2017-05-04 11 views
0

Sparkで処理したい情報が含まれているログファイルがあります。唯一の問題は、ファイル全体が非常に適切にフォーマットされていないことです。 私はそれをきちんと整形しようとしていて、必要なデータだけを取得しようとしています。Spark/Scalaのフォーマットログ

私はすでに、有用な情報の大部分に "INFO"タグが含まれていることに気づきました。だから私は、使用していることによってフィルタリングすることを決めた:

ヴァルtestje = realdata.filter(ライン=> line.contains(「INFO」))

をしかし、今、私にSQLContextに結果のデータを処理したいですデータを(ゼップリンで)視覚化することができます。

  • 結果のRDDにはまだ多くのジャンクがありますが、おそらく必要ありません。
  • 私は大文字小文字のクラスでこれをフォーマットしようとすると、私は常にArrayOutofBoundsエラーを受け取ります。たぶん、私はクラスで定義したよりも、迷惑情報が であるためです。ここで

データが今どのように見えるかの(非常に少ない)例:私は本当に必要なのは、日付、時間、タイルIDとブール値です

2016-03-08 14:55:29,637 INFO [ajp-nio-8009-exec-1] n.t.f.s.FloorService [FloorService.java:281] Snoozing. Wait 569 more milliseconds. Time passed : 4431 
2016-03-08 14:55:29,964 INFO [ajp-nio-8009-exec-3] n.t.f.c.FloorUpdateController [FloorUpdateController.java:67] Floor test received update from tile: 1, data = [false, false, false, false, false, false, false, false] 
2016-03-08 14:55:30,582 INFO [ajp-nio-8009-exec-2] n.t.f.c.FloorUpdateController [FloorUpdateController.java:67] Floor test received update from tile: 1, data = [false, false, false, false, false, false, true, false] 
2016-03-08 14:55:30,592 INFO [ajp-nio-8009-exec-2] n.t.f.s.FloorService [FloorService.java:284] delta time : 5387 
2016-03-08 14:55:30,595 INFO [ajp-nio-8009-exec-2] n.t.f.s.ActivityService [ActivityService.java:31] Activity added for floor with id: test 
2016-03-08 14:55:30,854 INFO [ajp-nio-8009-exec-4] n.t.f.c.FloorUpdateController [FloorUpdateController.java:67] Floor test received update from tile: 1, data = [false, false, false, false, false, false, false, false] 

すべての迷惑メールデータを考慮する必要なく、正しくフォーマットする方法はありますか?ここで

は私が今しようとしているものです(免責事項、私はこの時にかなり新しいだと私は種類のそれを巻取よ^^ '):

import org.apache.commons.io.IOUtils 
import java.net.URL 
import java.nio.charset.Charset 

val realdata = sc.textFile("/media/application.txt") 

case class testClass(date: String, time: String, level: String, unknown1: String, unknownConsumer: String, unknownConsumer2: String, vloer: String, tegel: String, msg: String, bool1: String, bool2: String, bool3: String, bool4: String, bool5: String, bool6: String, bool7: String, bool8: String, batchsize: String, troepje1: String, troepje2: String) 

//val testje = realdata.filter(line => line.contains("INFO")) 
val mapData = realdata.map(s => s.split(" ")).filter(line => line.contains("INFO")).map(
    s => testClass(s(0), 
     s(1), 
     s(2), 
     s(3), 
     s(4), 
     s(5), 
     s(6), 
     s(7), 
     s(8), 
     s(9), 
     s(10), 
     s(11), 
     s(12), 
     s(13), 
     s(14), 
     s(15), 
     s(16), 
     s(17), 
     s(18), 
     s(19) 
     ) 
    ).toDF() 
    mapData.registerTempTable("test") 

答えて

1

分割してデータフレームに変換する行にdataが含まれているため、dataではなくINFOでフィルタリングすることをおすすめします。
私はあなたのcase classに合うようにあなたのコードを少し変更したし、あなたの必要性

val mapData = realdata 
.filter(line => line.contains("data")) 
.map(s => s.split(" ").toList) 
.map(
    s => testClass(s(0), 
    s(1).split(",")(0), 
    s(1).split(",")(1), 
    s(3), 
    s(4), 
    s(5), 
    s(6), 
    s(7), 
    s(8), 
    s(15), 
    s(16), 
    s(17), 
    s(18), 
    s(19), 
    s(20), 
    s(21), 
    s(22), 
    "", 
    "", 
    "" 
) 
) 
.toDF() 
mapData.show(false) 

に応じてより編集することができますが、それは

+0

これは本当に役に立ちます。私はまだデータのいくつかがクラスに収まらないといういくつかの問題を抱えていますが、それはクラスが一致するまでクラスを調整するだけの問題だと思います。 – Jdeboer

+0

これはうまくいきました、ありがとう!しかし、Zeppelinを介してDFでクエリを実行するのは非常に遅いです。 (それはまるで永久にロードされます) – Jdeboer

1

私はこのような何かをしようとします:

val regex = """^(\d{4}-\d{2}-\d{2}) (\d{2}:\d{2}:\d{2}),(\d{0,3}) INFO .+Floor test received update from tile: (\d+), data = (\[((false|true)(,){0,1})+\])$""".r 
final case class LogLine(date: Instant, tileId: String, data: Seq[Boolean]) 
realdata.flatMap({ 
    case regex(date, time, millis, tileId, data, _*) => 
    val mapper = new ObjectMapper() with ScalaObjectMapper 
    mapper.registerModule(DefaultScalaModule) 

    Seq(LogLine(
     Instant.parse(s"${date}T$time.${millis}Z"), 
     tileId, 
     mapper.readValue[Seq[Boolean]](data) 
    )) 
    case _ => Nil 
}) 

ケースクラスは多次元ですが、これはおそらくこの場合に必要なものです。それが本当に必要な場合は、後でいつでも平坦化することができます。

パフォーマンスを向上させたい場合は、flatMapではなくmapPartitionsを使用し、ObjectMapperを再利用できます。

+0

おかげニルス、と返事が遅れて申し訳ありませんがお役に立てば幸いです。私はかなり病気でした。 :11:エラー:見つからない:タイプインスタント これは私が依存関係をインポートする必要があるためだと思いますが、どちらが見つからないのでしょうか? 。 – Jdeboer

+1

Java 8を使用していることを確認してください。その場合、java.time.Instantです。 java.util.Dateまたは単純にStringを使用したい場合は、代わりにそうすることができます。 文字列を日付として使用すると、 "$ {date} T $ time。$ {millis} Z"または任意の他の形式で挿入されます。 – Nils

関連する問題