2016-04-15 2 views

答えて

3

は、完全な作業アプリケーションのコードで新しいカフカ入力オペレータとアペックスMalharからのファイル出力演算子を使用しています。これはバイト配列をStringに変換し、制限されたサイズ(この例では1K)のローリングファイルを使用してHDFSに書き出します。ファイルサイズが上限に達するまで、.tmp拡張子を持つ一時的な名前が付けられます。これらの2つの演算子の間に、https://stackoverflow.com/a/36666388のDevTが示唆するように追加の演算子を挿入することができます。

package com.example.myapexapp; 

import java.nio.charset.Charset; 
import java.nio.charset.StandardCharsets; 

import org.apache.apex.malhar.kafka.AbstractKafkaInputOperator; 
import org.apache.apex.malhar.kafka.KafkaSinglePortInputOperator; 
import org.apache.hadoop.conf.Configuration; 

import com.datatorrent.api.annotation.ApplicationAnnotation; 
import com.datatorrent.api.StreamingApplication; 
import com.datatorrent.api.DAG; 

import com.datatorrent.lib.io.ConsoleOutputOperator; 
import com.datatorrent.lib.io.fs.AbstractFileInputOperator.FileLineInputOperator; 
import com.datatorrent.lib.io.fs.AbstractFileOutputOperator; 

@ApplicationAnnotation(name="MyFirstApplication") 
public class KafkaApp implements StreamingApplication 
{ 

    @Override 
    public void populateDAG(DAG dag, Configuration conf) 
    { 
    KafkaSinglePortInputOperator in = dag.addOperator("in", new KafkaSinglePortInputOperator()); 
    in.setInitialPartitionCount(1); 
    in.setTopics("test"); 
    in.setInitialOffset(AbstractKafkaInputOperator.InitialOffset.EARLIEST.name()); 
    //in.setClusters("localhost:2181"); 
    in.setClusters("localhost:9092"); // NOTE: need broker address, not zookeeper 

    LineOutputOperator out = dag.addOperator("out", new LineOutputOperator()); 
    out.setFilePath("/tmp/FromKafka"); 
    out.setFileName("test"); 
    out.setMaxLength(1024);  // max size of rolling output file 

    // create stream connecting input adapter to output adapter 
    dag.addStream("data", in.outputPort, out.input); 
    } 
} 

/** 
* Converts each tuple to a string and writes it as a new line to the output file 
*/ 
class LineOutputOperator extends AbstractFileOutputOperator<byte[]> 
{ 
    private static final String NL = System.lineSeparator(); 
    private static final Charset CS = StandardCharsets.UTF_8; 
    private String fileName; 

    @Override 
    public byte[] getBytesForTuple(byte[] t) { return (new String(t, CS) + NL).getBytes(CS); } 

    @Override 
    protected String getFileName(byte[] tuple) { return fileName; } 

    public String getFileName() { return fileName; } 
    public void setFileName(final String v) { fileName = v; } 
} 
0

カフカから読み込んでJDBCに書き込むためのサンプルコード。上記のコードを説明する

github.com/tweise/apex-samples/tree/master/exactly-once

ブログ。

アブロオペレータが https://github.com/apache/incubator-apex-malhar/search?utf8=%E2%9C%93&q=avro

テストケースを見つけることができる

www.datatorrent.com/blog/end-to-end-exactly-once-with-apache-apex/は、サンプルユースケースが含ま。あなたが別のを使用している場合 - > AvroToPojo - - >寸法アグリゲータ - AbstractFileOutputOperator

KafkaSinglePortStringInputOperatorの>実装アプリケーションコードが

KafkaSinglePortStringInputOperator、のようになり、高レベルで

+0

これは役に立ちません。あなたの答えには、問題の明確な解決策を含めて、参考になるだけのリンクに頼ってください。明確な解決策を提示できない場合、おそらく問題は明らかではありませんでした。 – miken32

2

KafkaSinglePortByteArrayInputOperatorを使用するか、カスタム実装を記述することができます。

AvroToPojo - https://github.com/apache/incubator-apex-malhar/blob/5075ce0ef75afccdff2edf4c044465340176a148/contrib/src/main/java/com/datatorrent/contrib/avro/AvroToPojo.java

このオペレータは、GenericRecordをPOJOを与えられたユーザに変換します。ユーザは、他のものを反映させるべきPOJOクラスを与える必要があります。これはコンテナファイルからGenericRecordsを読み出すために使用され、カフカから読んで、あなたは())と同様の線に沿って、あなたのオペレータをモデル化し、processTuple方法で以下のように入ってくるrecords.Somethingを解析するスキーマオブジェクトを追加し、 スキーマスキーマ=新しいSchema.Parserを動作するはずです()を解析することができます。 GenericDatumReader reader =新しいGenericDatumReader(スキーマ);

寸法アグリゲータ - https://github.com/apache/incubator-apex-malhar/tree/5075ce0ef75afccdff2edf4c044465340176a148/library/src/main/java/org/apache/apex/malhar/lib/dimensionsを、または同じラインに沿って、カスタム集計を書く - あなたはここで与えられたアグリゲータのいずれかを選ぶことができます。

FileWriter - 上のポストの例から。ここで

関連する問題