2016-09-12 7 views
1

5秒以下のタイムスタンプが対応するデータとともに1つの5秒バケットに収まるように、データをsparkに保存したいとします。同様に、次の5秒間バケットは残りのログと一緒になります。 (私はバケット内のデータを集めることができるように)。私のログ:apache sparkでバケットを作成する

1472120400.107 HTTP GEO er.aujf.csdh.jkhydf.eyrgt 
1472120399.999 HTTP GEO er.asdhff.cdn.qyirg.sdgsg 
1472120397.633 HTTP GEO er.abff.kagsf.weyfh.ajfg 
1472120397.261 HTTP GEO er.laffg.ayhrff.agyfr.yawr 
1472120394.328 HTTP GEO er.qfryf.aqwruf.oiuqwr.agsf 
1472120393.737 HTTP GEO er.aysf.aouf.ujaf.casf 
. 
. 
. 

私はまだ火花の中でそれを行う方法を見つけることができません。

タイムスタンプが付いたログ1472120400.107,1472120399.999,1472120397.633,1472120397.261などは、1つのバケットに分類され、次のバケットに設定されます。

出力:

タイムスタンプ1472120400.107,1472120399.999,1472120397.633,1472120397.261を持つすべてのloglinesは、さらなる処理が全体のバケットのカウントを見つけるようなものに行われますので、(1つのバケット)のメモリに保持されます。同様に、次のバケット。

+0

予想される出力は? – mtoto

+0

「バケツ」はどういう意味ですか? – mtoto

+0

これは、ログをパーティション分割すること以外は何もありません。パーティション化されたロググループがバケットを形成します。 – kaks

答えて

0

タイムスタンプを作成する細かさで分けるだけです。ビン番号をPairRDDのキーとして保持します。ここで、データは入力で、次にreduceByKeyです。

私はScalaでコード例を書いていますが、基本的にはPythonに変更することは簡単ですが、私はその点を作りたいと思います。

val l5 = List("1472120400.107 HTTP GEO er.aujf.csdh.jkhydf.eyrgt", "1472120399.999 HTTP GEO er.asdhff.cdn.qyirg.sdgsg") 
val l5RDD = sc.parallelize(l5) //input as RDD 
val l5tmp = l5RDD.map(item => item.split(" ")) //Split the sentence 
val l5tmp2 = l5tmp.map(item => ((item(0).toDouble/3600000).toInt, List(item))) //Map the data to a bin (in the key) according to the wanted granularity 
val collected = l5tmp2.reduceByKey(_ ++ _) //Collect the lists to create the bins of data 
collected.collect().foreach(println) //Prints (408,List([Ljava.lang.String;@2c6aed22, [Ljava.lang.String;@e322ec9)) - means that both entries collected to a bin named 408 
+0

私はコードのこの部分を理解しませんでした: 'val collected = l5tmp2.reduceByKey(_ ++ _)'。 ++は何ですか? – kaks

+0

@kaksあなたはアイテムを一緒に収集する方法を定義する必要があります。この場合はアイテムをリストの中に保持します。リストを一緒に追加するには++を使用します。各リストは、ビンに入るはずのすべてのアイテムを保持します –

+0

これをPythonでどのように書くことができるか教えてください。私はそれに苦労しています。 – kaks

関連する問題