2017-10-04 16 views
1

パイプ()をキーRDDでグループ化するには?私はこれまで、フォローのワークフローパスを行っている

1) JavaPairRDD< Integer, String > aRDD = fooRDD.mapToPair() 

2) JavaPairRDD< Integer, Iterable<String> > bRDD = aRDD.groupByKey() 

3) JavaPairRDD< Integer, List<String> > cRDD = bRDD.mapToPair() 

を今私は問題を抱えている:私はcRDD.pipe('myscript.sh')する必要があるが、私はmyscript.shが一度に各キーのすべてのリストを受信して​​いる気づきました。

長いバージョン:各行のグループを取り、データを含むPDFを作成するbashスクリプトがあります。したがって、bRDDはキーを使用して行をグループ化し、cRDDは各グループ内のいくつかの望ましくないデータを並べ替えて削除し、次の手順は各データグループに対して1つのPDFレポートを作成します。

私はグループの内容を表すList<String>を各グループの新しいJavaPairRDD< Integer, String >に変換することを考えていますが、これをどうやって行うのか、これが正しい方法であるとしてもわかりません。

例:

(1,'foo,b,tom'), (1,'bar,c,city'), (1,'fly,Marty'), (2,'newFoo,Jerry'), (2,'newBar,zed,Mark'), (2,'newFly,boring,data') (2,'jack,big,deal') 

GROUPBY後:

(1, 'foo,b,tom','bar,c,city','fly,Marty') 
(2, 'newFoo,Jerry','newBar,zed,Mark','newFly,boring,data','jack,big,deal') 

どう `myscript.shのデータを取っている(グループ全体に対して1つの文字列に注意してください):

(1,['foo,b,tom,bar,c,city,fly,Marty']) 

(2,['newFoo,Jerry,newBar,zed,Mark,newFly,boring,data,jack,big,deal']) 

私はどのように受信することを期待しています:

パーティション1または労働者1の場合:パーティション2または作業員2については

1,'foo,b,tom' 
1,'bar,c,city' 
1,'fly,Marty' 

2,'newFoo,Jerry' 
2,'newBar,zed,Mark' 
2,'newFly,boring,data' 
2,'jack,big,deal' 

だから私は一度にそれぞれの行を処理することができますが、まだグループを維持し、これを作ることを保証することができますグループ1は1つのPDFレポートに移動し、グループ2は別のレポートに移動します。大きな問題は私のデータラインがすでにカンマで区切られたデータであるため、すべてのラインがコンマで区切られたラインとしてマージされるので、新しいライン値をどこで開始するのかを決定することができません。

私はJavaを使用しています。あなたの答えはJavaでも与えてください。

答えて

1

RDD内にRDDを作成することはできません。特定のキーに属するすべてのレコードを連続して処理したい場合は、グループ化されたRDD(bRDD、cRDD)を再度フラット化しないでください。代わりに、グループ化されたRDDの(bRDD、cRDD)値の区切りを他の文字に変更することをお勧めします。

cRDD.map(s->{ 
      StringBuilder sb =new StringBuilder(); 
      Iterator<String> ite = s._2().iterator(); 
      while (ite.hasNext()){ 
       //change delimiter to colon(:) or some other character 
       sb.append(ite.next()+":"); 
      } 
      return new Tuple2<Long,String>(s._1(),sb.toString()); 
     }).pipe('myscript.sh'); 

myscript.shコロン(:)に基づく分割レコード。これが助けてくれるといいなあ

+0

良いアイデア。 Sparkがスクリプトでデータのバケットを一度にダンプしても何の問題もないでしょうか?何らかの理由で、私は一行一列の配管が一度に聖書全体を投げるより危険ではないと思っていますが、私は 'stdin'ストリームに違いはないと思っています... –

+0

考えてみましょう:データを使って報告すると、とにかくそれを一度に持っていなければなりません.... –

関連する問題