akka-stream

    3

    1答えて

    に一致する私が思い付くことができPartitionとMergeを使用して、最も簡単なグラフですが、実行すると、それは次のエラー得られます。 requirement failed: The inlets [] and outlets [] must correspond to the inlets [Merge.in0, Merge.in1] and outlets [Partition.out0,

    0

    1答えて

    私はあなたのステージの1つで、InputStreamを返す呼び出しを行う必要がある状況を処理する方法を理解しようとしています。ここで、そのストリームをステージのソースとして扱いますそれはさらに下に来る。 Source.map(e => Calls that return an InputStream) .via(processingFlow).runwith(sink.ignore) 私は要

    0

    1答えて

    私は、レコードの長さを指定するフィールドに関する以下の仕様を持つMarc21バイナリデータレコードをデコードしようとしています。 全体の長さに等しい、コンピュータ生成の5文字の数字。それ自体とレコードターミネータを含みます。番号 は右詰めであり、未使用位置にはゼロが含まれています。 私は、しかし、私はちょうどそのフィールドのサイズを指定する方法がわからない、 アッカストリームFraming.len

    0

    1答えて

    MergeHubの使い方は混乱しています。 は、私は与えられた関数が別のフローグラフを作成し、Sink.ignore()でそれを実行して、CompletionStageFlow.mapAsync()の値としてはを待つことを返すFlow.mapAsync()を、使用するフローグラフを設計しています。入れ子になったフローは、MergeHubのマテリアライズから返されたSinkを介して要素を返します。

    0

    1答えて

    私は基本的なスカラのakka http CRUDアプリケーションを持っています。関連するクラスについては以下を参照してください。 エンティティが作成/更新されるたびに、エンティティIDといくつかのデータ(jsonとして)をKafkaトピックに書き込むだけです。 私はhttp://doc.akka.io/docs/akka-stream-kafka/current/producer.htmlを見てい

    0

    1答えて

    akka-http websocketアプリでは、指定されたメッセージを返信するルートがあり、アプリケーションで状態を維持する必要があります。そこで、以下では正常に動作します: override protected def routes: Route = pathSuffix("echo") { handleWebSocketMessages(echoMessageFlow)

    1

    1答えて

    私はAkka Streamsアプリケーションとそれを正常に動作させました。 私がしたいことは、JMXコンソールをAkka Streamsアプリケーションを実行しているJVM intanceに接続して、シンクとフローに入ってくるメッセージの量を調べることです。 これは可能ですか?私はグーグルではありますが、具体的な方法は見つけられませんでした。 私のアプリケーションの最終段階は、カサンドラデータベー

    1

    1答えて

    私はPlay Framework 2.6(Scala)とAlpakka AWS S3 Connectorを使用して、ファイルをS3バケットに非同期でアップロードします。私のコードは次のようになります: def richUpload(extension: String, checkFunction: (String, Option[String]) => Boolean, cannedAcl: Ca

    0

    1答えて

    私は、アッカのストリームを使ってSQSを聞くしようとしていると私はそれが、このコードスニペットを使用して、Q だからメッセージを取得: もちろん、このコードスニペットは、メッセージの1対1(それをACK)を取得:を implicit val system = ActorSystem() implicit val mat = ActorMaterializer() implicit val ec