2017-11-27 5 views
0

HiveテーブルからデルタをKafkaに公開しようとしています。問題のテーブルは、単一パーティション、244 MBの単一ブロックファイルです。私たちのクラスタは256Mのブロックサイズに設定されているので、ここでは1つのファイルの最大値に達しています。Spark Dataframe leftanti結合に失敗する

テーブルが更新されるたびに、コピーがアーカイブされ、デルタプロセスが実行されます。

以下の関数では、異なるジョインを分離して内部ジョインが許容範囲内で実行されることを確認しましたが(約3分)、2つのアンチジョインデータフレームは完了しません - スパークジョブでは、以下のエラーを引き続き見ています。

このような結合のデータフレームサイズには実際的な制限がありますか?

private class DeltaColumnPublisher(spark: SparkSession, sink: KafkaSink, source: RegisteredDataset) 
    extends BasePublisher(spark, sink, source) with Serializable { 

    val deltaColumn = "hadoop_update_ts" // TODO: move to the dataset object 

    def publishDeltaRun(dataLocation: String, archiveLocation: String): (Long, Long) = { 

     val current = spark.read.parquet(dataLocation) 
     val previous = spark.read.parquet(archiveLocation) 

     val inserts = current.join(previous, keys, "leftanti") 
     val updates = current.join(previous, keys).where(current.col(deltaColumn) =!= previous.col(deltaColumn)) 
     val deletes = previous.join(current, keys, "leftanti") 

     val upsertCounter = spark.sparkContext.longAccumulator("upserts") 
     val deleteCounter = spark.sparkContext.longAccumulator("deletes") 

     logInfo("sending inserts to kafka") 
     sink.sendDeltasToKafka(inserts, "U", upsertCounter) 

     logInfo("sending updates to kafka") 
     sink.sendDeltasToKafka(updates, "U", upsertCounter) 

     logInfo("sending deletes to kafka") 
     sink.sendDeltasToKafka(deletes, "D", deleteCounter) 

     (upsertCounter.value, deleteCounter.value) 
    } 
    } 

私たちが見ているエラーは、ドライバが執行との接触を失っていることを示していると思われます。エグゼキュータのメモリーを24Gまで増やし、ネットワークのタイムアウトを900秒に、ハートビートの間隔を120秒まで延長しました。

後のログに
17/11/27 20:36:18 WARN netty.NettyRpcEndpointRef: Error sending message [message = Heartbeat(1,[Lscala.Tuple2;@596e3aa6,BlockManagerId(1, server, 46292, None))] in 2 attempts 
org.apache.spark.rpc.RpcTimeoutException: Futures timed out after [120 seconds]. This timeout is controlled by spark.executor.heartbeatInterval 
    at ... 
Caused by: java.util.concurrent.TimeoutException: Futures timed out after [120 seconds] 
    at ... 

:我々は(成功せず)を操作されている

17/11/27 20:42:37 WARN netty.NettyRpcEndpointRef: Error sending message [message = Heartbeat(1,[Lscala.Tuple2;@25d1bd5f,BlockManagerId(1, server, 46292, None))] in 3 attempts 
org.apache.spark.SparkException: Exception thrown in awaitResult 
    at ... 
Caused by: java.lang.RuntimeException: org.apache.spark.SparkException: Could not find HeartbeatReceiver. 

設定スイッチは、私が考えるに失敗したオプションは、私のドライバリソースを増やすことである--executor-memory 24G --conf spark.network.timeout=900s --conf spark.executor.heartbeatInterval=120s

答えて

0

です。私は--driver-memory 4G--driver-cores 2を追加し、私の仕事は約9分で完了しました。

これらの2つのファイルの内部結合(または組み込みのexcept()メソッドを使用)は、エグゼキュータにメモリ負担をかけるようです。キー列の1つを分割するとメモリの負荷が軽減されますが、シャッフルが多くなるため全体の時間が長くなります。

これらの2つのファイル間の左と反対の結合を行うには、より多くのドライバリソースが必要です。期待していませんでした。

関連する問題