2017-01-24 12 views
0

私はCassandra 2.2.5の列とメタストアでFiloDB 0.4を使用しており、Spark Streaming 1.6.1 + Jobserver 0.6.2を使用してデータを挿入しようとしています。FiloDB + Spark Streaming Data Loss

messages.foreachRDD(parseAndSaveToFiloDb) 

private static Function<JavaPairRDD<String, String>, Void> parseAndSaveToFiloDb = initialRdd -> { 
     final List<RowWithSchema> parsedMessages = parseMessages(initialRdd.collect()); 
     final JavaRDD<Row> rdd = javaSparkContext.parallelize(createRows(parsedMessages)); 
     final DataFrame dataFrame = sqlContext.createDataFrame(rdd, generateSchema(rawMessages); 

     dataFrame.write().format("filodb.spark") 
       .option("database", keyspace) 
       .option("dataset", dataset) 
       .option("row_keys", rowKeys) 
       .option("partition_keys", partitionKeys) 
       .option("segment_key", segmentKey) 
       .mode(saveMode).save(); 
     return null; 
    }; 

セグメント鍵は「:文字列が/ 0」、行キーがCONSTは、すべてのためのものであるカラムに設定され、各行およびパーティションキーに対する固有のものであるカラムに設定されている私は、データを挿入するために、次のコードを使用し行。言い換えれば、私のテストデータセットはすべて、単一パーティション上の単一セグメントになります。私が1つの1ノードスパークを使用している場合、すべて正常に動作し、すべてのデータが挿入されますが、同時に2つの別個の1ノードスパーク(クラスタではない)を実行すると、間隔として数秒間に1つずつメッセージを送信してもデータの% 各行に対してdataFrame.write()が実行されているので、この行の後に問題が発生することを確認しました。 セグメントキーを各行に固有の列に設定すると、すべてのデータがCassandra/FiloDBに届きます。

2つの別々のスパークを使用するシナリオの解決策を提案してください。

答えて

1

@psyduckこれは、各パーティションのデータが一度に1つのノード(0.4バージョンの場合)でのみ取り込まれる可能性が高いためです。したがって、現在のバージョンに固執するには、データを複数のパーティションに分割し、各ワーカーが1つのパーティションしか取得しないようにする必要があります。上記を達成する最も簡単な方法は、パーティションキーでデータを並べ替えることです。

- master(Spark 2.x/Scala 2.11)またはspark1.6ブランチ(spark 1.6/Scala 2.10)では、最新バージョンに移行することを強くお勧めします。最新バージョンには、問題を解決する0.4以外の多くの変更があります。

  • データを正しい取り込みノードに自動的にルーティングするために、Akkaクラスタを使用します。この場合、同じモデルを使用すると、データはすべて正しいノードに移動し、データ損失は発生しません。
  • TimeUUIDベースのchunkIDなので、複数のワーカー(スプリットブレインの場合)あなたが任意のセグメントキーを定義する必要はありませんので、データの損失は、読み取りと

は、私たちのメーリングリストに手を差し伸べる気軽に書き込みの両方のためのより効率的、

  • 新しい「セグメント以下」データモデル回避されますhttps://groups.google.com/forum/#!forum/filodb-discuss

  • 関連する問題