2017-05-18 3 views
1

CSVファイルを取得し、toXml関数を持つ既存のオブジェクトを使用してXMLに変更し、これをエンドポイントに通知します。それはakka.stream.scaladsl.Flow[(akka.http.scaladsl.model.HttpRequest, com.cogpp.exp.Thing),(scala.util.Try[akka.http.scaladsl.model.HttpResponse],が見つかりましたが、ビアのこのバージョンはakka.stream.Graph[akka.stream.FlowShape[akka.http.scaladsl.model.HttpRequest,?],?]を期待するとして、それは.via(poolClientFlow)への呼び出しとしてコンパイルされません。しかしデータを投稿するためのAkkaストリームは、フローを経由しないグラフを期待しています

val poolClientFlow = 
    Http().cachedHostConnectionPool[Thing]("localhost",5000) 

val file = new File("./example.csv") 

val uploadPipeline = 
    FileIO.fromFile(file) 
    .via(Framing.delimiter(ByteString("\n"), maximumFrameLength = 1024)) 
    .map(_.utf8String) 
    .map(_.split(",")) 
    .map(t => Thing(t(0),t(1).toInt,t(2).toInt)) 
    .map(_.toXml) 
    .map(_.toString) 
    .map(ByteString(_)) 
    .map(d =>HttpRequest(method=HttpMethods.POST,uri=s"/import",entity = d)) 
    .via(poolClientFlow) 
    .runForeach(x => System.out.println(x.toString())) 

- :私が作成したコードは次のようになります。

poolClientFlowを正しく構築していないと思いますが、私が行ったことと他のサンプルコードで見たこととの違いはわかりません。誰も助けることができますか?

答えて

1

からのフローcachedHostConnectionPool[T]は、(HttpRequest,T)のタプルを取ります。これにより、最終的に結果が(Try[HttpResponse], T)になるときに、要求のコンテキストを維持できます。必要がない場合は、ユニット()を渡してください。

リクエストを受け取るだけのAPIがあるかどうかわかりません。

することができますコンパイルあなたの例を取得するには...

.map(d => (HttpRequest(method=HttpMethods.POST,uri=s"/import",entity = d), d)) 
.via(Http().cachedHostConnectionPool[ByteString]("localhost",5000)) 

また、私、あなた、私はそのコードにはそれほど多くの.map年代を持っているwouldntの場合。あなたのserialising isntブロックと本当にそれらのステップのすべての間に背圧が必要です。私は純粋な関数を持っていただろうdef write(t: Thing): HttpRequest。しかし、それは大きな問題はありません...

+0

ありがとう!それはそれを修正しました!私はあなたが非常に多くの地図を持っていることは間違いないと思う、おそらくオーバーヘッドを追加するだけです。 – cogpp

関連する問題