2016-05-27 14 views
10

から切断すると、今のWebLogic JMSサーバからの私の切断は以下のようになります。このは、WebLogic JMS

import java.util.Hashtable; 
import javax.jms.*; 
import javax.naming.*; 
import javax.transaction.*; 
import java.util.Vector; 
import javax.rmi.PortableRemoteObject; 
import clojure.java.api.Clojure; 
import clojure.lang.IFn; 
import org.apache.log4j.Logger; 
import weblogic.jndi.*; 

public class WebLogicListener implements MessageListener, ExceptionListener{ 
    public InitialContext ctx; 
    public TopicConnectionFactory conFactory; 
    public TopicConnection tCon; 
    public TopicSession tSession; 
    public TopicSubscriber tSub; 
    public Boolean development; 
    public Topic topic; 
    /*clojure function objects*/ 
    public IFn publish; 
    public IFn close; 
    public IFn incrementMetric; 
    public IFn logMessage; 
    public IFn resync; 

    public Object channel; 
    public ExceptionListener exception; 
    public String topicName; 
    public String subName; 
    public String username; 
    public String password; 
    public String clientId; 
    public String factoryJNDI; 
    public String topicJNDI; 
    public Vector nms; 
    public Hashtable<Object,Object> env; 
    public boolean running = false; 

    public WebLogicListener (String topicName, String host, String username, String password, String factoryJNDI, 
          String topicJNDI, String clientId, String subName, String ns, String fnName, 
          boolean development, Vector nms){ 
    this.username = username; 
    this.password = password; 
    this.clientId = clientId; 
    this.topicName = topicName; 
    this.subName = subName; 
    this.development = development; 
    this.topicJNDI = topicJNDI; 
    this.factoryJNDI = factoryJNDI; 
    this.nms = nms; 
    /*Clojure interop handlers*/ 
    IFn chan = Clojure.var("clojure.core.async", "chan"); 
    resync = Clojure.var("cenx.baldr.api", "resync!"); 
    publish = Clojure.var(ns, fnName); 
    incrementMetric = Clojure.var(ns, "log-metric"); 
    logMessage = Clojure.var (ns, "log-message"); 
    close = Clojure.var("clojure.core.async","close!"); 
    /*populate envrionment*/ 
    env = new Hashtable<Object,Object>(); 
    env.put(Context.PROVIDER_URL, host); 
    env.put(Context.INITIAL_CONTEXT_FACTORY, "weblogic.jndi.WLInitialContextFactory"); 
    env.put(Context.SECURITY_PRINCIPAL, username); 
    env.put(Context.SECURITY_CREDENTIALS, password); 
    env.put("weblogic.jndi.createIntermediateContexts", "true"); 
    /*open communication channel for clojure daemon*/ 
    channel = chan.invoke(); 
    } 

    private void initListener() throws JMSException, NamingException{ 
    try{ 
     if (!running && !development){ 
     ctx = new InitialContext(env); 
     topic = (Topic) ctx.lookup(topicJNDI); 
     conFactory = (TopicConnectionFactory)PortableRemoteObject.narrow(ctx.lookup(factoryJNDI), TopicConnectionFactory.class); 
     tCon = (TopicConnection) conFactory.createTopicConnection(); 
     tCon.setExceptionListener(this); 
     tCon.setClientID(clientId); 
     tSession = (TopicSession) tCon.createTopicSession(false, 1); 
     tSub = tSession.createDurableSubscriber(topic, subName); 
     tSub.setMessageListener(this); 
     tCon.start(); 
     running = true; 
     }else{ 
     if (running){ 
      logMessage.invoke("error", String.format("Listener is already running")); 
     } 
     if (development){ 
      logMessage.invoke("info", "Running in development mode, no connection established"); 
     } 
     } 
    } catch(Exception e){ 
     logMessage.invoke("error", String.format("Unable to start listener \n %s", e.toString())); 
    } 
    } 

    public void startListener(){ 
    if (!development && env != null){ 
     try { 
     initListener(); 
     }catch(Exception e){ 
     logMessage.invoke("error", String.format("Unable to start Listener \n %s", e.toString())); 
     } 
    } else { 
     if (development){ 
     logMessage.invoke("info", "Running in development mode, no connection established"); 
     } 
     if (env == null){ 
     logMessage.invoke("error", "Environment variable is null"); 
     } 
    } 
    } 

    ///Closes the JMS connection and the channel 
    public void stopListener(){ 
    if (!development){ 
     try{ 
     tSub.close(); 
     tSession.close(); 
     tCon.close(); 
     incrementMetric.invoke("JMS-disconnect-count"); 
     }catch(Exception e){ 
     logMessage.invoke("error", String.format("Error while stopping the listener \n %s", e.toString())); 
     }finally{ 
     running = false; 
     } 
    } else { 
     logMessage.invoke("info", "Listener not started, running in development mode"); 
    } 
    } 

    public Object getChannel(){ 
    return channel; 
    } 

    //re-initializes the channel in case of error 
    public void initializeChannel(){ 
    if (channel == null){ 
     IFn chan = Clojure.var("clojure.core.async", "chan"); 
     channel = chan.invoke(); 
    } else { 
     logMessage.invoke("info", "Channel is already initialized"); 
    } 
    } 
    //accessors for debugging 

