2017-02-28 5 views
1

私はJMSソースからデータを読み込み、KAFKAトピックにプッシュすることを試みていますが、数時間後にKAFKAトピックへのプッシュの頻度がほぼゼロであり、いくつかの初期分析の後、私はFLUMEログで例外を見つけました。org.apache.kafka.common.errors.RecordTooLargeException Flume Kafka Sink

28 Feb 2017 16:35:44,758 ERROR [SinkRunner-PollingRunner-DefaultSinkProcessor] (org.apache.flume.SinkRunner$PollingRunner.run:158) - Unable to deliver event. Exception follows. 
org.apache.flume.EventDeliveryException: Failed to publish events 
     at org.apache.flume.sink.kafka.KafkaSink.process(KafkaSink.java:252) 
     at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:67) 
     at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:145) 
     at java.lang.Thread.run(Thread.java:745) 
Caused by: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.RecordTooLargeException: The message is 1399305 bytes when serialized which is larger than the maximum request size you have configured with the max.request.size configuration. 
     at org.apache.kafka.clients.producer.KafkaProducer$FutureFailure.<init>(KafkaProducer.java:686) 
     at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:449) 
     at org.apache.flume.sink.kafka.KafkaSink.process(KafkaSink.java:212) 
     ... 3 more 
Caused by: org.apache.kafka.common.errors.RecordTooLargeException: The message is 1399305 bytes when serialized which is larger than the maximum request size you have configured with the max.request.size configuration. 

私の水路は、1048576としてmax.request.sizeため(ログの)現在の設定値を示し、明らか1399305よりも非常に少なくなる、このmax.request.sizeを増やすことは、これらの例外を排除するかもしれませんが、することができませんその値を更新するための正しい場所を見つけます。

マイflume.config、

a1.sources = r1 
a1.channels = c1 
a1.sinks = k1 

a1.channels.c1.type = file 
a1.channels.c1.transactionCapacity = 1000 
a1.channels.c1.capacity = 100000000 
a1.channels.c1.checkpointDir = /data/flume/apache-flume-1.7.0-bin/checkpoint 
a1.channels.c1.dataDirs = /data/flume/apache-flume-1.7.0-bin/data 

a1.sources.r1.type = jms 

a1.sources.r1.interceptors.i1.type = timestamp 
a1.sources.r1.interceptors.i1.preserveExisting = true 

a1.sources.r1.channels = c1 
a1.sources.r1.initialContextFactory = some context urls 
a1.sources.r1.connectionFactory = some_queue 
a1.sources.r1.providerURL = some_url 
#a1.sources.r1.providerURL = some_url 
a1.sources.r1.destinationType = QUEUE 
a1.sources.r1.destinationName = some_queue_name 
a1.sources.r1.userName = some_user 
a1.sources.r1.passwordFile= passwd 

a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink 
a1.sinks.k1.kafka.topic = some_kafka_topic 
a1.sinks.k1.kafka.bootstrap.servers = some_URL 
a1.sinks.k1.kafka.producer.acks = 1 
a1.sinks.k1.flumeBatchSize = 1 
a1.sinks.k1.channel = c1 

すべてのヘルプは本当に理解されるであろう!

答えて

0

この変更はKafkaで行う必要があります。私は私の問題を解決しているように思え 更新

max.request.size=10000000 
+0

私は、KAFKAのプロデューサlibを使用してトピックでメッセージをプッシュするFLUMEを使用していますが、設定可能なFLUMEとして表示できません。ハードコードされた値をプロデューサクラスに変更する必要がありますか? –

+0

@RiteshSharmaあなたはKafkaをサーバーにインストールしていないと言っていますか? – franklinsijo

+0

実際には、この「max.request.size」の問題は、カフカ・ブローカーのデータをプッシュするためにkafkaシンクを使用しているFLUMEにとっては問題になるため、基本的にはカフカ・プロデューサー・ライブラリー(kafka sink)を使用してカフカ・ブローカー上にデータをプッシュします。 FLUMEは、 "producer.properties"として専用の設定ファイルを提供していないため、FLUME設定でカフカプロデューサプロパティを更新する必要があります。 –

1

のような大きな値とカフカプロデューサー設定ファイルproducer.propertiesmax.request.sizeがこのようなカフカシンク(プロデューサ)プロパティを更新するための例外を排除したと考えられるので、FLUMEは定数プレフィックスとして"kafka.producer"を提供します。であり、この定数接頭辞にkafkaプロパティを追加できます。

ですので、a1.sinks.k1.kafka.producer.max.request.size = 5271988です。

+0

うわー。これが可能であることを決して知らなかった! – franklinsijo

関連する問題