2017-11-13 7 views
2

私はcats効果シフト機能を使用してコードを非同期で実行しようとしています。猫効果からシフト機能を使用する

機能の実装:

def asyncSendMsg(producer: KkProducer)(record: KkRecord) : IO[Either[String, RecordMetadata]] = 
    for { 
     res <- trySendMsg(producer)(record).shift(BlockingFileIO).shift(Main) 
    } yield(res) 

    def trySendMsg(producer: KkProducer)(record: KkRecord): IO[Either[String, RecordMetadata]] = 
    IO { 
     try { 
     Right(producer.send(record).get()) 
     } catch { 
     case e: Exception => Left(e.getMessage()) 
     } 
    } 

をコンパイルしようとすると、私が受け取る:

私はシフト機能を使用するためにインポートする必要がありますライブラリ
[error] /home/developer/Desktop/scala/PureProducer/src/main/scala/TheProducer.scala:51:43: value shift is not a member of cats.effect.IO[Either[String,org.apache.kafka.clients.producer.RecordMetadata]] 
[error]  res <- trySendMsg(producer)(record).shift(BlockingFileIO).shift(Main) 
[error]           ^
[error] one error found 
[error] (compile:compileIncremental) Compilation failed 
[error] Total time: 2 s, completed Nov 13, 2017 2:18:06 PM 

val nonBlockingExecContext = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(10)) 

val res = for { 
    _ <- IO { println(Thread.currentThread().getName) } 
    _ <- IO.shift(nonBlockingExecContext) 
    _ <- IO { println(Thread.currentThread().getName) } 
} yield() 

res.unsafeRunSync() 

収量:

答えて

1

IO.shift前あなたのIOコールを呼び出すにIOのコンパニオンオブジェクト(APIが猫のバージョン間で変更されている)上で定義され、あなたは理解のためにそれを使用することができます

main 
pool-1-thread-1 
+0

メインスレッドに戻るにはどうすればよいですか? –

+0

@zero_coding 'unsafeRunSync()'が返ってきたら、 'main'に戻ります。 –

+0

そして、別のスレッドの計算値をメインスレッドの変数に代入しますか? –

関連する問題