2017-11-01 18 views
1

私は入力要求を処理するためにスパークストリーミングジョブを使用します。スパークストリーミング処理の並列処理

私のspark入力はファイル名を取ってデータをダウンロードし、変更を加えてデータをダウンストリームに送ります。

現在、1つのファイルを処理するのに2分かかります。

これらのファイル要求は独立した操作であり、並列処理が可能です。

現在、私がnetcatサーバーから入力した場合、各要求が最初に処理され、次の要求が処理されます。私はこの操作を並行させたい。

@timing 
def sleep_func(data): 
    print("start file processing")    
    time.sleep(60)  
    print("end file processing")     
    return data 

rdd = ssc.socketTextStream(sys.argv[1], int(sys.argv[2]))  
rdd = rdd.map(sleep_func)  
final_rects = rdd.pprint() 

私はこれに基づいて各エグゼキュータで処理される複数のsockettextstreamを作成しようとしています。

https://spark.apache.org/docs/2.0.2/streaming-programming-guide.html#level-of-parallelism-in-data-receiving 

rdd = [ssc.socketTextStream(sys.argv[1], int(sys.argv[2])) for _ in range(5)] 

これらの個々のストリームを個別に処理する方法はわかりません。

答えて