私のアプリケーションでは、Web UIアプリケーションが、Kafkaへのファイルアップロードプロセスを完了した後にファイルパスを送信しています。JavaSparkContextを使用して、カフカのレコードにファイル名のファイルを処理する方法は?
私は、JavaSparkContext
とJavaPairInputDStream
を使用して(それはファイルパスを受信するが、複数のファイルパスが存在する可能性がある)、KafkaからのメッセージをプルするSparkストリーミングアプリケーションを持っています。
私は並行してファイルを処理する必要があり、別のカフカストリームに結果を送信する必要があります。
SparkConf conf = new SparkConf().setAppName("Task1").setMaster("local[*]");
sc = new JavaSparkContext(conf);
JavaStreamingContext ssc = new JavaStreamingContext(sc, new Duration(2000));
Map<String, String> kafkaParams = new HashMap<>();
kafkaParams.put("metadata.broker.list", "localhost:9092");
Set<String> topics = Collections.singleton("topic1");
JavaPairInputDStream<String, String> directKafkaStream = KafkaUtils.createDirectStream(ssc, String.class,
String.class, StringDecoder.class, StringDecoder.class, kafkaParams, topics);
directKafkaStream.foreachRDD(rdd -> {
rdd.collect().forEach((t) -> {
sendMessage(sc, t._2());
});
});
ssc.start();
ssc.awaitTermination();
sendMessage
は、ファイル内のデータを送信します。
上記の実装では、foreachRDDメソッド内でJavaSparkContextを使用していますが、これはベストプラクティスではありません。私はファイルを並行して処理したい。私は純粋なカフカのプロデューサーになる機能sendMessage
(スパークの依存関係のないとし、ESP JavaSparkContext
。)を作成カフカのトピックにメッセージを送信したり取ると思い
directKafkaStream.foreachRDD(new VoidFunction<JavaRDD<String>>() {
public void call(JavaRDD<String> stringJavaRDD) throws Exception {
stringJavaRDD.foreachPartition(new VoidFunction<Iterator<String>>() {
public void call(Iterator<String> stringIterator) throws Exception {
sendMessage(stringIterator);
}
});
}
@JacekLaskowskiこれは、このコードと質問の多くの問題の1つに過ぎないと私は思っています。現在、質問は非常に広く、必要であるか不明か、作成者にとっては明確ではないものがあります –