私のアプリケーションでは、複数の状態を把握したいと思っています。このように、私は次のようにクラスStateManager
内全体の状態管理ロジックをカプセル化してみました:状態管理がシリアル化できない
@SerialVersionUID(xxxxxxxL)
class StateManager(
inputStream: DStream[(String, String)],
initialState: RDD[(String, String)]
) extends Serializable {
lazy val state = inputStream.mapWithState(stateSpec).map(_.get)
lazy val stateSpec = StateSpec
.function(trackStateFunc _)
.initialState(initialState)
.timeout(Seconds(30))
def trackStateFunc(key: String, value: Option[String], state: State[String]): Option[(String, String)] = {}
}
object StateManager { def apply(dstream: DStream[(String, String)], initialstate: RDD[(String, String)]) = new StateManager(_dStream, _initialState) }
@SerialVersionUID(xxxxxxxL) ... extends Serializable
は私の問題を解決しようとする試みです。
しかし、次のように私のメインクラスからStateManager
を呼び出すとき:
val lStreamingContext = StreamingEnvironment(streamingWindow, checkpointDirectory)
val statemanager= StateManager(lStreamingEnvironment.sparkContext, 1, None)
val state= statemanager.state(lKafkaStream)
state.foreachRDD(_.foreach(println))
(StreamingEnvironment
については以下を参照)、私が取得:
Exception in thread "main" org.apache.spark.SparkException: Task not serializable
[...]
Caused by: java.io.NotSerializableException: Object of org.apache.spark.streaming.kafka.DirectKafkaInputDStream is being serialized possibly as a part of closure of an RDD operation. This is because the DStream object is being referred to from within the closure. Please rewrite the RDD operation inside this DStream to avoid this. This has been enforced to avoid bloating of Spark tasks with unnecessary objects.
エラーがクリアされているが、それでも私は「ドンそれがどの点に到達するかはわかりません。
どこからトリガーされますか? これを解決して再利用可能なクラスを持つには、どうすればよいですか?
かもしれない被便利StreamingEnvironment
クラス:
class StreamingEnvironment(mySparkConf: SparkConf, myKafkaConf: KafkaConf, myStreamingWindow: Duration, myCheckPointDirectory: String) {
val sparkContext = spark.SparkContext.getOrCreate(mySparkConf)
lazy val streamingContext = new StreamingContext(sparkContext , mMicrobatchPeriod)
streamingContext.checkpoint(mCheckPointDirectory)
streamingContext.remember(Minutes(1))
def stream() = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](streamingContext, myKafkaConf.mBrokers, myKafkaConf.mTopics)
}
object StreamingEnvironment {
def apply(streamingWindow: Duration, checkpointDirectory: String) = {
//setup sparkConf and kafkaConf
new StreamingEnvironment(sparkConf , kafkaConf, streamingWindow, checkpointDirectory)
}
}
私たちにあなたのスパークグラフを見せてください。 –
'StreamingContext'を使って作成したグラフ。 –
ユーティリティクラスで編集が完了しました。 – wipman