私はここで奇妙な問題に直面しています。私はkafkaからAvro
のレコードを読み込み、デシリアライズしてファイルに保存しようとしています。 logger.info(rdd.count())
すべてが正常に動作し、私は正確なレコード数を参照してください声明ティル私はカフカからレコードを取得することができるが、私はRDDレコードで機能を使用しようとすると、いくつかは、どのようにそれが何スパークストリームkafkaストリーム - 返されたrddはforeachRDDで何もしません
import java.util.UUID
import io.confluent.kafka.serializers.KafkaAvroDecoder
import com.my.project.avro.AvroDeserializer
import com.my.project.util.SparkJobLogging
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient
import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient
import org.apache.spark.streaming.{Minutes, Seconds, StreamingContext, Time}
import org.apache.spark.streaming.kafka._
import kafka.serializer.{DefaultDecoder, StringDecoder}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.streaming.dstream.{DStream}
object KafkaConsumer extends SparkJobLogging {
var schemaRegistry: SchemaRegistryClient = null
val url="url:8181"
schemaRegistry= new CachedSchemaRegistryClient(url, 1000)
def createKafkaStream(ssc: StreamingContext): DStream[(String,Array[Byte])] = {
val kafkaParams = Map[String, String](
"zookeeper.connect" -> "zk.server:2181",
"group.id" -> s"${UUID.randomUUID().toString}",
"auto.offset.reset" -> "smallest",
"bootstrap.servers" -> "bootstrap.server:9092",
"zookeeper.connection.timeout.ms" -> "6000",
"schema.registry.url" ->"registry.url:8181"
)
val topic = "my.topic"
KafkaUtils.createDirectStream[String, Array[Byte], StringDecoder, DefaultDecoder](ssc, kafkaParams, Set(topic))
}
def processRecord( avroStream: Array[Byte])={
println(AvroDeserializer.toRecord(avroStream, schemaRegistry))
}
def main(args: Array[String]) = {
val sparkConf = new SparkConf().setAppName("AvroDeserilizer")
val sc = new SparkContext(sparkConf)
val ssc = new StreamingContext(sc, Seconds(5))
val topicStream = createKafkaStream(ssc)map(_._2)
topicStream.foreachRDD(
rdd => if (!rdd.isEmpty()){
logger.info(rdd.count())
rdd.foreach(avroRecords=> processRecord(avroRecords))
}
)
ssc.start()
ssc.awaitTermination()
}
}
object AvroDeserializer extends SparkJobLogging{
def toRecord(buffer: Array[Byte], registry: SchemaRegistryClient): GenericRecord = {
val bb = ByteBuffer.wrap(buffer)
bb.get() // consume MAGIC_BYTE
val schemaId = bb.getInt // consume schemaId
val schema = registry.getByID(schemaId) // consult the Schema Registry
val reader = new GenericDatumReader[GenericRecord](schema)
val decoder = DecoderFactory.get().binaryDecoder(buffer, bb.position(), bb.remaining(), null)
reader.read(null, decoder) //null -> as we are not providing any datum
}
}
を行うことを拒否しますログ。しかしその後は何も働かない。私は
val record= rdd.first()
processRecord(record)
疲れて、それが働いた が、rdd.foreach(avroRecords=> processRecord(avroRecords))
とrdd.map(avroRecords=> processRecord(avroRecords))
は著作ません。
17/05/14 01:01:24 INFO scheduler.DAGScheduler: Job 2 finished: foreach at KafkaConsumer.scala:56, took 42.684999 s
17/05/14 01:01:24 INFO scheduler.JobScheduler: Finished job streaming job 1494738000000 ms.0 from job set of time 1494738000000 ms
17/05/14 01:01:24 INFO scheduler.JobScheduler: Total delay: 84.888 s for time 1494738000000 ms (execution: 84.719 s)
17/05/14 01:01:24 INFO scheduler.ReceivedBlockTracker: Deleting batches ArrayBuffer()
17/05/14 01:01:24 INFO scheduler.InputInfoTracker: remove old batch metadata:
17/05/14 01:01:26 INFO yarn.YarnAllocator: Canceling requests for 0 executor containers
17/05/14 01:01:26 WARN yarn.YarnAllocator: Expected to find pending requests, but found none.
17/05/14 01:01:29 INFO yarn.YarnAllocator: Canceling requests for 0 executor containers
17/05/14 01:01:29 WARN yarn.YarnAllocator: Expected to find pending requests, but found none.
次のストリーミングコンテキストコールまでログの最後の2行を印刷するだけです。
あなたのロガーの設定はどうですか?それはコンソールにのみ印刷されますか? 'rdd.foreach'の中のコードはExecutorsで実行され、その出力はドライバには表示されません。 – Harald
私は 'trait SparkJobLogging { @transient protected lazy val logger:Logger = LoggerFactory.getLogger(getClass.getName) }'を使用しており、ログをclouderaクラスタに表示しています。 – Explorer
ログを確認するにはどうすればいいですか?どのツールを使用しますか?あなたは '糸ログ'またはWeb UIを使用していますか? –