は、私は私のプロジェクトで使用していますいくつかのカフカチャンネルの階層構造を持っている:Scalaの形質型の不一致
私の基本特性は次のとおりです。
trait SendChannel[A, B] extends CommunicationChannel {
def send(data:A): B
}
今、私は、共通のカフカを持っているが
trait CommonKafkaSendChannel[A, B, Return] extends SendChannel[A, Return] {
val channelProps: KafkaSendChannelProperties
val kafkaProducer: Producer[String, B]
override def close(): Unit = kafkaProducer.close()
}
のようにチャンネルを送信します
CommanKafkaSendChannelには2つのバリエーションがあり、1つはコールバック、もう1つはFutureとなります。
trait KafkaSendChannelWithFuture[A, B] extends CommonKafkaSendChannel[A, B, Future[RecordMetadata]] {
override def send(data: A): Future[RecordMetadata] = Future {
kafkaProducer.send(new ProducerRecord[String, B](channelProps.topic)).get
}
}
KafkaSendChannelWithCallback
定義:
object KafkaSendChannelWithCallback {
def apply[A, B](oChannelProps: KafkaSendChannelProperties,
oKafkaProducer: Producer[String, B],
oCallback: Callback): KafkaSendChannelWithCallback[A, B] =
new KafkaSendChannelWithCallback[A,B] {
override val channelProps: KafkaSendChannelProperties = oChannelProps
override val kafkaProducer: Producer[String, B] = oKafkaProducer
override val callback: Callback = oCallback
}
}
trait KafkaSendChannelWithCallback[A, B] extends CommonKafkaSendChannel[A, B, Unit] {
val callback: Callback
override def send(data: A): Unit =
kafkaProducer.send(new ProducerRecord[String, B](channelProps.topic), callback)
}
今、私は以下のように実行時にチャネルの適切なタイプを選択し、設定値に基づいて。
val sendChannel = kafkaChannel.channel(config, actorSystem).fold(
error => {
logger.error("Exception while instantiating the KafkaSendChannel")
throw error
},
success => success
)
actor = actorSystem.actorOf(IngestionActor.props(config, sendChannel), name = ACTOR_NAME)
俳優の定義:
object IngestionRouterActor {
def props[V](config: Config, sendChannel: SendChannel[V, Unit]): Props =
Props(classOf[IngestionActor[V]], config, sendChannel)
}
私はKafkaSendChannelWithFuture
を使用するときに私のコードは、しかし、適切にコンパイルKafkaSendChannelWithCallback
使用する場合に問題がある、私はカフカにデータを送信するチャネルの右のタイプの俳優を作成していますそれはactor =
宣言にエラー下に私を与える:
[エラー] IngestionActor.scala:32:パターンの種類が予想される型と互換性がありません。 [エラー]実測値:KafkaSendChannelWithFuture [文字列、V] [エラー]必須:両方のチャネル定義は
SendChannel
から延長されているようSendChannel [V、単位]
、このコードは、エラーなしでコンパイルしているべきです。なぜそれがコンパイルされていないのか分かりません。ありがとうございます
こんにちは@chunjefを、ご回答に感謝を。 'Any'はスーパータイプなのですが、SendChannel [V、Unit]もSendChannel [V、Future [RecordMetadata]もSendChannel [V、 Any] 'これは偽ですか? – Explorer
私はあなたが提案した変更を加えました。まだ 'SendChannel [V、Unit] 'を持っている' props'に変更が必要ですか? – Explorer
@Explorer:私の答えをもっと慎重に読んでください。私は答えの両方のあなたのコメントに対処します。 – chunjef