2016-07-21 3 views
0

以下のコードはうまくいきますが、トランザクションが大量に流入すると、cassandraに書き込むのに時間がかかります。sparkストリーミング - より良い並列化のためにforeachPartitionとsaveToCassandraを使用

val stream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics) 
val parsedStream = stream.map(_._2).map(EmpParser.parse(_)).cache() 

以下のコードは、cassandraに順次書き込みを行い、1つのエグゼキュータで実行します。

parsedStream.saveToCassandra("test", "ct_table", SomeColumns("emp_id","emp_name","emp_sal","emp_dept")) 

しかし、foreachPartitionを実行して、cassandraへの書き込みを並列化したかったのです。しかし、foreachPartitionでsaveToCassandraオプションが表示されません。

parsedStream.foreachRDD{rdd => 
    rdd.foreachPartition { partition => 
     partition.saveToCassandra("test", "ct_table", SomeColumns("emp_id","emp_name","emp_sal","emp_dept")) 
    } 
} 

これを実現する方法はありますか。

+0

'saveToCassandra'は' RDD'/'DStream'レベルで定義されていますが、' partition'は単純なスケーラ 'Iterator'ですので定義されていません。 –

+0

OK。すべての私のエグゼクティブが並行して実行したときに、どのようにしてカサンドラに書き込むことができましたか? – JKPEAK

+0

することができます 'parseStream.repartition(num).saveToCassandra' – Knight71

答えて

0

すでにダイレクトストリームを使用しているため、並列性を高める方法は2つあります。

  1. カフカパーティションの数を増やします。ダイレクトストリームを使用する場合、Sparkは自動的にKafkaと同じ数のパーティションを作成します。ただし、セットアップによっては、これが実現できない場合があります。

  2. スパークスrepartitionを使用してください。ほとんどの場合、出力ではなく入力上でrepartitionが良いです。

    val num: Int = ? // Number of partitition 
    val parsedStream = stream.repartition(num).map(_._2).map(EmpParser.parse(_)) 
    parsedStream.saveToCassandra(...) 
    

あなたは一度だけparsedStreamを使用する必要がある場合は、それをキャッシュする必要はありません。

+0

私は弾性検索にも同じデータを書く必要があります。現在、私は以下のようにelasticに保存しています。 ElasticSearchに保存する並列化されたアプローチをお勧めします。 – JKPEAK

+0

私のコードがcassandraとelasticsearchの両方に書き込んでいるとき、処理時間は3秒に増えています(データサイズによって異なります)。しかし、私がcassandraだけに書き込もうとすると、処理時間は0.3〜0.7秒になります。 – JKPEAK

関連する問題