2017-05-13 7 views
1

私はここで奇妙な問題に直面しています。私はkafkaからAvroのレコードを読み込み、デシリアライズしてファイルに保存しようとしています。 logger.info(rdd.count())すべてが正常に動作し、私は正確なレコード数を参照してください声明ティル私はカフカからレコードを取得することができるが、私はRDDレコードで機能を使用しようとすると、いくつかは、どのようにそれが何スパークストリームkafkaストリーム - 返されたrddはforeachRDDで何もしません

import java.util.UUID 
import io.confluent.kafka.serializers.KafkaAvroDecoder 
import com.my.project.avro.AvroDeserializer 
import com.my.project.util.SparkJobLogging 
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient 
import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient 
import org.apache.spark.streaming.{Minutes, Seconds, StreamingContext, Time} 
import org.apache.spark.streaming.kafka._ 
import kafka.serializer.{DefaultDecoder, StringDecoder} 
import org.apache.spark.{SparkConf, SparkContext} 
import org.apache.spark.streaming.dstream.{DStream} 

object KafkaConsumer extends SparkJobLogging { 
    var schemaRegistry: SchemaRegistryClient = null 
    val url="url:8181" 
    schemaRegistry= new CachedSchemaRegistryClient(url, 1000) 

    def createKafkaStream(ssc: StreamingContext): DStream[(String,Array[Byte])] = { 
    val kafkaParams = Map[String, String](
     "zookeeper.connect" -> "zk.server:2181", 
     "group.id" -> s"${UUID.randomUUID().toString}", 
     "auto.offset.reset" -> "smallest", 
     "bootstrap.servers" -> "bootstrap.server:9092", 
     "zookeeper.connection.timeout.ms" -> "6000", 
     "schema.registry.url" ->"registry.url:8181" 
    ) 

    val topic = "my.topic" 
    KafkaUtils.createDirectStream[String, Array[Byte], StringDecoder, DefaultDecoder](ssc, kafkaParams, Set(topic)) 
    } 

    def processRecord( avroStream: Array[Byte])={ 
    println(AvroDeserializer.toRecord(avroStream, schemaRegistry)) 
    } 

    def main(args: Array[String]) = { 
    val sparkConf = new SparkConf().setAppName("AvroDeserilizer") 
    val sc = new SparkContext(sparkConf) 
    val ssc = new StreamingContext(sc, Seconds(5)) 
    val topicStream = createKafkaStream(ssc)map(_._2) 
    topicStream.foreachRDD(
     rdd => if (!rdd.isEmpty()){ 
     logger.info(rdd.count()) 
     rdd.foreach(avroRecords=> processRecord(avroRecords)) 
     } 
    ) 
    ssc.start() 
    ssc.awaitTermination() 
    } 
} 

object AvroDeserializer extends SparkJobLogging{ 
    def toRecord(buffer: Array[Byte], registry: SchemaRegistryClient): GenericRecord = { 
    val bb = ByteBuffer.wrap(buffer) 
    bb.get() // consume MAGIC_BYTE 
    val schemaId = bb.getInt // consume schemaId 
    val schema = registry.getByID(schemaId) // consult the Schema Registry 
    val reader = new GenericDatumReader[GenericRecord](schema) 
    val decoder = DecoderFactory.get().binaryDecoder(buffer, bb.position(), bb.remaining(), null) 
    reader.read(null, decoder) //null -> as we are not providing any datum 
    } 
} 

を行うことを拒否しますログ。しかしその後は何も働かない。私は

val record= rdd.first() 
processRecord(record) 

疲れて、それが働いた が、rdd.foreach(avroRecords=> processRecord(avroRecords))rdd.map(avroRecords=> processRecord(avroRecords))は著作ません。

17/05/14 01:01:24 INFO scheduler.DAGScheduler: Job 2 finished: foreach at KafkaConsumer.scala:56, took 42.684999 s 
17/05/14 01:01:24 INFO scheduler.JobScheduler: Finished job streaming job 1494738000000 ms.0 from job set of time 1494738000000 ms 
17/05/14 01:01:24 INFO scheduler.JobScheduler: Total delay: 84.888 s for time 1494738000000 ms (execution: 84.719 s) 
17/05/14 01:01:24 INFO scheduler.ReceivedBlockTracker: Deleting batches ArrayBuffer() 
17/05/14 01:01:24 INFO scheduler.InputInfoTracker: remove old batch metadata: 
17/05/14 01:01:26 INFO yarn.YarnAllocator: Canceling requests for 0 executor containers 
17/05/14 01:01:26 WARN yarn.YarnAllocator: Expected to find pending requests, but found none. 
17/05/14 01:01:29 INFO yarn.YarnAllocator: Canceling requests for 0 executor containers 
17/05/14 01:01:29 WARN yarn.YarnAllocator: Expected to find pending requests, but found none. 

