2017-11-10 8 views
1

Apacheのスパークへのデータのストリームを読み込むためのdocumentationから標準的な方法は次のとおりです。データセット全体をロードする前に、各イベントにカスタムデータの書式設定/マップを適用する方法はありますか?

events = spark.readStream \ 
    .format("json") \   # or parquet, kafka, orc... 
    .option() \     # format specific options 
    .schema(my_schema) \  # required 
    .load("path/to/data") 

しかし、私は、スキーマを適用する前に、フィールドのデータを再配置し、いくつかのいくつかをクリーンアップする必要があり、私は

events = spark.readStream \ 
    .format("json") \   # or parquet, kafka, orc... 
    .option() \     # format specific options 
    .schema(my_schema) \  # required 
    **.map(custom_function)** # apply a custom function to the json object 
    .load("path/to/data") 

ストラクチャードストリーミングを使用してApache Sparkでこれを行う効率的な方法はありますか?

答えて

1

tl; dr簡単な答えですが、データセットをロードする前にこれを行うことはできません。

データセットを一連の文字列としてロードし、一連のwithColumnまたはselectの変換でクリーンアップすることが効果的です。.map(custom_function)です。

1

Jacekの答えに同意します。具体的には、次の2つのオプションがあります。

  1. 入力データの「スーパースキーマ」を適用し、必要なスキーマに操作します。これは、(a)すべてのデータが有効なJSONであり、(b)「スーパースキーマ」がやや安定している、たとえば動的なフィールド名が存在しない場合の最適なアプローチです。

  2. json4s(または別の選択したライブラリ)を使用して解析し、必要に応じて操作します。これは、(a)入力行が有効なJSONでないか、(b)安定した "スーパースキーマ"がない場合に最適です。

関連する問題