私はJMSソースからデータを読み込み、KAFKAトピックにプッシュすることを試みていますが、数時間後にKAFKAトピックへのプッシュの頻度がほぼゼロであり、いくつかの初期分析の後、私はFLUMEログで例外を見つけました。org.apache.kafka.common.errors.RecordTooLargeException Flume Kafka Sink
28 Feb 2017 16:35:44,758 ERROR [SinkRunner-PollingRunner-DefaultSinkProcessor] (org.apache.flume.SinkRunner$PollingRunner.run:158) - Unable to deliver event. Exception follows.
org.apache.flume.EventDeliveryException: Failed to publish events
at org.apache.flume.sink.kafka.KafkaSink.process(KafkaSink.java:252)
at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:67)
at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:145)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.RecordTooLargeException: The message is 1399305 bytes when serialized which is larger than the maximum request size you have configured with the max.request.size configuration.
at org.apache.kafka.clients.producer.KafkaProducer$FutureFailure.<init>(KafkaProducer.java:686)
at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:449)
at org.apache.flume.sink.kafka.KafkaSink.process(KafkaSink.java:212)
... 3 more
Caused by: org.apache.kafka.common.errors.RecordTooLargeException: The message is 1399305 bytes when serialized which is larger than the maximum request size you have configured with the max.request.size configuration.
私の水路は、1048576としてmax.request.sizeため(ログの)現在の設定値を示し、明らか1399305よりも非常に少なくなる、このmax.request.sizeを増やすことは、これらの例外を排除するかもしれませんが、することができませんその値を更新するための正しい場所を見つけます。
マイflume.config、
a1.sources = r1
a1.channels = c1
a1.sinks = k1
a1.channels.c1.type = file
a1.channels.c1.transactionCapacity = 1000
a1.channels.c1.capacity = 100000000
a1.channels.c1.checkpointDir = /data/flume/apache-flume-1.7.0-bin/checkpoint
a1.channels.c1.dataDirs = /data/flume/apache-flume-1.7.0-bin/data
a1.sources.r1.type = jms
a1.sources.r1.interceptors.i1.type = timestamp
a1.sources.r1.interceptors.i1.preserveExisting = true
a1.sources.r1.channels = c1
a1.sources.r1.initialContextFactory = some context urls
a1.sources.r1.connectionFactory = some_queue
a1.sources.r1.providerURL = some_url
#a1.sources.r1.providerURL = some_url
a1.sources.r1.destinationType = QUEUE
a1.sources.r1.destinationName = some_queue_name
a1.sources.r1.userName = some_user
a1.sources.r1.passwordFile= passwd
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.topic = some_kafka_topic
a1.sinks.k1.kafka.bootstrap.servers = some_URL
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.flumeBatchSize = 1
a1.sinks.k1.channel = c1
すべてのヘルプは本当に理解されるであろう!
私は、KAFKAのプロデューサlibを使用してトピックでメッセージをプッシュするFLUMEを使用していますが、設定可能なFLUMEとして表示できません。ハードコードされた値をプロデューサクラスに変更する必要がありますか? –
@RiteshSharmaあなたはKafkaをサーバーにインストールしていないと言っていますか? – franklinsijo
実際には、この「max.request.size」の問題は、カフカ・ブローカーのデータをプッシュするためにkafkaシンクを使用しているFLUMEにとっては問題になるため、基本的にはカフカ・プロデューサー・ライブラリー(kafka sink)を使用してカフカ・ブローカー上にデータをプッシュします。 FLUMEは、 "producer.properties"として専用の設定ファイルを提供していないため、FLUME設定でカフカプロデューサプロパティを更新する必要があります。 –