2017-07-09 20 views
0

私はsparkストリーミングのデータをscala mavenプロジェクトを使用してcassandraに保存したいと考えています。Spark Cassandra Streaming

scala -cp /home/darif/TestProject/testmavenapp/target/testmavenapp-1.0-SNAPSHOT.jar /home/darif/TestProject/testmavenapp/src/main/java/org/apache/maventestsparkproject/SparkCassandra.scala 

:これは、カサンドラ・テーブルにデータを保存するコード

import org.apache.maventestsparkproject._ 
import com.datastax.spark.connector.streaming._ 
import com.datastax.spark.connector.SomeColumns 

import org.apache.spark.SparkConf 
import org.apache.spark.streaming._ 
import org.apache.spark.streaming.kafka._ 

object SparkCassandra { 
    def main(args: Array[String]) { 

    val sparkConf = new SparkConf() 
     .setAppName("KakfaStreamToCassandra").setMaster("local[*]") 
     .set("spark.cassandra.connection.host", "localhost") 
     .set("spark.cassandra.connection.port", "9042") 

    val topics = "fayssal1,fayssal2" 

    val ssc = new StreamingContext(sparkConf, Seconds(5)) 

    val topicsSet = topics.split(",").toSet 
    val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers) 
    val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet) 

    val lines = messages.map(_._2) 

    val words = lines.flatMap(_.split(" ")) 

    val wordCounts = words.map(x => (x, 1L)).reduceByKey(_ + _) 

    wordCounts.saveToCassandra(keysspace, table, SomeColumns("word", "count")) 

    ssc.awaitTermination() 

    ssc.start() 
} 
} 

プロジェクトがsuccessfly builtingされ、私はこのcommandeを実行すると、これは

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> 
    <modelVersion>4.0.0</modelVersion> 

    <groupId>org.apache.maventestsparkproject</groupId> 
    <artifactId>testmavenapp</artifactId> 
    <version>1.0-SNAPSHOT</version> 
    <packaging>jar</packaging> 
    <name>testmavenapp</name> 
    <url>http://maven.apache.org</url> 

    <properties> 
     <scala.version>2.11.8</scala.version> 
     <spark.version>1.6.2</spark.version> 
     <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> 
    </properties> 




    <dependencies> 
    <dependency> 
     <groupId>junit</groupId> 
     <artifactId>junit</artifactId> 
     <version>3.8.1</version> 
     <scope>test</scope> 
    </dependency> 
    <dependency> 
     <groupId>org.scala-lang</groupId> 
     <artifactId>scala-library</artifactId> 
     <version>${scala.version}</version> 
    </dependency> 

    <dependency> 
     <groupId>org.apache.spark</groupId> 
     <artifactId>spark-core_2.11</artifactId> 
     <version>${spark.version}</version> 
    </dependency> 

    <dependency> 
     <groupId>org.apache.spark</groupId> 
     <artifactId>spark-sql_2.11</artifactId> 
     <version>${spark.version}</version> 
    </dependency> 

    <dependency> 
     <groupId>org.apache.spark</groupId> 
     <artifactId>spark-streaming_2.10</artifactId> 
     <version>1.3.1</version> 
    </dependency> 

    <dependency> 
     <groupId>org.apache.spark</groupId> 
     <artifactId>spark-streaming-kafka_2.11</artifactId> 
     <version>${spark.version}</version> 
    </dependency> 
    <dependency> 
     <groupId>com.datastax.spark</groupId> 
     <artifactId>spark-cassandra-connector_2.10</artifactId> 
     <version>1.0.0-rc4</version> 
    </dependency> 
    <dependency> 
     <groupId>com.datastax.spark</groupId> 
     <artifactId>spark-cassandra-connector-java_2.10</artifactId> 
     <version>1.0.0-rc4</version> 
    </dependency> 
    <dependency> 
     <groupId>com.datastax.cassandra</groupId> 
     <artifactId>cassandra-driver-core</artifactId> 
     <version>2.1.5</version> 
    </dependency> 
    <dependency> 
     <groupId>com.datastax.spark</groupId> 
     <artifactId>spark-cassandra-connector_2.11</artifactId> 
     <version>${spark.version}</version> 
    </dependency> 
    </dependencies> 
