2011-06-23 9 views
3

gtkアプリケーションでは、すべての実行がgtk_main関数内で実行されています。そして、他のグラフィカルフレームワークは、QTの場合はapp.execClutterの場合はclutter_mainのような同様のイベントループを持ちます。しかし、ZeroMQは、それが挿入されるループがwhile (1) ...であるという前提に基づいています(例についてはhereを参照)。GTK/QT/ClutterアプリケーションでZeroMQを使用するには?

これらの2つの実行戦略をどのように組み合わせますか?

私は現在、Cで書かれたクラッタ・アプリケーションでzeromqを使いたいので、もちろんそれに直接的な回答がありますが、他のバリエーションの回答も追加してください。

答えて

4

ZeroMQコードは、できるだけ頻繁に何度も何度も何度も繰り返し実行されるように思えます。最も簡単な方法は、ZeroMQコードをアイドル関数またはタイムアウト関数に置き、存在していれば関数の非ブロック化バージョンを使用することです。

クラッタについては、clutter_threads_add_idle()またはclutter_threads_add_timeout()を使用します。 GTKの場合、g_idle_add()またはg_timeout_add()を使用します。

g_thread_create()を使用してZeroMQコード用に別のスレッドを作成し、ブロック機能を使用してwhile(1)構造を使用するだけです。そうするならば、スレッドがお互いに通信するための方法を見つけなければならないでしょう - GLibのmutexと非同期待ち行列は通常うまくいきます。

+2

は、アイドルタイマーの結果でポーリングzeromqないだろう100%CPU使用率で? –

+2

アイドル状態の機能では、おそらくはいです。少なくとも、GTKメインループが何もしていないときです。タイムアウト機能では、いいえ。 – ptomato

2

ZeromqtというQT統合ライブラリがあることがわかりました。

ZmqSocket::ZmqSocket(int type, QObject *parent) : QObject(parent) 
{ 
    ... 
    notifier_ = new QSocketNotifier(fd, QSocketNotifier::Read, this); 
    connect(notifier_, SIGNAL(activated(int)), this, SLOT(activity())); 
} 

... 

void ZmqSocket::activity() 
{ 
    uint32_t flags; 
    size_t size = sizeof(flags); 
    if(!getOpt(ZMQ_EVENTS, &flags, &size)) { 
     qWarning("Error reading ZMQ_EVENTS in ZMQSocket::activity"); 
     return; 
    } 
    if(flags & ZMQ_POLLIN) { 
     emit readyRead(); 
    } 
    if(flags & ZMQ_POLLOUT) { 
     emit readyWrite(); 
    } 
    ... 
} 

をしたがって、似たものを持っていないだろうQTの統合ソケットハンドリングとクラッタに頼っている:ソースを見ると、統合のコアは、次のようです。

+1

