val sc = new SparkContext(conf)
val streamContext = new StreamingContext(sc, Seconds(1))
val log = Logger.getLogger("sqsLog")
val sqs = streamContext.receiverStream(new SQSReceiver("queue")
.at(Regions.US_EAST_1)
.withTimeout(5))
val jsonRows = sqs.mapPartitions(partitions => {
val s3Client = new AmazonS3Client(new BasicCredentialsProvider(sys.env("AWS_ACCESS_KEY_ID"), sys.env("AWS_SECRET_ACCESS_KEY")))
val txfm = new LogLine2Json
val log = Logger.getLogger("parseLog")
val sqlSession = SparkSession
.builder()
.getOrCreate()
val parsedFormat = new SimpleDateFormat("yyyy-MM-dd/")
val parsedDate = parsedFormat.format(new java.util.Date())
val outputPath = "/tmp/spark/presto"
partitions.map(messages => {
val sqsMsg = Json.parse(messages)
System.out.println(sqsMsg)
val bucketName = Json.stringify(sqsMsg("Records")(0)("s3")("bucket")("name")).replace("\"", "")
val key = Json.stringify(sqsMsg("Records")(0)("s3")("object")("key")).replace("\"", "")
System.out.println(bucketName)
System.out.println(key)
val obj = s3Client.getObject(new GetObjectRequest(bucketName, key))
val stream = obj.getObjectContent()
scala.io.Source.fromInputStream(stream).getLines().map(line => {
try{
val str = txfm.parseLine(line)
val jsonDf = sqlSession.read.schema(sparrowSchema.schema).json(str)
jsonDf.write.mode("append").format("orc").option("compression","zlib").save(outputPath)
}
catch {
case e: Throwable => {log.info(line); "";}
}
}).filter(line => line != "{}")
})
})
streamContext.start()
streamContext.awaitTermination()
私の仕事はSQSからS3キーを取ることはとても簡単です。ファイルの内容はnginxログであり、私たちはパーサーを使ってファイルを解析します。 LogLine2Json
ログをJSON形式に変換してから、それをorc
形式に書き出します。スパーク出力操作が登録されていないので実行する必要はありませんが、ファイルに書き込みしています
しかし、私はスパークがそうでなければ、それは動作しませんアクションを必要としていることを理解し、このエラーに
java.lang.IllegalArgumentException: requirement failed: No output operations registered, so nothing to execute
at scala.Predef$.require(Predef.scala:224)
at org.apache.spark.streaming.DStreamGraph.validate(DStreamGraph.scala:163)
at org.apache.spark.streaming.StreamingContext.validate(StreamingContext.scala:513)
at org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:573)
at org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:572)
at SparrowOrc$.main(sparrowOrc.scala:159)
at SparrowOrc.main(sparrowOrc.scala)
を取得しています。しかし、私はorcファイルに書き込むこのコードを持っています。私は他の何かをしなければならないかどうか分からないのですか?
jsonDf.write.mode("append").format("orc").option("compression","zlib").save(outputPath)
最初のデータフレームに一連の呼び出しがあり、writeまたはwriteStream呼び出しにつながるはずです。あなたはデータフレームを書きますが、それが副作用であるため、最初のデータフレームの処理を開始する理由はありません。 Classic I/Oは、スパークが構築されている処理チェーンの外で動作するため、使用するのが難しいです。 Sparkのhadoop操作がS3と互換性があると私は信じている:https://www.cloudera.com/documentation/enterprise/5-5-x/topics/spark_s3.html –