2017-06-08 10 views
0

私はシングルリーダースレッドモデルを持つ複数のライタースレッドを持っています。 ThreadMultipleDateReceiverクラスは、複数のスレッドから読み取るように設計されています。PipedOutputStreamとPipedInputStreamを使用したシングルスレッドリーダーを使用した複数のスレッドライター

public class ThreadMultipleDateReceiver extends Thread { 

    private static final int MAX_CLIENT_THREADS = 4; 
    private byte[] incomingBytes; 
    private volatile boolean isRunning; 
    private volatile List<ThreadStreamDateWriter> lThrdDate; 

    private static PipedInputStream pipedInputStream; 

    public ThreadMultipleDateReceiver() { 
    lThrdDate = Collections.synchronizedList(new ArrayList<>(MAX_CLIENT_THREADS)); 
    pipedInputStream = new PipedInputStream(); 
    System.out.println("ThreadMultipleDateReceiver Created"); 
    } 

    @Override public void run() { 
    isRunning = true; 
    while (isRunning) { 
     if (!lThrdDate.isEmpty()) { 
     System.out.println("ThreadMultipleDateReceiver has:" + lThrdDate.size()); 
     for (int i = lThrdDate.size(); i > 0; i--) { 
      if (lThrdDate.get(i - 1).getState() == Thread.State.TERMINATED) { 
      lThrdDate.remove(i - 1); 
      } else { 
      System.out.println("I ThreadMultipleDateReceiver have:" + lThrdDate.get(i - 1).getNameDateWriter()); 
      } 
     } 
     incomingBytes = new byte[1024]; 
     try { 
      String str = ""; 
      int iRd; 
      System.out.println("ThreadMultipleDateReceiver waiting:" + str); 
      while ((iRd = pipedInputStream.read(incomingBytes)) != -1) { 
      if (iRd > 0) { 
       str += new String(incomingBytes); 
      } 
      } 
      System.out.println("ThreadMultipleDateReceiver Received:\n\t:" + str); 
     } catch (IOException e) { } 
     } else { 
     System.out.println("ThreadMultipleDateReceiver Empty"); 
     } 
    } 
    emptyDateWriters(); 
    } 

    public void addDateWriter(ThreadStreamDateWriter threadDateWriter) { 
    if (lThrdDate.size() < MAX_CLIENT_THREADS) { 
     lThrdDate.add(threadDateWriter); 
    } 
    } 

    private void emptyDateWriters() { 
    if (!lThrdDate.isEmpty()) { 
     for (int i = lThrdDate.size(); i > 0; i--) { 
     ThreadStreamDateWriter threadDateWriter = lThrdDate.get(i - 1); 
     threadDateWriter.stopThread(); 
     lThrdDate.remove(i - 1); 
     } 
    } 
    } 

    public PipedInputStream getPipedInputStream() { 
    return pipedInputStream; 
    } 

    public void stopThread() { 
    isRunning = false; 
    } 

} 

そして単一ライタースレッド

public class ThreadStreamDateWriter extends Thread { 
    String Self; 
    private byte[] outgoingBytes; 
    private volatile boolean isRunning; 
    private static PipedOutputStream pipedOutputStream; 

    ThreadStreamDateWriter(String name, PipedInputStream snk) { 
    Self = name; 
    pipedOutputStream = new PipedOutputStream(); 
    try { 
     pipedOutputStream.connect(snk); 
    } catch (IOException e) { } 
    } 

    @Override public void run() { 
    isRunning = true; 
    while (isRunning) { 
     try { 
     outgoingBytes = getInfo().getBytes(); 
     System.out.println("ThreadStreamDateWriter -> write to pipedOutputStream:" + new String(outgoingBytes)); 
     pipedOutputStream.write(outgoingBytes); 
     System.out.println("ThreadStreamDateWriter -> wrote:" + new String(outgoingBytes)); 
     try { Thread.sleep(4000); } catch (InterruptedException ex) { } 
     } catch (IOException | NegativeArraySizeException | IndexOutOfBoundsException e) { 
     isRunning = false; 
     } 
    } 
    } 

