2016-08-19 11 views
2

私は、Apache ApexアプリケーションでKafkaログを消費し、HDFSに書き込んでいます。AbstractFileOutputWriter重複したtmpファイルの生成

DAGは、 "MyWriter extends AbstractFileOutputOperator"にストリームで接続されたKafkaコンシューマー(オペレータ用に2GBのメモリ20個のパーティション)があるほど簡単です。

問題: 1.同じサイズと同じデータの複数の.tmpファイルを何度も繰り返し書き込んでいるWriterを見ています。私はWrite Operatorのメモリを増やしてみました。WriterのParitionの数を増やしました。それでもこの問題は起こり続けています。

MyWriterにrequestFinalizeを追加/削除しようとしました。まだ同じ問題。

@Override 
    public void endWindow() 
    { 
     if (null != fileName) { 
      requestFinalize(fileName); 
     } 
     super.endWindow(); 
    } 

これは、これは私がオペレータのためdt.logから取得することができたのスタックトレースである私のproperties.xmlのサブセット

<property> 
    <name>dt.attr.STREAMING_WINDOW_SIZE_MILLIS</name> 
    <value>1000</value> 
    </property> 

    <property> 
    <name>dt.application.myapp.operator.*.attr.APPLICATION_WINDOW_COUNT</name> 
    <value>60</value> 
    </property> 

    <property> 
    <name>dt.application.myapp.operator.*.attr.CHECKPOINT_WINDOW_COUNT</name> 
    <value>60</value> 
    </property> 

<property> 
     <name>dt.application.myapp.operator.myWriter.attr.PARTITIONER</name> 
     <value>com.datatorrent.common.partitioner.StatelessPartitioner:20</value> 
    </property> 

    <property> 
    <name>dt.application.myapp.operator.myWriter.prop.maxLength</name> 
    <value>1000000000</value> <!-- 1 GB File --> 
    </property> 

です: オペレータが異なる中で、おそらく再デプロイますcontianersは、この例外をスローし、重複したファイルを書き続けます。

java.lang.RuntimeException: java.io.FileNotFoundException: File does not exist: /kafkaconsumetest/inventoryCount/nested/trial2/1471489200000_1471489786800_161.0.1471489802786.tmp 
     at com.datatorrent.lib.io.fs.AbstractFileOutputOperator.setup(AbstractFileOutputOperator.java:418) 
     at com.datatorrent.lib.io.fs.AbstractFileOutputOperator.setup(AbstractFileOutputOperator.java:112) 
     at com.datatorrent.stram.engine.Node.setup(Node.java:187) 
     at com.datatorrent.stram.engine.StreamingContainer.setupNode(StreamingContainer.java:1309) 
     at com.datatorrent.stram.engine.StreamingContainer.access$100(StreamingContainer.java:130) 
     at com.datatorrent.stram.engine.StreamingContainer$2.run(StreamingContainer.java:1388) 
    Caused by: java.io.FileNotFoundException: File does not exist: /kafkaconsumetest/inventoryCount/nested/trial2/1471489200000_1471489786800_161.0.1471489802786.tmp 
     at org.apache.hadoop.hdfs.DistributedFileSystem$19.doCall(DistributedFileSystem.java:1219) 
     at org.apache.hadoop.hdfs.DistributedFileSystem$19.doCall(DistributedFileSystem.java:1211) 
     at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81) 
     at org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1211) 
     at com.datatorrent.lib.io.fs.AbstractFileOutputOperator.setup(AbstractFileOutputOperator.java:411) 
     ... 5 more 
2016-08-17 22:17:01,108 INFO com.datatorrent.stram.engine.StreamingContainer: Undeploy request: [161, 177] 
2016-08-17 22:17:01,116 INFO com.datatorrent.stram.engine.StreamingContainer: Undeploy complete. 
2016-08-17 22:17:02,121 INFO com.datatorrent.stram.engine.StreamingContainer: Waiting for pending request. 
2016-08-17 22:17:02,625 INFO com.datatorrent.stram.engine.StreamingContainer: Waiting for pending request. 
2016-08-17 22:17:03,129 INFO com.datatorrent.stram.engine.StreamingContainer: Waiting for pending request. 

答えて

2

ベースのオペレータ用のコードは以下のリンクであり、以下 コメントで参照されている: https://github.com/apache/apex-malhar/blob/master/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperator.java

1ギガバイトに最大ファイルサイズを設定することで、自動的にファイルをロール可能。関連フィールドは、次のとおり

protected Long maxLength = Long.MAX_VALUE; 
protected transient boolean rollingFile = false; 

前者はLong.MAX_VALUEの既定値より小さい値を有する場合、後者はsetup()方法でtrueに設定されています。

ローリングファイルを有効にすると、ファイルのファイナライズは自動的に行われるため、requestFinalize()に電話をかけてはいけません。

第二に、あなたのMyWriterクラスで、endWindow()オーバーライドを削除し、setup()方法でオペレータIDを含んで目的のファイル名を作成し、getFileName()オーバーライドで、このファイル名を返すことを確認してください。これにより、複数のパーティションが互いに踏み込まないようにします。例:あなたはそれのためにゲッターとセッターを追加する必要があります

@NotNull 
private String fileName;   // current base file name 

private transient String fName; // per partition file name 

@Override 
public void setup(Context.OperatorContext context) 
{ 
    // create file name for this partition by appending the operator id to 
    // the base name 
    // 
    long id = context.getId(); 
    fName = fileName + "_p" + id; 
    super.setup(context); 

    LOG.debug("Leaving setup, fName = {}, id = {}", fName, id); 
} 

@Override 
protected String getFileName(Long[] tuple) 
{ 
    return fName; 
} 

ファイルのベース名(上記のコードでfileName)コードで直接設定するか、XMLファイル内のプロパティから初期化することができます(同じように)。

あなたはでの使用のこのタイプの例を見ることができます:追加の提案の https://github.com/DataTorrent/examples/tree/master/tutorials/fileOutput

カップル:パーティションは1にカウント(またはPARTITIONER属性を設定XMLをコメントアウト設定し

  1. )、すべてが期待通りに機能することを確認してください。これにより、パーティション化に関連していない問題はなくなります。可能な場合は、最大ファイルサイズをたとえば2Kまたは4Kに減らして、テストが容易になるようにします。
  2. 単一パーティションのケースが機能したら、パーティションの数を2に増やします。これが機能すれば、任意の大きい数値(理由の中で)も機能するはずです。
関連する問題