kafkaから取得しているストリーミングデータは、hdfsファイルのパスであり、そのファイルのデータを取得する必要があります。JavaDStreamをRDDに変換するには?または、JavaDStreamのマップ関数内に新しいRDDを作成する方法はありますか?
batchInputDStream.map(new Function<Tuple2<String,String>, FreshBatchInput>() {
@Override
public String call(Tuple2<String, String> arg0)
throws Exception {
StringReader reader = new StringReader(arg0._2);
JAXBContext jaxbContext = JAXBContext.newInstance(FreshBatchInput.class);
Unmarshaller jaxbUnmarshaller = jaxbContext.createUnmarshaller();
FreshBatchInput input = (FreshBatchInput)jaxbUnmarshaller.unmarshal(reader);
return input.getPath();
}
});
ここで、input.getPath()は、ファイルのhdfsパスです。
JavaDstreamオブジェクトを収集するオプションはありません。それ以外の場合は、最初にデータを収集し、データをファイルから取得することで使用します。
Iamは新しいRDD内部マップ関数を作成できません。タスクはシリアル化できません。
他にもオプションはありますか?
しかし、私はこのメソッドの呼び出しでsparcontext.readFileを呼び出すことはできませんので、この解決策も同じ問題に直面しますか? –
@NidhiShah変換に関するテキストが間違っている可能性があるので削除しました。foreachRDDはあなたにとっては大丈夫です –