私は、AkkaストリームからSource
とSink
を受け入れ、コンパイル時にソースとシンクに一致する型があることを検証する単純なビルダーを持っています。シェイプレスHLISTを使用した汎用ストリームビルダー
class EventProcessorTask(config: EventProcessorConfig =
EventProcessorConfig()) {
def source[In, MatIn](source: Source[In, MatIn]): SourcedTask[In, MatIn] = new SourcedTask[In, MatIn](source, config)
}
class SourcedTask[In, MatIn](source: Source[In, MatIn], config: EventProcessorConfig) {
def withPartitioning[Id](partitioningF: In => Id): SourcedTaskWithPartitioning[In, MatIn, Id] =
new SourcedTaskWithPartitioning[In, MatIn, Id](source, partitioningF, config)
}
class SourcedTaskWithPartitioning[In, MatIn, Id](source: Source[In, MatIn], partitioningF: In => Id, config: EventProcessorConfig) {
def withSink[Out, T](sink: Sink[Out, T]): WiredTask[In, MatIn, Out :: HNil, Id, Sink[Out, T] :: HNil] =
new WiredTask[In, MatIn, Out :: HNil, Id, Sink[Out, T] :: HNil](source, sink :: HNil, partitioningF, config)
}
class WiredTask[In, MatIn, L <: HList, Id, SinksTypes <: HList](
source: Source[In, MatIn],
sinks: SinksTypes,
partitioningF: In => Id,
config: EventProcessorConfig
) {
def withSink[Out, T](sink: Sink[Out, T]): WiredTask[In, MatIn, Out :: L, Id, Sink[Out, T] :: SinksTypes] =
new WiredTask[In, MatIn, Out :: L, Id, Sink[Out, T] :: SinksTypes](
source, sink :: sinks, partitioningF, config
)
def execute[N <: Nat, P <: Product, F, R <: HList, SinksRev <: HList]
(executionMethod: In => Future[P])(
implicit generic: Generic.Aux[P, R],
rev: Reverse.Aux[L, R],
sinksRev: Reverse.Aux[SinksTypes, SinksRev],
executionContext: ExecutionContext,
l: Length.Aux[SinksRev, N]
): Unit = {
val sinksReversed = sinksRev(sinks)
// val sinksLength= sinksReversed.length.toInt
}
}
コンパイル上記のコードが、私は、私もリスト(コメントアウトコード)のサイズを取得することはできませんSinks
ためBroadcast
を構築してみてください。次のステップは、SinksRev
のSinks
のすべてをP
の対応するタイプに一致させることです。P
タイプに対応するSinks
にタプルを返すexecutionMethod
によって生成されたメッセージを送信できます。
I.e.
new EventProcessorTask()
.source(Source.single("str"))
.withPartitioning(r => 1)
.withSink(Sink.head[Long])
.withSink(Sink.foreach((s: String) =>()))
.execute(
in => Future.successful((null.asInstanceOf[Long], null.asInstanceOf[String]))
)
Long
は、第1に最初Sink
とString
に行く必要があります。
ご協力いただければ幸いです。私はここで何か非常に間違っているかもしれないかもしれませんが、私がこれに取り組んでいた時点でコンセプトは素晴らしかったようです。いずれにせよ、ここで私が何が欠けているのかを理解したい。
要約すると、 1. Int
の表現はSinksRev
のサイズにできません。 2. Sink
をSinksRev
からP
に対応する要素に一致させる方法は GraphShape
?