2016-11-18 9 views
3

私はIoTアプリケーションからのJSONデータを持つKafkaブローカーを持っています。いくつかの処理を行うために、Spark Streamingアプリケーションからこのサーバに接続します。Spark Streamingアプリケーションでキャッシュデータにアクセスする方法は?

cache()persist()オペレータを使用して達成できると思われる私のjsonデータのいくつかの特定のフィールドをメモリ(RAM)に保存したいとします。

次回は、Spark Streamingアプリケーションで新しいJSONデータを受け取ると、検索できる共通のフィールドがあるかどうかをメモリ(RAM)にチェックインします。そしてもしそうなら、私は単純な計算を行い、私は最終的に私がメモリ(RAM)に保存したフィールドの値を更新します。

このように、私が前に説明したものが可能かどうかを知りたいと思います。はいの場合は、cache()またはpersist()を使用する必要がありますか?そして、私は自分のフィールドをメモリから取り出すことができますか?

答えて

2

Sparkアプリケーションのデータにメモリまたはディスクを使用するcache/persist(これは一般的にはcaching in SparkのSpark Streamingアプリケーション専用ではありません)で可能です。

しかし、Spark Streamingでは、ステートフルな計算と呼ばれるそのようなユースケースを特別にサポートしています。可能なものについては、Spark Streaming Programming Guideを参照してください。

mapWithStateの演算子は、あなたの後ろのものとまったく同じです。

0

スパークはそのようには機能しません。分散して考えてください。

RAMに保存する最初の部分です。 cache()またはpersist()を使うことができます。デフォルトでは、データは作業者のメモリに保存されます。

Apache Spark Codeから確認できます。

/** Persist this RDD with the default storage level (`MEMORY_ONLY`). */ 
    def persist(): this.type = persist(StorageLevel.MEMORY_ONLY) 

    /** Persist this RDD with the default storage level (`MEMORY_ONLY`). */ 
    def cache(): this.type = persist() 

あなたのユースケースを理解する限り、2番目のユースケースを実装するにはUpdateStateByKey操作が必要です。

ウィンドウ処理の詳細については、hereを参照してください。

関連する問題