2017-06-28 15 views
2

外部のソースからデータを取り出してトピックに格納するカスタムKafkaコネクタ(Javaで書かれ、Kafka ConnectのJava APIを使用)を使用しています。カスタムパーティショニング戦略を設定する必要があります。 partitioner.class propertyを設定することで、のカスタム設定はのカフカプロデューサーで可能です。ただし、このプロパティはKafkaコネクタでは何も実行されていません。 Kafka Connect(私のコネクタを実行するのにconnect-standaloneスクリプトを使用しています)を設定するには、私が書いたカスタムPartitionerを使用するにはどうしたらいいですか?Kafkaコネクタのパーティションストラテジを設定する

答えて

4

ソースコネクタは、各ソースレコードが書き込まれるパーティションをSourceRecordpartitionフィールドで制御できます。これが独自のコネクタであれば、これは最も簡単です。

ただし、ソースコネクタの各レコードの分割方法を変更する場合は、ソースレコードのpartitionフィールドを上書きするSMT(Single Message Transform)を使用できます。 org.apache.kafka.connect.transforms.Transformationを実装し、独自のパーティショニングロジックを使用してカスタムSMTを作成しなければならない可能性がありますが、実際にはカスタムKafkaパーティショナーを作成するよりも簡単です。

たとえば、構成プロパティの使用方法と、希望のパーティション番号を持つ新しいSourceRecordインスタンスを作成する方法を示す概念的なカスタム変換があります。サンプルは実際にはどのような真のパーティショニングロジックもないので不完全ですが、それは良い出発点であるはずです。

 
package io.acme.example; 

import org.apache.kafka.common.config.AbstractConfig; 
import org.apache.kafka.common.config.ConfigDef; 
import org.apache.kafka.common.config.ConfigDef.Importance; 
import org.apache.kafka.common.config.ConfigDef.Type; 
import org.apache.kafka.connect.source.SourceRecord; 
import org.apache.kafka.connect.transforms.Transformation; 

import java.util.Map; 

public class CustomPartitioner implements Transformation { 

    private static final String MAX_PARTITIONS_CONFIG = "max.partitions"; 
    private static final String MAX_PARTITIONS_DOC = "The maximum number of partitions"; 
    private static final int MAX_PARTITIONS_DEFAULT = 1; 

    /** 
    * The definition of the configurations. We just define a single configuration property here, 
    * but you can chain multiple "define" methods together. Complex configurations may warrant 
    * pulling all the config-related things into a separate class that extends {@link AbstractConfig} 
    * and adds helper methods (e.g., "getMaxPartitions()"), and you'd use this class to parse the 
    * parameters in {@link #configure(Map)} rather than {@link AbstractConfig}. 
    */ 
    private static final ConfigDef CONFIG_DEF = new ConfigDef().define(MAX_PARTITIONS_CONFIG, Type.INT, MAX_PARTITIONS_DEFAULT, Importance.HIGH, MAX_PARTITIONS_DOC); 

    private int maxPartitions; 

    @Override 
    public void configure(Map configs) { 
    // store any configuration parameters as fields ... 
    AbstractConfig config = new AbstractConfig(CONFIG_DEF, configs); 
    maxPartitions = config.getInt(MAX_PARTITIONS_CONFIG); 
    } 

    @Override 
    public SourceRecord apply(SourceRecord record) { 
    // Compute the desired partition here 
    int actualPartition = record.kafkaPartition(); 
    int desiredPartition = ... 
    // Then create the new record with all of the existing fields except with the new partition ... 
    return record.newRecord(record.topic(), desiredPartition, 
          record.keySchema(), record.key(), 
          record.valueSchema(), record.value(), 
          record.timestamp()); 
    } 

    @Override 
    public ConfigDef config() { 
    return CONFIG_DEF; 
    } 

    @Override 
    public void close() { 
    // do nothing 
    } 
} 

ConfigDefAbstractConfig機能はかなり便利ですし、カスタムバリデータと推薦を使用して、だけでなく、他の特性に依存しているコンフィギュレーション特性を有するなど、多くの興味深い事を、行うことができます。このことについてさらに知りたい場合は、この同じフレームワークも使用する既存のKafka Connectコネクタのいくつかを確認してください。

最後の1つです。 Kafka Connectスタンドアロンまたは分散ワーカーを実行する場合は、カスタムSMTとJARファイルを含むJARファイルを指すようにCLASSPATH環境変数を設定することを忘れないでください。SMTはに依存します。ただし、Kafkaによって提供されるものはです。 connect-standalone.shconnect-distributed.shコマンドは、カフカJARをクラスパスに自動的に追加します。

関連する問題