    String getInfo() { 
     String sDtTm = new SimpleDateFormat("yyyyMMdd-hhmmss").format(Calendar.getInstance().getTime()); 
     return Self + " -> " + sDtTm; 
    } 

    public void stopThread() { 
    isRunning = false; 
    } 

    public String getNameDateWriter() { 
    return Self; 
    } 
} 

どう打ち上げ(Iは、NetBeansを使用していますか)?

ThreadMultipleDateReceiver thrdMDateReceiver = null; 
ThreadStreamDateWriter thrdSDateWriter0 = null; 
ThreadStreamDateWriter thrdSDateWriter1 = null; 
    private void jtbDateExchangerActionPerformed(java.awt.event.ActionEvent evt) { 
    if (jtbDateExchanger.isSelected()) { 
     if (thrdMDateReceiver == null) { 
     thrdMDateReceiver = new ThreadMultipleDateReceiver(); 
     thrdMDateReceiver.start(); 
     } 
     if (thrdSDateWriter0 == null) { 
     thrdSDateWriter0 = new ThreadStreamDateWriter("-0-", thrdMDateReceiver.getPipedInputStream()); 
     thrdSDateWriter0.start(); 
     thrdMDateReceiver.addDateWriter(thrdSDateWriter0); 
     } 
     if (thrdSDateWriter1 == null) { 
     thrdSDateWriter1 = new ThreadStreamDateWriter("-1-", thrdMDateReceiver.getPipedInputStream()); 
     thrdSDateWriter1.start(); 
     thrdMDateReceiver.addDateWriter(thrdSDateWriter1); 
     } 
    } else { 
     if (thrdMDateReceiver != null) { 
     thrdMDateReceiver.stopThread(); 
     } 
    } 
    }             

ThreadMultipleDateReceiverが遮断され、印刷されていない

run: 
ThreadMultipleDateReceiver Created 
ThreadMultipleDateReceiver Empty 
ThreadMultipleDateReceiver Empty 
ThreadMultipleDateReceiver Empty 
..... 
ThreadMultipleDateReceiver Empty 
ThreadMultipleDateReceiver Empty 
ThreadMultipleDateReceiver Empty 
ThreadMultipleDateReceiver has:1 
I ThreadMultipleDateReceiver have:-0- 
ThreadMultipleDateReceiver waiting: 
ThreadStreamDateWriter -> write to pipedOutputStream:-0- -> 20170608-090003 
ThreadStreamDateWriter -> write to pipedOutputStream:-1- -> 20170608-090003 
BUILD SUCCESSFUL (total time: 1 minute 3 seconds) 

OUTPUT:

ThreadMultipleDateReceiver Received: 
    -1- -> 20170608-090003 

又は

ThreadMultipleDateReceiver Received: 
    -0- -> 20170608-090003 

どのようにそれを解決する?

答えて

1

テストこのコード...

public class ThreadMultipleDateReceiver extends Thread { 

    private static final int MAX_CLIENT_THREADS = 4; 
    private byte[] incomingBytes; 
    private volatile boolean isRunning; 
    private volatile List<ThreadStreamDateWriter> lThrdDate; 

    private PipedInputStream pipedInputStream; 
    private PipedOutputStream pipedOutputStream; 

    public ThreadMultipleDateReceiver() { 
     lThrdDate = Collections.synchronizedList(new ArrayList<>(MAX_CLIENT_THREADS)); 
     pipedInputStream = new PipedInputStream(); 
     pipedOutputStream = new PipedOutputStream(); 
     pipedInputStream.connect(pipedOutputStream); 
     System.out.println("ThreadMultipleDateReceiver Created"); 
    } 

