単一のパーティションを使用する方法はありませんが、最も洗練された方法はトピックです。しかし、特定のメッセージが特定のパーティションから発信されていると指定する方法があります。 createDirectStream
のオーバーロードを使用する場合は、Function1[MessageAndMetadata, R]
が必要です。
たとえば、タイプがString
のキーとメッセージがあり、現在は1つのトピックからのみ消費しているとします。
val topicAndPartition: Map[TopicAndPartition, Long] = ???
val kafkaProperties: Map[String, String] = ???
KafkaUtils.createDirectStream[String,
String,
StringDecoder,
StringDecoder,
(String, String)](
streamingContext,
kafkaConfig.properties,
topicAndPartition,
(mam: MessageAndMetadata[String, String]) =>
(mam.partition, mam.message())
このようにして、私はパーティション(1)とその下にあるメッセージ(2)のタプルを出力しています。その後、私は特定のパーティションからのメッセージのみを含むように、このDStream[(String, String)]
をフィルタリングすることができます。
val filteredStream = kafkaDStream.filter { case (partition, _) => partition == 4 }
我々は複数のトピックから消費している場合は、私たちがフィルタリングするために、出力に話題とパーティションの両方のタプルをする必要があります適切なトピックを持つパーティション幸運にも、TopicAndPartition
という便利なクラスがあります。我々は持っていると思います:
(mam: MessageAndMetadata[String, String]) =>
(TopicAndPartition(mam.topic(), mam.partition()), mam.message())
そして:
val filteredStream = kafkaDStream.filter {
case (tap, _) => tap.topic == "mytopic" && tap.partition == 4
}
を