私はスパークストリーミングに関する何かを学んでおり、トップ5の単語を見つけるように設計されたプログラムがあります。120秒以内に応答を受信できません。このタイムアウトはspark.rpc.askTimeoutによって制御されます
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.SparkConf
import org.apache.spark.streaming.Seconds
object Top5{
def main(args:Array[String]){
val conf=new SparkConf()
conf.setAppName("AppName")
conf.setMaster("spark://SparkMaster:7077")
val ssc=new StreamingContext(conf,Seconds(10))
val hottestStream=ssc.socketTextStream("SparkMaster:7077", 9999)
val searchPair=hottestStream.map(_.split("")(1)).map(item=>(item,1))
val hottestDStream=searchPair.reduceByKeyAndWindow((v1:Int,v2:Int)=>v1+v2,Seconds(60),Seconds(20))
hottestDStream.transform(hottestItemRDD=>{
val top5=hottestItemRDD.map(pair=>(pair._2,pair._1)).sortByKey(false)
.map(pair=>(pair._2,pair._1)).take(3)
for(item<-top5){
println(item)
}
hottestItemRDD}
).print()
ssc.start()
ssc.awaitTermination()
}}
私はスパーククラスタ環境でそれを行っておりますように、エラーが
Cannot receive any reply in 120 seconds. This timeout is controlled by spark.rpc.askTimeout
私はstackoverflowの中に私の質問を検索しましたと言います。そして、同様の質問がありますorg.apache.spark.rpc.RpcTimeoutException: Futures timed out after [120 seconds]. This timeout is controlled by spark.rpc.lookupTimeout答えは私に増加を指示しますspark.timeout.network
、そうですか?また、spark.timeout.network
はどこにありますか? spark.network.timeout
800秒を増加することをお勧めします重いワークロードの場合