    public void closeSubscription(){ 
    try{ 
     tSub.close(); 
    }catch (Exception e){ 
     logMessage.invoke("error", "unable to close topic subscription"); 
     logMessage.invoke("error", e.toString()); 
    } 
    } 

    public void closeSession(){ 
    try{ 
     tSession.unsubscribe(subName); 
     tSession.close(); 
    }catch (Exception e){ 
     logMessage.invoke("error", "unable to close topic session"); 
     logMessage.invoke("error", e.toString()); 
    } 
    } 

    public void closeConnection(){ 
    try{ 
     tCon.close(); 
    }catch (Exception e){ 
     logMessage.invoke("error", "unable to close topic connection"); 
     logMessage.invoke("error", e.toString()); 
    } 
    } 

    public void closeContext(){ 
    try { 
     ctx.close(); 
    }catch (Exception e){ 
     logMessage.invoke("error", "unable to close context"); 
     logMessage.invoke("error", e.toString()); 
    } 
    } 

    public Boolean isRunning(){ 
    return running; 
    } 

    public Context getContext(){ 
    return ctx; 
    } 

    public TopicConnectionFactory getFactory(){ 
    return conFactory; 
    } 

    public TopicConnection getTopicConnection(){ 
    return tCon; 
    } 

    public TopicSession getTopicSession(){ 
    return tSession; 
    } 

    public Boolean getDevelopmentMode(){ 
    return development; 
    } 

    public TopicSubscriber getTopicSubscriber(){ 
    return tSub; 
    } 

    public Topic getTopic(){ 
    return topic; 
    } 

    /*Interface methods*/ 

    public void onMessage(Message message){ 
    publish.invoke(channel, message); 
    } 
    /*attempt a resync after an exception connection*/ 
    private void resync(){ 
    resync.invoke(nms); 
    } 

    private void attemptReconnect() throws Exception{ 
    if (!development){ 
     //clean up any portions of the connection that managed to establish 
     stopListener(); 
     //incase of stopListener exceptioning out set running to false 
     running = false; 
     do{ 
     try{ 
      initListener(); 
      if (running){ 
      resync(); 
      } 
     }catch(Exception e){ 
      logMessage.invoke("error", 
          String.format("Unable to establish connection to JMS server \n %s", e.toString())); 
     }finally{ 
      Thread.sleep(30000); 
     } 
     } while (!running); 
    } else { 
     logMessage.invoke("info", "Running in development mode, no connection established"); 
    } 
    } 

    public void onException(JMSException e){ 
    logMessage.invoke("error", 
         String.format("A JMS Exception has occurred, attempting to re-establish topic connection \n %s", e.toString())); 
    try{ 
     incrementMetric.invoke("JMS-disconnect-count"); 
     attemptReconnect(); 
    }catch(Exception g){ 
     logMessage.invoke("error", 
         String.format("Unable to start Listener \n %s", g.toString())); 
    } 
    } 

    /* Test functions */ 
    public void testException() throws JMSException{ 
    onException(new JMSException("testing exception function")); 
    } 

    public void testChannel (String message){ 
    if (development){ 
     publish.invoke(channel, message); 
    } 
    } 
} 

私は、サーバーが

netstatコマンドを接続しているかどうかを確認するには、netstat使用する接続を作成する場合 - | grepの8001のtcp 0 0 IPアドレス:59730
IPアドレス:8001は、その後、私は.closeContext方法に加えて、私の.stopListenerを呼び出し、netstatコマンドで再び私の接続を確認するために戻って、私が手

を設立しました同じ結果

netstat -an | grepの8001のtcp 0 0 IPアドレス:59730
IPアドレス:8001なぜ、セッションを閉じて、加入者、および接続JMSサーバへの接続を破壊しないでしょう

を設立しました。私が見つけた文書は、なぜ私が接続を完全に破壊することができないのかの説明を私に与えていない。

+0

「開発」フラグの値は何ですか? –

+0

falseに設定されています。それが本当であれば、結びつきは確立されませんでした。私は切断時に私の最終的なブロックからログメッセージを参照してください。 – jrahme

+1

jms接続/セッションを作成する前に、別のコンポーネントによって接続が確立されていないかどうかを確認できます。 jndiコンテキストを閉じることも忘れないでください。 –

答えて

0

私はあなたがこれに正しく近づいているとは確信していません。 接続に例外リスナーがあります。

weblogicでは、各エラーイベントに対してリスナーが何度も呼び出されるため、各呼び出しでattemptReconnectを実行しないでください。登録したすべてのコンシューマに対して1回呼び出され、監視された各接続に対して1回呼び出されます。例外は、ServerConnectionLostを表す場合にのみ切断する必要があります。

また、エラーハンドラでは、接続を閉じるだけで済みます。 connection.close()を実行した場合、セッションとリスナーも閉じます。あなたのようにそれらを逆の順序で閉じる必要はありません。

もう1つ。プロダクションコードに「開発」や「デバッグ」や「テスト」のコードを使用しないでください。

"if(!development & & env!= null){" ... あなたはそうしてはいけません。

質問に戻ると、なぜ実際の接続が閉じられないのですか?私はtSub.close()またはtSession.close()は、あなたの接続は閉じられません飽きないだろう、エラー出力した場合、あなたが

try{ 
    tSub.close(); 
    tSession.close(); 
    tCon.close(); 
    incrementMetric.invoke("JMS-disconnect-count"); 
} catch... 

をやっている参照してください。それぞれを独立したtry/catchでラップします。

関連する問題