akka-stream

    0

    1答えて

    VisualVMを使用して、akka-streams-kafkaを使用するアプリケーションをプロファイルします。 それはカフカコーディネーターの多くは、これらのコーディネーターのスレッド を遮断示して? 私もまた、スレッド をブロックしている3人のカフカの消費者を持って、私は彼らのために別の実行コンテキストを作成する必要がありますか?

    0

    1答えて

    私は、ユーザーデータの負荷が大きいです。私はそれが新しいユーザーかどうかをIDで判断したいと思います。 dbへの呼び出しを減らすために、私は以前のユーザーの状態を維持します。 val users = mutable.set[String]() //init the state from db user = db.getAllUsersIds() val source: Source[User

    2

    1答えて

    私はカフカサーバを設定し、プロデューサを使用してメッセージを送信することで自分のコードをローカルでテストしようとしましたが、これに対してユニットテストを書く方法があるかどうかは疑問ですコードの一部(消費者が受け取ったメッセージが正しいかどうかをテストする)。 val consumerSettings = ConsumerSettings(system, new ByteArrayDese

    0

    1答えて

    FlowOps#concat状態のドキュメント その...ソースは、このフローと一緒にマテリアライズド、ちょうどその時が来るまで背圧をアサートすることによって生成エレメントから保たれています。言い換えれば 、私は sourceA.concat(sourceB) を持っているならば、sourceBはsourceAと同時にマテリアライズされます。 sourceAが完了するまでsourceBを具体化

    0

    1答えて

    Iveはこのような数千の行を含むファイルを取得しました。 Mr|David|Smith|[email protected] Mrs|Teri|Smith|[email protected] ... 私は、各行をダウンストリームに放出するが、抑制された方法でファイルを読みたい。 1 /秒。 私は、流れの中で調整がどのように働くかを理解できません。 flow1(下記)は、1秒後に最初の行を出力

    0

    1答えて

    は、Excelシートをアップロードし、ローカルに保存するため、サーバ側で def uploadFile(fileData: Multipart.FormData) = { println(" uploadFile ") // path("user"/"upload"/"file") { /* (post & entity(as[Multipart.FormData])) { fileData

    0

    1答えて

    私はAkka Streamsを使用して、同時にリクエストをサーバーに送信し、各リクエストをオリジナルのコンテキスト(この例ではInt)に関連付けることを試みています。これは私が特に val createRequestFlow: Flow[(String, String), (HttpRequest, Int), _] = Flow.fromFunction[(String, String), (H

    0

    1答えて

    mapAsyncのようにストリームの一部を並列化したいが、未来はない。 現在のところ、以下の解決策がありますが、これにはプレーンなflatMapConcatがないマテリアライザーが必要です。 def flatMapConcatParallel[In, Out](parallelism: Int)(f: In => Source[Out, _])(implicit mat: Materializer

    4

    1答えて

    私はブロードキャストとジップの内側にフローグラフを持っています。何か(それが何であるかにかかわらず)がこの流れの中で失敗するなら、問題のある要素をそれに渡して再開したいと思います。私は印刷にそれを期待する val flow = Flow.fromGraph(GraphDSL.create() { implicit builder => import GraphDSL.Implicits.

    0

    2答えて

    akkaストリームで私の最初のステップを得る。私はhereからコピーされ、これと同様のグラフを持っている: val topHeadSink = Sink.head[Int] val bottomHeadSink = Sink.head[Int] val sharedDoubler = Flow[Int].map(_ * 2) val g = RunnableGraph.fromGraph(