私はKafkaからデータを読み込み、Spark RDDを使用してCassandraテーブルに格納しようとしています。コードのコンパイル中値の分割は(String、String)のメンバーではありません
エラーを取得する:コードの下
/root/cassandra-count/src/main/scala/KafkaSparkCassandra.scala:69: value split is not a member of (String, String)
[error] val lines = messages.flatMap(line => line.split(',')).map(s => (s(0).toString, s(1).toDouble,s(2).toDouble,s(3).toDouble))
[error] ^
[error] one error found
[error] (compile:compileIncremental) Compilation failed
:私はインタラクティブspark-shell
から手動でコードを実行したときにそれがうまく動作しますが、エラー用のコードをコンパイルしながら、来ます。
// Create direct kafka stream with brokers and topics
val topicsSet = Set[String] (kafka_topic)
val kafkaParams = Map[String, String]("metadata.broker.list" -> kafka_broker)
val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet)
// Create the processing logic
// Get the lines, split
val lines = messages.map(line => line.split(',')).map(s => (s(0).toString, s(1).toDouble,s(2).toDouble,s(3).toDouble))
lines.saveToCassandra("stream_poc", "US_city", SomeColumns("city_name", "jan_temp", "lat", "long"))
@RameshMaharjan:コードなどの固有名詞をフォーマットしないでください。カフカとカサンドラは初期資本だけを必要とし、それだけです - 彼らはコードではありません。しかし、 'spark-shell'のようなものはOKです。なぜなら、コードのフォーマットはコンソールI/O(' spark-shell'は入力されたコマンドであると仮定します)に適しているからです。 – halfer