2016-07-01 6 views
0

私はキューにメッセージを生成し、メッセージが正しく消費されてアプリケーションで処理されるかどうかを確認するためにいくつかのテストを作成しようとしています。ユニットテスト用のメモリ内転送を正しく使用する方法

私はkombuライブラリ、特にメモリ内のトランスポートの実装について試しています。

まだ私はそれが働いて、生成されたメッセージが消費されることはありません。誰もが生成し、メモリ内のメッセージを消費し、簡単なユニットテストを提供できる場合

私の質問は、そのためである

答えて

2

あなたがテストしようとしているコードにこれを適用したいのですが、基本ますあなたが探しているものはメモリ内のコントローラのためのamqp URIです。これは 'memory://'です。本当に簡単な例として、

conn = kombu.Connection("memory://") 
queue = conn.SimpleQueue('myqueue') 
queue.put('test') 
msg = queue.get(timeout=1) 
msg.ack() 
print(msg.payload) 
関連する問題