1

でストリームを完全に正常に動作するJavaでの私のコードされて次のように私のScalaはコードであるカフカは私がScalaの下に でストリームカフカを使用しようとしていますスカラ座

KStreamBuilder builder = new KStreamBuilder(); 
    KStream<String, String> textLines = builder.stream("TextLinesTopic"); 
    textLines.foreach((key,values) -> { 
     System.out.println(values); 
    }); 

    KafkaStreams streams = new KafkaStreams(builder, config); 
    streams.start(); 

val builder = new KStreamBuilder 
    val textLines:KStream[String, String] = builder.stream("TextLinesTopic") 
    textLines.foreach((key,value)-> { 
    println(key) 
    }) 

    val streams = new KafkaStreams(builder, config) 
    streams.start() 

スカラコードがコンパイルエラーをスローします。タイプ予想の不一致:ForEachAction [>文字列、>文字列]、実際の((いずれかの、いずれかの)、ユニット) が見つかりません:値キー 見つかりません:値値

誰もが内のストリームAPIを使用する方法を知っていますscala

答えて

4

あなたの構文は間違っています:)。 ->は、ペアを作成するためだけの作業なので、表現

(key,value)-> { 
    println(key) 
} 

は、コンパイラがどのようなタイプの情報を推測することはできません(とkeyvalueが不足している)ので、あなたがしている場合((どれ、どれ)、ユニット)

型を持ちます=>->を交換Scalaの2.12を使用して問題を解決する必要がありますが、あなたはスカラ座の古いバージョンを使用している場合は、明示的にJavaのbifunctionを実装する必要があります:

textLines.foreach(new BiFunction[T1, T2] { ... }) 
1

印刷方法を使用してkafkastreamを直接印刷することができます。

textlines.print

それはカフカのストリームを出力します。 print関数に引数を渡すことによって、キーまたは値のいずれかを印刷することもできます。

関連する問題