私はJavaを使用しています。カフカからレコードのファイルパスにアクセスし、データセットを作成する方法は?
kafka-messagesでファイルパスを受信しています。このファイルをスパークRDDにロードして処理し、HDFSにダンプする必要があります。
カフカメッセージからファイルパスを取得できました。そして、私はこのファイルの上にデータセット/ RDDを作成したいと思います。
カフカメッセージデータセットでマップ機能を実行できません。ワーカーでsparkContextを使用できないため、NPEでエラーが発生します。
私はkafkaメッセージデータセットでforeachを実行できません。メッセージでエラーが発生しました。 "ストリーミングソースを使用したクエリは、writeStream.start();で実行する必要があります。
kafkaメッセージデータセットから受け取ったデータを収集できません。メッセージでエラーが発生しました。「ストリーミングソースを使用したクエリは、writeStream.start();;で実行する必要があります。
これは非常に一般的な使用例でなければならず、多くの設定で実行されている必要があります。
カフカメッセージで受信したパスからファイルをRDDとしてロードするにはどうすればよいですか?
以下のコード:
SparkSession spark = SparkSession.builder()
.appName("MyKafkaStreamReader")
.master("local[4]")
.config("spark.executor.memory", "2g")
.getOrCreate();
// Create DataSet representing the stream of input lines from kafka
Dataset<String> kafkaValues = spark.readStream()
.format("kafka")
.option("spark.streaming.receiver.writeAheadLog.enable", true)
.option("kafka.bootstrap.servers", Configuration.KAFKA_BROKER)
.option("subscribe", Configuration.KAFKA_TOPIC)
.option("fetchOffset.retryIntervalMs", 100)
.option("checkpointLocation", "file:///tmp/checkpoint")
.load()
.selectExpr("CAST(value AS STRING)").as(Encoders.STRING());
Dataset<String> messages = kafkaValues.map(x -> {
ObjectMapper mapper = new ObjectMapper();
String m = mapper.readValue(x.getBytes(), String.class);
return m;
}, Encoders.STRING());
// ====================
// TEST 1 : FAILS
// ====================
// CODE TRYING TO execute MAP on the received RDD
// This fails with a Null pointer exception because "spark" is not available on worker node
/*
Dataset<String> statusRDD = messages.map(message -> {
// BELOW STATEMENT FAILS
Dataset<Row> fileDataset = spark.read().option("header", "true").csv(message);
Dataset<Row> dedupedFileDataset = fileDataset.dropDuplicates();
dedupedFileDataset.rdd().saveAsTextFile(getHdfsLocation());
return getHdfsLocation();
}, Encoders.STRING());
StreamingQuery query2 = statusRDD.writeStream().outputMode("append").format("console").start();
*/
// ====================
// TEST 2 : FAILS
// ====================
// CODE BELOW FAILS WITH EXCEPTION
// "Queries with streaming sources must be executed with writeStream.start();;"
// Hence, processing the deduplication on the worker side using
/*
JavaRDD<String> messageRDD = messages.toJavaRDD();
messageRDD.foreach(message -> {
Dataset<Row> fileDataset = spark.read().option("header", "true").csv(message);
Dataset<Row> dedupedFileDataset = fileDataset.dropDuplicates();
dedupedFileDataset.rdd().saveAsTextFile(getHdfsLocation());
});
*/
// ====================
// TEST 3 : FAILS
// ====================
// CODE TRYING TO COLLECT ALSO FAILS WITH EXCEPTION
// "Queries with streaming sources must be executed with writeStream.start();;"
// List<String> mess = messages.collectAsList();
私は、ファイル・パスを作成し、ファイルの上にRDDSを作成読むことができる方法上の任意のアイデア?
はまた、あなたのコードを投稿することができますあなたはどうしていますか? – nat
私の試用のためのコードが追加されました。 –
私はあなたが構造化されたストリーミングを使用してこのユースケースを達成できるとは思わない。代わりに 'Direct''カフカコンシューマーでSpark Streamingを使用してください。カスタムファイル読み込みロジックは、一般的な 'foreachRDD'操作の中で実装することができます。 – maasg