2017-08-16 5 views
3

私はthis questionからスパークノードが効果的に「直接通信」することを見てきましたが、私は理論や実装についてはあまり気にしません。 Hereページの下部にある "### Encryption"セクションに、セキュリティのためにいくつかのSSLプロトコルを使用するようにSparkを設定できることが示されています。通信のHTTP(s)。私の質問は事実上2つの部分です:Sparkノードがどのプロトコルを使って通信するのですか?この転送のためにデータはどのようにフォーマットされていますか?シャッフル中にスパークノードはどのように通信しますか?

答えて

3

スパークは、エグゼキュータプロセス間で通信するためにRPC(Netty)を使用します。 NettyRpcEndpointRefクラスを調べることで、実際の実装を確認できます。

データをシャッフルするには、データブロックの提供を担当するBlockManagerから始めます。エグゼキュータプロセスごとに1つあります。内部的には、BlockStoreShuffleReaderSerializerManagerを使用して異なるエグゼキュータからの読み取りを管理します。このマネージャはspark.serializerプロパティによって定義される実際のシリアライザ、保持:

val serializer = instantiateClassFromConf[Serializer](
    "spark.serializer", "org.apache.spark.serializer.JavaSerializer") 
logDebug(s"Using serializer: ${serializer.getClass}") 

ブロックを読み取るBlockManager試み、その基礎となる構成からシリアライザを使用します。設定に応じて、KryoSerializerまたはJavaSerializerのいずれかになります。

ボトムライン、シャッフルされたデータの読み書き用Sparkはユーザ定義シリアライザを使用します。


タスクのシリアル化では、これは少し異なります。

スパークはclosureSerializerという変数を使用します。この変数のデフォルトはJavaSerializerInstanceで、Javaのシリアル化を意味します。

val taskBinaryBytes: Array[Byte] = stage match { 
    case stage: ShuffleMapStage => 
    JavaUtils.bufferToArray(
     closureSerializer.serialize((stage.rdd, stage.shuffleDep): AnyRef)) 
    case stage: ResultStage => 
     JavaUtils.bufferToArray(closureSerializer.serialize((stage.rdd, stage.func): AnyRef)) 
} 

連載され、各エグゼキュータに送信されます実際のオブジェクトがTaskDescriptionと呼ばれている:あなたはDAGScheduler.submitMissingTasksメソッドの内部でこれを見ることができます

def encode(taskDescription: TaskDescription): ByteBuffer = { 
    val bytesOut = new ByteBufferOutputStream(4096) 
    val dataOut = new DataOutputStream(bytesOut) 

    dataOut.writeLong(taskDescription.taskId) 
    dataOut.writeInt(taskDescription.attemptNumber) 
    dataOut.writeUTF(taskDescription.executorId) 
    dataOut.writeUTF(taskDescription.name) 
    dataOut.writeInt(taskDescription.index) 

    // Write files. 
    serializeStringLongMap(taskDescription.addedFiles, dataOut) 

    // Write jars. 
    serializeStringLongMap(taskDescription.addedJars, dataOut) 

    // Write properties. 
    dataOut.writeInt(taskDescription.properties.size()) 
    taskDescription.properties.asScala.foreach { case (key, value) => 
    dataOut.writeUTF(key) 
    // SPARK-19796 -- writeUTF doesn't work for long strings, which can happen for property values 
    val bytes = value.getBytes(StandardCharsets.UTF_8) 
    dataOut.writeInt(bytes.length) 
    dataOut.write(bytes) 
    } 

    // Write the task. The task is already serialized, so write it directly to the byte buffer. 
    Utils.writeByteBuffer(taskDescription.serializedTask, bytesOut) 

    dataOut.close() 
    bytesOut.close() 
    bytesOut.toByteBuffer 
} 

そしてCoarseGrainedSchedulerBackend.launchTasks方法からRPC経由で送信されます:

executorData.executorEndpoint.send(LaunchTask(new SerializableBuffer(serializedTask))) 

私がこれまで示したことは、打ち上げタスクについて語ったことです。データをシャッフルするために、Sparkは異なるエグゼキュータからの読み取りを管理するBlockStoreShuffleReaderを保持しています。

+1

ありがとうございました!より有益で直接的な答えを求めることができませんでした。 – Chance

+0

速いフォローアップとして、私は可能な限りファイルシステムから直接入力データを読み込むことを理解していますが、ドライバープログラムが最初にデータをロードして手動で作業者に渡すと、記載されている?ありがとう。 – Chance

+1

ドライバとワーカーはRPC経由で通信します。ドライバがデータ自体を読み込んでそれをワーカーに配布することは非常に珍しいことに注意してください。 –

関連する問題