外部のソースからデータを取り出してトピックに格納するカスタムKafkaコネクタ(Javaで書かれ、Kafka ConnectのJava APIを使用)を使用しています。カスタムパーティショニング戦略を設定する必要があります。 partitioner.class
propertyを設定することで、のカスタム設定はのカフカプロデューサーで可能です。ただし、このプロパティはKafkaコネクタでは何も実行されていません。 Kafka Connect(私のコネクタを実行するのにconnect-standalone
スクリプトを使用しています)を設定するには、私が書いたカスタムPartitioner
を使用するにはどうしたらいいですか?Kafkaコネクタのパーティションストラテジを設定する
答えて
ソースコネクタは、各ソースレコードが書き込まれるパーティションをSourceRecord
のpartition
フィールドで制御できます。これが独自のコネクタであれば、これは最も簡単です。
ただし、ソースコネクタの各レコードの分割方法を変更する場合は、ソースレコードのpartition
フィールドを上書きするSMT(Single Message Transform)を使用できます。 org.apache.kafka.connect.transforms.Transformation
を実装し、独自のパーティショニングロジックを使用してカスタムSMTを作成しなければならない可能性がありますが、実際にはカスタムKafkaパーティショナーを作成するよりも簡単です。
たとえば、構成プロパティの使用方法と、希望のパーティション番号を持つ新しいSourceRecord
インスタンスを作成する方法を示す概念的なカスタム変換があります。サンプルは実際にはどのような真のパーティショニングロジックもないので不完全ですが、それは良い出発点であるはずです。
package io.acme.example; import org.apache.kafka.common.config.AbstractConfig; import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.common.config.ConfigDef.Importance; import org.apache.kafka.common.config.ConfigDef.Type; import org.apache.kafka.connect.source.SourceRecord; import org.apache.kafka.connect.transforms.Transformation; import java.util.Map; public class CustomPartitioner implements Transformation { private static final String MAX_PARTITIONS_CONFIG = "max.partitions"; private static final String MAX_PARTITIONS_DOC = "The maximum number of partitions"; private static final int MAX_PARTITIONS_DEFAULT = 1; /** * The definition of the configurations. We just define a single configuration property here, * but you can chain multiple "define" methods together. Complex configurations may warrant * pulling all the config-related things into a separate class that extends {@link AbstractConfig} * and adds helper methods (e.g., "getMaxPartitions()"), and you'd use this class to parse the * parameters in {@link #configure(Map)} rather than {@link AbstractConfig}. */ private static final ConfigDef CONFIG_DEF = new ConfigDef().define(MAX_PARTITIONS_CONFIG, Type.INT, MAX_PARTITIONS_DEFAULT, Importance.HIGH, MAX_PARTITIONS_DOC); private int maxPartitions; @Override public void configure(Map configs) { // store any configuration parameters as fields ... AbstractConfig config = new AbstractConfig(CONFIG_DEF, configs); maxPartitions = config.getInt(MAX_PARTITIONS_CONFIG); } @Override public SourceRecord apply(SourceRecord record) { // Compute the desired partition here int actualPartition = record.kafkaPartition(); int desiredPartition = ... // Then create the new record with all of the existing fields except with the new partition ... return record.newRecord(record.topic(), desiredPartition, record.keySchema(), record.key(), record.valueSchema(), record.value(), record.timestamp()); } @Override public ConfigDef config() { return CONFIG_DEF; } @Override public void close() { // do nothing } }
ConfigDef
とAbstractConfig
機能はかなり便利ですし、カスタムバリデータと推薦を使用して、だけでなく、他の特性に依存しているコンフィギュレーション特性を有するなど、多くの興味深い事を、行うことができます。このことについてさらに知りたい場合は、この同じフレームワークも使用する既存のKafka Connectコネクタのいくつかを確認してください。
最後の1つです。 Kafka Connectスタンドアロンまたは分散ワーカーを実行する場合は、カスタムSMTとJARファイルを含むJARファイルを指すようにCLASSPATH環境変数を設定することを忘れないでください。SMTはに依存します。ただし、Kafkaによって提供されるものはです。 connect-standalone.sh
とconnect-distributed.sh
コマンドは、カフカJARをクラスパスに自動的に追加します。
- 1. Kafka接続JDBCコネクタ
- 2. Kafka:リモートブローカのトピックコンテンツを取得するためのkafka-connect-sinkコネクタの設定方法
- 3. Azure Blobストレージ用Kafkaコネクタ
- 4. Tomcat 8:カスタムファイルでコネクタを設定する
- 5. kafka elasticsearchコネクタのelasticsearch IDの生成
- 6. kafkaコネクタからポートにログをプッシュ
- 7. Apache-Kafka-ConnectのコンフルエントHDFSコネクタ、不明なマジックバイト、Kafka-To-Hdfs
- 8. Jetty 9 XMLコネクタ設定エラー
- 9. Arduino Uno wifi Mqttコネクタ設定
- 10. ラクダでKAFKAエンドポイントを設定する
- 11. kerberized KafkaでHortonworks NiFIを設定する
- 12. Windows 10でKafkaの設定
- 13. AWS - IP設定のKafkaブローカー
- 14. confluent-kafkaのプロデューサ設定
- 15. スパークのBigQueryコネクタ、設定EUの場所
- 16. ループバックMongoDBコネクタ固有の設定
- 17. PysparkストラクチャードストリーミングKafka設定エラー
- 18. Windows 7のコネクタ/ Jのパスを設定する
- 19. pyspark rdd kafkaのoffsetRange()関数を設定
- 20. コンフルント3.3アップグレード後にKafka-cassandraコネクタが失敗する
- 21. "mysql_options"を設定するC++コネクタのサンプルコードが必要
- 22. Tomcat用のTrustManagerコネクタをプログラムで設定する
- 23. logstashをkafkaに出力 - パーティションキーを設定
- 24. NameNode HAモードのクラスタを持つKafka HDFSコネクタ
- 25. kafka-console-consumerのSSLとACLの設定
- 26. mysqlコネクタで自動接続オプションを設定する方法C++
- 27. Spring Cloud Stream - Kafkaバインダーの統合/設定
- 28. Kafka HAの消費者設定
- 29. Spring Integration Kafka設定 - Eclipseでのエラー
- 30. camel kafka maxPollRecords設定の意味