1
Scala 2.10.6
とSpark 1.6.2
を使用して、カフカトピックからのメッセージを消費したいと思います。カフカのために私はこの依存関係を使用しています:ScalaのKafkaコンシューマをコンパイルできません
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.10</artifactId>
<version>1.6.2</version>
</dependency>
をこのコードは罰金コンパイル、しかし私はauto.offset.reset
を定義したいとここで問題が発生:
val topicMap = topic.split(",").map((_, kafkaNumThreads.toInt)).toMap
val data = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap,
StorageLevel.MEMORY_AND_DISK_SER_2).map(_._2)
私はkafkaParams
を追加すると、それはもうコンパイルされません。
val kafkaParams = Map[String, String](
"zookeeper.connect" -> zkQuorum, "group.id" -> group,
"zookeeper.connection.timeout.ms" -> "10000",
"auto.offset.reset" -> "smallest")
val data = KafkaUtils.createStream(ssc, kafkaParams, topicMap,
StorageLevel.MEMORY_AND_DISK_SER_2).map(_._2)
エラーメッセージ:
94: error: missing parameter type for expanded function ((x$3) => x$3._2)
[ERROR] StorageLevel.MEMORY_AND_DISK_SER_2).map(_._2)
私はcreateStream
のパラメータの多くの異なる組み合わせを試しましたが、すべてが失敗します。誰か助けてもらえますか?
[OK]をクリックします。確かに問題は、私がリモートのKafkaキューからメッセージを消費したいということです。私はそれらを 'curl'コマンドとコンフルエントなAPIを使って端末から得ることができます。しかし、私がScalaコードを実行すると、私はそれらを取得しません。だから、私の前提はオフセットを指定する必要があるということです。 – Dinosaurius
@Dinosauriusあなたの質問のエラーは、オフセットとは関係ありません。これは単にコンパイラが正しい型を推論できないことです。 –
ええ、私は知っています。オフセットを設定する必要がある理由を説明したかっただけです。 – Dinosaurius