2017-07-19 13 views
0
val sc = new SparkContext(conf) 

val streamContext = new StreamingContext(sc, Seconds(1)) 

val log = Logger.getLogger("sqsLog") 
val sqs = streamContext.receiverStream(new SQSReceiver("queue") 
    .at(Regions.US_EAST_1) 
    .withTimeout(5)) 


val jsonRows = sqs.mapPartitions(partitions => { 
    val s3Client = new AmazonS3Client(new BasicCredentialsProvider(sys.env("AWS_ACCESS_KEY_ID"), sys.env("AWS_SECRET_ACCESS_KEY"))) 

    val txfm = new LogLine2Json 
    val log = Logger.getLogger("parseLog") 
    val sqlSession = SparkSession 
    .builder() 
    .getOrCreate() 

    val parsedFormat = new SimpleDateFormat("yyyy-MM-dd/") 
    val parsedDate = parsedFormat.format(new java.util.Date()) 
    val outputPath = "/tmp/spark/presto" 

    partitions.map(messages => { 
    val sqsMsg = Json.parse(messages) 
    System.out.println(sqsMsg) 

    val bucketName = Json.stringify(sqsMsg("Records")(0)("s3")("bucket")("name")).replace("\"", "") 
    val key = Json.stringify(sqsMsg("Records")(0)("s3")("object")("key")).replace("\"", "") 
    System.out.println(bucketName) 
    System.out.println(key) 
    val obj = s3Client.getObject(new GetObjectRequest(bucketName, key)) 
    val stream = obj.getObjectContent() 
    scala.io.Source.fromInputStream(stream).getLines().map(line => { 
     try{ 
      val str = txfm.parseLine(line) 
      val jsonDf = sqlSession.read.schema(sparrowSchema.schema).json(str) 
      jsonDf.write.mode("append").format("orc").option("compression","zlib").save(outputPath) 
     } 
     catch { 
      case e: Throwable => {log.info(line); "";} 
     } 
     }).filter(line => line != "{}") 
    }) 
}) 

streamContext.start() 
streamContext.awaitTermination() 

私の仕事はSQSからS3キーを取ることはとても簡単です。ファイルの内容はnginxログであり、私たちはパーサーを使ってファイルを解析します。 LogLine2JsonログをJSON形式に変換してから、それをorc形式に書き出します。スパーク出力操作が登録されていないので実行する必要はありませんが、ファイルに書き込みしています

しかし、私はスパークがそうでなければ、それは動作しませんアクションを必要としていることを理解し、このエラーに

java.lang.IllegalArgumentException: requirement failed: No output operations registered, so nothing to execute 
    at scala.Predef$.require(Predef.scala:224) 
    at org.apache.spark.streaming.DStreamGraph.validate(DStreamGraph.scala:163) 
    at org.apache.spark.streaming.StreamingContext.validate(StreamingContext.scala:513) 
    at org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:573) 
    at org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:572) 
    at SparrowOrc$.main(sparrowOrc.scala:159) 
    at SparrowOrc.main(sparrowOrc.scala) 

を取得しています。しかし、私はorcファイルに書き込むこのコードを持っています。私は他の何かをしなければならないかどうか分からないのですか?

jsonDf.write.mode("append").format("orc").option("compression","zlib").save(outputPath) 
+0

最初のデータフレームに一連の呼び出しがあり、writeまたはwriteStream呼び出しにつながるはずです。あなたはデータフレームを書きますが、それが副作用であるため、最初のデータフレームの処理を開始する理由はありません。 Classic I/Oは、スパークが構築されている処理チェーンの外で動作するため、使用するのが難しいです。 Sparkのhadoop操作がS3と互換性があると私は信じている:https://www.cloudera.com/documentation/enterprise/5-5-x/topics/spark_s3.html –

答えて

0

まずはmapはアクションではありません。それは変容です。 Sparkはこのコードを実行する理由はありません。

次に、変換で副作用を避ける必要があります。出力の正確さが必要な場合は、これらを使用しないでください。

最後に、標準ioの機能を分散システムで使用すると、通常意味がありません。

全体的にあなたがDStreamシンクのための既存のオプションを検討し、これらはあなたのシナリオに適しているなし場合は、アクション(foreachforeachPartition)を使用して独自のを記述する必要があります。

+0

なぜio関数を使うのが無意味なのかをもっと説明できますか? – toy