2016-12-30 21 views
0


グリーティングすべて、
ファイル転送の問題を解決するのに役立つ必要があります。私は1つのホスト(ノード0)から別のホスト(ノード1)に1つの10MBバイナリファイルを転送するためにNettyコードを実装しましたが、ファイルの8.5KBのみが転送され、なぜか分かりません。私はChunkWriteHandlerを使用して、ChunkedNioFile経由でファイルの1MBのチャンクを送信しています(下記のコードを参照してください)。さらに、100MB、500MB、1GBなどの1MBを超えるファイルを転送しようとしましたが、8.5KBのファイルしか転送されませんでした。 ChunkedNioFileで指定されたチャンクサイズを1MBから512KB以下に減らすと、前のファイル転送の2倍の17KBが転送されます。また、ちょうどChunkedFileを使ってみましたが、同じ転送結果を受け取りました。ファイル名、ファイルサイズ(長さ)、ファイルのオフセット(読み書きを開始する場所)など、ファイルヘッダーを正常に転送して受信できますが、実際のファイルは数KBにすぎません。 誰かが何が起こっているのか、どうすればこの問題を解決できるのか教えていただけますか? (以下はコードです)。ファイルを理解して解決するNettyを使用して問題を転送する

、ありがとう

コードセットアップ:

  • FileSenderInitializer.java
  • FileSenderHandler.java
  • FileSender.java
  • FileReceiverInitializer.java
  • FileReceiverHandler。 Java
  • FileReceiver.java

FileSenderInitializer.java - 送信 - チャネルハンドラを有するチャネル・パイプラインを

を初期化パブリッククラスFileSenderInitializerがChannelInitializer {

 @Override 
     public void initChannel(SocketChannel ch) throws Exception { 
     ch.pipeline().addLast(
     //new LengthFieldPrepender(8), 
     new ChunkedWriteHandler(), 
     new FileSenderHandler()); 
     } 
     } 

FileSenderHandler.javaを拡張ファイルヘッダ情報 - ファイル名、オフセット、長さ、実際のファイル

@Override 
public void channelActive(ChannelHandlerContext ctx) throws Exception { 
try { 
String fileRequest = "ftp Node0/root/10MB_File.dat Node1/tmp/10MB_File_Copy.dat"; 

//Source File to send/transfer to the Destination Node 
String theSrcFilePath = "/root/10MB_File.dat"; 

//File Name to write on the destination node, once the file is received 
String theDestFilePath = "/tmp/10MB_File_Copy.dat"; 

//Get the source file to send 
File theFile = new File(theSrcFilePath); 
FileChannel theFileChannel = new RandomAccessFile(theFile, "r").getChannel(); 

//Get the length of the file 
long fileLength = theFileChannel.size(); 
//Get the offset 
long offSet = 0; 

//Copy the offset to the ByteBuf 
ByteBuf offSetBuf = Unpooled.copyLong(offSet); 
//Copy the file length to the ByteBuf 
ByteBuf fileLengthBuf = Unpooled.copyLong(fileLength); 

//Get the Destination Filename (including the file path) in Bytes 
byte[] theDestFilePathInBytes = theDestFilePath.getBytes(); 
//Get the length of theFilePath 
int theDestSize = theDestFilePathInBytes.length; 
//Copy the Dest File Path length to the ByteBuf 
ByteBuf theDestSizeBuf = Unpooled.copyInt(theDestSize); 
//Copy the theDestFilePathInBytes to the Byte Buf 
ByteBuf theDestFileBuf = Unpooled.copiedBuffer(theDestFilePathInBytes); 

//Send the file Headers: FileName Length, the FileName, the Offset and the file length 
ctx.write(theDestSizeBuf); 
ctx.write(theDestFileBuf); 
ctx.write(offSetBuf); 
ctx.write(fileLengthBuf); 
ctx.flush(); 

//Send the 10MB File in 1MB chunks as specified by the following chunk size (1024*1024*1) 
ctx.write(new ChunkedNioFile(theFileChannel, offSet, fileLength, 1024 * 1024 * 1)); 
ctx.flush(); 

}catch(Exception e){ 
System.err.printf("FileSenderHandler: Channel Active: Error: "+e.getMessage()); 
e.printStackTrace(); 
} 
} //End channelActive 

