2012-02-15 7 views
2

Rrom C#、SOCKETからJAVAへの読み取り/書き込み、並行性/ソケットの問題があります。SOCKETからJAVAへの読み込み/書き込みといくつかの並行性/ソケットの問題

サーバーがJavaで、クライアントがC#のサーバークライアントアプリケーションを実装しようとしています。また、TCP/IPを介して通信し、それらの間でバイナリデータを交換します。

特に、私はJavaとC#の両方で定義されたPacketクラスを持っています。ヘッダー、キー、値があります。 JavaとC#の両方は、まったく同じ方法でPacket to Socketを読み書きします。このようにして、C#から要求パケットを送信し、Javaサーバーで処理し、応答をパケットとして送り返すことができます。

オリジナルの問題はもっと複雑ですが、私はこの「シンプル」バージョンに煮詰めることができました。

私は以下のようにサーバーとクライアントの両方を実装しました。コードは下部にもあります。サーバー側で

私はあなたが読むことを続けなければならない問題を述べるために:)

サーバ(Java)の側

私は非常にダミーServerSocketの使用量を持っています。それは着信パケットを読み取り、応答としてほぼ同じパケットを返送する。

クライアント(C#)側 クライアントは少し複雑です。クライアントはN(設定可能)スレッド数を開始します(私はそれらをユーザースレッドと呼んでいます)。 1つのスレッドと1つのスレッド。すべてのユーザスレッドは、ダミー要求パケットと一意のIDを持つCallオブジェクトを作成します。次に、呼び出しをローカルのBlockingCollectionに追加します。

アウトスレッドが継続的にローカルBlockingCollectionを読み取り、サーバー

にすべての要求パケットを送信してスレッドも継続的にサーバからの応答パケットを読み取り、呼び出しオブジェクト(独自のコールIDを覚えている)に、それらを一致します。

5秒間隔で特定のCallオブジェクトに対する応答がない場合、ユーザースレッドはConsoleに印刷することでそれに不平を言うでしょう。

また、1秒間に実行されたトランザクションの数を表示する10秒間隔のタイマーもあります。

これまでに届いた場合は、ありがとうございます。

は今問題:以下

コード、私はMac上のMonoで正常に動作し、上述のものの実装です。 Windowsでは、ユーザースレッド数が少ない(<)とすぐには失敗しません。突然スレッドの数を増やすと、何らかの形でクライアントが受け取る応答パケットが破損しています。このアプリケーションに関しては、リクエストに対する応答が受信されなかったため、すべてのユーザースレッドが停止します。 質問が壊れている理由は?あなたが見ての通り、ソケットに入っているスレッドはインスレッドとアウトスレッドです。しかし、どうにかしてユーザースレッドの量がクライアントに影響を与え、それを制動します。

これはいくつかの並行性やソケットの問題のようですが、私はそれを見つけることができました。

私はServer(Java)とClient(C#)のコードを入れました。コンパイルと実行の依存関係がなく、両方のMainメソッド(最初のサーバー)が問題を示しています。

これまで読んでいただければ幸いです。

Serverコード

import java.io.*; 
import java.net.*; 
import java.nio.ByteBuffer; 

public class DummyServer { 

public static void main(String[] args) throws IOException { 
    ServerSocket server = new ServerSocket(9900); 
    System.out.println("Server started"); 
    for(;;){ 
     final Socket socket = server.accept(); 
     System.out.println("Accepting a connection"); 
     new Thread(new Runnable(){ 
      public void run() { 
       try { 
        System.out.println("Thread started to handle the connection"); 
        DataInputStream dis = new DataInputStream(socket.getInputStream()); 
        DataOutputStream dos = new DataOutputStream(socket.getOutputStream()); 
        for(int i=0; ; i++){ 
         Packet packet = new Packet(); 
         packet.readFrom(dis); 
         packet.key = null; 
         packet.value = new byte[1000]; 
         packet.writeTo(dos); 
        } 
       } catch (IOException e) { 
        e.printStackTrace(); 
       } 
      } 
     }).start(); 
    } 
} 
public static class Packet { 
    byte[] key; 
    byte[] value; 
    long callId = -1; 
    private int valueHash = -1; 

    public void writeTo(DataOutputStream outputStream) throws IOException { 
     final ByteBuffer writeHeaderBuffer = ByteBuffer.allocate(1 << 10); // 1k 
     writeHeaderBuffer.clear(); 
     writeHeaderBuffer.position(12); 
     writeHeaderBuffer.putLong(callId); 
     writeHeaderBuffer.putInt(valueHash); 
     int size = writeHeaderBuffer.position(); 
     int headerSize = size - 12; 
     writeHeaderBuffer.position(0); 
     writeHeaderBuffer.putInt(headerSize); 
     writeHeaderBuffer.putInt((key == null) ? 0 : key.length); 
     writeHeaderBuffer.putInt((value == null) ? 0 : value.length); 
     outputStream.write(writeHeaderBuffer.array(), 0, size); 
     if (key != null)outputStream.write(key); 
     if (value != null)outputStream.write(value); 
    } 

    public void readFrom(DataInputStream dis) throws IOException { 
     final ByteBuffer readHeaderBuffer = ByteBuffer.allocate(1 << 10); 
     final int headerSize = dis.readInt(); 
     int keySize = dis.readInt(); 
     int valueSize = dis.readInt(); 
     readHeaderBuffer.clear(); 
     readHeaderBuffer.limit(headerSize); 
     dis.readFully(readHeaderBuffer.array(), 0, headerSize); 
     this.callId = readHeaderBuffer.getLong(); 
     valueHash = readHeaderBuffer.getInt(); 
     key = new byte[keySize]; 
     dis.readFully(key); 
     value = new byte[valueSize]; 
     dis.readFully(value); 
    } 
} 

}

C#クライアントコード:

using System; 
using System.Collections.Generic; 
using System.Linq; 
using System.Text; 
using System.Net.Sockets; 
using System.Net; 
using System.IO; 
using System.Collections.Concurrent; 
using System.Threading; 

namespace Client 
{ 
public class Program 
{ 
    readonly ConcurrentDictionary<long, Call> calls = new ConcurrentDictionary<long, Call>(); 
    readonly BlockingCollection<Call> outThreadQueue = new BlockingCollection<Call>(1000); 
    readonly TcpClient tcpClient = new TcpClient("localhost", 9900); 
    readonly private int THREAD_COUNT; 
    static int ops; 

    public static void Main(string[] args) { 
     new Program(args.Length > 0 ? int.Parse(args[0]) : 100).Start(); 
    } 
    public Program(int threadCount) { 
     this.THREAD_COUNT = threadCount; 
     new Thread(new ThreadStart(this.InThreadRun)).Start();//start the InThread 
     new Thread(new ThreadStart(this.OutThreadRun)).Start();//start the OutThread 
    } 
    public void Start(){ 
     for (int i = 0; i < THREAD_COUNT; i++) 
      new Thread(new ThreadStart(this.Call)).Start(); 
     Console.WriteLine(THREAD_COUNT + " User Threads started to perform server call"); 
     System.Timers.Timer aTimer = new System.Timers.Timer(10000); 
     aTimer.Elapsed += new System.Timers.ElapsedEventHandler(this.Stats); 
     aTimer.Enabled = true; 
    } 
    public void Stats(object source, System.Timers.ElapsedEventArgs e){ 
     Console.WriteLine("Ops per second: " + Interlocked.Exchange(ref ops, 0)/10); 
    } 
    public void Call() { 
     for (; ;){ 
      Call call = new Call(new Packet()); 
      call.request.key = new byte[10]; 
      call.request.value = new byte[1000]; 
      outThreadQueue.Add(call); 
      Packet result = null; 
      for (int i = 1;result==null ; i++){ 
       result = call.getResult(5000); 
       if(result==null) Console.WriteLine("Call" + call.id + " didn't get answer within "+ 5000*i/1000 + " seconds"); 
      } 
      Interlocked.Increment(ref ops); 
     } 
    } 
    public void InThreadRun(){ 
     for (; ;){ 
      Packet packet = new Packet(); 
      packet.Read(tcpClient.GetStream()); 
      Call call; 
      if (calls.TryGetValue(packet.callId, out call)) 
       call.inbQ.Add(packet); 
      else 
       Console.WriteLine("Unkown call result: " + packet.callId); 
     } 
    } 
    public void OutThreadRun() { 
     for (; ;){ 
      Call call = outThreadQueue.Take(); 
      calls.TryAdd(call.id, call); 
      Packet packet = call.request; 
      if (packet != null) packet.write(tcpClient.GetStream()); 
     } 
    } 
} 
public class Call 
{ 
    readonly public long id; 
    readonly public Packet request; 
    static long callIdGen = 0; 
    readonly public BlockingCollection<Packet> inbQ = new BlockingCollection<Packet>(1); 
    public Call(Packet request) 
    { 
     this.id = incrementCallId(); 
     this.request = request; 
     this.request.callId = id; 
    } 
    public Packet getResult(int timeout) 
    { 
     Packet response = null; 
     inbQ.TryTake(out response, timeout); 
     return response; 
    } 
    private static long incrementCallId() 
    { 
     long initialValue, computedValue; 
     do 
     { 
      initialValue = callIdGen; 
      computedValue = initialValue + 1; 
     } while (initialValue != Interlocked.CompareExchange(ref callIdGen, computedValue, initialValue)); 
     return computedValue; 
    } 
} 

public class Packet 
{ 
    public byte[] key; 
    public byte[] value; 
    public long callId = 0; 
    public void write(Stream stream) 
    { 
     MemoryStream header = new MemoryStream(); 
     using (BinaryWriter writer = new BinaryWriter(header)) 
     { 
      writer.Write(System.Net.IPAddress.HostToNetworkOrder((long)callId)); 
      writer.Write(System.Net.IPAddress.HostToNetworkOrder((int)-1)); 
     } 
     byte[] headerInBytes = header.ToArray(); 
     MemoryStream body = new MemoryStream(); 
     using (BinaryWriter writer = new BinaryWriter(body)) 
     { 
      writer.Write(System.Net.IPAddress.HostToNetworkOrder(headerInBytes.Length)); 
      writer.Write(System.Net.IPAddress.HostToNetworkOrder(key == null ? 0 : key.Length)); 
      writer.Write(System.Net.IPAddress.HostToNetworkOrder(value == null ? 0 : value.Length)); 
      writer.Write(headerInBytes); 
      if (key != null) writer.Write(key); 
      if (value != null) writer.Write(value); 
      byte[] packetInBytes = body.ToArray(); 
      stream.Write(packetInBytes, 0, packetInBytes.Length); 
     } 
    } 
    public void Read(Stream stream) 
    { 
     BinaryReader reader = new BinaryReader(stream); 
     int headerSize = IPAddress.NetworkToHostOrder(reader.ReadInt32()); 
     int keySize = IPAddress.NetworkToHostOrder(reader.ReadInt32()); 
     int valueSize = IPAddress.NetworkToHostOrder(reader.ReadInt32()); 
     this.callId = IPAddress.NetworkToHostOrder(reader.ReadInt64()); 
     int valuePartitionHash = IPAddress.NetworkToHostOrder(reader.ReadInt32()); 
     this.key = new byte[keySize]; 
     this.value = new byte[valueSize]; 
     if (keySize > 0) reader.Read(this.key, 0, keySize); 
     if (valueSize > 0) reader.Read(this.value, 0, valueSize); 
    } 
} 

}

答えて

2

これはかなり一般的な間違いです。Readソケットでの呼び出しは、現在利用可能でない場合には、実際には要求した数だけ読み取ることができません。 Readは、各呼び出しで読み取られたバイト数を返します。 nバイトのデータを読み込む予定の場合は、読み込まれたバイト数がnまで加算されるまで、readを複数回呼び出す必要があります。

+0

私はRead()メソッドをreadFullyメソッドに変更しました。これはJava DataInputStreamから不正行為を行ったものです。どうもありがとう。高く評価! –

関連する問題