2017-05-18 7 views
1

私のアプリケーションでは、Web UIアプリケーションが、Kafkaへのファイルアップロードプロセスを完了した後にファイルパスを送信しています。JavaSparkContextを使用して、カフカのレコードにファイル名のファイルを処理する方法は?

私は、JavaSparkContextJavaPairInputDStreamを使用して(それはファイルパスを受信するが、複数のファイルパスが存在する可能性がある)、KafkaからのメッセージをプルするSparkストリーミングアプリケーションを持っています。

私は並行してファイルを処理する必要があり、別のカフカストリームに結果を送信する必要があります。

SparkConf conf = new SparkConf().setAppName("Task1").setMaster("local[*]"); 
    sc = new JavaSparkContext(conf); 
    JavaStreamingContext ssc = new JavaStreamingContext(sc, new Duration(2000)); 

    Map<String, String> kafkaParams = new HashMap<>(); 
    kafkaParams.put("metadata.broker.list", "localhost:9092"); 
    Set<String> topics = Collections.singleton("topic1"); 

    JavaPairInputDStream<String, String> directKafkaStream = KafkaUtils.createDirectStream(ssc, String.class, 
      String.class, StringDecoder.class, StringDecoder.class, kafkaParams, topics); 

    directKafkaStream.foreachRDD(rdd -> { 

     rdd.collect().forEach((t) -> { 
      sendMessage(sc, t._2()); 
     }); 
    }); 

    ssc.start(); 
    ssc.awaitTermination(); 

sendMessageは、ファイル内のデータを送信します。

上記の実装では、foreachRDDメソッド内でJavaSparkContextを使用していますが、これはベストプラクティスではありません。私はファイルを並行して処理したい。私は純粋なカフカのプロデューサーになる機能sendMessage(スパークの依存関係のないとし、ESP JavaSparkContext。)を作成カフカのトピックにメッセージを送信したり取ると思い

directKafkaStream.foreachRDD(new VoidFunction<JavaRDD<String>>() { 
    public void call(JavaRDD<String> stringJavaRDD) throws Exception { 
     stringJavaRDD.foreachPartition(new VoidFunction<Iterator<String>>() { 
      public void call(Iterator<String> stringIterator) throws Exception { 
       sendMessage(stringIterator); 
      } 
     }); 
    } 
+0

@JacekLaskowskiこれは、このコードと質問の多くの問題の1つに過ぎないと私は思っています。現在、質問は非常に広く、必要であるか不明か、作成者にとっては明確ではないものがあります –

答えて

1

送信するすべてのメッセージのイテレータ。

official documentation of Apache Kafkaを参照してください。

def sendMessage(message: String) = { 
    println(s"Sending $message to Kafka") 
} 
dstream.map(_.value).foreachRDD { rdd => 
    println(s"Received rdd: $rdd with ${rdd.count()} records") 
    // take paths from RDD that contains Kafka records with the file names 
    val files = rdd.collect() 
    files.foreach { f => 
    // read a file `f` using Spark Core's RDD API 
    rdd.sparkContext.textFile(f).map { line => 
     // do something with line 
     // this is the place for a pure Spark transformation 
     // it's as if you were outside Spark Streaming 
     println(line) 
     line 
    }.foreachPartition { linesAfterProcessingPerPartition => 
     // send lines to Kafka 
     // they have been processed using Spark 
     linesAfterProcessingPerPartition.foreach { line => 
     sendMessage(message = line) 
     } 
    } 
    } 
} 

を私は確信している:私は(コメントをインラインがすべての行で何が起こるかのようあなたにいくつかのヒントを与える必要があります)スパークストリーミングの変換で、次の操作を行いたいsendMessageとして純粋カフカのプロデューサーで

コードがさらにはっきりしてしまうかもしれませんが、それはScalaであり、Javaを使用するので、ここで中止します。


それはすぐにスパークストリーミングを交換し、スパークでのストリーミングAPIになるように、私は強くSpark SQL's Structured Streamingを使用してお勧めします。

+0

'sendMessage'のSparkContextはどうでしょうか? –

2

:たとえば

関連する問題