2017-09-27 3 views
0

スパークストリーミングでカフカストリームから数百万のメッセージを取得しています。 15種類のメッセージがあります。メッセージは1つのトピックから来ています。私はその内容によってのみメッセージを区別することができます。だから私はrdd.containsメソッドを使用して、異なるタイプのrddを取得しています。はspark-scalaのrdd.containsファンクションです

サンプルメッセージ

{ "A": "FOO"、 "B": "バー"、 "タイプ": "第一" .......}
{ "A":」 "b": "bar1"、 "type": "second" .......}
{"a": "foo2"、 "b": "bar2"、 "type": "第3 ".......}
{" a ":" foo "、" b ":" bar "、" type ":" first ".......}
.... ..........
...............
.........

にそう

コード

DStream.foreachRDD { rdd => 
    if (!rdd.isEmpty()) { 
    val rdd_first = rdd.filter { 
     ele => ele.contains("First") 
    } 
    if (!rdd_first.isEmpty()) { 
     insertIntoTableFirst(hivecontext.read.json(rdd_first)) 
    } 
    val rdd_second = rdd.filter { 
     ele => ele.contains("Second") 
    } 
    if (!rdd_second.isEmpty()) { 
    insertIntoTableSecond(hivecontext.read.json(rdd_second)) 
    } 
     ............. 
     ...... 
    same way for 15 different rdd 

カフカトピックメッセージから異なるRDDを取得する方法はありますか?

答えて

1

rdd.containsはありません。ここで使用されている関数containsは、RDDStringに適用されます。

ここのように:文字列内の他のコンテンツにエラーが生じ、比較を満たす可能性があるため、この方法は堅牢ではありません

val rdd_first = rdd.filter { 
    element => element.contains("First") // each `element` is a String 
} 

これに対処するための

{"a":"foo", "b":"bar","type":"second", "c": "first", .......} 

一つの方法は、最初に適切なレコードにJSONデータを変換して、それらのレコードにグループ化やフィルタリングロジックを適用することであろう。そのためには、まずデータのスキーマ定義が必要です。スキーマでは、我々はJSONにレコードを解析することができますし、その上に任意の処理を適用します。

case class Record(a:String, b:String, `type`:String) 

import org.apache.spark.sql.types._ 
val schema = StructType(
       Array(
       StructField("a", StringType, true), 
       StructField("b", StringType, true), 
       StructField("type", String, true) 
       ) 
      ) 

val processPerType: Map[String, Dataset[Record] => Unit ] = Map(...) 

stream.foreachRDD { rdd => 
    val records = rdd.toDF("value").select(from_json($"value", schema)).as[Record] 
    processPerType.foreach{case (tpe, process) => 
     val target = records.filter(entry => entry.`type` == tpe) 
     process(target) 
    } 
} 

質問は、レコードの種類ごとに適用する必要のあるロジックの種類を指定しません。ここに提示されているのは、任意のカスタムロジックを関数Dataset[Record] => Unitとして表すことができる問題に近づく一般的な方法です。

ロジックを集約として表現できる場合は、おそらくDataset集計関数が適しています。

+0

私はハイブにデータを保存する必要があります。ハイブには15種類のテーブルが作成されています。更新された質問。実際、1つのタイプのJSONには50以上の列があります。だから私は15のケースクラスを作成しなければならない。ケースクラスを作成するのではなく、他のものがありますか? –

+0

@KishoreKumarSutharデータが最初の 'case class'(Sparkのlingoによる)で '構造化'された後、特定のテーブルに一致するようにデータを投影することができます。' val tableProjection1 = records select($ "column"、$ "column"、...)where($ "type" === ...) ' – maasg

関連する問題