2017-05-25 18 views
1

各メッセージを構造化ストリーミングパイプライン(spark 2.1.1でソースはkafka 0.10.2.1である)を通って処理するための「推奨される」方法は何でしょうか?ストラクチャードストリーミング - 各メッセージを消費する

これまでのところ、dataframe.mapPartitions(私はhbaseに接続する必要があります。そのクライアント接続クラスはシリアル化できないため、mapPartitions)です。

アイデア? https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#output-sinks

https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#using-foreachは、クライアントが直列化可能ではないですが、あなたはForeachWriterコンストラクタでそれを開く必要はありません。

答えて

1

あなたはforeach出力シンクを使用することができるはずです。 None/nullのままにして、メソッド(シリアライズ後)で初期化しますが、タスクごとに1回だけ行います。ソートの擬似コードで

class HBaseForeachWriter extends ForeachWriter[MyType] { 
    var client: Option[HBaseClient] = None 
    def open(partitionId: Long, version: Long): Boolean = { 
    client = Some(... open a client ...) 
    } 
    def process(record: MyType) = { 
    client match { 
     case None => throw Exception("shouldn't happen") 
     case Some(cl) => { 
     ... use cl to write record ... 
     } 
    } 
    } 
    def close(errorOrNull: Throwable): Unit = { 
    client.foreach(cl => cl.close()) 
    } 
} 
関連する問題