2017-12-17 13 views
2

このストリームを消費するためのシンプルなakkaストリーム休止エンドポイントとクライアントを作成しようとしています。しかし、私はサーバーとクライアントを実行しようとすると、クライアントはストリームの一部だけを消費することができます。私は実行中に例外を見ることができません。ここでAkka http-clientはサーバーからのすべてのデータストリームを消費することはできません

は私のサーバーとクライアントです:

import akka.NotUsed 
import akka.actor.ActorSystem 
import akka.http.scaladsl.Http 
import akka.http.scaladsl.common.{EntityStreamingSupport, JsonEntityStreamingSupport} 
import akka.http.scaladsl.server.Directives._ 
import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport 
import akka.stream.{ActorAttributes, ActorMaterializer, Attributes, Supervision} 
import akka.stream.scaladsl.{Flow, Source} 
import akka.util.ByteString 
import spray.json.DefaultJsonProtocol 

import scala.io.StdIn 
import scala.util.Random 

object WebServer { 

    object Model { 
    case class Person(id: Int = Random.nextInt(), fName: String = Random.nextString(10), sName: String = Random.nextString(10)) 
    } 

    object JsonProtocol extends SprayJsonSupport with DefaultJsonProtocol { 
    implicit val personFormat = jsonFormat(Model.Person.apply, "id", "firstName", "secondaryName") 
    } 

    def main(args: Array[String]) { 

    implicit val system = ActorSystem("my-system") 
    implicit val materializer = ActorMaterializer() 

    implicit val executionContext = system.dispatcher 

    val start = ByteString.empty 
    val sep = ByteString("\n") 
    val end = ByteString.empty 

    import JsonProtocol._ 
    implicit val jsonStreamingSupport: JsonEntityStreamingSupport = EntityStreamingSupport.json() 
     .withFramingRenderer(Flow[ByteString].intersperse(start, sep, end)) 
     .withParallelMarshalling(parallelism = 8, unordered = false) 

    val decider: Supervision.Decider = { 
     case ex: Throwable => { 
     println("Exception occurs") 
     ex.printStackTrace() 
     Supervision.Resume 
     } 
    } 

    val persons: Source[Model.Person, NotUsed] = Source.fromIterator(
    () => (0 to 1000000).map(id => Model.Person(id = id)).iterator 
    ) 
     .withAttributes(ActorAttributes.supervisionStrategy(decider)) 
     .map(p => { println(p); p }) 


    val route = 
     path("persons") { 
     get { 
      complete(persons) 
     } 
     } 

    val bindingFuture = Http().bindAndHandle(route, "localhost", 8080) 

    println(s"Server online at http://localhost:8080/\nPress RETURN to stop...") 
    StdIn.readLine() 

    bindingFuture 
     .flatMap(_.unbind()) 
     .onComplete(_ => { 
     println("Stopping http server ...") 
     system.terminate() 
     }) 
    } 
} 

とクライアント:

import akka.actor.ActorSystem 
import akka.http.scaladsl.Http 
import akka.http.scaladsl.model.{HttpRequest, Uri} 
import akka.stream.{ActorAttributes, ActorMaterializer, Supervision} 

import scala.util.{Failure, Success} 

object WebClient { 
    def main(args: Array[String]): Unit = { 
    implicit val system = ActorSystem() 
    implicit val materializer = ActorMaterializer() 
    implicit val executionContext = system.dispatcher 


    val request = HttpRequest(uri = Uri("http://localhost:8080/persons")) 

    val response = Http().singleRequest(request) 

    val attributes = ActorAttributes.withSupervisionStrategy { 
     case ex: Throwable => { 
     println("Exception occurs") 
     ex.printStackTrace 
     Supervision.Resume 
     } 
    } 
    response.map(r => { 
     r.entity.dataBytes.withAttributes(attributes) 
    }).onComplete { 
     case Success(db) => db.map(bs => bs.utf8String).runForeach(println) 
     case Failure(ex) => ex.printStackTrace() 
    } 
    } 
} 

それは、100、1000年のために働く10 000名が、> 100 000' のために動作しません。 ストリームのためのいくつかの制限があるように見えますが、私はそれが

最終レコードは私のローカルマシン上のサーバによって印刷された見つけることができません(番号79101で)です: 人(79101、ⰷ瑐劲죗醂竜泲늎制䠸、䮳硝沢并⎗ᝨᫌꊭᐽ酡)

クライアント上

最後のレコードは、()番号79048である :それはなぜ起こるか

{"id":79048,"firstName":"췁頔䚐龫暀࡙頨捜昗㢵","secondaryName":"⏉ݾ袈庩컆◁ꄹ葪䑥Ϻ"} 

はたぶん誰かが知っていますか?

答えて

1

解決策が見つかりました。明示的にクライアントにr.entity.withoutSizeLimit()を追加する必要があります。その後、すべてが正常に動作します

関連する問題