2017-02-17 20 views
8

Spring 5では、残りのAPIの無効プログラミングスタイルがwebfluxで導入されています。私はそれ自体かなり新しく、データベースへの同期呼び出しをFluxまたはMonoに包み込むことは意味がありませんか?はいの場合は、これを行う方法です:spring webfluxとデータベースからの読み込み

@RestController 
public class HomeController { 

    private MeasurementRepository repository; 

    public HomeController(MeasurementRepository repository){ 
     this.repository = repository; 
    } 

    @GetMapping(value = "/v1/measurements") 
    public Flux<Measurement> getMeasurements() { 
     return Flux.fromIterable(repository.findByFromDateGreaterThanEqual(new Date(1486980000L))); 
    } 

} 

非同期CrudRepositoryのようなものがありますか?私はそれを見つけることができませんでした。

+0

JDBCコードは、任意の反応JDBCドライバがそこに存在しない、本質的に同期している(と私が今までには存在しないだろう)。このようにデータベースにアクセスすることは実際には意味がありません。 –

+0

私はFluxに慣れていませんが、Spring Data JPAの戻り値の型としてJava 8 Streamを使用できることはわかっています。 'Stream 'を返すことができます。このコメントが役立つかどうかは分かりません:) – burcakulug

+0

これは良いスタートですが、非同期ではないため、webflux非ブロッキングパラダイムを破るJDBC操作が進行中のときに、呼び出し側がブロックされます。 – NeilS

答えて

8

1つのオプションは、完全に非ブロッキングの代替SQLクライアントを使用することです。いくつかの例には、 https://github.com/mauricio/postgresql-asyncまたはhttps://github.com/finagle/rocが含まれます。もちろん、これらのドライバはデータベースベンダーによって正式にサポートされていません。また、機能は、HibernateやjOOQのような成熟したJDBCベースの抽象化と比べるとはるかに魅力的ではありません。

別のアイディアがScalaの世界から私に届きました。このアイデアは、ブロッキングコールを独立したThreadPoolにディスパッチして、ブロッキングコールとノンブロッキングコールを混在させないようにすることです。これにより、スレッドの総数を制御することが可能になり、いくつかの潜在的な最適化を伴って、CPUがメイン実行コンテキストでノンブロッキング・タスクを処理できるようになります。 実際にブロックしているSpring Data JPAなどのJDBCベースの実装があると仮定すると、非同期で実行され、専用のスレッドプールでディスパッチされます。

@RestController 
public class HomeController { 

    private final MeasurementRepository repository; 
    private final Scheduler scheduler; 

    public HomeController(MeasurementRepository repository, @Qualifier("jdbcScheduler") Scheduler scheduler) { 
     this.repository = repository; 
     this.scheduler = scheduler; 
    } 

    @GetMapping(value = "/v1/measurements") 
    public Flux<Measurement> getMeasurements() { 
     return Mono.fromCallable(() -> repository.findByFromDateGreaterThanEqual(new Date(1486980000L))).publishOn(scheduler); 
    } 

} 

JDBCのスケジューラは、接続数と同じサイズの専用スレッドプールを使用して構成する必要があります。

@Configuration 
public class SchedulerConfiguration { 
    private final Integer connectionPoolSize; 

    public SchedulerConfiguration(@Value("${spring.datasource.maximum-pool-size}") Integer connectionPoolSize) { 
     this.connectionPoolSize = connectionPoolSize; 
    } 

    @Bean 
    public Scheduler jdbcScheduler() { 
     return Schedulers.fromExecutor(Executors.newFixedThreadPool(connectionPoolSize)); 
    } 

} 

ただし、この方法では問題があります。主なものはトランザクション管理です。 JDBCでは、トランザクションは単一のjava.sql.Connection内でのみ可能です。 1つのトランザクションで複数の操作を行うには、接続を共有する必要があります。その間に何らかの計算をしたい場合は、接続を維持する必要があります。これはあまり効果的ではありません。その間に計算を実行している間、限られた数の接続をアイドル状態に保ちます。

この非同期JDBCラッパーの考え方は新しいものではなく、既にScalaライブラリSlick 3に実装されています。最後に、非ブロックJDBCがJavaロードマップに追加される可能性があります。 2016年9月にJavaOneで発表されたので、Java 10で見ることができます。

+0

CrudRepositoryを実装する推奨オプションまたはWIPプロジェクトはありますか?SQLプロジェクトでは、早ければJava 10までreactor/WebFluxを使用できないというのは残念です。 – NeilS

+0

複数の情報源からの@NeilSこの仕事は、少なくともPivotalによって計画されていないことがわかります。https://jira.spring.io/browse/DATAJPA-701 https://spring.io/blog/2017/02/23/spring-framework-5-0-m5-update#comment-3174616521 –

+0

コード例では、publishOn(スケジューラー)呼び出しが私には奇妙に見えます。私たちをあなたの代わりに呼び出すsubscribeOnは、このスケジューラ上でこのパブリッシャにrequest()を実行しますか? – etiennepeiniau

0

MongoとCassandraのリアクティブリポジトリインターフェイスをサポートしています。

Spring data MongoDb Reactive Interface

春データMongoDBは、プロジェクト炉・RxJava 1反応性のタイプと反応するリポジトリのサポートを提供します。反応性APIは、反応型間の反応型変換をサポートします。

public interface ReactivePersonRepository extends ReactiveCrudRepository<Person, String> { 

Flux<Person> findByLastname(String lastname); 

@Query("{ 'firstname': ?0, 'lastname': ?1}") 
Mono<Person> findByFirstnameAndLastname(String firstname, String lastname); 

// Accept parameter inside a reactive type for deferred execution 
Flux<Person> findByLastname(Mono<String> lastname); 

Mono<Person> findByFirstnameAndLastname(Mono<String> firstname, String lastname); 

@InfiniteStream // Use a tailable cursor 
Flux<Person> findWithTailableCursorBy(); 
} 

public interface RxJava1PersonRepository extends RxJava1CrudRepository<Person, String> { 

Observable<Person> findByLastname(String lastname); 

@Query("{ 'firstname': ?0, 'lastname': ?1}") 
Single<Person> findByFirstnameAndLastname(String firstname, String lastname); 

// Accept parameter inside a reactive type for deferred execution 
Observable<Person> findByLastname(Single<String> lastname); 

Single<Person> findByFirstnameAndLastname(Single<String> firstname, String lastname); 

@InfiniteStream // Use a tailable cursor 
Observable<Person> findWithTailableCursorBy(); 

}

関連する問題