また、[nzmqt](https://github.com/jonnydee/nzmqt) - ZeroMQ用のもう1つのQtバインディングを見たいかもしれません。そこにはポーリングベースの実装があります。特に[PollingZMQSocket(line 429 ++)](https://github.com/jonnydee/nzmqt/blob/7f3c54c2d3055769df157d01319a333d61cba1f1/include/nzmqt/nzmqt.hpp#L497)のクラスを見てください。たぶんあなたはクラッターのために同様のsometingを行うことができます。 –

2

0MQソケット(ZMQ_FDオプション)のファイル記述子を取得し、それをイベントループと統合することができます。私はgtkがソケットを扱うためのいくつかのメカニズムを持っていると推測します。

10

zmqとgtkまたはクラッタを組み合わせる適切な方法は、zmqキューのファイル記述子をメインイベントループに接続することです。コールバック関数で

GIOChannel* channel = g_io_channel_unix_new(fd);  
g_io_add_watch(channel, G_IO_IN|G_IO_ERR|G_IO_HUP, callback_func, NULL); 

をものが本当に存在する場合、それは最初にチェックする必要がある:fdがio_add_watch使用しての問題ですメインループに接続

int fd; 
size_t sizeof_fd = sizeof(fd); 
if(zmq_getsockopt(socket, ZMQ_FD, &fd, &sizeof_fd)) 
     perror("retrieving zmq fd"); 

を使用して取得することができます読む前に読むそれ以外の場合、関数はIOの待機をブロックする可能性があります。

gboolean callback_func(GIOChannel *source, GIOCondition condition,gpointer data) 
{ 
    uint32_t status; 
    size_t sizeof_status = sizeof(status); 

    while (1){ 
     if (zmq_getsockopt(socket, ZMQ_EVENTS, &status, &sizeof_status)) { 
      perror("retrieving event status"); 
      return 0; // this just removes the callback, but probably 
         // different error handling should be implemented 
     } 
     if (status & ZMQ_POLLIN == 0) { 
      break; 
     } 

     // retrieve one message here 
    } 
    return 1; // keep the callback active 
} 

ご注意:これは実際にテストされていない、私は私が使用しているものである、Pythonの+クラッタからの翻訳をしましたが、私はそれがうまくいくことを確信しています。 参考までに、実際に動作する完全なPython + Clutterコードを以下に示します。

import sys 
from gi.repository import Clutter, GObject 
import zmq 

def Stage(): 
    "A Stage with a red spinning rectangle" 
    stage = Clutter.Stage() 

    stage.set_size(400, 400) 
    rect = Clutter.Rectangle() 
    color = Clutter.Color() 
    color.from_string('red') 
    rect.set_color(color) 
    rect.set_size(100, 100) 
    rect.set_position(150, 150) 

    timeline = Clutter.Timeline.new(3000) 
    timeline.set_loop(True) 

    alpha = Clutter.Alpha.new_full(timeline, Clutter.AnimationMode.EASE_IN_OUT_SINE) 
    rotate_behaviour = Clutter.BehaviourRotate.new(
     alpha, 
     Clutter.RotateAxis.Z_AXIS, 
     Clutter.RotateDirection.CW, 
     0.0, 359.0) 
    rotate_behaviour.apply(rect) 
    timeline.start() 
    stage.add_actor(rect) 

    stage.show_all() 
    stage.connect('destroy', lambda stage: Clutter.main_quit()) 
    return stage, rotate_behaviour 

def Socket(address): 
    ctx = zmq.Context() 
    sock = ctx.socket(zmq.SUB) 
    sock.setsockopt(zmq.SUBSCRIBE, "") 
    sock.connect(address) 
    return sock 

def zmq_callback(queue, condition, sock): 
    print 'zmq_callback', queue, condition, sock 

    while sock.getsockopt(zmq.EVENTS) & zmq.POLLIN: 
     observed = sock.recv() 
     print observed 

    return True 

def main(): 
    res, args = Clutter.init(sys.argv) 
    if res != Clutter.InitError.SUCCESS: 
     return 1 

    stage, rotate_behaviour = Stage() 

    sock = Socket(sys.argv[2]) 
    zmq_fd = sock.getsockopt(zmq.FD) 
    GObject.io_add_watch(zmq_fd, 
         GObject.IO_IN|GObject.IO_ERR|GObject.IO_HUP, 
         zmq_callback, sock) 

    return Clutter.main() 

if __name__ == '__main__': 
    sys.exit(main()) 
+0

io_add_watchコールバックにとって重要な点は、Trueを返さなければならないことに注意してください。これがなければ、コールバックは一度だけ呼び出されます。 –

1

これはPyQt4を使用したPythonの例です。これは、動作中のアプリケーションから派生したものです。

import zmq 
from PyQt4 import QtCore, QtGui 

class QZmqSocketNotifier(QtCore.QSocketNotifier): 
    """ Provides Qt event notifier for ZMQ socket events """ 
    def __init__(self, zmq_sock, event_type, parent=None): 
     """ 
     Parameters: 
     ---------- 
     zmq_sock : zmq.Socket 
      The ZMQ socket to listen on. Must already be connected or bound to a socket address. 
     event_type : QtSocketNotifier.Type 
      Event type to listen for, as described in documentation for QtSocketNotifier. 
     """ 
     super(QZmqSocketNotifier, self).__init__(zmq_sock.getsockopt(zmq.FD), event_type, parent) 

class Server(QtGui.QFrame): 

def __init__(self, topics, port, mainwindow, parent=None): 
    super(Server, self).__init__(parent) 

    self._PORT = port 

    # Create notifier to handle ZMQ socket events coming from client 
    self._zmq_context = zmq.Context() 
    self._zmq_sock = self._zmq_context.socket(zmq.SUB) 
    self._zmq_sock.bind("tcp://*:" + self._PORT) 
    for topic in topics: 
     self._zmq_sock.setsockopt(zmq.SUBSCRIBE, topic) 
    self._zmq_notifier = QZmqSocketNotifier(self._zmq_sock, QtCore.QSocketNotifier.Read) 

    # connect signals and slots 
    self._zmq_notifier.activated.connect(self._onZmqMsgRecv) 
    mainwindow.quit.connect(self._onQuit) 

@QtCore.pyqtSlot() 
def _onZmqMsgRecv(): 
    self._test_info_notifier.setEnabled(False) 
    # Verify that there's data in the stream 
    sock_status = self._zmq_sock.getsockopt(zmq.EVENTS) 
    if sock_status == zmq.POLLIN: 
     msg = self._zmq_sock.recv_multipart() 
     topic = msg[0] 
     callback = self._topic_map[ topic ] 
     callback(msg) 
    self._zmq_notifier.setEnabled(True) 
    self._zmq_sock.getsockopt(zmq.EVENTS) 

def _onQuit(self): 
    self._zmq_notifier.activated.disconnect(self._onZmqMsgRecv) 
    self._zmq_notifier.setEnabled(False) 
    del self._zmq_notifier 
    self._zmq_context.destroy(0) 

無効化した後、_on_ZmqMsgRecvに再イネーブル通知はQSocketNotifierのドキュメントごとです。

getsockoptへの最後の呼び出しは何らかの理由で必要です。それ以外の場合は、最初のイベントの後に通知機能が停止します。私は実際にこれについて新しい質問を投稿しようとしていました。誰がなぜこれが必要であるか知っていますか?あなたはZMQコンテキストの前に通知を破壊しない場合は、アプリケーションを終了すると、あなたはおそらく、このようなエラーが出るだろうと

注:

QSocketNotifier: Invalid socket 16 and type 'Read', disabling... 
関連する問題