次のストリーミングコンテキストコールまでログの最後の2行を印刷するだけです。

+2

あなたのロガーの設定はどうですか?それはコンソールにのみ印刷されますか? 'rdd.foreach'の中のコードはExecutorsで実行され、その出力はドライバには表示されません。 – Harald

+0

私は 'trait SparkJobLogging { @transient protected lazy val logger:Logger = LoggerFactory.getLogger(getClass.getName) }'を使用しており、ログをclouderaクラスタに表示しています。 – Explorer

+0

ログを確認するにはどうすればいいですか?どのツールを使用しますか?あなたは '糸ログ'またはWeb UIを使用していますか? –

答えて

0

上記の方法は私のためには機能しませんでしたが、コンフルエントなドキュメントでは別の方法がありました。 KafkaAvroDecoderは、スキーマレジストリと通信し、スキーマを取得し、データを逆シリアル化します。したがって、カスタムデシリアライザの必要性が排除されます。

import io.confluent.kafka.serializers.KafkaAvroDecoder 

val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers, 
    "schema.registry.url" -> schemaRegistry, 
    "key.converter.schema.registry.url" -> schemaRegistry, 
    "value.converter.schema.registry.url" -> schemaRegistry, 
    "auto.offset.reset" -> "smallest") 
val topicSet = Set(topics) 
val messages = KafkaUtils.createDirectStream[Object, Object, KafkaAvroDecoder, KafkaAvroDecoder](ssc, kafkaParams, topicSet).map(_._2) 
messages.foreachRDD { 
rdd => if (!rdd.isEmpty()){ 
    logger.info(rdd.count()) 
    rdd.saveAsTextFile("/data/") 
    } 
) 
ssc.start() 
ssc.awaitTermination() 
} 
} 

依存関係jar:kafka-avro-serializer-3.1.1.jar。これは今私にとって完璧に働いており、これが将来誰かに役立つことを願っています。

1

あなたのprintlnステートメントは、現在のプロセスにない分散ワーカーで実行されているため、表示されません。あなたはprintlnlog.infoに置き換えてみることができます。

DStream[Array[Byte]]DStream[GenericRecord]に変更してファイルに書き込むのが理想的です。.saveAsTextFilesなどを使用してください。ストリームが無限になる可能性があるため、そこにstream.take()が必要な場合があります。

http://spark.apache.org/docs/latest/streaming-programming-guide.html#output-operations-on-dstreams

+0

あなたのコメントをありがとう、本当に有益です。私は使用しているユースケースのために別の方法を見つけましたが、私はまだ時間を取るときにあなたが言及した解決策を試みます。 – Explorer

1
val topicStream = createKafkaStream(ssc)map(_._2) 
topicStream.foreachRDD(
    rdd => if (!rdd.isEmpty()){ 
    logger.info(rdd.count()) 
    rdd.foreach(avroRecords=> processRecord(avroRecords)) 

dstream.foreachRDDは、データを外部システムに送信することを可能にするという強力なプリミティブです。ただし、このプリミティブを正しく正しく効率的に使用する方法を理解することが重要です。回避すべき一般的な間違いのいくつかは次のとおりです。

DStreamは、RDDアクションによって遅延実行されるのと同じように、出力操作によって遅延実行されます。特に、DStream出力操作内のRDDアクションは、受信したデータの処理を強制します。したがって、アプリケーションに出力操作がない場合、またはdstream.foreachRDD()などの出力操作がRDDアクションなしで実行されている場合は、何も実行されません。システムは単にデータを受信して​​破棄します。

+0

あなたの答えにIrshadをおねがいします。あなたは本当に価値のある情報を提供しました。私はループ内で 'rdd.saveAsTestfile'も試してみましたが、RDDの動作だと思って動作しませんでした。私は自分のコードで本当にばかげたことをしているかもしれません。私はより良い解決策を見つけ、そのために今働いています。私は以前のアプローチでも問題をチェックします。 – Explorer

関連する問題