akka-stream

    0

    1答えて

    から同じ要素をフィルタリングするためにどのように二つのソースがあります。 val s1 = Source(List(2, 4)) val s2 = Source(List(1, 2, 3, 4, 5)) s1に存在している s2からの要素をフィルタリングする方法。上記の例の場合、それは次のようになります。 val s2Filtered = Source(List(1, 3, 5)) 別の

    1

    1答えて

    Future[Int]をこのようにして保存する方法を教えてください。 val test: Sink[Int, NotUsed] = MergeHub.source[Int].grouped(100).to(Sink.fold(0L) { case (count, items) => count + items.sum }).run() Sink[Int, Future[Int]]を取

    2

    1答えて

    Iチェーン連続する各ステージは、前のSuccess場合を処理し、終了時にSinkを総称全てFailureを処理フォームa -> Try[b]のFlow年代の一連たいです。 これなどは簡潔にエンコードできますか?それは実際には線形の流れですが、どのステージでも放送と合併がいかに短いかわかりません。

    1

    1答えて

    Akkaストリーム背圧モデルまたは永続アクタを使用したakkaシャーディングを使用するためのものはありますか? 私には問題があるため、永続的なアクターを持つAkkShardingクラスタがあります。 (ジャーナルプラグインとしてcassandraを使用)。 多くの場合、複数のアクターを同時に作成する必要があります。 (たとえば、すべてのユーザにブロードキャストメッセージを送信する場合) と数千の永

    2

    3答えて

    私はsbtアセンブリで私のプロジェクトの太った瓶を作っています。ここに私のbuild.sbtです: name := "projName" version := "1.0" scalaVersion := "2.12.1" libraryDependencies ++= Seq( "com.typesafe.akka" %% "akka-stream" % "2.5.3",

    0

    1答えて

    私は、アッカのソースコードを経由して、誰かがここで何が起こっているかを私に説明できます私は、次の type Repr[+O] <: FlowOps[O, Mat] { type Repr[+OO] = FlowOps.this.Repr[OO] type Closed = FlowOps.this.Closed } type Closed に出くわしました?

    0

    1答えて

    私はS3アップロードを処理するためにAlpakkaを使い、Akka Steamsでダウンロードしたいと思っていました。しかし、私は、S3ClientがAkka Httpルートで作成したSourceを使用することに固執しました。私が取得エラーメッセージは次のとおりです。 [error] found : akka.stream.scaladsl.Source[akka.util.ByteString,

    0

    1答えて

    Alpakkaを使用してS3にファイルをアップロードし、同時にTikaと解析してMimeTypeを取得したいとします。 が、私は、現時点では、グラフの3つの部分を持っている: val fileSource: Source[ByteString, Any] // comes from Akka-HTTP val fileUpload: Sink[ByteString, Future[Multipa

    1

    1答えて

    私はAkka Streams 2.4.8をKafkaに対して使用し、At Once Once Delivery Semanticsでグラフを作成しようとしています。私のグラフのいくつかは、以下のようなものから始め:mapOneRecordToNOtherRecordsは、単一のメッセージを受け取り、で動作するように、別の種類のメッセージの任意の数「N」に変換し committableSource