2017-01-10 1 views
0

kafkaから取得しているストリーミングデータは、hdfsファイルのパスであり、そのファイルのデータを取得する必要があります。JavaDStreamをRDDに変換するには?または、JavaDStreamのマップ関数内に新しいRDDを作成する方法はありますか?

batchInputDStream.map(new Function<Tuple2<String,String>, FreshBatchInput>() { 

      @Override 
      public String call(Tuple2<String, String> arg0) 
        throws Exception { 
       StringReader reader = new StringReader(arg0._2); 
       JAXBContext jaxbContext = JAXBContext.newInstance(FreshBatchInput.class); 

        Unmarshaller jaxbUnmarshaller = jaxbContext.createUnmarshaller(); 
        FreshBatchInput input = (FreshBatchInput)jaxbUnmarshaller.unmarshal(reader); 


       return input.getPath();  
      } 
     }); 

ここで、input.getPath()は、ファイルのhdfsパスです。

JavaDstreamオブジェクトを収集するオプションはありません。それ以外の場合は、最初にデータを収集し、データをファイルから取得することで使用します。

Iamは新しいRDD内部マップ関数を作成できません。タスクはシリアル化できません。

他にもオプションはありますか?

答えて

0

foreachRDDを使用できます。 RDDSはドライバのみで作成することができます - それは、ドライバ上で実行されるので、RDDアクションが

transformed.foreachRDD (rdd -> { 
    String inputPath = doSomethingWithRDD(rdd) 
    rdd.sparkContext.textFile(inputPath) ... 
}); 

はあなたが変換またはアクション内のRDDを作成することができないことに注意してください許可されています。 foreachRDDの例と同様の質問はhereです。これは、マップ、フィルタまたはforeachPartition内のSparkContextを使用できないことを意味します

+0

しかし、私はこのメソッドの呼び出しでsparcontext.readFileを呼び出すことはできませんので、この解決策も同じ問題に直面しますか? –

+0

@NidhiShah変換に関するテキストが間違っている可能性があるので削除しました。foreachRDDはあなたにとっては大丈夫です –

関連する問題