2016-10-13 8 views
3

Spark Streamingを使用して、カフカのトピックの特定のパーティションを消費しようとしています。スパークストリーミング - カフカトピックの特定のパーティションを消費することは可能ですか?

KafkaUtilsクラスでこのユースケースのメソッドが表示されません。

createRDDと呼ばれる方法があります。基本的にはoffsetsが必要ですが、ストリーミング以外のアプリケーションでのみ有効です。スパークストリーミングを使用してカフカのトピックの特定のパーティションを消費する方法はありますか?

答えて

2

単一のパーティションを使用する方法はありませんが、最も洗練された方法はトピックです。しかし、特定のメッセージが特定のパーティションから発信されていると指定する方法があります。 createDirectStreamのオーバーロードを使用する場合は、Function1[MessageAndMetadata, R]が必要です。

たとえば、タイプがStringのキーとメッセージがあり、現在は1つのトピックからのみ消費しているとします。

val topicAndPartition: Map[TopicAndPartition, Long] = ??? 
val kafkaProperties: Map[String, String] = ??? 

KafkaUtils.createDirectStream[String, 
           String, 
           StringDecoder, 
           StringDecoder, 
           (String, String)](
     streamingContext, 
     kafkaConfig.properties, 
     topicAndPartition, 
     (mam: MessageAndMetadata[String, String]) => 
      (mam.partition, mam.message()) 

このようにして、私はパーティション(1)とその下にあるメッセージ(2)のタプルを出力しています。その後、私は特定のパーティションからのメッセージのみを含むように、このDStream[(String, String)]をフィルタリングすることができます。

val filteredStream = kafkaDStream.filter { case (partition, _) => partition == 4 } 

我々は複数のトピックから消費している場合は、私たちがフィルタリングするために、出力に話題とパーティションの両方のタプルをする必要があります適切なトピックを持つパーティション幸運にも、TopicAndPartitionという便利なクラスがあります。我々は持っていると思います:

(mam: MessageAndMetadata[String, String]) => 
    (TopicAndPartition(mam.topic(), mam.partition()), mam.message()) 

そして:

val filteredStream = kafkaDStream.filter { 
    case (tap, _) => tap.topic == "mytopic" && tap.partition == 4 
} 
関連する問題