次のコードを使用して、Spark-Streaming
の出力をElasticSearch
に格納しています。私はスパークストリーミングの出力を適切な名前のi.e (Key, OsName, PlatFormName, Mobile, BrowserName, Count)
にマッピングしたいと思います。しかし、現在わかるように、_1や_2などのようにESにマッピングされています。 さらに、いくつかのフィルタ、つまり(if PlatFormName = "ubuntu" then index the data)
をESにインデックス付けする前に入れたいと思います。だから、どうすればいいの? ElasticSearchでSpark-StreamingからElastic Searchへの出力のフィールド名のマッピング
val realTimeAgg = lines.map{ x => ((x.key, x.os, x.platform, x.mobile, x.browser), 1)}.reduceByKey(_+_)
val pageCounts = realTimeAgg.map
pageCounts.foreachRDD{ x =>
if (x.toLocalIterator.nonEmpty) {
EsSpark.saveToEs(x, "spark/ElasticSearch")
}
}
ssc.start()
ssc.awaitTermination()
出力:あなたが持つPairRDD(Tuple6、ロング)のデータ型を格納しているので、弾性検索文書の
{
"_index": "spark",
"_type": "ElasticSearch",
"_id": "AVTH0JPgzgtrAOUg77qq",
"_score": 1,
"_source": {
"_1": {
"_3": "Amiga",
"_2": "AmigaOS 1.3",
"_6": "SeaMonkey",
"_1": "Usedcar",
"_4": 0,
"_5": 0
},
"_2": 1013
}
}
ありがとうございます。私はあなたの2番目の提案を実装しました。あなたは、私がそれを得ていない例を使って、あなたの最初の提案に何を意味するのかをもっと詳しく教えてください。 Morover、このバグはあなたがspark clsuterにあなたの仕事を提出したときに起こるようですね。 – Naresh
@Naresh、最初のオプションでは、(このスレッドの示唆しているように)既存のクラス(必要ならば)でequalsとhashCodeメソッドをオーバーライドすることを指していました[http://stackoverflow.com/questions/7681183/how-can- i-a-a-custom-equalality-operation-that-be-be-immutable-set]を使用して、そして、はい、バグはスパークシェルでのみ、クラスタを実行するときではありません。 –
これで私を助けてください。私はここに立ち往生しています。 'http:// stackoverflow.com/question/39363586/スパークストリーミングからキャッサナードラへのデータ保存中の問題 – Naresh