2016-11-25 10 views
0

私はApache Spark Streamingの新機能です。 Kinesis Streamから価値を読み取るためにSparkを構築しようとしています。これはスパークストリーミング - Kinesisからの読み込み時のエラー

import settings 
from pyspark import SparkContext 
from pyspark.streaming import StreamingContext 
from pyspark.streaming.kinesis import KinesisUtils, InitialPositionInStream 
spark_context = SparkContext(master="local[2]", appName=settings.KINESIS_APP_NAME) 

streaming_context = StreamingContext(sparkContext=spark_context, batchDuration=settings.BATCH_DURATION) 

kinesis_good_stream = KinesisUtils.createStream(
ssc=streaming_context, kinesisAppName=settings.KINESIS_APP_NAME, 
streamName=settings.KINESIS_GOOD_STREAM, endpointUrl=settings.KINESIS_ENDPOINT, 
awsAccessKeyId=settings.AWS_ACCESS_KEY, awsSecretKey=settings.AWS_SECRET_KEY, 
checkpointInterval=settings.KINESIS_CHECKPOINT_INTERVAL, regionName=settings.KINESIS_REGION, 
initialPositionInStream=InitialPositionInStream.LATEST) 

counts = kinesis_good_stream.flatMap(lambda line: line.split(" ")) \ 
    .map(lambda word: (word, 1)) \ 
    .reduceByKey(lambda a, b: a+b) 
counts.pprint() 

streaming_context.start() 
streaming_context.awaitTermination() 

私のPythonスクリプトでの設定は、私は私のコレクタから私のDjangoプロジェクト

INFO:snowplow_tracker.emitters:GET request finished with status code: 200 
INFO:snowplow_tracker.emitters:POST request finished with status code: 200 

から

spark-submit --jars spark-streaming-kinesis-asl-assembly.jar kinesis.py 

このコマンドでスクリプトを実行

# Kinesis Configuration 
KINESIS_REGION = 'ap-southeast-1' 
KINESIS_ENDPOINT = 'kinesis.ap-southeast-1.amazonaws.com' 
KINESIS_GOOD_STREAM = 'GoodStream' 
KINESIS_BAD_STREAM = 'BadStream' 
KINESIS_CHECKPOINT_INTERVAL = 2000 
KINESIS_APP_NAME = 'test-spark' 

# Spark context 
BATCH_DURATION = 2 

# AWS Credential 
AWS_ACCESS_KEY = '' 
AWS_SECRET_KEY = '' 

を提出します、Kに書き込むことに気づいた私のキネシスストリーム用

------------------------------------------- 
Time: 2016-11-25 07:59:25 
------------------------------------------- 

16/11/25 07:59:30 ERROR Executor: Exception in task 0.0 in stage 345.0 (TID 173) 
java.lang.NoSuchMethodError: org.apache.spark.storage.BlockManager.get(Lorg/apache/spark/storage/BlockId;)Lscala/Option; 
at org.apache.spark.streaming.kinesis.KinesisBackedBlockRDD.getBlockFromBlockManager$1(KinesisBackedBlockRDD.scala:104) 
at org.apache.spark.streaming.kinesis.KinesisBackedBlockRDD.compute(KinesisBackedBlockRDD.scala:117) 
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) 
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283) 
at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63) 
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) 
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283) 
at org.apache.spark.api.python.PairwiseRDD.compute(PythonRDD.scala:390) 
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) 
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283) 
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79) 
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47) 
at org.apache.spark.scheduler.Task.run(Task.scala:86) 
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) 
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) 
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) 
at java.lang.Thread.run(Thread.java:745) 

をストリーミング私のスパークについては

08:00:19.720 [pool-1-thread-9] INFO c.s.s.c.s.sinks.KinesisSink - Successfully wrote 2 out of 2 records 

成功するinesis、私は1つのシャードを使用し、2個のコア、エラーを解決するために管理

+0

あなたのバージョンを確認できるようにsbt/mavenビルドファイルを投稿できますか?特にawsライブラリとsparkのバージョン – ImDarrenG

+0

申し訳ありません、あなたは、私の悪いpysparkを使用していることに気づいた – ImDarrenG

+0

私はスパーク2.0.2からpysparkを使用しています –

答えて

0

でスパークコンテキストを設定しています。私はSpark-2.0.2で動作していますが、java.lang.NoSuchMethodErrorを引き起こすstreaming-kinesis-asl-assembly.2.10-2.0.0.jarを使用しています。

関連する問題