2017-09-25 18 views
1

は、私は私のプロジェクトで使用していますいくつかのカフカチャンネルの階層構造を持っている:Scalaの形質型の不一致

私の基本特性は次のとおりです。

trait SendChannel[A, B] extends CommunicationChannel { 
    def send(data:A): B 
} 

今、私は、共通のカフカを持っているが

trait CommonKafkaSendChannel[A, B, Return] extends SendChannel[A, Return] { 
val channelProps: KafkaSendChannelProperties 
val kafkaProducer: Producer[String, B] 
override def close(): Unit = kafkaProducer.close() 
} 
のようにチャンネルを送信します

CommanKafkaSendChannelには2つのバリエーションがあり、1つはコールバック、もう1つはFutureとなります。

trait KafkaSendChannelWithFuture[A, B] extends CommonKafkaSendChannel[A, B, Future[RecordMetadata]] { 
override def send(data: A): Future[RecordMetadata] = Future { 
    kafkaProducer.send(new ProducerRecord[String, B](channelProps.topic)).get 
} 
} 

KafkaSendChannelWithCallback定義:

object KafkaSendChannelWithCallback { 

def apply[A, B](oChannelProps: KafkaSendChannelProperties, 
       oKafkaProducer: Producer[String, B], 
       oCallback: Callback): KafkaSendChannelWithCallback[A, B] = 
new KafkaSendChannelWithCallback[A,B] { 
    override val channelProps: KafkaSendChannelProperties = oChannelProps 
    override val kafkaProducer: Producer[String, B] = oKafkaProducer 
    override val callback: Callback = oCallback 
} 
} 

trait KafkaSendChannelWithCallback[A, B] extends CommonKafkaSendChannel[A, B, Unit] { 

    val callback: Callback 

override def send(data: A): Unit = 
kafkaProducer.send(new ProducerRecord[String, B](channelProps.topic), callback) 
} 

今、私は以下のように実行時にチャネルの適切なタイプを選択し、設定値に基づいて。

val sendChannel = kafkaChannel.channel(config, actorSystem).fold(
    error => { 
     logger.error("Exception while instantiating the KafkaSendChannel") 
     throw error 
    }, 
    success => success 
) 

actor = actorSystem.actorOf(IngestionActor.props(config, sendChannel), name = ACTOR_NAME) 

俳優の定義:

object IngestionRouterActor { 
    def props[V](config: Config, sendChannel: SendChannel[V, Unit]): Props = 
Props(classOf[IngestionActor[V]], config, sendChannel) 
} 

私はKafkaSendChannelWithFutureを使用するときに私のコードは、しかし、適切にコンパイルKafkaSendChannelWithCallback使用する場合に問題がある、私はカフカにデータを送信するチャネルの右のタイプの俳優を作成していますそれはactor =宣言にエラー下に私を与える:

[エラー] IngestionActor.scala:32:パターンの種類が予想される型と互換性がありません。 [エラー]実測値:KafkaSendChannelWithFuture [文字列、V] [エラー]必須:両方のチャネル定義はSendChannelから延長されているようSendChannel [V、単位]

、このコードは、エラーなしでコンパイルしているべきです。なぜそれがコンパイルされていないのか分かりません。ありがとうございます

答えて

1

IngestionActorPropsSendChannel[V, Unit]です。この引数にKafkaSendChannelWithCallbackを渡すことは、SendChannel[V, Unit]であるために機能します。

一方、KafkaSendChannelWithFutureSendChannel[V, Future[RecordMetadata]]です。 SendChannel[V, Future[RecordMetadata]]は、ではなく、aである。

一つの選択肢はAnyUnitFuture両方のスーパータイプであるため、SendChannel[V, Any]を取るためにPropsを再定義することです:

def props[V](config: Config, sendChannel: SendChannel[V, Any]): Props = ??? 

この時点でSendChannelは、ジェネリック型であることから、コンパイラはまだ不幸ですデフォルトでは不変です。つまり、SendChannel[V, Unit]でもSendChannel[V, Future[RecordMetadata]]SendChannel[V, Any]ではありません。

Bの前で+を追加することによって)第二種のパラメータにSendChannel共変を作り、それを変更するには:

trait SendChannel[A, +B] extends CommunicationChannel { 
    def send(data: A): B 
} 
+0

こんにちは@chunjefを、ご回答に感謝を。 'Any'はスーパータイプなのですが、SendChannel [V、Unit]もSendChannel [V、Future [RecordMetadata]もSendChannel [V、 Any] 'これは偽ですか? – Explorer

+0

私はあなたが提案した変更を加えました。まだ 'SendChannel [V、Unit] 'を持っている' props'に変更が必要ですか? – Explorer

+0

@Explorer:私の答えをもっと慎重に読んでください。私は答えの両方のあなたのコメントに対処します。 – chunjef