HiveテーブルからデルタをKafkaに公開しようとしています。問題のテーブルは、単一パーティション、244 MBの単一ブロックファイルです。私たちのクラスタは256Mのブロックサイズに設定されているので、ここでは1つのファイルの最大値に達しています。Spark Dataframe leftanti結合に失敗する
テーブルが更新されるたびに、コピーがアーカイブされ、デルタプロセスが実行されます。
以下の関数では、異なるジョインを分離して内部ジョインが許容範囲内で実行されることを確認しましたが(約3分)、2つのアンチジョインデータフレームは完了しません - スパークジョブでは、以下のエラーを引き続き見ています。
このような結合のデータフレームサイズには実際的な制限がありますか?
private class DeltaColumnPublisher(spark: SparkSession, sink: KafkaSink, source: RegisteredDataset)
extends BasePublisher(spark, sink, source) with Serializable {
val deltaColumn = "hadoop_update_ts" // TODO: move to the dataset object
def publishDeltaRun(dataLocation: String, archiveLocation: String): (Long, Long) = {
val current = spark.read.parquet(dataLocation)
val previous = spark.read.parquet(archiveLocation)
val inserts = current.join(previous, keys, "leftanti")
val updates = current.join(previous, keys).where(current.col(deltaColumn) =!= previous.col(deltaColumn))
val deletes = previous.join(current, keys, "leftanti")
val upsertCounter = spark.sparkContext.longAccumulator("upserts")
val deleteCounter = spark.sparkContext.longAccumulator("deletes")
logInfo("sending inserts to kafka")
sink.sendDeltasToKafka(inserts, "U", upsertCounter)
logInfo("sending updates to kafka")
sink.sendDeltasToKafka(updates, "U", upsertCounter)
logInfo("sending deletes to kafka")
sink.sendDeltasToKafka(deletes, "D", deleteCounter)
(upsertCounter.value, deleteCounter.value)
}
}
私たちが見ているエラーは、ドライバが執行との接触を失っていることを示していると思われます。エグゼキュータのメモリーを24Gまで増やし、ネットワークのタイムアウトを900秒に、ハートビートの間隔を120秒まで延長しました。
後のログに17/11/27 20:36:18 WARN netty.NettyRpcEndpointRef: Error sending message [message = Heartbeat(1,[Lscala.Tuple2;@596e3aa6,BlockManagerId(1, server, 46292, None))] in 2 attempts
org.apache.spark.rpc.RpcTimeoutException: Futures timed out after [120 seconds]. This timeout is controlled by spark.executor.heartbeatInterval
at ...
Caused by: java.util.concurrent.TimeoutException: Futures timed out after [120 seconds]
at ...
:我々は(成功せず)を操作されている
17/11/27 20:42:37 WARN netty.NettyRpcEndpointRef: Error sending message [message = Heartbeat(1,[Lscala.Tuple2;@25d1bd5f,BlockManagerId(1, server, 46292, None))] in 3 attempts
org.apache.spark.SparkException: Exception thrown in awaitResult
at ...
Caused by: java.lang.RuntimeException: org.apache.spark.SparkException: Could not find HeartbeatReceiver.
設定スイッチは、私が考えるに失敗したオプションは、私のドライバリソースを増やすことである--executor-memory 24G --conf spark.network.timeout=900s --conf spark.executor.heartbeatInterval=120s