2017-07-14 11 views
0

私は文字列のRDDを持っています。各行はさまざまなログに対応しています。グローバルスパークrddに書き込むマップ関数

私は適応された正規表現を適用するためにRDDの行と一致するか/大文字の1つの単一の関数で複数の正規表現を持っています。

この固有の機能を自分のRDDにマップしたいので、すべての行をすばやく処理し、処理された各行を他のグローバルrddに格納することができます。

問題は、このタスクを並列化したいので、すべての処理済み行を追加するには、グローバルRDDに同時にアクセスできる必要があります。

これを行うには別の方法があるかどうか疑問に思っていました。私は自分のスパークスキルを向上させるために探しています。たとえば、これは私がしたい何

されています:

私のようなTXT持っている:

ERROR:Hahhaha PARAM_ERROR = 8 param_err2 = httpsの

WARNING:HUHUHUHUH param_warn = tchu param_warn2を= wifi

私のregex関数は、 "ERROR"を含む行と配列が一致する例:

、別の正規表現関数は終わりに例Array("Warning","tchu","wifi")

の配列で「WARNING」を含む行と一致しますが、私は処理ライン毎RDD[Array[String]]を取得したいです。

私はそれをSparkと並列に保つにはどうしますか?

+0

を「私は適応正規表現を適用するためにRDDの行を/ケースに一致する1つの機能で複数の正規表現を持っている」 - ことができますこの機能の_署名を含めるように投稿を編集しますか? –

答えて

2

まず、「グローバルRDD」のようなものはスパークではないこと、またそのようなものが必要な理由はないことを理解することが重要です。 Sparkを使用する場合はの形でを別のRDDに変換し、 RDDを更新することはできません(これは不可能です - RDDは不変です)。このようなの変換は、Sparkによって分散(並列)されて実行されます。

  • 最初の項目が"ERROR"あるArray[String]、または:
  • 私が正しくあなたの条件を理解していれば、あなたは次のいずれかの結果にmapに、各レコードをしたいと思います。この場合

    最初の項目は"WARNING"ある、又はArray[String]:NOパターンレコードと一致しない場合

  • 除去それ
あなたが RDDmap(f)collect(f)メソッドを使用することができ、それを行うために

// Sample data: 
val rdd = sc.parallelize(Seq(
    "ERROR : Hahhaha param_error=8 param_err2=https", 
    "WARNING : HUHUHUHUH param_warn=tchu param_warn2=wifi", 
    "Garbage - not matching anything" 
)) 

// First we can split in " : " to easily identify ERROR vs. WARNING 
val splitPrefix = rdd.map(line => line.split(" : ")) 

// Implement these parsing functions as you see fit; 
// The input would be the part following the " : ", 
// and the output should be a list of the values (not including the ERROR/WARNING) 
def parseError(v: String): List[String] = ??? // example input: "Hahhaha param_error=8 param_err2=https" 
def parseWarning(v: String): List[String] = ??? // example input: "HUHUHUHUH param_warn=tchu param_warn2=wifi" 

// Now we can use these functions in a pattern-matching function passed to RDD.collect, 
// which will transform each value that matches one of the cases, and will filter out 
// values that don't match anything 
val result: RDD[List[String]] = splitPrefix.collect { 
    case Array(l @ "ERROR", v) => l :: parseError(v) 
    case Array(l @ "WARNING", v) => l :: parseWarning(v) 
    // NOT adding a default case, so records that didn't match will be removed 
}  

// If you really want Array[String] and not List[String]:  
val arraysRdd: RDD[Array[String]] = result.map(_.toArray) 
+0

ああええ、まさに私が欲しかった!ありがとう、私はこのような収集機能を使用することができるか分からなかった。毎日火花に進んで、感謝の男! :p – tricky

関連する問題