以下のコードはうまくいきますが、トランザクションが大量に流入すると、cassandraに書き込むのに時間がかかります。sparkストリーミング - より良い並列化のためにforeachPartitionとsaveToCassandraを使用
val stream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics)
val parsedStream = stream.map(_._2).map(EmpParser.parse(_)).cache()
以下のコードは、cassandraに順次書き込みを行い、1つのエグゼキュータで実行します。
parsedStream.saveToCassandra("test", "ct_table", SomeColumns("emp_id","emp_name","emp_sal","emp_dept"))
しかし、foreachPartitionを実行して、cassandraへの書き込みを並列化したかったのです。しかし、foreachPartitionでsaveToCassandraオプションが表示されません。
parsedStream.foreachRDD{rdd =>
rdd.foreachPartition { partition =>
partition.saveToCassandra("test", "ct_table", SomeColumns("emp_id","emp_name","emp_sal","emp_dept"))
}
}
これを実現する方法はありますか。
'saveToCassandra'は' RDD'/'DStream'レベルで定義されていますが、' partition'は単純なスケーラ 'Iterator'ですので定義されていません。 –
OK。すべての私のエグゼクティブが並行して実行したときに、どのようにしてカサンドラに書き込むことができましたか? – JKPEAK
することができます 'parseStream.repartition(num).saveToCassandra' – Knight71