2017-08-23 14 views
0

私はRxRubyを使って遊ぼうとしており、TCPソケットから受信したデータを消費可能なストリームに変換したいとします。次のコードは、ソケットから受け取ったデータがストリーミングされているという点で「機能します」、つまり新しいデータが到着したときに私は「データ=」メッセージを受け取るが、サブスクリプションブロックを受け取る。 subscription =ステートメントの後にコードを追加すると、ソケットを閉じるまで実行されません。プログラムがすぐに完了しないようにするためには、ループが必要になると思います。このサブスクリプションがメインスレッドをブロックするのはなぜですか?

require 'rx' 
require 'socket' 
Thread.abort_on_exception=true 

class StellariumInterface 
    attr_accessor :server, :client, :goto_stream 

    def initialize(host:, port:) 
    @host = host 
    @port = port 
    @server = TCPServer.new host, port 
    @goto_stream = nil 
    @client = nil 
    end 


    def accept 
    puts "connecting" 
    self.client = server.accept 
    puts "connected" 
    end 

    def listen 
    self.goto_stream = Rx::Observable.create do |observer| 
     l = Thread.new do 
     loop do 
      raw_data = client.recvfrom(1000) 
      break if raw_data.first.empty? 
      data = raw_data.first.unpack('ssqLl') 
      p data 
      observer.on_next(data) 
      sleep 0.1 
     end 
     observer.on_completed 
     end 
    l.join 
    end 
    end 



end 

source = StellariumInterface.new host: 'localhost', port: 10001 
source.accept 
source.listen 
subscription = source.goto_stream.subscribe(
    lambda { |x| puts "data = #{x}" }, 
    lambda { |x| puts "error "}, 
    lambda { puts "stream done "} 
) 
+1

.joinを取り出すとまだハングアップしていますか? –

+0

私は家に帰るときをチェックします...約1時間 – nPn

+0

@WedgeMartin、それでした。回答として投稿してもかまいません(ちょっとした説明が助けになるかもしれません) – nPn

答えて

0

のrecvfromがブロッキング操作であり、それは新しいスレッドの中だにもかかわらず、とき「参加」は場所を取る、それはメインスレッドは、新しく作成されたスレッドを待つために起こっていることを意味します。 'join'を削除すると、スレッドは独立したままになり、ループスレッドはメインスレッドをブロックしません。

関連する問題