2
私はcats効果シフト機能を使用してコードを非同期で実行しようとしています。猫効果からシフト機能を使用する
機能の実装:
def asyncSendMsg(producer: KkProducer)(record: KkRecord) : IO[Either[String, RecordMetadata]] =
for {
res <- trySendMsg(producer)(record).shift(BlockingFileIO).shift(Main)
} yield(res)
def trySendMsg(producer: KkProducer)(record: KkRecord): IO[Either[String, RecordMetadata]] =
IO {
try {
Right(producer.send(record).get())
} catch {
case e: Exception => Left(e.getMessage())
}
}
をコンパイルしようとすると、私が受け取る:
私はシフト機能を使用するためにインポートする必要がありますライブラリ[error] /home/developer/Desktop/scala/PureProducer/src/main/scala/TheProducer.scala:51:43: value shift is not a member of cats.effect.IO[Either[String,org.apache.kafka.clients.producer.RecordMetadata]]
[error] res <- trySendMsg(producer)(record).shift(BlockingFileIO).shift(Main)
[error] ^
[error] one error found
[error] (compile:compileIncremental) Compilation failed
[error] Total time: 2 s, completed Nov 13, 2017 2:18:06 PM
?
val nonBlockingExecContext = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(10))
val res = for {
_ <- IO { println(Thread.currentThread().getName) }
_ <- IO.shift(nonBlockingExecContext)
_ <- IO { println(Thread.currentThread().getName) }
} yield()
res.unsafeRunSync()
収量:
メインスレッドに戻るにはどうすればよいですか? –
@zero_coding 'unsafeRunSync()'が返ってきたら、 'main'に戻ります。 –
そして、別のスレッドの計算値をメインスレッドの変数に代入しますか? –