2

私はKafkaからデータを読み込み、Spark RDDを使用してCassandraテーブルに格納しようとしています。コードのコンパイル中値の分割は(String、String)のメンバーではありません

エラーを取得する:コードの下

/root/cassandra-count/src/main/scala/KafkaSparkCassandra.scala:69: value split is not a member of (String, String) 

[error]  val lines = messages.flatMap(line => line.split(',')).map(s => (s(0).toString, s(1).toDouble,s(2).toDouble,s(3).toDouble)) 
[error]            ^
[error] one error found 

[error] (compile:compileIncremental) Compilation failed 

:私はインタラクティブspark-shellから手動でコードを実行したときにそれがうまく動作しますが、​​エラー用のコードをコンパイルしながら、来ます。

// Create direct kafka stream with brokers and topics 
val topicsSet = Set[String] (kafka_topic) 
val kafkaParams = Map[String, String]("metadata.broker.list" -> kafka_broker) 
val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet) 

// Create the processing logic 
// Get the lines, split 
val lines = messages.map(line => line.split(',')).map(s => (s(0).toString, s(1).toDouble,s(2).toDouble,s(3).toDouble)) 
lines.saveToCassandra("stream_poc", "US_city", SomeColumns("city_name", "jan_temp", "lat", "long")) 
+0

@RameshMaharjan:コードなどの固有名詞をフォーマットしないでください。カフカとカサンドラは初期資本だけを必要とし、それだけです - 彼らはコードではありません。しかし、 'spark-shell'のようなものはOKです。なぜなら、コードのフォーマットはコンソールI/O(' spark-shell'は入力されたコマンドであると仮定します)に適しているからです。 – halfer

答えて

1

KafkaUtils.createDirectStream(カフカ内のメッセージは、必要に応じてキー止めされているので)キーと値のタプルを返します。あなたの場合、タイプは(String, String)です。あなたはを分割したい場合は、まずそれを取らなければならない:

val lines = 
    messages 
    .map(line => line._2.split(',')) 
    .map(s => (s(0).toString, s(1).toDouble,s(2).toDouble,s(3).toDouble)) 

または一部の機能の構文を使用して:

val lines = 
    messages 
    .map { case (_, value) => value.split(',') } 
    .map(s => (s(0).toString, s(1).toDouble,s(2).toDouble,s(3).toDouble)) 
2

をカフカのすべてのメッセージはキーが付いています。この場合のオリジナルのカフカストリームは、messagesで、タプル(key,value)のストリームです。

コンパイルエラーが指摘するように、タプルにはsplitメソッドがありません。我々はここで何をしたいのか

は次のとおりです。

messages.map{ case (key, value) => value.split(','))} ... 
関連する問題