</project> 

私のpom.xmlですが、あります次のようなエラーが表示されます。

home/darif/TestProject/testmavenapp/src/main/java/org/apache/maventestsparkproject/SparkCassandra.scala:1: error: object apache is not a member of package org 
import org.apache.maventestsparkproject._ 
     ^
/home/darif/TestProject/testmavenapp/src/main/java/org/apache/maventestsparkproject/SparkCassandra.scala:2: error: object datastax is not a member of package com 
import com.datastax.spark.connector.streaming._ 
     ^
/home/darif/TestProject/testmavenapp/src/main/java/org/apache/maventestsparkproject/SparkCassandra.scala:3: error: object datastax is not a member of package com 
import com.datastax.spark.connector.SomeColumns 
     ^
/home/darif/TestProject/testmavenapp/src/main/java/org/apache/maventestsparkproject/SparkCassandra.scala:5: error: object apache is not a member of package org 
import org.apache.spark.SparkConf 
     ^
/home/darif/TestProject/testmavenapp/src/main/java/org/apache/maventestsparkproject/SparkCassandra.scala:6: error: object apache is not a member of package org 
import org.apache.spark.streaming._ 
     ^
/home/darif/TestProject/testmavenapp/src/main/java/org/apache/maventestsparkproject/SparkCassandra.scala:7: error: object apache is not a member of package org 
import org.apache.spark.streaming.kafka._ 
     ^
/home/darif/TestProject/testmavenapp/src/main/java/org/apache/maventestsparkproject/SparkCassandra.scala:12: error: not found: type SparkConf 
    val sparkConf = new SparkConf() 
          ^
/home/darif/TestProject/testmavenapp/src/main/java/org/apache/maventestsparkproject/SparkCassandra.scala:19: error: not found: type StreamingContext 
    val ssc = new StreamingContext(sparkConf, Seconds(5)) 
        ^
/home/darif/TestProject/testmavenapp/src/main/java/org/apache/maventestsparkproject/SparkCassandra.scala:19: error: not found: value Seconds 
    val ssc = new StreamingContext(sparkConf, Seconds(5)) 
               ^
/home/darif/TestProject/testmavenapp/src/main/java/org/apache/maventestsparkproject/SparkCassandra.scala:22: error: not found: value brokers 
    val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers) 
                     ^
/home/darif/TestProject/testmavenapp/src/main/java/org/apache/maventestsparkproject/SparkCassandra.scala:23: error: not found: value KafkaUtils 
    val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet) 
         ^
/home/darif/TestProject/testmavenapp/src/main/java/org/apache/maventestsparkproject/SparkCassandra.scala:23: error: not found: type StringDecoder 
    val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet) 
                    ^
/home/darif/TestProject/testmavenapp/src/main/java/org/apache/maventestsparkproject/SparkCassandra.scala:23: error: not found: type StringDecoder 
    val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet) 
                        ^
13 errors found 

私が使用しています:

Scala 2.11.8 
Spark 1.6.2 
Kafka Client APIs 0.8.2.11 
Cassandra 3.9 
Datastax Spark-Cassandra Connector compatible with Spark 1.6.2 

答えて

0

をアプリケーションのクラスパスが正しく設定されていません。​​をランチャーとして使用することは、さまざまな場所でクラスパスの大半をセットアップすることをお勧めします。サードパーティの依存関係は--packagesを使って設定されます。あなたはスパークコンファレンスで様々なものを設定するだけでなく、手動でスパークのすべてを含むようにクラスパスを設定するカスタムで同じ結果を得ることができると言いました

、DSE、カフカ図書館などがあります。

関連する問題