0
私はsparkストリーミングのデータをscala mavenプロジェクトを使用してcassandraに保存したいと考えています。Spark Cassandra Streaming
scala -cp /home/darif/TestProject/testmavenapp/target/testmavenapp-1.0-SNAPSHOT.jar /home/darif/TestProject/testmavenapp/src/main/java/org/apache/maventestsparkproject/SparkCassandra.scala
:これは、カサンドラ・テーブルにデータを保存するコード
import org.apache.maventestsparkproject._
import com.datastax.spark.connector.streaming._
import com.datastax.spark.connector.SomeColumns
import org.apache.spark.SparkConf
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka._
object SparkCassandra {
def main(args: Array[String]) {
val sparkConf = new SparkConf()
.setAppName("KakfaStreamToCassandra").setMaster("local[*]")
.set("spark.cassandra.connection.host", "localhost")
.set("spark.cassandra.connection.port", "9042")
val topics = "fayssal1,fayssal2"
val ssc = new StreamingContext(sparkConf, Seconds(5))
val topicsSet = topics.split(",").toSet
val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers)
val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet)
val lines = messages.map(_._2)
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1L)).reduceByKey(_ + _)
wordCounts.saveToCassandra(keysspace, table, SomeColumns("word", "count"))
ssc.awaitTermination()
ssc.start()
}
}
プロジェクトがsuccessfly builtingされ、私はこのcommandeを実行すると、これは
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.apache.maventestsparkproject</groupId>
<artifactId>testmavenapp</artifactId>
<version>1.0-SNAPSHOT</version>
<packaging>jar</packaging>
<name>testmavenapp</name>
<url>http://maven.apache.org</url>
<properties>
<scala.version>2.11.8</scala.version>
<spark.version>1.6.2</spark.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>3.8.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.10</artifactId>
<version>1.3.1</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>com.datastax.spark</groupId>
<artifactId>spark-cassandra-connector_2.10</artifactId>
<version>1.0.0-rc4</version>
</dependency>
<dependency>
<groupId>com.datastax.spark</groupId>
<artifactId>spark-cassandra-connector-java_2.10</artifactId>
<version>1.0.0-rc4</version>
</dependency>
<dependency>
<groupId>com.datastax.cassandra</groupId>
<artifactId>cassandra-driver-core</artifactId>
<version>2.1.5</version>
</dependency>
<dependency>
<groupId>com.datastax.spark</groupId>
<artifactId>spark-cassandra-connector_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
</dependencies>
</project>
私のpom.xmlですが、あります次のようなエラーが表示されます。
home/darif/TestProject/testmavenapp/src/main/java/org/apache/maventestsparkproject/SparkCassandra.scala:1: error: object apache is not a member of package org
import org.apache.maventestsparkproject._
^
/home/darif/TestProject/testmavenapp/src/main/java/org/apache/maventestsparkproject/SparkCassandra.scala:2: error: object datastax is not a member of package com
import com.datastax.spark.connector.streaming._
^
/home/darif/TestProject/testmavenapp/src/main/java/org/apache/maventestsparkproject/SparkCassandra.scala:3: error: object datastax is not a member of package com
import com.datastax.spark.connector.SomeColumns
^
/home/darif/TestProject/testmavenapp/src/main/java/org/apache/maventestsparkproject/SparkCassandra.scala:5: error: object apache is not a member of package org
import org.apache.spark.SparkConf
^
/home/darif/TestProject/testmavenapp/src/main/java/org/apache/maventestsparkproject/SparkCassandra.scala:6: error: object apache is not a member of package org
import org.apache.spark.streaming._
^
/home/darif/TestProject/testmavenapp/src/main/java/org/apache/maventestsparkproject/SparkCassandra.scala:7: error: object apache is not a member of package org
import org.apache.spark.streaming.kafka._
^
/home/darif/TestProject/testmavenapp/src/main/java/org/apache/maventestsparkproject/SparkCassandra.scala:12: error: not found: type SparkConf
val sparkConf = new SparkConf()
^
/home/darif/TestProject/testmavenapp/src/main/java/org/apache/maventestsparkproject/SparkCassandra.scala:19: error: not found: type StreamingContext
val ssc = new StreamingContext(sparkConf, Seconds(5))
^
/home/darif/TestProject/testmavenapp/src/main/java/org/apache/maventestsparkproject/SparkCassandra.scala:19: error: not found: value Seconds
val ssc = new StreamingContext(sparkConf, Seconds(5))
^
/home/darif/TestProject/testmavenapp/src/main/java/org/apache/maventestsparkproject/SparkCassandra.scala:22: error: not found: value brokers
val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers)
^
/home/darif/TestProject/testmavenapp/src/main/java/org/apache/maventestsparkproject/SparkCassandra.scala:23: error: not found: value KafkaUtils
val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet)
^
/home/darif/TestProject/testmavenapp/src/main/java/org/apache/maventestsparkproject/SparkCassandra.scala:23: error: not found: type StringDecoder
val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet)
^
/home/darif/TestProject/testmavenapp/src/main/java/org/apache/maventestsparkproject/SparkCassandra.scala:23: error: not found: type StringDecoder
val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet)
^
13 errors found
私が使用しています:
Scala 2.11.8
Spark 1.6.2
Kafka Client APIs 0.8.2.11
Cassandra 3.9
Datastax Spark-Cassandra Connector compatible with Spark 1.6.2