2016-05-19 15 views
1

次のコードを使用して、Spark-Streamingの出力をElasticSearchに格納しています。私はスパークストリーミングの出力を適切な名前のi.e (Key, OsName, PlatFormName, Mobile, BrowserName, Count)にマッピングしたいと思います。しかし、現在わかるように、_1や_2などのようにESにマッピングされています。 さらに、いくつかのフィルタ、つまり(if PlatFormName = "ubuntu" then index the data)をESにインデックス付けする前に入れたいと思います。だから、どうすればいいの? ElasticSearchでSpark-StreamingからElastic Searchへの出力のフィールド名のマッピング

val realTimeAgg = lines.map{ x => ((x.key, x.os, x.platform, x.mobile, x.browser), 1)}.reduceByKey(_+_) 

      val pageCounts = realTimeAgg.map  
      pageCounts.foreachRDD{ x => 
        if (x.toLocalIterator.nonEmpty) {  
         EsSpark.saveToEs(x, "spark/ElasticSearch") 
        } 
       } 

      ssc.start() 
      ssc.awaitTermination() 

出力:あなたが持つPairRDD(Tuple6、ロング)のデータ型を格納しているので、弾性検索文書の

{ 
      "_index": "spark", 
      "_type": "ElasticSearch", 
      "_id": "AVTH0JPgzgtrAOUg77qq", 
      "_score": 1, 
      "_source": { 
       "_1": { 
        "_3": "Amiga", 
        "_2": "AmigaOS 1.3", 
        "_6": "SeaMonkey", 
        "_1": "Usedcar", 
        "_4": 0, 
        "_5": 0 
       }, 
       "_2": 1013 
      } 
     } 

答えて

1

キー等_1、_2、です。

キーを保持するには、ケースクラスをキーとして使用する必要があります。

val realTimeAgg = lines.map{ x => (x, 1)}.reduceByKey(_+_) 

私は、オブジェクトxのクラスはケースクラスであり、あなたは(つまり2ケースクラスのインスタンスが等しいかどうかをチェックするための)削減を行うため、そのクラスのすべてのフィールドを使用することを想定しています。そのクラスのすべてのフィールドが平等のために使用するクラスの自然キーされていない場合、次の2つのオプションがある -

  1. をオーバーライドは、あなたのケースクラス
  2. のために等しいとhashCodeだけ持っている別のケースクラスを作成します。 (x.key、x.os、x.platform、x.mobile、x.browser)のタ​​プルで使用したフィールド)を取得し、最初の行のタプルではなくそのケースクラスにマッピングします。 map {x => ...}。

ElasticSearchに書き込む前に必要なフィルタを追加できます。

pageCounts.foreachRDD { x => 
         if (x.toLocalIterator.nonEmpty) { 
          val y = x.filter(z => z._1.platform == "ubuntu")  
          EsSpark.saveToEs(y, "spark/ElasticSearch") 
        } 
       } 

PS:あなたは私がlines.map(X =>(X、1))を示唆しているようにキーとして(ケースクラス、ロング)ケースクラスと対RDDをテストしている場合reduceByKey(_ +。 _)。 Spark Shellに関連したバグがあります。このクラスでは、操作クラスをキークラスとして正しく機能しません。jira issue

+0

ありがとうございます。私はあなたの2番目の提案を実装しました。あなたは、私がそれを得ていない例を使って、あなたの最初の提案に何を意味するのかをもっと詳しく教えてください。 Morover、このバグはあなたがspark clsuterにあなたの仕事を提出したときに起こるようですね。 – Naresh

+0

@Naresh、最初のオプションでは、(このスレッドの示唆しているように)既存のクラス(必要ならば)でequalsとhashCodeメソッドをオーバーライドすることを指していました[http://stackoverflow.com/questions/7681183/how-can- i-a-a-custom-equalality-operation-that-be-be-immutable-set]を使用して、そして、はい、バグはスパークシェルでのみ、クラスタを実行するときではありません。 –

+0

これで私を助けてください。私はここに立ち往生しています。 'http:// stackoverflow.com/question/39363586/スパークストリーミングからキャッサナードラへのデータ保存中の問題 – Naresh

関連する問題