2017-03-02 12 views
1

私はMongodbからreactivemongo-akkastream 0.12.1を使用してデータをストリーミングしようとしています(Akka-httpを使用して)その結果をCSVストリームに返します。AKKA-HTTPを使用したCSVソースのストリーミング

http://doc.akka.io/docs/akka-http/10.0.0/scala/http/routing-dsl/source-streaming-support.html#simple-csv-streaming-example

、それが正常に動作になります。 私はここexempleを、以下のことを実施しました。

私が今直面している唯一の問題は、出力CSVファイルにヘッダーを追加する方法です。何か案は?

おかげ

答えて

2

それ以外の点の例では、実際にCSVを生成する強力な方法ではないという事実(適切なエスケープを提供していない)から、あなたはそれにヘッダを追加するには少し手直しする必要があります。私はどうなるのかここにあります:

  1. Flowは、例えば、CSV行のソースにSource[Tweet]を変換するために作ります

    case class Tweet(uid: String, txt: String) 
    
    def getTweets: Source[Tweet, NotUsed] = ??? 
    
    val tweetToRow: Flow[Tweet, List[String], NotUsed] = 
        Flow[Tweet].map { t => 
        List(
         t.uid, 
         t.txt.replaceAll(",", ".")) 
        } 
    
    // provide a marshaller from a row (List[String]) to a ByteString 
    implicit val tweetAsCsv = Marshaller.strict[List[String], ByteString] { row => 
        Marshalling.WithFixedContentType(ContentTypes.`text/csv(UTF-8)`,() => 
        ByteString(row.mkString(",")) 
    ) 
    } 
    
    // enable csv streaming 
    implicit val csvStreaming = EntityStreamingSupport.csv() 
    
    val route = path("tweets") { 
        val headers = Source.single(List("uid", "text")) 
        val tweets: Source[List[String], NotUsed] = getTweets.via(tweetToRow) 
        complete(headers.concat(tweets)) 
    } 
    

    Source[List[String]]

  2. はここにいくつかのコード例があります行ではなく、ツイート

のソースをレンダリングするマーシャラーを適応させる単一List[String]

  • として、あなたのヘッダーを含むソースに連結します更新:あなたのgetTweetsメソッドがFutureを返す場合は、そのソース値をマップして、そのようにヘッダーを追加することができます。例えば、

    val route = path("tweets") { 
        val headers = Source.single(List("uid", "text")) 
        val rows: Future[Source[List[String], NotUsed]] = getTweets 
         .map(tweets => headers.concat(tweets.via(tweetToRow))) 
        complete(rows) 
    } 
    
  • +0

    ありがとうございます。最後の質問、私は実際にmongodbからこの方法でソースとしてデータを返すアクターを持っています:(MyView?GetStuff())。mapTo [Source [MyObject、NotUsed]] - これをtweetToRowメソッドにどのようにリダイレクトしますか?それは未来を返すということです。ありがとう –

    +0

    @HasseneBenAmaraは答えを – Mikesname

    +0

    あなたの助けをもう一度ありがとう。実際にvia()関数は正しい方法ではないように見えます...これはプロジェクトをビルドするときに得られるものです...エラー:(155、87)型の不一致。 が見つかりました:akka.stream.scaladsl.Flow [MyObject、List [String]、akka.NotUsed] 必須:akka.stream.Graph [akka.stream.FlowShape [java.io.Serializable ,?] ,?を持つ製品] ビア署名を二重チェックしましたが、コードをリファクタリングして動作させる方法がわかりません。私はまだakkaストリームとスカラの新しいですので、本当にあなたの助けをありがとう! –

    関連する問題