私はthis questionからスパークノードが効果的に「直接通信」することを見てきましたが、私は理論や実装についてはあまり気にしません。 Hereページの下部にある "### Encryption"セクションに、セキュリティのためにいくつかのSSLプロトコルを使用するようにSparkを設定できることが示されています。通信のHTTP(s)。私の質問は事実上2つの部分です:Sparkノードがどのプロトコルを使って通信するのですか?この転送のためにデータはどのようにフォーマットされていますか?シャッフル中にスパークノードはどのように通信しますか?
答えて
スパークは、エグゼキュータプロセス間で通信するためにRPC(Netty)を使用します。 NettyRpcEndpointRef
クラスを調べることで、実際の実装を確認できます。
データをシャッフルするには、データブロックの提供を担当するBlockManager
から始めます。エグゼキュータプロセスごとに1つあります。内部的には、BlockStoreShuffleReader
はSerializerManager
を使用して異なるエグゼキュータからの読み取りを管理します。このマネージャはspark.serializer
プロパティによって定義される実際のシリアライザ、保持:
val serializer = instantiateClassFromConf[Serializer](
"spark.serializer", "org.apache.spark.serializer.JavaSerializer")
logDebug(s"Using serializer: ${serializer.getClass}")
ブロックを読み取るBlockManager
試み、その基礎となる構成からシリアライザを使用します。設定に応じて、KryoSerializer
またはJavaSerializer
のいずれかになります。
ボトムライン、シャッフルされたデータの読み書き用Sparkはユーザ定義シリアライザを使用します。
タスクのシリアル化では、これは少し異なります。
スパークはclosureSerializer
という変数を使用します。この変数のデフォルトはJavaSerializerInstance
で、Javaのシリアル化を意味します。
val taskBinaryBytes: Array[Byte] = stage match {
case stage: ShuffleMapStage =>
JavaUtils.bufferToArray(
closureSerializer.serialize((stage.rdd, stage.shuffleDep): AnyRef))
case stage: ResultStage =>
JavaUtils.bufferToArray(closureSerializer.serialize((stage.rdd, stage.func): AnyRef))
}
連載され、各エグゼキュータに送信されます実際のオブジェクトがTaskDescription
と呼ばれている:あなたはDAGScheduler.submitMissingTasks
メソッドの内部でこれを見ることができます
def encode(taskDescription: TaskDescription): ByteBuffer = {
val bytesOut = new ByteBufferOutputStream(4096)
val dataOut = new DataOutputStream(bytesOut)
dataOut.writeLong(taskDescription.taskId)
dataOut.writeInt(taskDescription.attemptNumber)
dataOut.writeUTF(taskDescription.executorId)
dataOut.writeUTF(taskDescription.name)
dataOut.writeInt(taskDescription.index)
// Write files.
serializeStringLongMap(taskDescription.addedFiles, dataOut)
// Write jars.
serializeStringLongMap(taskDescription.addedJars, dataOut)
// Write properties.
dataOut.writeInt(taskDescription.properties.size())
taskDescription.properties.asScala.foreach { case (key, value) =>
dataOut.writeUTF(key)
// SPARK-19796 -- writeUTF doesn't work for long strings, which can happen for property values
val bytes = value.getBytes(StandardCharsets.UTF_8)
dataOut.writeInt(bytes.length)
dataOut.write(bytes)
}
// Write the task. The task is already serialized, so write it directly to the byte buffer.
Utils.writeByteBuffer(taskDescription.serializedTask, bytesOut)
dataOut.close()
bytesOut.close()
bytesOut.toByteBuffer
}
そしてCoarseGrainedSchedulerBackend.launchTasks
方法からRPC経由で送信されます:
executorData.executorEndpoint.send(LaunchTask(new SerializableBuffer(serializedTask)))
私がこれまで示したことは、打ち上げタスクについて語ったことです。データをシャッフルするために、Sparkは異なるエグゼキュータからの読み取りを管理するBlockStoreShuffleReader
を保持しています。
- 1. オンラインブラックジャックゲームはどのようにカードをシャッフルしますか?
- 2. CloudfrontはELBとどのように通信しますか?
- 3. Kubernetesモジュールはetcdとどのように通信しますか?
- 4. libvirtはkvmとどのように通信しますか?
- 5. チャットサーバーはサーバーファーム内でどのように通信しますか?
- 6. Firebaseはどのようにクライアントと通信しますか?
- 7. HTML通知からメインページとどのように通信しますか?
- 8. データノードはHadoopでどのように相互に通信していますか?
- 9. MS Access:フォームはどのように値を互いに通信しますか?
- 10. ルートハンドラとemberデータからのクエリはどのように通信しますか?
- 11. ディレクティブはコントローラによってどのように通信されますか?
- 12. プログラムはどのように通信するのですか?マイクロカーネルモノリシックカーネルエクソカーネル
- 13. どのようにHTTPSと通信できますか? (Basic Stuff)
- 14. GUIツールキットは、Linux上のウィンドウマネージャとどのように通信しますか?
- 15. 同じノード上のポッドはどのように通信しますか?
- 16. WCFサービスはどのように通信チャネルですか?
- 17. 相互通信マイクロサービス - どのように?
- 18. テーブルビューコントローラは詳細ビューとどのように通信しますか?
- 19. プログラミング言語/ライブラリはハードウェアとどのように通信しますか?
- 20. ビューとコントローラはMVCでどのように通信しますか?
- 21. U-BootはLinuxカーネルとどのように通信しますか?
- 22. ArduinoはどのようにFirebaseを通じてデータを受信しますか?
- 23. ブラウザは電子メールプログラムとどのように通信しますか?
- 24. どのように2つのアンドロイドアプリケーションを通信するには?
- 25. フラグメントが1つのアクティビティにあるかのように、フラグメント間でどのように通信しますか?
- 26. Facebookのような企業はどのように通知メールをユーザーに送信しますか?
- 27. 私が作成したHTTPLISTENERとどのように通信しますか?
- 28. グラフィックスドライバはCPUからGPUにプログラムによってどのように通信しますか?
- 29. どのように通知をユーザーからユーザーに送信できますか?
- 30. ブラウザ内で2つのタブはどのように通信できますか?
ありがとうございました!より有益で直接的な答えを求めることができませんでした。 – Chance
速いフォローアップとして、私は可能な限りファイルシステムから直接入力データを読み込むことを理解していますが、ドライバープログラムが最初にデータをロードして手動で作業者に渡すと、記載されている?ありがとう。 – Chance
ドライバとワーカーはRPC経由で通信します。ドライバがデータ自体を読み込んでそれをワーカーに配布することは非常に珍しいことに注意してください。 –