2016-11-30 10 views
2

私はSparkを使用してログラインを処理する必要があります。処理のステップの1つは、外部DBの特定の値をルックアップすることです。sparkからの外部データベースでのマイクロバッチルックアップ

例: ログラインに複数のキーと値のペアが含まれています。ログに存在するキーの1つが「key1」です。ルックアップコールにはこのキーを使用する必要があります。 RDDの "key1"の各値に対して、外部DBで複数のルックアップを連続して実行したくありません.RDDにある "key1"のすべての値のリストを作成し、外部DBで単一参照呼び出しを行いたい。

ようになり、各ログ行からキーを抽出する私のコードは次のとおりです。

lines.foreachRDD{rdd => rdd.map(line => extractKey(line)) 
// next step is lookup 
// then further processing 

.MAP機能は、各ログ行のために呼ばれ、私はわからないことになるが、どのように私はのリストを作成することができます外部ルックアップに使用できるキー。

おかげ

+0

を回避します'メソッドを呼び出すと、Array [Key]またはList [Key]として返されます。 – Shankar

答えて

0

あなたはこれをしたいように見えます:

lines.groupByKey().filter() 

あなたはより多くの情報を提供してもらえますか?

+0

私が使用しようとしている "key1"は、ログ行の列の1つだけです。これは検索のための鍵です。たとえば、私はキー "key1"と2つのログラインを得た、私はこのキーで特定の値のデータベースを検索します。私は結果をログラインに追加し、処理を進めます。 基本的に、私はドライバではなくエグゼキュータ上のキーのリストを蓄積しようとしています。 – Alok

2

collectを使用してください。

lines.foreachRDD{rdd => 
    val keys = rdd.map(line => extractKey(line)).collect() 
    // here you can use keys List 

おそらくあなたもmapPartitionsを使用する必要があります:1つのパーティションあたり1回のコールがあります

lines.foreachRDD{rdd => 
    rdd.foreachPartition(iter => { 
     val keys = iter.map(line => extractKey(line)).toArray 
     // here you can use keys Array 

    } 
} 

、この方法は、あなたが `collect`または` collectAsListを使用することができ、シリアル化問題

+0

ありがとうございます。方法はありますか、パーティションごとに1つではなく1つのrddで1つのコールを作成できますか? – Alok

関連する問題