2017-02-19 15 views
1

私が書いているアプリケーションの一部は、クライアントからサーバーまで任意に大きな(この質問では私は100-200 GBと仮定します)ファイルを転送する必要があります。重要なことは、受信者(サーバー)がこのファイルを格納していないことです。ストリームを読み取り/検査して次のポイントに送信するだけです。私はファイル全体を必要とすることは一切ありませんが、同時に複数の転送を期待しているので、私はRAMの使用を最小限に抑え、ディスクの使用量を削減したいと思います。私は1 MBの塊でファイルを処理したいと思います。反応性のあるストリームで大きなファイルを送信

現在、サーバーはSpring BootとAkkaを使用しています。

私の最初の試みは、クライアント側でバッファリングされたファイル入力ストリームを開き、1MBのチャンクで読み込み、別のスレッドでメッセージで送信することでした。しかし問題は、サーバがそれを格納するためのバッファを持っているかどうか心配することなくクライアントがメッセージを送信していることです(背圧がかからない)。ここで述べたように、

akka-streams with akka-cluster

しかし:このようなActorPublisherの使用と

How to use Reactive Streams for NIO binary processing?

私の第二のアイデアは、このようなアッカ・ストリームを使用していた

http://doc.akka.io/docs/akka/2.4.16/scala/stream/stream-integrations.html#Implementing_Reactive_Streams_Publisher_or_Subscriber

"警告 ActorPublisherとActorSubscriberは、今後のバージョンのAkkaでは廃止される予定です。

ActorPublisherを警告し、反応ストリームプロトコル(例えば要求)の信号が失われた場合、ストリームがデッドロックする可能性があるためActorSubscriberは、リモートの俳優と一緒に使用することはできません。」

それは良いアイデアのようには見えません。

データをオンザフライで分析したいので、ストレージプロバイダ(Dropbox、Googleドライブなど)に保存したくないです。私はSpring 5とAkkaを搭載していますが、 Rawソケットには背圧がかからず、急流ではシーケンシャル/オーダーの読み書きが保証されません(必要なもの)

主な質問は、クライアントからサーバーに大きなファイルを流す方法です。サーバーはディスク上やラム内にファイルを一度に格納できないと仮定していますか?

ボーナスの質問:どのようにそのような転送でチャンクの "正しい"サイズを計算するのですか?

私は数日間の回答を探していましたが、このような問題のある唯一の人ではないように見えますが、他の適切な代替ソリューションを指摘することなく「それをしないでください。

+1

あなたの問題が何であるかは本当に明確ではありません。Akka-streamsには、必要なツールがすべて用意されています。TCPソケットの周りにストリームラッパーがあります(バックプレッシャーはもちろんあります)。処理を実装するために使用できる 'GraphStage'(これはActorSubscriberとActorPublisherの代わりとなります)があります。デフォルトのコンビネータのどれもあなたに合っていない場合は、ロジック。あなたはそれらを組み合わせる必要があります。 –

+0

あなたは親切で、私にリモートストリームのいくつかの最小限の例を教えてくれませんか? – spam

+1

確かに、ここにあります:https://gist.github.com/netvl/1245564b106c02691dd0808fe98d07eb。それはかなり汚れています(特にサーバーのシャットダウン処理の周り)ですが、基本的な考え方を伝えるはずです。これは通信に未処理のTCPソケットを使用します。 akka-httpを使用することもできます。これは、TLSの設定も簡単になるためです(raw TCPストリームでも可能ですが)。 TCPストリームは、[here](http://doc.akka.io/docs/akka/2.4/scala/stream/stream-io.html#streaming-tcp)、akka-httpは[here](http ://doc.akka.io/docs/akka-http/current/scala.html)。 –

答えて

3

Akkaストリームは、このユースケース専用の機能を提供します。streaming File IO。ドキュメントから:

import akka.stream.scaladsl._ 
val file = Paths.get("example.csv") 

val foreach: Future[IOResult] = 
    FileIO.fromPath(file) 
     .to(Sink.ignore) 
     .run() 

チャンクの「正しいサイズ」のためのあなたのボーナスの質問については、これはハードウェアとソフトウェアの構成に大きく依存します。あなたの最善の策は、テストクライアントを作成し、あなたのサーバーに "スイートスポット"が見つかるまで、チャンクサイズを調整することです。

関連する問題