2017-11-29 9 views
0

スパークダイレクトストリーミングを使用するとき、私はズキーパーカーに私のオフセットを保存しようとしています。私はそれがAPIで利用できません見るように、我々はJavaPairInputDstreamを取得するために使用できる方法の回避策があります私はJavaInputDstream APIは、特定のオフセットから起動するオプションを持っている見るが、私はJavaPairInputDstreamのためにそれを必要とするオフセットからのJavaPairInputDstreamストリーム

JavaInputDStream<String> messages = KafkaUtils.createDirectStream(jsc, String.class, 
          String.class, StringDecoder.class, StringDecoder.class, String.class, kafkaParams, offsets,(messageAndMetadata) -> messageAndMetadata.message()); 

私はこのオフセットを持たないJavaPairInputDstreamを使用します

答えて

0

私は直接ストリームで変換を行い、キーと値のペアにマップしました。

final JavaPairDStream<String, String> messages2 =messages.transformToPair(pairRdd -> { 

          pairRdd.mapToPair(label->new Tuple2<>(label,label))};); 
関連する問題