1

Project reactorと1つの場所で私が少し苦労しているところは、Monoから来たものとFluxをどのように組み合わせるかです。ここに私のユースケースがあります:Fluxからの結果とMonoの結果を合わせる

public interface GroupRepository { 
     Mono<GroupModel> getGroup(Long groupId); 
} 

public interface UserRepository { 
     Flux<User> getUsers(Set<Long> userIds); 
} 

Mono<GroupModel> groupMono = getGroup(groupId); 
Flux<User> userFlux = getUsers(Set<Long> users); 
//run above instrtuction in parallel and associate user to group. 

今、私が達成したいものです。

どのように私はUserFluxからの応答を結合し、group.addUsers(userfromFlux)のようなもので、そのグループにそれらのユーザーを関連付けることができます。

誰かがuserFluxとgroupMonoからの結果をどのように組み合わせるかについて助けてもらえますか?私はZipのようなものを使うと思っていますが、ソースから1対1のマッピングをします。私の場合、1対Nマッピングを行う必要があります。ここで私は1つのグループを持っていますが、そのグループに追加する必要がある複数のユーザーです。 Mono<List<Users>を返してから、モノラルでzip演算子を使用して、ここで説明したようにコンビネータを提供することをお勧めします
public static <T1, T2, O> Flux<O> zip(Publisher<? extends T1> source1, Publisher<? extends T2> source2, final BiFunction<? super T1, ? super T2, ? extends O> combinator)

答えて

3

私はMonoが1つの要素しか放出しないので、静的方法があなたに役立つと思う:その要素は常にFluxからの各入力値と組み合わされたものになります。

Flux.combineLatest(arr -> new Combination((GroupModel) arr[0], (User) arr[1]), 
        groupMono, userFlux); 
+0

私の場合、グループモデル内の基になるデータ構造は(グループの一部であるユーザーを保持する)HashSetであり、スレッドセーフではないため、Flux.zip(groupMono、userMono、BiFunction)を使用して終了しました。グループ内のユーザー・フローから流れるユーザーを反応的な方法で追加します。だからむしろ、BiFunctionを使用して、別の方法でグループ内のユーザーを設定しています。助けてくれてありがとう、本当にあなたの助けに感謝! – Coder

0

他の人の回答を追加するには、Flux.zip(groupMono, userMono(holding list of users), this::biFunctionToPopulateGroupWithUsers)を使用しました。 @Simonによって提案されるのではなく、このアプローチを使用しました。これは、ユーザーを保持する基盤グループがHashSetであり、反応的な方法でユーザーを追加するとスレッドセーフではないためです。しかし、あなたがスレッドセーフなデータ構造を持っているなら、私は@Simonの提案を使用します。

関連する問題