2016-04-26 10 views
2

私は、AkkaストリームからSourceSinkを受け入れ、コンパイル時にソースとシンクに一致する型があることを検証する単純なビルダーを持っています。シェイプレスHLISTを使用した汎用ストリームビルダー

class EventProcessorTask(config: EventProcessorConfig =  
    EventProcessorConfig()) { 
    def source[In, MatIn](source: Source[In, MatIn]): SourcedTask[In, MatIn] = new SourcedTask[In, MatIn](source, config) 
} 

class SourcedTask[In, MatIn](source: Source[In, MatIn], config: EventProcessorConfig) { 
    def withPartitioning[Id](partitioningF: In => Id): SourcedTaskWithPartitioning[In, MatIn, Id] = 
    new SourcedTaskWithPartitioning[In, MatIn, Id](source, partitioningF, config) 
} 

class SourcedTaskWithPartitioning[In, MatIn, Id](source: Source[In, MatIn], partitioningF: In => Id, config: EventProcessorConfig) { 
    def withSink[Out, T](sink: Sink[Out, T]): WiredTask[In, MatIn, Out :: HNil, Id, Sink[Out, T] :: HNil] = 
    new WiredTask[In, MatIn, Out :: HNil, Id, Sink[Out, T] :: HNil](source, sink :: HNil, partitioningF, config) 
} 

class WiredTask[In, MatIn, L <: HList, Id, SinksTypes <: HList](
                  source: Source[In, MatIn], 
                  sinks: SinksTypes, 
                  partitioningF: In => Id, 
                  config: EventProcessorConfig 
                  ) { 
    def withSink[Out, T](sink: Sink[Out, T]): WiredTask[In, MatIn, Out :: L, Id, Sink[Out, T] :: SinksTypes] = 
new WiredTask[In, MatIn, Out :: L, Id, Sink[Out, T] :: SinksTypes](
    source, sink :: sinks, partitioningF, config 
) 

    def execute[N <: Nat, P <: Product, F, R <: HList, SinksRev <: HList] 
    (executionMethod: In => Future[P])(
    implicit generic: Generic.Aux[P, R], 
    rev: Reverse.Aux[L, R], 
    sinksRev: Reverse.Aux[SinksTypes, SinksRev], 
    executionContext: ExecutionContext, 
    l: Length.Aux[SinksRev, N] 
    ): Unit = { 
    val sinksReversed = sinksRev(sinks) 

// val sinksLength= sinksReversed.length.toInt 
} 
} 

コンパイル上記のコードが、私は、私もリスト(コメントアウトコード)のサイズを取得することはできませんSinksためBroadcastを構築してみてください。次のステップは、SinksRevSinksのすべてをPの対応するタイプに一致させることです。Pタイプに対応するSinksにタプルを返すexecutionMethodによって生成されたメッセージを送信できます。

I.e.

new EventProcessorTask() 
    .source(Source.single("str")) 
    .withPartitioning(r => 1) 
    .withSink(Sink.head[Long]) 
    .withSink(Sink.foreach((s: String) =>())) 
    .execute(
    in => Future.successful((null.asInstanceOf[Long], null.asInstanceOf[String])) 
) 

Longは、第1に最初SinkStringに行く必要があります。

ご協力いただければ幸いです。私はここで何か非常に間違っているかもしれないかもしれませんが、私がこれに取り組んでいた時点でコンセプトは素晴らしかったようです。いずれにせよ、ここで私が何が欠けているのかを理解したい。

要約すると、 1. Intの表現はSinksRevのサイズにできません。 2. SinkSinksRevからPに対応する要素に一致させる方法は GraphShape

答えて

1

私は最近、シェイプレスで、akkaストリームになっています。そのため、私はこの質問を試してみることにしました。あなたのケースは非常に複雑に思えるので、私が理解したものを取り上げ、簡略化して、あなたがおそらく望むものと似たようなことをするようなコードで少し出てきました。私はまだ長さを計算することはできませんが、それは建築家だから+= 1で十分です。

シンクの代わりにオプションを使用した場合の結果です。オプションのリストを取り、オプションの内容に関数を適用します。私が言及したように、私はそのケースを単純化した。

import shapeless._ 

object Shapes extends App { 

    import ops.function._ 
    import syntax.std.function._ 

    case class Thing[Types <: HList, Out] private(sources: List[Option[_]]) { 

    def withOption[T](o: Option[T]) = Thing[T :: Types, Out](o :: sources) 

    def withOutput[T] = Thing[Types, T](sources) 

    def apply[F](f: F)(implicit fp: FnToProduct.Aux[F, Types => Out]) = { 
     val a: Types = sources.foldLeft[HList](HNil)((m, v) ⇒ v.get :: m).asInstanceOf[Types] 
     f.toProduct(a) 
    } 
    } 

    object Thing { 
    def withOption[T](o: Option[T]) = Thing[T :: HNil, AnyVal](o :: Nil) 
    } 

    val r = Thing 
    .withOption(Some(1)) 
    .withOption(Some(2)) 
    .withOption(Some(3)) 
    .withOutput[Unit] 
    .apply { 
     (x: Int, y: Int, z: Int) ⇒ println(x + y + z) 
    } 

    println(r) 
} 
関連する問題