FileSender.java - ブートストラップチャネルとは別のホスト

public static void main(String[] args) throws Exception { 
    // Configure the client/ File Sender 
    EventLoopGroup group = new NioEventLoopGroup(); 
    try { 
    Bootstrap b = new Bootstrap(); 
    b.group(group) 
    .channel(NioSocketChannel.class) 
    .option(ChannelOption.TCP_NODELAY, true) 
    .handler(new FileSenderInitializer()); 

    // Start the client. 
    ChannelFuture f = b.connect(HOST, PORT).sync(); 

    // Wait until the connection is closed. 
    //f.channel().closeFuture().sync(); 
    } finally { 
    // Shut down the event loop to terminate all threads. 
    group.shutdownGracefully(); 
    } 
    } 
} 

FileReceiverInitializer.javaこのクライアント/ホストを接続する - チャネルハンドラ有するチャネル・パイプラインを初期化

public class FileReceiverInitializer extends ChannelInitializer<SocketChannel> { 

public FileReceiverInitializer(){ 

} 

@Override 
public void initChannel(SocketChannel ch) throws Exception { 
ch.pipeline().addLast( 
    //Read in 1MB data at a time (which is the max frame length), length field offset starts at 0, length of the length field is 8 bits, length adjustment is 0, strip the 8 bits representing the length field from the frame 
//new LengthFieldBasedFrameDecoder(1024*1024*1, 0, 8, 0, 8), 
new FileReceiverHandler()); 
} 
} 

FileReceiverHandler.javaは、 - ファイル名、オフセット、長さと実際のファイル

