2017-05-27 1 views
0

私はスパークストリーミングに関する何かを学んでおり、トップ5の単語を見つけるように設計されたプログラムがあります。120秒以内に応答を受信できません。このタイムアウトはspark.rpc.askTimeoutによって制御されます

import org.apache.spark.streaming.StreamingContext 
import org.apache.spark.SparkConf 
import org.apache.spark.streaming.Seconds 

object Top5{ 
    def main(args:Array[String]){ 
    val conf=new SparkConf() 
    conf.setAppName("AppName") 
    conf.setMaster("spark://SparkMaster:7077") 
    val ssc=new StreamingContext(conf,Seconds(10)) 
    val hottestStream=ssc.socketTextStream("SparkMaster:7077", 9999) 
    val searchPair=hottestStream.map(_.split("")(1)).map(item=>(item,1)) 
    val hottestDStream=searchPair.reduceByKeyAndWindow((v1:Int,v2:Int)=>v1+v2,Seconds(60),Seconds(20)) 
    hottestDStream.transform(hottestItemRDD=>{ 
    val top5=hottestItemRDD.map(pair=>(pair._2,pair._1)).sortByKey(false) 
       .map(pair=>(pair._2,pair._1)).take(3) 
    for(item<-top5){ 
     println(item) 
    } 
    hottestItemRDD} 
).print() 
    ssc.start() 
    ssc.awaitTermination() 
}} 

私はスパーククラスタ環境でそれを行っておりますように、エラーが

Cannot receive any reply in 120 seconds. This timeout is controlled by spark.rpc.askTimeout 

私はstackoverflowの中に私の質問を検索しましたと言います。そして、同様の質問がありますorg.apache.spark.rpc.RpcTimeoutException: Futures timed out after [120 seconds]. This timeout is controlled by spark.rpc.lookupTimeout答えは私に増加を指示しますspark.timeout.network、そうですか?また、spark.timeout.networkはどこにありますか? spark.network.timeout 800秒を増加することをお勧めします重いワークロードの場合

答えて

関連する問題