スパークストリーミングでカフカストリームから数百万のメッセージを取得しています。 15種類のメッセージがあります。メッセージは1つのトピックから来ています。私はその内容によってのみメッセージを区別することができます。だから私はrdd.containsメソッドを使用して、異なるタイプのrddを取得しています。はspark-scalaのrdd.containsファンクションです
サンプルメッセージ
{ "A": "FOO"、 "B": "バー"、 "タイプ": "第一" .......}
{ "A":」 "b": "bar1"、 "type": "second" .......}
{"a": "foo2"、 "b": "bar2"、 "type": "第3 ".......}
{" a ":" foo "、" b ":" bar "、" type ":" first ".......}
.... ..........
...............
.........
コード
DStream.foreachRDD { rdd =>
if (!rdd.isEmpty()) {
val rdd_first = rdd.filter {
ele => ele.contains("First")
}
if (!rdd_first.isEmpty()) {
insertIntoTableFirst(hivecontext.read.json(rdd_first))
}
val rdd_second = rdd.filter {
ele => ele.contains("Second")
}
if (!rdd_second.isEmpty()) {
insertIntoTableSecond(hivecontext.read.json(rdd_second))
}
.............
......
same way for 15 different rdd
カフカトピックメッセージから異なるRDDを取得する方法はありますか?
私はハイブにデータを保存する必要があります。ハイブには15種類のテーブルが作成されています。更新された質問。実際、1つのタイプのJSONには50以上の列があります。だから私は15のケースクラスを作成しなければならない。ケースクラスを作成するのではなく、他のものがありますか? –
@KishoreKumarSutharデータが最初の 'case class'(Sparkのlingoによる)で '構造化'された後、特定のテーブルに一致するようにデータを投影することができます。' val tableProjection1 = records select($ "column"、$ "column"、...)where($ "type" === ...) ' – maasg