2017-01-04 10 views
1

私のアプリケーションでは、複数の状態を把握したいと思っています。このように、私は次のようにクラスStateManager内全体の状態管理ロジックをカプセル化してみました:状態管理がシリアル化できない

@SerialVersionUID(xxxxxxxL) 
class StateManager(
    inputStream: DStream[(String, String)], 
    initialState: RDD[(String, String)] 
) extends Serializable { 
    lazy val state = inputStream.mapWithState(stateSpec).map(_.get) 
    lazy val stateSpec = StateSpec 
    .function(trackStateFunc _) 
    .initialState(initialState) 
    .timeout(Seconds(30)) 
    def trackStateFunc(key: String, value: Option[String], state: State[String]): Option[(String, String)] = {} 
} 

object StateManager { def apply(dstream: DStream[(String, String)], initialstate: RDD[(String, String)]) = new StateManager(_dStream, _initialState) } 

@SerialVersionUID(xxxxxxxL) ... extends Serializableは私の問題を解決しようとする試みです。

しかし、次のように私のメインクラスからStateManagerを呼び出すとき:

val lStreamingContext = StreamingEnvironment(streamingWindow, checkpointDirectory) 
val statemanager= StateManager(lStreamingEnvironment.sparkContext, 1, None) 
val state= statemanager.state(lKafkaStream) 

state.foreachRDD(_.foreach(println)) 

StreamingEnvironmentについては以下を参照)、私が取得:

Exception in thread "main" org.apache.spark.SparkException: Task not serializable 
[...] 
Caused by: java.io.NotSerializableException: Object of org.apache.spark.streaming.kafka.DirectKafkaInputDStream is being serialized possibly as a part of closure of an RDD operation. This is because the DStream object is being referred to from within the closure. Please rewrite the RDD operation inside this DStream to avoid this. This has been enforced to avoid bloating of Spark tasks with unnecessary objects. 

エラーがクリアされているが、それでも私は「ドンそれがどの点に到達するかはわかりません。

どこからトリガーされますか? これを解決して再利用可能なクラスを持つには、どうすればよいですか?


かもしれない被便利StreamingEnvironmentクラス:

class StreamingEnvironment(mySparkConf: SparkConf, myKafkaConf: KafkaConf, myStreamingWindow: Duration, myCheckPointDirectory: String) { 
    val sparkContext = spark.SparkContext.getOrCreate(mySparkConf) 
    lazy val streamingContext = new StreamingContext(sparkContext , mMicrobatchPeriod) 

    streamingContext.checkpoint(mCheckPointDirectory) 
    streamingContext.remember(Minutes(1)) 

    def stream() = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](streamingContext, myKafkaConf.mBrokers, myKafkaConf.mTopics) 
} 

object StreamingEnvironment { 
    def apply(streamingWindow: Duration, checkpointDirectory: String) = { 
    //setup sparkConf and kafkaConf 

    new StreamingEnvironment(sparkConf , kafkaConf, streamingWindow, checkpointDirectory) 
    } 
} 
+0

私たちにあなたのスパークグラフを見せてください。 –

+0

'StreamingContext'を使って作成したグラフ。 –

+0

ユーティリティクラスで編集が完了しました。 – wipman

答えて

0

我々は関数にメソッドを持ち上げるときは、親クラスへouter参照はここのように、その関数のリファレンスの一部になります。 function(trackStateFunc _) trackStateFuncを直接関数として宣言すると(つまり、val)、おそらく問題を処理します。

また、クラスSerializableをマークしても魔法のようにはなりません。 DStreamはシリアライズ可能ではないため、@transientと注釈を付ける必要があります。これによりおそらく問題が解決されます。

関連する問題