public void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception { 
while (msg.readableBytes() >= 1){ 
    //Read in the size of the File Name and it's directory path 
    if (!fileNameStringSizeSet) { 
    fileNameStringSizeBuf.writeBytes(msg, ((fileNameStringSizeBuf.writableBytes() >= msg.readableBytes()) ? msg.readableBytes() : fileNameStringSizeBuf.writableBytes())); //INT_SIZE = 4 & LONG_SIZE = 8 (the byte size of an int and long) 
    if (fileNameStringSizeBuf.readableBytes() >= INT_SIZE) { 
     fileNameStringSize = fileNameStringSizeBuf.getInt(fileNameStringSizeBuf.readerIndex());//Get Size at index = 0; 
     fileNameStringSizeSet = true; 
    //Allocate a byteBuf to read in the actual file name and it's directory path 
     fileNameStringBuf = ctx.alloc().buffer(fileNameStringSize); 
    } 
    } else if (!readInFileNameString) { 
    //Read in the actual file name and it's corresponding directory path 
    fileNameStringBuf.writeBytes(msg, ((fileNameStringBuf.writableBytes() >= msg.readableBytes()) ? msg.readableBytes() : fileNameStringBuf.writableBytes())); 
    if (fileNameStringBuf.readableBytes() >= fileNameStringSize) { 
     readInFileNameString = true; 
     //convert the data in the fileNameStringBuf to an ascii string 
     thefileName = fileNameStringBuf.toString(Charset.forName("US-ASCII")); 

     //Create file 
     emptyFile = new File(thefileName); //file Name includes the directory path 
     f = new RandomAccessFile(emptyFile, "rw"); 
     fc = f.getChannel(); 
    } 
}else if (!readInOffset) { 
    offSetBuf.writeBytes(msg, ((offSetBuf.writableBytes() >= msg.readableBytes()) ? msg.readableBytes() : offSetBuf.writableBytes())); 
    if (offSetBuf.readableBytes() >= LONG_SIZE) { 
    currentOffset = offSetBuf.getLong(offSetBuf.readerIndex());//Get Size at index = 0; 
    readInOffset = true; 
    } 

} else if (!readInFileLength) { 
    fileLengthBuf.writeBytes(msg, ((fileLengthBuf.writableBytes() >= msg.readableBytes()) ? msg.readableBytes() : fileLengthBuf.writableBytes())); 
    //LONG_SIZE = 8 
    if (fileLengthBuf.readableBytes() >= LONG_SIZE) { 
    fileLength = fileLengthBuf.getLong(fileLengthBuf.readerIndex());//Get Size at index = 0; 
    remainingFileLength = fileLength; 
    readInFragmentLength = true; 
    } 
} else { 
    if (!readInCompleteFile) { 
    if (msg.readableBytes() < remainingFileLength) { 
     if (msg.readableBytes() > 0) { 
     currentFileBytesWrote = 0 
     while (msg.readableBytes >= 1){ 
      int fileBytesWrote = fc.write(msg.nioBuffer(msg.readerIndex(), msg.readableBytes()), currentOffset); 
      currentOffset += fileBytesWrote; 
      remainingFileLength -= fileBytesWrote; 
      msg.readerIndex(msg.readerIndex + fileBytesWrote); 
     } 
     } 
    } else { 
     int remainingFileLengthInt = (int) remainingFileLength; 
     while (remainingFileLength >= 1){ 
     int fileBytesWrote = fc.write(msg.nioBuffer(msg.readerIndex(), remainingFileLengthInt), currentOffset); 

     currentOffset += fileBytesWrote; 
     remainingFileLength -= fileBytesWrote; 
     remainingFileLengthInt-= fileBytesWrote; 
     msg.readerIndex(msg.readerIndex + fileBytesWrote); 
     } 

     //Set readInCompleteFile to true 
     readInCompleteFile = true; 

    } 
    }//End else if file chunk 
    }//End Else 
}//End While 
}//End Read Method 

FileReceiver - ファイルのヘッダ情報を受信します。javaの - ブートストラップサーバと受け付ける接続

public static void main(String[] args) throws Exception { 
// Configure the server 
EventLoopGroup bossGroup = new NioEventLoopGroup(1); 
EventLoopGroup workerGroup = new NioEventLoopGroup(); 
try { 
ServerBootstrap b = new ServerBootstrap(); 
b.group(bossGroup, workerGroup) 
.channel(NioServerSocketChannel.class) 
.handler(new LoggingHandler(LogLevel.INFO)) 
.childHandler(new FileReceiverInitializer()) 
.childOption(ChannelOption.AUTO_READ, true) 
.bind(LOCAL_PORT).sync().channel().closeFuture().sync(); 
} finally { 
bossGroup.shutdownGracefully(); 
workerGroup.shutdownGracefully(); 
} 
} 

-- 

答えて

0

おそらく、私が間違っているが、以下は、私には奇妙です:

 int fileBytesWrote = fc.write(msg.nioBuffer(msg.readerIndex(), msg.readableBytes()), currentOffset); 
     currentOffset += fileBytesWrote; 
     remainingFileLength -= fileBytesWrote; 
     msg.readerIndex(msg.readerIndex + fileBytesWrote); 
     // msg.readerIndex (or msg.readerIndex() ?) changed already 

あなたはこの割り当てを行う前にバックアップreaderIndex()の値にもできます。

