オペレータからタプル数/秒を取得し、ファイルに記録します。タプルレートを自分で設定するために「スロットルオペレーター」を使うことはできません。また、もう一度追加するには、コンソールから情報をキャプチャするのではなく、SPLアプリケーションを使用します。IBM Streams(Streamsコンソールではない)のオペレータを介してタプル/秒をキャプチャする方法はありますか?
答えて
「この演算子のスループットを教えてください」というメトリックはありません。時間の経過とともにnTuplesProcessed
メトリックにアクセスし、そのスループットを計算するプリミティブ演算子を実装できます。 (list of available metrics。)しかし、私は実際にそれがはるかに簡単に、次の複合演算子を使用して見つける:
public composite PeriodicThroughputSink(input In) {
param expression<float64> $period;
expression<rstring> $file;
graph
stream<boolean b> Period = Beacon() {
param period: $period;
}
stream<float64 throughput> Throughput = Custom(In; Period) {
logic state: {
mutable uint64 _count = 0;
float64 _period = $period;
}
onTuple In: {
++_count;
}
onTuple Period: {
if (_count > 0ul) {
submit({throughput=((float64)_count/_period)}, Throughput);
_count = 0ul;
}
}
config threadedPort: queue(Period, Sys.Wait); // ensures that the throughput calculation and file
// writing is on a different thread from the rest
// of the application
}
() as Sink = FileSink(Throughput) {
param file: $file;
format: txt;
flush: 1u;
}
}
あなたは、それはその何でもオペレータからのストリームを消費し、「スループットタップ」、のような複合演算子を使用することができます録音するスループットを調整します。たとえば、あなたはとてもようにそれを使用することがあります。もちろん、あなたはまだあなたのアプリケーションの別の場所でResult
ストリームを使用することができます
stream<Data> Result = OperatorYouCareAbout(In) {}
() as ResultThroughput = PeriodicThroughputSink(Result) {
param period: 5.0;
file: "ResultThroughput.txt";
}
。このメソッドは、アプリケーションのパフォーマンスにある程度の影響を与える可能性があることに注意してください。データパスにタップを入れています。しかし、特に、あなたがタップしているオペレータと同じPEに融合されていることを確認した場合は、影響は大きくありません。また、期間が短いほど、アプリケーションのパフォーマンスに影響を与える可能性が高くなります。
また、nTuplesProcessed
メトリックにアクセスすることで、C++またはJavaのプリミティブ演算子で同様のことを行うことができますが、上記の方法はずっと簡単です。また、アプリケーションの外部からシステムメトリックを取得することもできます。たとえば、streamtool capturestate
またはREST APIを定期的に使用するスクリプトを作成し、出力を解析し、関心のある演算子のメトリックを見つけて、スループットを計算することができます。しかし、私はこのコンポジット演算子のテクニックがはるかに容易であることがわかります
- 1. イメージをIBM Streamsにインポートする方法は?
- 2. Streamsのこのシーケンスを作成する方法はありますか?
- 3. ES8 async/streams with a streamsを使用するには?
- 4. Java Streamsで余分な値を処理する方法は?
- 5. DC/OS、ElasticSearch、Kafka Connect、Kafka StreamsのCloudFormationテンプレートはありますか?
- 6. Oracle Streamsドライバはストリーミングをサポートしていますか?
- 7. Kafka Streams:データ型の変換方法は?
- 8. Akka Streamsを使用して簡単なTCPプロトコルを実装する方法は?
- 9. Java 8のLambdaとStreams ...「方法」ではなく「なぜ」?
- 10. Kafka Streams:レコードのタイムスタンプ(0.11.0)を変更する方法は?
- 11. EnumMap&streams
- 12. AWS Kinesis FirehoseとStreamsの処理時間に違いはありますか?
- 13. Akka Streams - FlowShapeをジャンプする
- 14. Kafka Streams DSLプロセッサにカスタムStateStoreを追加する方法は?
- 15. マップ内のすべてのList値が空であるか空でない場合は、Streamsを使用して
- 16. Akka StreamsとHTTPを使用してHTTPリソースをファイルにダウンロードする方法は?
- 17. StreamsでorElse(null)を使用する必要がありますか?
- 18. Akka StreamsをJMSに接続します。
- 19. Kafka Streams - グローバルメトリック集計の方法
- 20. Akka Streams、別のソースとしてアイテムをソースしますか?
- 21. .NET Streamsはメモリを節約しますか?
- 22. Akka Streams入門
- 23. Kafka Streams Cassandra Connector
- 24. Akka Streams Websocket Wiring
- 25. Solr Streams Mechanics
- 26. akka-streamsマップで非同期ドライバを使用する方法mapAsync
- 27. PlayFrameworkでAkka Streams SourceQueueを使用する方法
- 28. Akka StreamsでchopByを実装する
- 29. なぜAkka Streamsアプリケーションは正常に終了しないのですか?
- 30. Reactive Streams Processorをイベントバスとして使用するのは一般に問題ありませんか?
Scottさん、最初にそれをキャプチャするような直接的な方法はないことを明らかにしました。私はあなたが与えた解決策のようなものを持っていましたが、直接的な方法があれば複雑すぎないようにしたいと思っていました。素晴らしい助け! –