2017-10-27 16 views
1

セッションウィンドウを使用し、Apache Beam 2.0.0でTextIO.write経由でファイルに書き込むとき、TextIO.write()を呼び出すと次の例外が発生します。TextIO.writeをセッションウィンドウで書き込むとGroupByKeyの消費例外が発生する

java.lang.IllegalStateException: GroupByKey must have a valid Window merge function. Invalid because: WindowFn has already been consumed by previous GroupByKey

潜在的にウィンドウを消費する可能性のある間に介入がない場合でも例外が発生します。私はコードを含んでいます - 主な機能は問題を示し、2.0.0のためのヘルパーポリシーライタークラスを含んでいます。私は透かし/トリガ、タイムスタンプの組み合わせ、Window.remerge()ING、 またはビーム2.1.0を使用して(周りの意図を明確にするから何の効果も見られなかったし、ビーム2.1.0は、そのデフォルトのファイル名のポリシーが含まれて

import org.apache.beam.sdk.Pipeline; 
import org.apache.beam.sdk.io.FileBasedSink; 
import org.apache.beam.sdk.io.TextIO; 
import org.apache.beam.sdk.io.fs.ResolveOptions; 
import org.apache.beam.sdk.io.fs.ResourceId; 
import org.apache.beam.sdk.transforms.*; 
import org.apache.beam.sdk.transforms.windowing.*; 
import org.apache.beam.sdk.values.PCollection; 
import org.apache.beam.sdk.values.TimestampedValue; 
import org.joda.time.Duration; 
import org.joda.time.Instant; 
import org.joda.time.format.DateTimeFormatter; 
import org.joda.time.format.ISODateTimeFormat; 


public class TestSessionWindowToFile { 
    /** 
    * Support class: a filename policy for getting one file per window. 
    * See https://github.com/apache/beam/blob/release-2.0.0/examples/java/src/main/java/org/apache/beam/examples/common/WriteOneFilePerWindow.java 
    */ 
    public static class LocalPerWindowFiles extends FileBasedSink.FilenamePolicy { 
     private static final DateTimeFormatter FORMATTER = ISODateTimeFormat.hourMinute(); 
     private final String prefix; 

     public LocalPerWindowFiles(String prefix) { 
      this.prefix = prefix; 
     } 

     public String filenamePrefixForWindow(IntervalWindow window) { 
      return String.format("%s-%s-%s", 
        prefix, FORMATTER.print(window.start()), FORMATTER.print(window.end())); 
     } 

     @Override 
     public ResourceId windowedFilename(
       ResourceId outputDirectory, WindowedContext context, String extension) { 
      IntervalWindow window = (IntervalWindow) context.getWindow(); 
      String filename = String.format(
        "%s-%s-of-%s%s", 
        filenamePrefixForWindow(window), context.getShardNumber(), context.getNumShards(), 
        extension); 
      return outputDirectory.resolve(filename, ResolveOptions.StandardResolveOptions.RESOLVE_FILE); 
     } 

     @Override 
     public ResourceId unwindowedFilename(
       ResourceId outputDirectory, Context context, String extension) { 
      throw new UnsupportedOperationException("Unsupported."); 
     } 
    } 


    /** 
    * Creating a session windows and then asking TextIO to write the results leads to 
    * "java.lang.IllegalStateException: GroupByKey must have a valid Window merge function. 
    * Invalid because: WindowFn has already been consumed by previous GroupByKey" 
    */ 
    public static void main(String[] args) { 
     Pipeline p = Pipeline.create(); 

     PCollection<String> input = p.apply(
       Create.timestamped(
         TimestampedValue.of("this", new Instant(1)), 
         TimestampedValue.of("is", new Instant(2)), 
         TimestampedValue.of("a", new Instant(3)), 
         TimestampedValue.of("test", new Instant(4)), 
         TimestampedValue.of("test", new Instant(5)), 
         TimestampedValue.of("test", new Instant(50)), 
         TimestampedValue.of("test", new Instant(51)), 
         TimestampedValue.of("test", new Instant(52)) 
       ) 
     ); 

     PCollection<String> windowedInputs = input 
       // session windowing fails: 
       .apply(Window.into(Sessions.withGapDuration(new org.joda.time.Duration(10)))); 
       // sliding windowing succeeds: 
       //.apply(Window.into(SlidingWindows.of(new Duration(30)).every(new Duration(10)))); 

     // Invoke counting of elements so that sessioning is more clear 
     PCollection<KV<String, Long>> counts = 
       windowedInputs.apply(Count.perElement()); 
     PCollection<String> writeableStrings = counts.apply("Convert to text", 
      ParDo.of(new DoFn<KV<String, Long>, String>() { 
      @ProcessElement 
      public void processElement(ProcessContext c) { 
       String word = c.element().getKey(); 
       Long count = c.element().getValue(); 
       c.output(String.format("%s,%d", word, count)); 
      } 
     })); 

     writeableStrings 
       .apply(TextIO.write() 
         .to("i_am_ignored_given_filename_policy") 
         .withFilenamePolicy(new LocalPerWindowFiles("results/testSessionWindow")) 
         .withWindowedWrites() 
         .withNumShards(1) 
     ); 
     p.run(); 
    } 
} 

ウィンドウのないファイルだけでなくウィンドウのないファイルも書き込む方法を知っています)。

ロギングはセッションが正しく構築されていることを示している、とSlidingWindowは(Sessionsの代わりに.apply(Window.into(SlidingWindows.of(new Duration(30)).every(new Duration(10))));ようなバリアントを使用して)出力ファイルを生成し、正常に動作します。これは、セッションwindowing + TextIO.writeの設定ミスや操作ミスを示唆しています。

キー+スタート+終了ウィンドウグループごとにテキストファイルを書き込むために、このコードをどのように修正できますか?

答えて

0

これはWriteFilesトランスフォームのバグです。私はhttps://issues.apache.org/jira/browse/BEAM-3122を提出しました。残念ながら、バグを修正することができない、回避策は考えられません。

+0

これとさらに戦う価値はないことを確認していただきありがとうございます - ありがとうございます。私は(私はこのコードベースで始まる森の中のベイビーですが)私が見たときにはっきりとしたバグを見つけることができませんでしたので、それを取る人はすべて私の最高の願いを得ます。 – ppptomlin

+0

この問題を解決するために、達成しようとしていることを明確にすることはできますか? Windowsは常にキーごとであり、いくつかのウィンドウ処理戦略はすべてのキー(例えばスライドウィンドウや固定ウィンドウ)について独立して同じことをしますが、セッションウィンドウではキーの問題があります。 1人のユーザーIDセッションごとに考えることができます。セッション内の単一の「キー」に属するデータセット全体を計算してセッションウィンドウを作成しようとしていますか?または、何らかの種類のグループ化キーを持っていますか? – jkff

+0

例を編集して実際に単語をグループ化し、出現回数を計算するようにします。例をあまりにも積極的に攻撃しています。私は実際にキーをグループ化してカウントをしようとしています。たとえば、キー「earlySession」を持つt = 1、t = 2、およびt = 10のデータ、およびキー「lateSession」を持つt = 5、t = 9およびt = 60のデータを受け取ることがあります。 t_10_earlySession(3つすべての情報)、t_9_lateSession(最初の2つの情報)、t_60_lateSession(最後の1からの情報)の3つのファイルを生成するために、25のt-単位のギャップでセッションを行うことを期待しています。 (申し訳ありませんが、私はこれを逃しました。私は今、メールを受け取ります)。 – ppptomlin

関連する問題