qosについて少し混乱しています。Qosについて読むのは qosを2に設定すると、ブローカー/クライアントは4段階で正確にメッセージを配信しますハンドシェーク。qos 2で公開しているパブリッシャー
したがってqos 2は、メッセージがブローカーに公開されており、サブスクライバ(クライアント)は受信していないことを確認します。 または メッセージ受信確認のために、加入者 または
によって受信され、我々は出版社のようなアプリケーションを構築する必要があるのは、例えばトピックにメッセージを公開します「DATA」と例えば、トピックにサブスクライブします「ACK」と加入者の必要性トピック「ACK」の承認を発行するために、そのメッセージがトピック「DATA」
上で受信された私は、QoSで公開しようとした次のコードで 出版社
をサブスクライブするために発行データのJavaクラスと別のクラスを作成しました2とdeliveryComplete関数には例外がありますときにgetMessage()私はqosを試したときに0 getMessage()は何も例外を与えませんでした。以下
public class PublishMe implements MqttCallback{
MqttClient myClient;
MqttClient myClientPublish;
MqttConnectOptions connOpt;
MqttConnectOptions connOptPublish;
static final String BROKER_URL = "tcp://Ehydromet-PC:1883";
static Boolean msgACK=false;
public static void main(String[] args) {
PublishMe smc = new PublishMe();
smc.runClient();
}
@Override
public void connectionLost(Throwable t) {
System.out.println("Connection lost!");
}
@Override
public void messageArrived(String string, MqttMessage message) throws Exception {
System.out.println("-------------------------------------------------");
System.out.println("| Topic:" + string);
System.out.println("| Message: " + new String(message.getPayload()));
System.out.println("-------------------------------------------------");
}
/**
*
* deliveryComplete
* This callback is invoked when a message published by this client
* is successfully received by the broker.
*
* @param token
*/
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
try{
System.out.println("Message delivered successfully to topic : \"" + token.getMessage().toString() + "\".");
}catch(Exception ex){
System.out.println(ex.getCause()+" -- "+ex.getLocalizedMessage()+" -- "+ex.getMessage()+" -- ");
}
}
public void runClient() {
connOpt = new MqttConnectOptions();
connOpt.setCleanSession(false);
connOpt.setKeepAliveInterval(0);
connOptPublish= new MqttConnectOptions();
connOptPublish.setCleanSession(false);
connOptPublish.setKeepAliveInterval(0);
// Connect to Broker
try {
myClient = new MqttClient(BROKER_URL, "pahomqttpublish11");
myClient.setCallback(this);
myClient.connect(connOpt);
myClientPublish= new MqttClient(BROKER_URL, "pahomqttpublish42");
myClientPublish.setCallback(this);
myClientPublish.connect(connOptPublish);
} catch (MqttException e) {
e.printStackTrace();
System.exit(-1);
}
System.out.println("Connected to " + BROKER_URL);
String myTopic = "sample";
// String myTopic = "receiveDATA2";
MqttTopic topic = myClientPublish.getTopic(myTopic);
// publish messages if publisher
if (publisher) {
int i=1;
while(true){
String pubMsg = "sample msg "+i;
MqttMessage message = new MqttMessage(pubMsg.getBytes());
System.out.println(message);
message.setQos(2);
message.setRetained(false);
// Publish the message
MqttDeliveryToken token = null;
try {
// publish message to broker
token = topic.publish(message);
// Wait until the message has been delivered to the broker
token.waitForCompletion();
msgACK=false;
Thread.sleep(100);
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
}
とは私が出版社のコードで実装する必要がどうあるべきか、その加入者がメッセージを受信した保証することができますどのように加入者
public class Mqttsample implements MqttCallback{
MqttClient myClient;
MqttClient myClientPublish;
MqttConnectOptions connOpt;
MqttConnectOptions connOptPublish;
static final String BROKER_URL = "tcp://Ehydromet-PC:1883";
// the following two flags control whether this example is a publisher, a subscriber or both
static final Boolean subscriber = true;
static final Boolean publisher = true;
public static void main(String[] args) {
Mqttsample smc = new Mqttsample();
smc.runClient();
}
@Override
public void connectionLost(Throwable t) {
System.out.println("Connection lost!");
// code to reconnect to the broker would go here if desired
}
@Override
public void messageArrived(String string, MqttMessage message) throws Exception {
//throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates.
System.out.println("| Topic:" + string+"| Message: " + new String(message.getPayload()));
}
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
try{
System.out.println("Pub complete" + new String(token.getMessage().getPayload()));
}
catch(Exception ex){
System.out.println("delivery Error "+ex.getMessage());
}
}
public void runClient() {
connOpt = new MqttConnectOptions();
connOpt.setCleanSession(false);
connOpt.setKeepAliveInterval(0);
connOptPublish= new MqttConnectOptions();
connOptPublish.setCleanSession(false);
connOptPublish.setKeepAliveInterval(0);
// Connect to Broker
try {
myClient = new MqttClient(BROKER_URL, "pahomqttpublish");
myClient.setCallback(this);
myClient.connect(connOpt);
myClientPublish= new MqttClient(BROKER_URL, "pahomqttsubscribe");
myClientPublish.setCallback(this);
myClientPublish.connect(connOptPublish);
} catch (MqttException e) {
e.printStackTrace();
System.exit(-1);
}
System.out.println("Connected to " + BROKER_URL);
// subscribe to topic if subscriber
if (subscriber) {
try {
//String myTopicACK = M2MIO_DOMAIN + "/" + "ACK" + "/" + M2MIO_THING;
String myTopicACK = "sample";
// MqttTopic topicACK = myClient.getTopic(myTopicACK);
int subQoS = 2;
myClient.subscribe(myTopicACK, subQoS);
} catch (Exception e) {
e.printStackTrace();
}
}
//
}
}
です。上記のリンクから
http://www.eclipse.org/paho/files/mqttdoc/Cclient/qos.html 正確に一度
QoS2、:メッセージは常に正確に一度配信されます。メッセージは、送信者がメッセージが受信者によって公開されたという確認を受信するまで、送信者にローカルに保存する必要があります。メッセージは、メッセージを再度送信する必要がある場合に備えて格納されます。 QoS2は最も安全ですが、最も遅い転送モードです。
また、実際に何が起こったのか推測されないように、例外に例外を含めてください。 – hardillb