2017-11-29 7 views
0

ScalaでflatMapValuesをKafkaライブラリで使用するとエラーが発生します。ここに私のコードは次のとおりです。KafkaでflatMapValuesを使用する方法

val builder: KStreamBuilder = new KStreamBuilder() 
val textLines: KStream[String, String] = builder.stream("streams-plaintext-input") 
import collection.JavaConverters.asJavaIterableConverter 
val wordCounts: KTable[String, JLong] = textLines 
    .flatMapValues(textLine => textLine.toLowerCase.split("\\W+").toIterable.asJava) 
    .groupBy((_, word) => word) 
    .count("word-counts") 

と私はflatMapValuestextLineのエラーmissing parameter typeを取得しています。 flatMapValues((textLine: String) => textLine.toLowerCase.split("\\W+").toIterable.asJava)に置き換えても、それでも動作しません。

誰か知っていますか?私はこのような解決のScala 2.12.4での作業 おかげで、フェリペ

+0

は、手動で出力タイプを指定する必要があります。 Scalaはしばしばジェネリックを反転できません:https://docs.confluent.io/current/streams/faq.html#scala-compile-error-no-type-parameter-java-defined-trait-is-invariant-in-type- t –

+2

'flatMapValues {case textLine:String => ...}' –

+1

を試してみてください。カフカがJavaで構築されているため、これが起こっているようです(https://docs.confluent.io/current/streams/faq.html#scala-コンパイルエラーなし型パラメータのjava-defined-trait-is-invariant-in-type-t) –

答えて

0

:私は推測

val props = new Properties 
    props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-wordcount") 
    props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092") 
    props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String.getClass.getName) 
    props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String.getClass.getName) 

    val stringSerde: Serde[String] = Serdes.String() 
    val longSerde: Serde[Long] = Serdes.Long() 

    val builder = new StreamsBuilder() 
    val textLines: KStream[String, String] = builder.stream("streams-plaintext-input") 

    val topology: Topology = builder.build() 

    println(topology.describe()) 

    val wordCounts: KTable[String, Long] = textLines 
    .flatMapValues { textLine => 
     println(textLine) 
     println(topology.describe()) 
     textLine.toLowerCase.split("\\W+").toIterable.asJava 
    } 
    .groupBy((_, word) => word) 
    // this is a stateful computation config to the topology 
    .count("word-counts") 

    wordCounts.to(stringSerde, longSerde, "streams-wordcount-output") 

    val streams = new KafkaStreams(topology, props) 
    streams.start() 
関連する問題