1
私は入力要求を処理するためにスパークストリーミングジョブを使用します。スパークストリーミング処理の並列処理
私のspark入力はファイル名を取ってデータをダウンロードし、変更を加えてデータをダウンストリームに送ります。
現在、1つのファイルを処理するのに2分かかります。
これらのファイル要求は独立した操作であり、並列処理が可能です。
現在、私がnetcatサーバーから入力した場合、各要求が最初に処理され、次の要求が処理されます。私はこの操作を並行させたい。
@timing
def sleep_func(data):
print("start file processing")
time.sleep(60)
print("end file processing")
return data
rdd = ssc.socketTextStream(sys.argv[1], int(sys.argv[2]))
rdd = rdd.map(sleep_func)
final_rects = rdd.pprint()
私はこれに基づいて各エグゼキュータで処理される複数のsockettextstreamを作成しようとしています。
https://spark.apache.org/docs/2.0.2/streaming-programming-guide.html#level-of-parallelism-in-data-receiving
rdd = [ssc.socketTextStream(sys.argv[1], int(sys.argv[2])) for _ in range(5)]
これらの個々のストリームを個別に処理する方法はわかりません。