flume-ngのカスタムシンクを作成しようとしています。私は既存のシンクとドキュメンテーションを見て、それをコーディングしました。しかし、イベントを受け取るはずの 'process()'メソッドは常にnullで終了します。 私はしているイベントイベント= channel.take();イベントはnullです。このメソッドは、イベントがまだチャンネルにあるときに繰り返し呼び出されます。Flume-ngヌルイベントのカスタムシンク
誰かが正しい方向に向かうことができますか?
flume-ngのカスタムシンクを作成しようとしています。私は既存のシンクとドキュメンテーションを見て、それをコーディングしました。しかし、イベントを受け取るはずの 'process()'メソッドは常にnullで終了します。 私はしているイベントイベント= channel.take();イベントはnullです。このメソッドは、イベントがまだチャンネルにあるときに繰り返し呼び出されます。Flume-ngヌルイベントのカスタムシンク
誰かが正しい方向に向かうことができますか?
これは、処理機能のスケルトンです...あなたがBACKOFFにステータスを変更し、をロールバックイベントを取得して失敗した場合。あなたでない場合をコミットし、ステータスをREADYに設定します。何があっても、あなたは常に取引を終了します。
Status status = null;
Channel channel = getChannel();
Transaction transaction = channel.getTransaction();
transaction.begin();
try {
Event event = channel.take();
if (event != null && validEvent(event.getBody()) >= 0) {
# make some printing
}
transaction.commit();
status = Status.READY;
} catch (Throwable ex) {
transaction.rollback();
status = Status.BACKOFF;
logger.error("Failed to deliver event. Exception follows.", ex);
throw new EventDeliveryException("Failed to deliver event: " + ex);
} finally {
transaction.close();
}
return status;
私はこれがうまくいくと確信しています:)。
これは仕様です。シンクランナーはシンクをnull
イベントでポーリングするので、シンクが生きていて将来のイベントを受け入れる準備ができていることを確認できます。 null
イベントを受け取ったら、Status.BACKOFF
を返すことを確認してから、シンクプロセッサーがもう一度お試しください。
[documentation](http://flume.apache.org/FlumeDeveloperGuide.html#sink)には何も言われていません。 – Dmitry
私は同意します。 Flumeのドキュメントは非常に最小限であり、少し詳しく説明する必要があります。 – logicalgeek
バックオフ期間はどのくらいですか?そしてそれはどのように制御されていますか? AbstractSinkクラスは、ソースのようなメソッドを実装していません。 。。 公共長いgetBackOffSleepIncrement() 公共長いgetMaxBackOffSleepInterval( – bearrito
素晴らしいおかげでそれはまだ2016年に私を助けます。.. – logicalgeek
はちょっと私はここで同様の問題があります。https://stackoverflow.com/questions/46479157/streaming-kafka- messages-to-mysql-database あなたはこれについて考えていますか? –