    @Override public void run() { 
     isRunning = true; 
     while (isRunning) { 
      if (!lThrdDate.isEmpty()) { 
       System.out.println("ThreadMultipleDateReceiver has:" + lThrdDate.size()); 
       for (int i = lThrdDate.size(); i > 0; i--) { 
        if (lThrdDate.get(i - 1).getState() == Thread.State.TERMINATED) { 
         lThrdDate.remove(i - 1); 
        } else { 
         System.out.println("ThreadMultipleDateReceiver have:" + lThrdDate.get(i - 1).getNameDateWriter()); 
        } 
       } 
       incomingBytes = new byte[1024]; 
       try { 
        String str = ""; 
        int iRd; 
        System.out.println("ThreadMultipleDateReceiver waiting:" + str); 
        while ((iRd = pipedInputStream.read(incomingBytes)) != -1) { 
         String r = new String(Arrays.copyOf(incomingBytes, iRd)); 
//      if (iRd > 0) { 
//       str += r; 
//      } 
         System.out.println("ThreadMultipleDateReceiver Received:\t" + r); 
        } 
//     System.out.println("ThreadMultipleDateReceiver Received:\n\t:" + str); 
       } catch (IOException e) { } 
      } else { 
       System.out.println("ThreadMultipleDateReceiver Empty"); 
      } 
     } 
     emptyDateWriters(); 
    } 

public void addDateWriter(ThreadStreamDateWriter threadDateWriter) { 
    if (lThrdDate.size() < MAX_CLIENT_THREADS) { 
     lThrdDate.add(threadDateWriter); 
    } 
} 

    private void emptyDateWriters() { 
     if (!lThrdDate.isEmpty()) { 
      for (int i = lThrdDate.size(); i > 0; i--) { 
       ThreadStreamDateWriter threadDateWriter = lThrdDate.get(i - 1); 
       threadDateWriter.stopThread(); 
       lThrdDate.remove(i - 1); 
      } 
     } 
    } 

    public PipedOutputStream getPipedOutputStream() { 
     return pipedOutputStream; 
    } 

    public void stopThread() { 
     isRunning = false; 
    } 

} 

ThreadStreamDateWriterクラス

public class ThreadStreamDateWriter extends Thread { 
    String Self; 
    private byte[] outgoingBytes; 
    private volatile boolean isRunning; 
    private final PipedOutputStream pipedOutputStream; 


    ThreadStreamDateWriter(String name, PipedOutputStream src) { 
     Self = name; 
     pipedOutputStream = src; 
    } 

    @Override public void run() { 
     isRunning = true; 
     while (isRunning) { 
      try { 
       outgoingBytes = getInfo().getBytes(); 
       System.out.println("ThreadStreamDateWriter -> write to pipedOutputStream:" + new String(outgoingBytes)); 
       pipedOutputStream.write(outgoingBytes); 
       System.out.println("ThreadStreamDateWriter -> wrote:" + new String(outgoingBytes)); 
       try { Thread.sleep(4000); } catch (InterruptedException ex) { } 
      } catch (IOException | NegativeArraySizeException | IndexOutOfBoundsException e) { 
       isRunning = false; 
      } 
     } 
    } 

    String getInfo() { 
     String sDtTm = new SimpleDateFormat("yyyyMMdd-hhmmss").format(Calendar.getInstance().getTime()); 
     return Self + " -> " + sDtTm; 
    } 

    public void stopThread() { 
     isRunning = false; 
    } 

    public String getNameDateWriter() { 
     return Self; 
    } 
} 

使用して...

ThreadMultipleDateReceiver thrdMDateReceiver = null; 
    ThreadStreamDateWriter thrdSDateWriter0 = null; 
    ThreadStreamDateWriter thrdSDateWriter1 = null; 
    private void jtbDateExchangerActionPerformed(java.awt.event.ActionEvent evt) { 
    if (jtbDateExchanger.isSelected()) { 
     if (thrdMDateReceiver == null) { 
     thrdMDateReceiver = new ThreadMultipleDateReceiver(); 
     thrdMDateReceiver.start(); 
     } 
     if (thrdSDateWriter0 == null) { 
     thrdSDateWriter0 = new ThreadStreamDateWriter("-0-", thrdMDateReceiver.getPipedOutputStream()); 
     thrdSDateWriter0.start(); 
     thrdMDateReceiver.addDateWriter(thrdSDateWriter0); 
     } 
     if (thrdSDateWriter1 == null) { 
     thrdSDateWriter1 = new ThreadStreamDateWriter("-1-", thrdMDateReceiver.getPipedOutputStream()); 
     thrdSDateWriter1.start(); 
     thrdMDateReceiver.addDateWriter(thrdSDateWriter1); 
     } 
    } else { 
     if (thrdMDateReceiver != null) { 
     thrdMDateReceiver.stopThread(); 
     } 
    } 
    } 