数KBのいずれかに関連すると思われる持つ: - (?のみ最初の1) あなたがなりたかったとして、あなたはすべてのパケットを消費しない - あなたがいない正しくそれを読んで(私はコードの容疑者として、いくつかのバイトをスキップ

各読み取り操作(サーバー側)をトレースできますか?それは、(あなたが受け取ったバイト数、あなたが書いた数、例えば、readerIndex/readableBytes/offsetが何であるかを知るのに役立ちます)。

0

もう1つの理由が考えられます。クライアント側では、一度接続したグループをすぐにシャットダウンします。クライアントが転送を「中断」する可能性があるため、サーバーが完全転送を行えないため、それが理由である可能性があります。

+0

ありがとうございますフレデリック –

0

問題は、メインのFileSender.javaアプリケーションがコードの実行を終了したときに終了するため、FileSenderHandlerが終了するという問題でした。しかし、主なFileSender.javaアプリケーションの終了をブロックするために、私は次のステートメントを使用しました:f.channel()。closeFuture()。sync();. fは、呼び出しによってサーバーに接続することからレンダリングされるChannelFutureです。b.connect(HOST、PORT).sync();これにより、FileSenderが保持され、fileSenderHandlerが早期に終了することなくすべての情報を送信できるようになります。

しかし、私の新しい質問は次のとおりです。アプリケーションがチャネルを閉じて、メインアプリケーションがすべてのデータを送信し、確認応答を受け取った後でブロック解除する方法を教えてください。現在、f.channel()。closeFuture()。sync();の呼び出しがブロックされています。すべてのデータを送って確認を受け取ったら、どうすればメインアプリケーションをブロックすることができますか?私はチャネルを閉じると、closeFutureがtrueとして返され、メインアプリケーションのブロックを解除すると考えました。また、ctx.channel()。close()を使用してFileSenderHandlerとFileReceiverHandler内からチャンネルを閉じることを試みましたが、チャンネルはメインアプリケーションを閉じたりブロック解除したりしませんでした。

私はアプリケーションのブロックを解除する必要があるので、すべてのデータが送信され、確認応答された後に、コンソールにスループットを印刷できます。複数のデータチャネルがあり、プログラムがブロックされている場合は、最初のデータチャネルハンドラスループットのみが出力されます。 FileSender.javaは以下のようになります。しかし、私が1つのデータチャンネルを持っていて、FileSenderHandlerのチャンネルを閉じようとしても、メインアプリケーション(FileSender.java)はまだブロックし、ChannelFuture.channel()でハングします。終了するには、コントロールCを端末に入力する必要があります。 すべてのデータを一度受信して受信した後、メインアプリケーションのロックを解除する方法についての考え方はありますか?

FileSender.java - ブートストラップチャネルと別のホストにこのクライアント/ホスト

public static void main(String[] args) throws Exception { 
// Configure the client/ File Sender 
EventLoopGroup group = new NioEventLoopGroup(); 
try { 
for (int i =0; i<numOfDataChannels; i++) { 
Bootstrap b = new Bootstrap(); 
b.group(group) 
.channel(NioSocketChannel.class) 
.option(ChannelOption.TCP_NODELAY, true) 
.handler(new FileSenderInitializer()); 

// Start the client. 
ChannelFuture f = b.connect(HOST, PORT).sync(); 

    addChannelFutureToList(f); 
} 

// Wait until the connection is closed for each data channel, but also who can actually close the channel 
for (ChannelFuture f: channelFutureList){ 
    f.channel().closeFuture().sync(); 
} 

//When Channel is closed PRINT THROUGHPUT OF ALL THE DATA CHANNELS 
printThroughput(); 
} finally { 
// Shut down the event loop to terminate all threads. 
group.shutdownGracefully(); 
} 
} 
} 

FileSenderHandler.javaを接続している - などの読み取り/書き込み

としてI/Oチャンネルのイベントを処理します
public void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception { 
try { 
. 
. 
//After received msg Ack, close the channel, this should unblock the main application (FileSender.java) since after closing the channel closeFuture will be fulfilled 
ctx.channel().close(); 

}catch(Exception e){ 
    System.err.printf("ChannelRead Error Msg: " + e.getMessage()); 
    e.printStackTrace(); 

} 
関連する問題