2017-05-07 8 views
2

kafka-streamsアプリケーションを開発するのは初めてです。私のストリームプロセッサは、入力jsonメッセージのユーザキーの値に基づいてjsonメッセージをソートすることを意図しています。kafka-streamsを使用してjson入力ストリームを条件付きでソートする

Message 1: {"UserID": "1", "Score":"123", "meta":"qwert"} 
Message 2: {"UserID": "5", "Score":"780", "meta":"mnbvs"} 
Message 3: {"UserID": "2", "Score":"0", "meta":"fghjk"} 

私はダイナミックな解決策がないことをここにDynamically connecting a Kafka input stream to multiple output streams読みました。

私の場合、私は入力ストリームをソートするために必要なユーザーキーと出力トピックを知っています。そこで私は、各プロセッサアプリケーションが異なるUserIDと一致する各ユーザーに固有のプロセッサアプリケーションを個別に作成しています。

すべての異なるストリームプロセッサアプリケーションは、kafkaの同じjson入力トピックから読み込みますが、プリセットされたユーザー条件が満たされている場合は、それぞれのユーザーが出力トピックにメッセージを書き込みます。

public class SwitchStream extends AbstractProcessor<String, String> { 
     @Override 
     public void process(String key, String value) { 
      HashMap<String, String> message = new HashMap<>(); 
      ObjectMapper mapper = new ObjectMapper(); 
      try { 
       message = mapper.readValue(value, HashMap.class); 
      } catch (IOException e){} 

      // User condition UserID = 1 
      if(message.get("UserID").equals("1")) { 
       context().forward(key, value); 
       context().commit(); 
      } 
     } 

     public static void main(String[] args) throws Exception { 
      Properties props = new Properties(); 
      props.put(StreamsConfig.APPLICATION_ID_CONFIG, "sort-stream-processor"); 
      props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); 
      props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); 
      props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); 

      props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); 
      TopologyBuilder builder = new TopologyBuilder(); 
      builder.addSource("Source", "INPUT_TOPIC"); 
      builder.addProcessor("Process", SwitchStream::new, "Source"); 
      builder.addSink("Sink", "OUTPUT_TOPIC", "Process"); 

      KafkaStreams streams = new KafkaStreams(builder, props); 
      streams.start(); 
     } 
} 

質問1: は、それがあれば代わりに低レベルプロセッサのAPIを高レベルストリームDSLを使用して簡単に同じ機能を実現することは可能ですか? (高水準ストリームDSLの他のオンライン例を理解するのは難しいと思っています)

質問2: 入力されたjsonトピックは、20K-25K EPSの高レートで入力されています。私のプロセッサアプリケーションは、この入力ストリームに追いつくことができないようです。私は各プロセスの複数のインスタンスを展開しようとしましたが、結果はどこにでもありたいです。理想的には、各プロセッサインスタンスは3-5K EPSを処理できる必要があります。

ハイレベルストリームDSLを使用してプロセッサロジックを改善したり、同じプロセッサロジックを書き込む方法はありますか?それは違いをもたらすだろうか?

答えて

3

filter()を使用して高レベルのDSLでこれを行うことができます(メッセージはuserID==1の場合のみ有効です)。 KStream#branch()を使用してこのフィルタパターンを一般化することができます(詳細はhttp://docs.confluent.io/current/streams/developer-guide.html#stateless-transformationsのドキュメントを参照してください)。パフォーマンスについてhttp://kafka.apache.org/0102/javadoc/index.html?org/apache/kafka/streams

KStreamBuilder builder = new KStreamBuilder(); 
builder.stream("INPUT_TOPIC") 
     .filter(new Predicate() { 
      @Overwrite 
      boolean test(String key, String value) { 
       // put you processor logic here 
       return message.get("UserID").equals("1") 
      } 
     }) 
     .to("OUTPUT_TOPIC"); 

:またJavadocを読んでください。 1つのインスタンスで10K +レコードを処理できる必要があります。問題が何であるか、さらに詳しい情報なしで伝えるのは難しいです。私はカフカユーザーリスト(http://kafka.apache.org/contactを参照)にお尋ねすることをお勧めします

+0

ありがとうマティアス!私が低EPSで抱えていた問題は、Kafka +環境の古いバージョンに関連していました。 – iron3rd

関連する問題