2017-12-08 6 views
2

シンクのinvokeメソッドは、非同期ioを作る方法のようですか?例えば返信Future?例えば、Redisのコネクターが同期Redisのコマンドを実行するためにjedis libにを使用していますフリンクシンクはバイオのみをサポートしますか?

https://github.com/apache/bahir-flink/blob/master/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisSink.java

そして、それは、コマンドごとにRedisのサーバからネットワークの応答を待っているFLINKのタスクスレッドをブロックします!シンクを持つ同じスレッドで実行されている他の演算子は可能ですか?もしそうなら、それもそれをブロックするだろうか?

私はflinkがasyncio APIを持っていることを知っていますが、シンクimplで使われていないようですね? @Dexterが述べたように

https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/asyncio.html

+0

Jedisはあなたが@Dexterは、あなたが簡単な例を与えることができる「RichAsyncFunction」 – Dexter

+1

何の非同期インターフェイスを持っていませんか? – Dexter

+0

を使用する独自のコネクタのimplを書くことができます – kingluo

答えて

1

、あなたはRichAsyncFunctionを使用することができます。ここではサンプルコードは(それを動作させるために更なるアップデートが必要な場合があります。)である

AsyncDataStream.orderedWait(ds, new RichAsyncFunction<Tuple2<String,MyEvent>, String>() { 
     transient private RedisClient client; 
     transient private RedisAsyncCommands<String, String> commands; 
     transient private ExecutorService executor; 

     @Override 
     public void open(Configuration parameters) throws Exception { 
      super.open(parameters); 

      client = RedisClient.create("redis://localhost"); 
      commands = client.connect().async(); 
      executor = Executors.newFixedThreadPool(10); 
     } 

     @Override 
     public void close() throws Exception { 
      // shut down the connection and thread pool. 
      client.shutdown(); 
      executor.shutdown(); 

      super.close(); 
     } 

     public void asyncInvoke(Tuple2<String, MyEvent> input, final AsyncCollector<String> collector) throws Exception { 
      // eg.g get something from redis in async 
      final RedisFuture<String> future = commands.get("key"); 
      future.thenAccept(new Consumer<String>() { 
       @Override 
       public void accept(String value) { 
        collector.collect(Collections.singletonList(future.get())); 
       } 
      }); 
     } 
    }, 1000, TimeUnit.MILLISECONDS); 
+0

'thenAccept'と' addListener'の違いは何ですか? – kingluo

+0

ここで 'addListener'はGUAVAの' ListenableFuture'からのもので、どこから 'thenAccept'が来るのかわかりませんが、' ListenableFuture'の 'addListener'と似た機能を持つ' CompletableFuture'を参照してください。 – David

+0

redisクライアントとしてlettuceを使用していますか?返された未来には、完了コールバックをバインドするための 'thenAccept'メソッドがあります。https://github.com/lettuce-io/lettuce-core/wiki/Asynchronous-API#consuming-futures – kingluo

関連する問題