私はCassandra 2.2.5の列とメタストアでFiloDB 0.4を使用しており、Spark Streaming 1.6.1 + Jobserver 0.6.2を使用してデータを挿入しようとしています。FiloDB + Spark Streaming Data Loss
messages.foreachRDD(parseAndSaveToFiloDb)
private static Function<JavaPairRDD<String, String>, Void> parseAndSaveToFiloDb = initialRdd -> {
final List<RowWithSchema> parsedMessages = parseMessages(initialRdd.collect());
final JavaRDD<Row> rdd = javaSparkContext.parallelize(createRows(parsedMessages));
final DataFrame dataFrame = sqlContext.createDataFrame(rdd, generateSchema(rawMessages);
dataFrame.write().format("filodb.spark")
.option("database", keyspace)
.option("dataset", dataset)
.option("row_keys", rowKeys)
.option("partition_keys", partitionKeys)
.option("segment_key", segmentKey)
.mode(saveMode).save();
return null;
};
セグメント鍵は「:文字列が/ 0」、行キーがCONSTは、すべてのためのものであるカラムに設定され、各行およびパーティションキーに対する固有のものであるカラムに設定されている私は、データを挿入するために、次のコードを使用し行。言い換えれば、私のテストデータセットはすべて、単一パーティション上の単一セグメントになります。私が1つの1ノードスパークを使用している場合、すべて正常に動作し、すべてのデータが挿入されますが、同時に2つの別個の1ノードスパーク(クラスタではない)を実行すると、間隔として数秒間に1つずつメッセージを送信してもデータの% 各行に対してdataFrame.write()が実行されているので、この行の後に問題が発生することを確認しました。 セグメントキーを各行に固有の列に設定すると、すべてのデータがCassandra/FiloDBに届きます。
2つの別々のスパークを使用するシナリオの解決策を提案してください。