2017-02-19 12 views
0

RxJava2を使い始めたばかりで、UDP観測を正しく実装できるかどうか疑問です。
私はすでにいくつかの作業コードを持っていますが、いくつかの問題があると思います。以下のソースコードのコメントの4つの質問を参照してください。RxJava2 UDPパケットを監視する方法は?

私はGitHub RxJava2_Udpにコードを公開しました。コメント、問題、プルリクエストを歓迎します。

class UdpObservable { 

    private static class UdpThread extends Thread { 
     private final int portNo; 
     private final int bufferSizeInBytes; 
     private final ObservableEmitter<DatagramPacket> emitter; 
     private DatagramSocket udpSocket; 

     private UdpThread(@NonNull ObservableEmitter<DatagramPacket> emitter 
       , int portNo, int bufferSizeInBytes) { 
      this.emitter = emitter; 
      this.portNo = portNo; 
      this.bufferSizeInBytes = bufferSizeInBytes; 
     } 

     @Override 
     public void run() { 
      try { 
       // we don't want to create the DatagramSocket in the constructor, because this 
       // might raise an Exception that the observer wants to handle 
       udpSocket = new DatagramSocket(portNo); 
       try { 
        /* QUESTION 1: 
         Do I really need to check isInterrupted() and emitter.isDisposed()? 

         When the thread is interrupted an interrupted exception will 
         be raised anyway and the emitter is being disposed (this is what 
         caused the interruption) 
        */ 
        while (!isInterrupted() && !emitter.isDisposed()) { 
         byte[] rcvBuffer = new byte[bufferSizeInBytes]; 
         DatagramPacket datagramPacket = new DatagramPacket(rcvBuffer, rcvBuffer.length); 
         udpSocket.receive(datagramPacket); 
         // QUESTION 1a: same as QUESTION 1 above 
         if (!isInterrupted() && !emitter.isDisposed()) { 
          emitter.onNext(datagramPacket); 
         } 
        } 
       } finally { 
        closeUdpSocket(); 
       } 
      } catch (Throwable th) { 
       // the thread will only be interrupted when the observer has unsubscribed: 
       // so we need not report it 
       if (!isInterrupted()) { 
        if (!emitter.isDisposed()) { 
         emitter.onError(th); 
        } else { 
         // QUESTION 2: is this the correct way to handle errors, when the emitter 
         //    is already disposed? 
         RxJavaPlugins.onError(th); 
        } 
       } 
      } 
     } 

     private void closeUdpSocket() { 
      if (!udpSocket.isClosed()) { 
       udpSocket.close(); 
      } 
     } 

     @Override 
     public void interrupt() { 
      super.interrupt(); 
      // QUESTION 3: this is called from an external thread, right, so 
      //    how can we correctly synchronize the access to udpSocket? 
      closeUdpSocket(); 
     } 
    } 

    /** 
    * creates an Observable that will emit all UDP datagrams of a UDP port. 
    * <p> 
    * This will be an infinite stream that ends when the observer unsubscribes, or when an error 
    * occurs. The observer does not handle backpressure. 
    * </p> 
    */ 
    public static Observable<DatagramPacket> create(final int portNo, final int bufferSizeInBytes) { 
     return Observable.create(
       new ObservableOnSubscribe<DatagramPacket>() { 
        @Override 
        public void subscribe(ObservableEmitter<DatagramPacket> emitter) throws Exception { 
         final UdpThread udpThread = new UdpThread(emitter, portNo, bufferSizeInBytes); 
         /* QUESTION 4: Is this the right way to handle unsubscription? 
         */ 
         emitter.setCancellable(new Cancellable() { 
          @Override 
          public void cancel() throws Exception { 
           udpThread.interrupt(); 
          } 
         }); 
         udpThread.start(); 
        } 
       } 
     ); 
    } 

} 
+0

に行く必要があります。質問を更新してください。 –

+1

ソースコードのコメント – TmTron

答えて

2
  • 一般的に言って、私はそれを作成するための正しい方法はないと思う、あなたはRxJavaとして、自分でスレッド作成しないでください、それはSchedulersはあなたのためにそれを行う必要があります。
    ObservableOnSubscribeで実行されたコードは、Scheduler戦略に従ってスレッドで実行されるため、自分で構築する必要はありません。作成中にude while-loopを実行するだけです。
  • Thread.interrupt()メソッドを呼び出す必要はありません。Observableを処分(退会)するときにRxJavaがそれを行います。ご質問については

を(もちろん、whileループの前cancelableを設定):

  1. あなたは例外として中断をチェックする必要はありませんyou'r場合 を上げることになりますあなたはonNext()あなたのためにそれを行いますし、 は、サブスクリプションの放出しないので、ioの操作を待って、あなたも 処分を確認する必要はありません。

  2. 再度onErrorに電話をかけることができ、エミッタはObservableが登録解除されたかどうかをチェックします。

  3. 前述のとおり、スレッドは存在しませんが、リソースのクリーンアップにはemitter.setCancellableメソッドを使用できます。 (ストリームを閉じる)、これはあなたのコードが実行される同じスレッドで発生します。前回答
  4. 、Thread.interrputは()RxJavaによって処分/退会を上げられます、リソースのクリーンアップ「のコメントで4つの質問は、」不足しているemitter.setCancellable方法
+0

完璧な答えをありがとう。これで私は多くのことを学び、[UdpObservable](https://github.com/tmtron/RxJava2_Udp/blob/40a63475044b93f6f7ecb0c7d7c5aee98a651391/app/src/main/java/com/tmtron/rxjava2udp/UdpObservable.java)のコードが完成しましたより簡潔ではるかに明確です。 – TmTron

関連する問題