2017-09-26 8 views
4

私はJavaを使用しています。カフカからレコードのファイルパスにアクセスし、データセットを作成する方法は?

kafka-messagesでファイルパスを受信して​​います。このファイルをスパークRDDにロードして処理し、HDFSにダンプする必要があります。

カフカメッセージからファイルパスを取得できました。そして、私はこのファイルの上にデータセット/ RDDを作成したいと思います。

カフカメッセージデータセットでマップ機能を実行できません。ワーカーでsparkContextを使用できないため、NPEでエラーが発生します。

私はkafkaメッセージデータセットでforeachを実行できません。メッセージでエラーが発生しました。 "ストリーミングソースを使用したクエリは、writeStream.start();で実行する必要があります。

kafkaメッセージデータセットから受け取ったデータを収集できません。メッセージでエラーが発生しました。「ストリーミングソースを使用したクエリは、writeStream.start();;で実行する必要があります。

これは非常に一般的な使用例でなければならず、多くの設定で実行されている必要があります。

カフカメッセージで受信したパスからファイルをRDDとしてロードするにはどうすればよいですか?

以下のコード:

SparkSession spark = SparkSession.builder() 
.appName("MyKafkaStreamReader") 
    .master("local[4]") 
.config("spark.executor.memory", "2g") 
.getOrCreate(); 

// Create DataSet representing the stream of input lines from kafka 
Dataset<String> kafkaValues = spark.readStream() 
.format("kafka") 
    .option("spark.streaming.receiver.writeAheadLog.enable", true) 
    .option("kafka.bootstrap.servers", Configuration.KAFKA_BROKER) 
    .option("subscribe", Configuration.KAFKA_TOPIC) 
    .option("fetchOffset.retryIntervalMs", 100) 
    .option("checkpointLocation", "file:///tmp/checkpoint") 
.load() 
    .selectExpr("CAST(value AS STRING)").as(Encoders.STRING()); 

Dataset<String> messages = kafkaValues.map(x -> { 
    ObjectMapper mapper = new ObjectMapper(); 
    String m = mapper.readValue(x.getBytes(), String.class); 
    return m; 
}, Encoders.STRING()); 

// ==================== 
// TEST 1 : FAILS 
// ====================  
// CODE TRYING TO execute MAP on the received RDD 
// This fails with a Null pointer exception because "spark" is not available on worker node 

/* 
Dataset<String> statusRDD = messages.map(message -> { 

    // BELOW STATEMENT FAILS 
    Dataset<Row> fileDataset = spark.read().option("header", "true").csv(message); 
    Dataset<Row> dedupedFileDataset = fileDataset.dropDuplicates(); 
    dedupedFileDataset.rdd().saveAsTextFile(getHdfsLocation()); 
    return getHdfsLocation(); 

}, Encoders.STRING()); 

    StreamingQuery query2 = statusRDD.writeStream().outputMode("append").format("console").start(); 
    */ 

// ====================  
// TEST 2 : FAILS 
// ====================  
// CODE BELOW FAILS WITH EXCEPTION 
// "Queries with streaming sources must be executed with writeStream.start();;" 
// Hence, processing the deduplication on the worker side using 
/* 
JavaRDD<String> messageRDD = messages.toJavaRDD(); 

messageRDD.foreach(message -> { 

    Dataset<Row> fileDataset = spark.read().option("header", "true").csv(message); 
    Dataset<Row> dedupedFileDataset = fileDataset.dropDuplicates(); 
    dedupedFileDataset.rdd().saveAsTextFile(getHdfsLocation()); 

}); 
*/ 

// ====================  
// TEST 3 : FAILS 
// ==================== 
// CODE TRYING TO COLLECT ALSO FAILS WITH EXCEPTION 
// "Queries with streaming sources must be executed with writeStream.start();;" 
// List<String> mess = messages.collectAsList(); 

私は、ファイル・パスを作成し、ファイルの上にRDDSを作成読むことができる方法上の任意のアイデア?

+0

はまた、あなたのコードを投稿することができますあなたはどうしていますか? – nat

+0

私の試用のためのコードが追加されました。 –

+0

私はあなたが構造化されたストリーミングを使用してこのユースケースを達成できるとは思わない。代わりに 'Direct''カフカコンシューマーでSpark Streamingを使用してください。カスタムファイル読み込みロジックは、一般的な 'foreachRDD'操作の中で実装することができます。 – maasg

答えて

0

ストラクチャードストリーミングでは、データセット操作のパラメータとして使用するストリームを1つのストリームに反映させる方法はないと思います。

Sparkエコシステムでは、これはSpark StreamingとSpark SQL(データセット)を組み合わせることで可能です。 Spark Streamingを使用してKafkaトピックを消費し、Spark SQLを使用して、対応するデータをロードして、必要なプロセスを適用することができます。

このような仕事はおおよそ次のようになります(これはScalaであり、Javaコードは同じ構造に従います。実際のコードは少しより冗長であるだけという。)

// configure and create spark Session 

val spark = SparkSession 
    .builder 
    .config(...) 
    .getOrCreate() 

// create streaming context with a 30-second interval - adjust as required 
val streamingContext = new StreamingContext(spark.sparkContext, Seconds(30)) 

// this uses Kafka080 client. Kafka010 has some subscription differences 

val kafkaParams = Map[String, String](
    "metadata.broker.list" -> kafkaBootstrapServer, 
    "group.id" -> "job-group-id", 
    "auto.offset.reset" -> "largest", 
    "enable.auto.commit" -> (false: java.lang.Boolean).toString 
) 

// create a kafka direct stream 
val topics = Set("topic") 
val stream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
    streamingContext, kafkaParams, topics) 

// extract the values from the kafka message 
val dataStream = stream.map{case (id, data) => data}  

// process the data 
dataStream.foreachRDD { dataRDD => 
    // get all data received in the current interval 
    // We are assuming that this data fits in memory. 
    // We're not processing a million files per second, are we? 
    val files = dataRDD.collect() 
    files.foreach{ file => 
    // this is the process proposed in the question -- 
    // notice how we have access to the spark session in the context of the foreachRDD 
    val fileDataset = spark.read().option("header", "true").csv(file) 
    val dedupedFileDataset = fileDataset.dropDuplicates() 
    // this can probably be written in terms of the dataset api 
    //dedupedFileDataset.rdd().saveAsTextFile(getHdfsLocation()); 
    dedupedFileDataset.write.format("text").mode("overwrite").save(getHdfsLocation()) 
    } 
} 

// start the streaming process 
streamingContext.start() 
streamingContext.awaitTermination() 
関連する問題