2016-03-22 5 views
0

カスケードを使用してファイルを読み込み、特定のフィールドでグループ化を使用しようとしています。カスケーディング - フィールド名によるグループ化

しかし、ソースファイルのすべての行が返されます。

ソースファイル:

no,date,amt 
1,3/10/2016,1000 
1,3/10/2016,2000 
1,3/11/2016,400 
232,2/10/2016,1500 

コード:

Fields tscnFields = new Fields("no", "date", "amt"); 
FileTap tscnFileTap = new FileTap(new TextDelimited(tscnFields,true, ","), "C://Users//Test//tscn.txt"); 

    final Fields groupField = new Fields("date"); 

    Pipe pipe = new Pipe("test"); 
    pipe = new Each(pipe, new Debug()); 
    pipe = new GroupBy("group by date", pipe, groupField); 
    Fields outFields = new Fields("no", "date", "amt"); 
    FileTap sinkTap = new FileTap(new TextDelimited(outFields,true, ","), "C://Users//Test//out.txt", SinkMode.REPLACE);   Flow flow = flowConnector.connect("FlowMonitorTest", tscnFileTap, sinkTap, pipe); 
    flow.complete(); 

あなたは下記のような出力をしたい場合、私は取得しています出力は

['1', '3/10/2016', '1000'] 
['1', '3/10/2016', '2000'] 
['1', '3/11/2016', '400'] 
['232', '2/10/2016', '1500'] 
tuples count: 4 
+1

どのような出力が必要ですか? –

答えて

1

です

date 
2/10/2016 
3/10/2016 
3/10/2016 
3/11/2016 

次に、このコードをチェックする必要があります。このコードは、日付でタプルをグループ化し、日付フィールドのみのタプルを返します。以下はコードです

import java.util.Properties; 

import cascading.flow.Flow; 
import cascading.flow.FlowDef; 
import cascading.flow.local.LocalFlowConnector; 
import cascading.operation.Debug; 
import cascading.operation.Identity; 
import cascading.pipe.Each; 
import cascading.pipe.GroupBy; 
import cascading.pipe.Pipe; 
import cascading.property.AppProps; 
import cascading.scheme.local.TextDelimited; 
import cascading.tap.Tap; 
import cascading.tap.local.FileTap; 
import cascading.tuple.Fields; 

public class Main { 
    public static void main(String[] args) { 
     Tap sourceTap=new FileTap(new TextDelimited(true,","), 
       "text"); 
     Tap sinkTap=new FileTap(new TextDelimited(true,","), 
       "output"); 
     Pipe inputPipe=new Pipe("input_to_group"); 
     Fields groupField=new Fields("date"); 
     inputPipe=new Each(inputPipe,new Debug()); 
     inputPipe=new GroupBy(inputPipe,groupField); 
     inputPipe=new Each(inputPipe,new Fields("date"),new Identity()); 
     Properties properties=new Properties(); 
     AppProps.setApplicationJarClass(properties, Main.class); 
     FlowDef flowDef=FlowDef.flowDef().addSource(inputPipe, sourceTap) 
       .addTailSink(inputPipe, sinkTap); 
     Flow zodiacFlow=new LocalFlowConnector(properties).connect(flowDef); 
     zodiacFlow.complete(); 
    } 
} 

希望、この回答はあなたの要件を満たしています!
ありがとうございます

1

はい、明らかに入力と同じ出力が得られます。

'date'のフィールドをグループ化していますが、グループ化されたフィールドデータで何も処理しないので、出力と同じ入力が得られます。行われるべきである何

カスケードでデータをグループ化した後、我々は、データをグループ化されたプロセスにバッファバッファ通話ごとにアセンブリ)を呼び出す必要があります。あなたがあなたの操作を行うことができ、それらの入力で、

no,date,amt 
1,3/10/2016,1000 
1,3/10/2016,2000 

no,date,amt 
1,3/11/2016,400 

no,date,amt 
232,2/10/2016,1500 

今:上記の例で

は、あなたはすべてのバッファ動作の入力となりますので、以下の三つのグループフィールド「日付」でデータをグループ化入力を変更します。

注: Hadoopカスケードでは、各データ操作は異なる分散システムで実行されます。同じマシンで操作を確実に実行するために、バッファ操作(グループ化が必要です)を行います。

関連する問題