2016-11-26 1 views
1

私は、Pythonで、Spark Streaming + Kafka Integration Guide (Kafka broker version 0.8.2.1 or higher)に記載されている例に従ってApache Sparkを使用してKafkaメッセージをストリーミングするためにSparkアプリケーションを作成しましたが、メッセージを送信する前にシャットダウンしています。Spark Streamingアプリケーションがすぐにシャットダウンされ、カフカレコードを処理しないのはなぜですか?

ここで、シャットダウンセクションが出力から開始されます。

16/11/26 17:11:06 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(driver, 1********6, 58045) 
16/11/26 17:11:06 INFO VerifiableProperties: Verifying properties 
16/11/26 17:11:06 INFO VerifiableProperties: Property group.id is overridden to 
16/11/26 17:11:06 INFO VerifiableProperties: Property zookeeper.connect is overridden to 
16/11/26 17:11:07 INFO SparkContext: Invoking stop() from shutdown hook 
16/11/26 17:11:07 INFO SparkUI: Stopped Spark web UI at http://192.168.1.16:4040 
16/11/26 17:11:07 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped! 
16/11/26 17:11:07 INFO MemoryStore: MemoryStore cleared 
16/11/26 17:11:07 INFO BlockManager: BlockManager stopped 
16/11/26 17:11:07 INFO BlockManagerMaster: BlockManagerMaster stopped 
16/11/26 17:11:07 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped! 
16/11/26 17:11:07 INFO SparkContext: Successfully stopped SparkContext 
16/11/26 17:11:07 INFO ShutdownHookManager: Shutdown hook called 
16/11/26 17:11:07 INFO ShutdownHookManager: Deleting directory /private/var/folders/yn/t3pvrk7s231_11ff2lqr4jhr0000gn/T/spark-1876feee-9b71-413e-a505-99c414aafabf/pyspark-1d97c3dd-0889-42ed-b559-d0fd473faa22 
16/11/26 17:11:07 INFO ShutdownHookManager: Deleting directory /private/var/folders/yn/t3pvrk7s231_11ff2lqr4jhr0000gn/T/spark-1876feee-9b71-413e-a505-99c414aafabf 

私はそれを待つように言わなければならない方法があるのでしょうか?

全コード:

from pyspark.streaming.kafka import KafkaUtils 
from pyspark import SparkContext 
from pyspark.streaming import StreamingContext 

sc = SparkContext("local[2]", "TwitterWordCount") 
ssc = StreamingContext(sc, 1) 

directKafkaStream = KafkaUtils.createDirectStream(ssc, ["next"], {"metadata.broker.list": "localhost:9092"}) 

offsetRanges = [] 

def storeOffsetRanges(rdd): 
    global offsetRanges 
    offsetRanges = rdd.offsetRanges() 
    return rdd 

def printOffsetRanges(rdd): 
    for o in offsetRanges: 
     print("Printing! %s %s %s %s" % o.topic, o.partition, o.fromOffset, o.untilOffset) 

directKafkaStream\ 
    .transform(storeOffsetRanges)\ 
    .foreachRDD(printOffsetRanges) 

そして、ここでは役に立ちます場合には、それを実行するコマンドです。

spark-submit --packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.0.2 producer.py 

答えて

関連する問題