Datatorrent RTS(Apache Apex)プラットフォームをセットアップし、piデモを実行するだけです。 私はkafkaからの "avro"メッセージを消費し、そのデータを集めてhdfsに格納したいと思っています。 このかカフカのサンプルコードを入手できますか?avro kafkaメッセージを使用するサンプルコードを入手できますか?
答えて
は、完全な作業アプリケーションのコードで新しいカフカ入力オペレータとアペックスMalharからのファイル出力演算子を使用しています。これはバイト配列をStringに変換し、制限されたサイズ(この例では1K)のローリングファイルを使用してHDFSに書き出します。ファイルサイズが上限に達するまで、.tmp
拡張子を持つ一時的な名前が付けられます。これらの2つの演算子の間に、https://stackoverflow.com/a/36666388のDevTが示唆するように追加の演算子を挿入することができます。
package com.example.myapexapp;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import org.apache.apex.malhar.kafka.AbstractKafkaInputOperator;
import org.apache.apex.malhar.kafka.KafkaSinglePortInputOperator;
import org.apache.hadoop.conf.Configuration;
import com.datatorrent.api.annotation.ApplicationAnnotation;
import com.datatorrent.api.StreamingApplication;
import com.datatorrent.api.DAG;
import com.datatorrent.lib.io.ConsoleOutputOperator;
import com.datatorrent.lib.io.fs.AbstractFileInputOperator.FileLineInputOperator;
import com.datatorrent.lib.io.fs.AbstractFileOutputOperator;
@ApplicationAnnotation(name="MyFirstApplication")
public class KafkaApp implements StreamingApplication
{
@Override
public void populateDAG(DAG dag, Configuration conf)
{
KafkaSinglePortInputOperator in = dag.addOperator("in", new KafkaSinglePortInputOperator());
in.setInitialPartitionCount(1);
in.setTopics("test");
in.setInitialOffset(AbstractKafkaInputOperator.InitialOffset.EARLIEST.name());
//in.setClusters("localhost:2181");
in.setClusters("localhost:9092"); // NOTE: need broker address, not zookeeper
LineOutputOperator out = dag.addOperator("out", new LineOutputOperator());
out.setFilePath("/tmp/FromKafka");
out.setFileName("test");
out.setMaxLength(1024); // max size of rolling output file
// create stream connecting input adapter to output adapter
dag.addStream("data", in.outputPort, out.input);
}
}
/**
* Converts each tuple to a string and writes it as a new line to the output file
*/
class LineOutputOperator extends AbstractFileOutputOperator<byte[]>
{
private static final String NL = System.lineSeparator();
private static final Charset CS = StandardCharsets.UTF_8;
private String fileName;
@Override
public byte[] getBytesForTuple(byte[] t) { return (new String(t, CS) + NL).getBytes(CS); }
@Override
protected String getFileName(byte[] tuple) { return fileName; }
public String getFileName() { return fileName; }
public void setFileName(final String v) { fileName = v; }
}
カフカから読み込んでJDBCに書き込むためのサンプルコード。上記のコードを説明する
github.com/tweise/apex-samples/tree/master/exactly-once
ブログ。
アブロオペレータが https://github.com/apache/incubator-apex-malhar/search?utf8=%E2%9C%93&q=avro
テストケースを見つけることができる
www.datatorrent.com/blog/end-to-end-exactly-once-with-apache-apex/は、サンプルユースケースが含ま。あなたが別のを使用している場合 - > AvroToPojo - - >寸法アグリゲータ - AbstractFileOutputOperator
KafkaSinglePortStringInputOperatorの>実装アプリケーションコードが
KafkaSinglePortStringInputOperator、のようになり、高レベルで
KafkaSinglePortByteArrayInputOperatorを使用するか、カスタム実装を記述することができます。
このオペレータは、GenericRecordをPOJOを与えられたユーザに変換します。ユーザは、他のものを反映させるべきPOJOクラスを与える必要があります。これはコンテナファイルからGenericRecordsを読み出すために使用され、カフカから読んで、あなたは())と同様の線に沿って、あなたのオペレータをモデル化し、processTuple方法で以下のように入ってくるrecords.Somethingを解析するスキーマオブジェクトを追加し、 スキーマスキーマ=新しいSchema.Parserを動作するはずです()を解析することができます。 GenericDatumReader reader =新しいGenericDatumReader(スキーマ);
寸法アグリゲータ - https://github.com/apache/incubator-apex-malhar/tree/5075ce0ef75afccdff2edf4c044465340176a148/library/src/main/java/org/apache/apex/malhar/lib/dimensionsを、または同じラインに沿って、カスタム集計を書く - あなたはここで与えられたアグリゲータのいずれかを選ぶことができます。
FileWriter - 上のポストの例から。ここで
- 1. Avroバイナリエンコーダを使用してKafkaメッセージをエンコード/デコードする方法は?
- 2. AvroシリアライザとスキーマレジストリでKafkaにメッセージを送信する方法
- 3. go中のKafka Avroメッセージを消費する
- 4. サンプルコードを入手するにはいいですか?
- 5. confluent-kafka avro with pipをインストールする
- 6. スキーマを使用したAvroメッセージ
- 7. Kafka Avroコンシューマ(デコーダー付)
- 8. confluent-kafka python avro messages
- 9. spring-cloud-stream kafka avro
- 10. リンゴのGLImageProcessingのサンプルコードはどこから入手できますか?
- 11. Kafka Avroデコードメッセージを印刷できません
- 12. スパムストリーミングやflumeでXmlをAvroからKafka、hdfsに変換する
- 13. avroスキーマを知らなくても、Avroエンコードされたkafkaメッセージをスカラーで読む方法は?
- 14. avro&kafkaを使用してオブジェクトのリストを送信する方法
- 15. Apache Kafka Avro Deserialization:特定のタイプのメッセージを逆シリアル化またはデコードできません。
- 16. kafkaのavroエンコードメッセージ内のprotobuf
- 17. Avroシリアライザとデシリアライザkafka java api
- 18. Kafka Avroスキーマの進化
- 19. KafkaからJSONメッセージを消すことができないKafka-Pythonのデシリアライザを使用
- 20. Kerberized Kafkaにメッセージを送信します。 kafkaテンプレート(spring-kafka)バージョン1.15(Springブートを使用していません)
- 21. 何を使用すればいいですか:Kafka StreamまたはKafka consumer APIまたはKafka connect
- 22. python-kafka:コンシューマはメッセージ属性に基づいてメッセージをスキップできますか?
- 23. PythonでKafka Avro文字列をデコード/逆シリアル化する方法
- 24. Kafka Streamsがスキーマなしでavroトピックを作成する
- 25. Avroスキーマを集中化したApache Kafka
- 26. kafkaにタイムスタンプを追加する方法nodejsを使ったメッセージ
- 27. KAFKA REST APIを使用してJSONメッセージを消費する
- 28. iOSフレームワークを入手できますか?
- 29. Kafka 0.9 KafkaConsumerでオフセットを手動でコミットするときにメッセージを再消費する方法
- 30. サンプルコードをsandcastleを使用して文書に入れます
これは役に立ちません。あなたの答えには、問題の明確な解決策を含めて、参考になるだけのリンクに頼ってください。明確な解決策を提示できない場合、おそらく問題は明らかではありませんでした。 – miken32