2017-06-16 5 views
0

sparkのDataset関数のほとんどは行単位の操作です。しかし、私はSparkで実行するMLタスクの実行を配布したいと思います。ほとんどのMLタスクは自然に行の関数ではなくテーブルの関数である演算です。 (私はMLLibを見てきました。その方法はあまりにも限定されており、多くの場合、単一のコアに収まる可能性のある多くのコアに分散操作を行うことによって、実行がより遅くなります)。リモートノードでテーブル操作を実行できますか? (対行操作)

MLアルゴリズムは単一行ではなく行の集まりを処理するということが重要なので、ノード上のメモリにテーブルを実現したいと考えています。 (私はそれがコアに収まると約束します)。これどうやってするの?

機能的には、私がやってみたい:

def mlsubtask(table, arg2, arg3): 
    data = table.collect() 
    ... 

sc = SparkContext(...) 
sqlctx = SQLContext(sc) 
... 
df = sqlctx.sql("SELECT ...") 
results = sc.parallelize([(df,arg2,arg3),(df,arg2,arg3),(df,arg2,arg3)]).map(mlsubtask).collect() 

このような実行を実行することができた場合:

sc = SparkContext(...) 
sqlctx = SQLContext(sc) 
... 
df = sqlctx.sql("SELECT ...") 
df = df.collect() 
results = sc.parallelize([(df,arg2,arg3),(df,arg2,arg3),(df,arg2,arg3)]).map(mlsubtask).collect() 

を...しかし、これはクライアントにデータをもたらし、ここで再シリアライズされ、非常に非効率的です。単一のタスクのために

答えて

0

def mlsubtask(iter_rows): 
    data_table = list(iter_rows) # Or other way of bringing into memory. 
    ... 

df.repartition(1).mapPartitions(mlsubtask) 
関連する問題