それ以外の点の例では、実際にCSVを生成する強力な方法ではないという事実(適切なエスケープを提供していない)から、あなたはそれにヘッダを追加するには少し手直しする必要があります。私はどうなるのかここにあります:
Flow
は、例えば、CSV行のソースにSource[Tweet]
を変換するために作ります case class Tweet(uid: String, txt: String)
def getTweets: Source[Tweet, NotUsed] = ???
val tweetToRow: Flow[Tweet, List[String], NotUsed] =
Flow[Tweet].map { t =>
List(
t.uid,
t.txt.replaceAll(",", "."))
}
// provide a marshaller from a row (List[String]) to a ByteString
implicit val tweetAsCsv = Marshaller.strict[List[String], ByteString] { row =>
Marshalling.WithFixedContentType(ContentTypes.`text/csv(UTF-8)`,() =>
ByteString(row.mkString(","))
)
}
// enable csv streaming
implicit val csvStreaming = EntityStreamingSupport.csv()
val route = path("tweets") {
val headers = Source.single(List("uid", "text"))
val tweets: Source[List[String], NotUsed] = getTweets.via(tweetToRow)
complete(headers.concat(tweets))
}
:Source[List[String]]
- はここにいくつかのコード例があります行ではなく、ツイート
のソースをレンダリングするマーシャラーを適応させる単一List[String]
として、あなたのヘッダーを含むソースに連結します更新:あなたのgetTweets
メソッドがFuture
を返す場合は、そのソース値をマップして、そのようにヘッダーを追加することができます。例えば、 val route = path("tweets") {
val headers = Source.single(List("uid", "text"))
val rows: Future[Source[List[String], NotUsed]] = getTweets
.map(tweets => headers.concat(tweets.via(tweetToRow)))
complete(rows)
}
ありがとうございます。最後の質問、私は実際にmongodbからこの方法でソースとしてデータを返すアクターを持っています:(MyView?GetStuff())。mapTo [Source [MyObject、NotUsed]] - これをtweetToRowメソッドにどのようにリダイレクトしますか?それは未来を返すということです。ありがとう –
@HasseneBenAmaraは答えを – Mikesname
あなたの助けをもう一度ありがとう。実際にvia()関数は正しい方法ではないように見えます...これはプロジェクトをビルドするときに得られるものです...エラー:(155、87)型の不一致。 が見つかりました:akka.stream.scaladsl.Flow [MyObject、List [String]、akka.NotUsed] 必須:akka.stream.Graph [akka.stream.FlowShape [java.io.Serializable ,?] ,?を持つ製品] ビア署名を二重チェックしましたが、コードをリファクタリングして動作させる方法がわかりません。私はまだakkaストリームとスカラの新しいですので、本当にあなたの助けをありがとう! –