2017-03-27 5 views
1

私はxmlを処理しています。最後のレコードを受け取ったときにレコードごとにメッセージを送信する必要があります。私はカフカプロデューサーを閉じます。カフカのプロデューサーは非同期なので、プロデューサーを閉じると時々java.lang.IllegalStateException: Cannot send after the producer is closed.が出て、私はプロデューサーを開いたままにすることができる場所を読みました。私の質問は¿は何を意味していますか?あるいは、このカフカプロデューサーを閉じないとどうなるのですか?

---編集のためのよりよい解決策---

<list> 
    <element attr1="" att2="" attr3=""/> 
    <element attr1="" att2="" attr3=""/> 
    <element attr1="" att2="" attr3=""/> 
    <element attr1="" att2="" attr3=""/> 
    <element attr1="" att2="" attr3=""/> 
    <element attr1="" att2="" attr3=""/> 
    <element attr1="" att2="" attr3=""/> 
    <element attr1="" att2="" attr3=""/> 
... 
</list> 

があれば次のシナリオを想像:私たちは、タグを読んで、私たちはカフカのプロデューサーを作成

  • 各要素ごとに属性を読み取り、jsonオブジェクトを生成し、sendメソッドを使用してkafkaに送信します。我々は、プロデューサ

問題でcloseメソッドを呼び出し要素を読み取る -When要素の数は、我々は切断メソッドを呼び出すときに、時にはそれが非同期な方法でメッセージを送信し続け、従って80Kであることができます。だから我々は最初のflushメソッドを呼び出す必要がありますが、それに影響パフォーマンス

あなたはProducer.close()を呼び出す前Producer.flush()を呼び出す必要がありますあなたの

答えて

0

ありがとうございます。これはブロッキング呼び出しであり、すべてのレコードが送信される前ではありません。

close()を呼び出さないと、実装/言語によってはリソース/メモリリークが発生する可能性があります。

+0

私はこれをやろうとしましたが、パフォーマンスはひどいです。カフカの最後のバージョンでは、このバージョンでは何らかの方法でsend(List )メソッドを実行しましたか? –

+0

あなたが「カフカプロデューサーを閉じる最後のレコードを受け取ったとき」と言うと、.flush()と.close()の呼び出しを1回だけ呼び出す必要があります。これがあなたの執筆成績にどのように影響しますか? –

+0

私はxmlを処理しています、私はサックスを使用しています。私は特別なタグを検出すると、フラッシュとクローズメソッドを呼び出しています。しかし、もし私がそれをしなければ、パフォーマンスははるかに良くなりますが、一定の期間が経過した後、私はoutofmemoryerrorを受け取ります。 kafkaの以前のバージョンでは、メッセージのリストを送信する方法がありましたが、現在は利用できません。だから私はより良い方法でそれを行う方法を考えています –

関連する問題