0

のようになります。パイプライン出力ストリームは静的であるように見えます。したがって、ThreadStreamDateWriterを構築するたびに、パイプ出力ストリームの古い値を踏んでいることになります。

これをインスタンス変数にしてコンストラクタに渡してみてください。あなたはそのうちの1つしか持っていません。

edit 1:パイプのインスタンス変数を作成し、いくつかの出力を追加しました。 (もっと見る):

edit 2:you second pipedOutputStream.connect(snk);投げている。一度に接続できるのは1つだけです。

import java.io.IOException; 
import java.io.PipedInputStream; 
import java.io.PipedOutputStream; 
import java.text.SimpleDateFormat; 
import java.util.ArrayList; 
import java.util.Calendar; 
import java.util.Collections; 
import java.util.List; 
public class So44438086 { 
    public static class ThreadMultipleDateReceiver extends Thread { 
     private static final int MAX_CLIENT_THREADS=4; 
     private byte[] incomingBytes; 
     private volatile boolean isRunning; 
     private volatile List<ThreadStreamDateWriter> lThrdDate; 
     private /*static*/ PipedInputStream pipedInputStream; 
     public ThreadMultipleDateReceiver() { 
      lThrdDate=Collections.synchronizedList(new ArrayList<>(MAX_CLIENT_THREADS)); 
      pipedInputStream=new PipedInputStream(); 
      System.out.println("ctor setting pipedInputStream to: "+pipedInputStream); 
      System.out.println("ThreadMultipleDateReceiver Created"); 
     } 
     @Override public void run() { 
      isRunning=true; 
      while(isRunning) { 
       if(!lThrdDate.isEmpty()) { 
        System.out.println("ThreadMultipleDateReceiver has:"+lThrdDate.size()); 
        for(int i=lThrdDate.size();i>0;i--) { 
         if(lThrdDate.get(i-1).getState()==Thread.State.TERMINATED) { 
          lThrdDate.remove(i-1); 
         } else { 
          System.out.println("I ThreadMultipleDateReceiver have:"+lThrdDate.get(i-1).getNameDateWriter()); 
         } 
        } 
        incomingBytes=new byte[1024]; 
        try { 
         String str=""; 
         int iRd; 
         System.out.println("ThreadMultipleDateReceiver waiting:"+str); 
         System.out.println("reading: "+pipedInputStream); 
         while((iRd=pipedInputStream.read(incomingBytes))!=-1) { 
          if(iRd>0) { 
           str+=new String(incomingBytes); 
          } 
         } 
         System.out.println("ThreadMultipleDateReceiver Received:\n\t:"+str); 
        } catch(IOException e) {} 
       } else { 
        System.out.println("ThreadMultipleDateReceiver Empty"); 
       } 
      } 
      emptyDateWriters(); 
     } 
     public void addDateWriter(ThreadStreamDateWriter threadDateWriter) { 
      if(lThrdDate.size()<MAX_CLIENT_THREADS) { 
       lThrdDate.add(threadDateWriter); 
      } 
     } 
     private void emptyDateWriters() { 
      if(!lThrdDate.isEmpty()) { 
       for(int i=lThrdDate.size();i>0;i--) { 
        ThreadStreamDateWriter threadDateWriter=lThrdDate.get(i-1); 
        threadDateWriter.stopThread(); 
        lThrdDate.remove(i-1); 
       } 
      } 
     } 
     public PipedInputStream getPipedInputStream() { 
      return pipedInputStream; 
     } 
     public void stopThread() { 
      isRunning=false; 
     } 
    } 
    public static class ThreadStreamDateWriter extends Thread { 
     String Self; 
     private byte[] outgoingBytes; 
     private volatile boolean isRunning; 
     private /*static*/ PipedOutputStream pipedOutputStream; 
     ThreadStreamDateWriter(String name,PipedInputStream snk) { 
      Self=name; 
      pipedOutputStream=new PipedOutputStream(); 
      System.out.println("ctor setting pipedOutputStream to: "+pipedOutputStream); 
      try { 
       pipedOutputStream.connect(snk); 
       System.out.println(pipedOutputStream+" connectd to: "+snk); 
      } catch(IOException e) {} 
     } 
     @Override public void run() { 
      isRunning=true; 
      while(isRunning) { 
       try { 
        outgoingBytes=getInfo().getBytes(); 
        System.out.println("ThreadStreamDateWriter -> write to pipedOutputStream:"+new String(outgoingBytes)); 
        System.out.println("writing to: "+pipedOutputStream); 
        pipedOutputStream.write(outgoingBytes); 
        System.out.println("ThreadStreamDateWriter -> wrote:"+new String(outgoingBytes)); 
        try { 
         Thread.sleep(4000); 
        } catch(InterruptedException ex) {} 
       } catch(IOException|NegativeArraySizeException|IndexOutOfBoundsException e) { 
        isRunning=false; 
       } 
      } 
     } 
     String getInfo() { 
      String sDtTm=new SimpleDateFormat("yyyyMMdd-hhmmss").format(Calendar.getInstance().getTime()); 
      return Self+" -> "+sDtTm; 
     } 
     public void stopThread() { 
      isRunning=false; 
     } 
     public String getNameDateWriter() { 
      return Self; 
     } 
    } 
    private void foo() { 
     if(thrdMDateReceiver==null) { 
      thrdMDateReceiver=new ThreadMultipleDateReceiver(); 
      thrdMDateReceiver.start(); 
     } 
     if(thrdSDateWriter0==null) { 
      thrdSDateWriter0=new ThreadStreamDateWriter("-0-",thrdMDateReceiver.getPipedInputStream()); 
      thrdSDateWriter0.start(); 
      thrdMDateReceiver.addDateWriter(thrdSDateWriter0); 
     } 
     if(thrdSDateWriter1==null) { 
      thrdSDateWriter1=new ThreadStreamDateWriter("-1-",thrdMDateReceiver.getPipedInputStream()); 
      thrdSDateWriter1.start(); 
      thrdMDateReceiver.addDateWriter(thrdSDateWriter1); 
     } 
    } 
    void run() throws InterruptedException { 
     System.out.println(("running")); 
     foo(); 
     System.out.println(("sleeping")); 
     Thread.sleep(10000); 
     System.out.println(("stopping")); 
     if(thrdMDateReceiver!=null) { 
      thrdMDateReceiver.stopThread(); 
     } 
    } 
    public static void main(String[] args) throws InterruptedException { 
     new So44438086().run(); 
    } 
    ThreadMultipleDateReceiver thrdMDateReceiver=null; 
    ThreadStreamDateWriter thrdSDateWriter0=null; 
    ThreadStreamDateWriter thrdSDateWriter1=null; 
} 
+0

はそれだけで一つを使用することはできませんmのPipedOutpudStream最後にPipedInpudStream? –

+0

あなたは1つを使うことができますが、あなたがコードを正しく読んでいるならば、あなたは3を作って最初の2つを歩いています。あなたのものは3つの異なるパイプに行くかもしれません。 –

+0

私はそのオブジェクトの概念的な間違いを知っています...多分PipedXxxputStreamそれは扇動オブジェクトではありませんが、私は複数のライターオブジェクト1つのリーダーオブジェクトが必要です。私はブロッキングを避けたいです... –

関連する問題