2017-10-17 13 views
0

オンラインシステムでは、NullPointerExceptionが発生します。これはしばらくの間、NullPointerExceptionを取得します。AbstractStringBuilder.ensureCapacityInternalは、ストームボルトでNullPointerExceptionを取得します。

import ***.KeyUtils; 
import ***.redis.PipelineHelper; 
import ***.redis.PipelinedCacheClusterClient; 
import **.redis.R2mClusterClient; 
import org.apache.commons.lang3.StringUtils; 
import org.apache.storm.task.OutputCollector; 
import org.apache.storm.task.TopologyContext; 
import org.apache.storm.topology.IRichBolt; 
import org.apache.storm.topology.OutputFieldsDeclarer; 
import org.apache.storm.tuple.Tuple; 
import org.slf4j.Logger; 
import org.slf4j.LoggerFactory; 
import org.springframework.context.ApplicationContext; 
import org.springframework.context.support.ClassPathXmlApplicationContext; 

import java.util.Map; 

/** 
* RedisBolt batch operate 
*/ 
public class RedisBolt implements IRichBolt { 
    static final long serialVersionUID = 737015318988609460L; 
    private static ApplicationContext applicationContext; 
    private static long logEmitNumber = 0; 
    private static StringBuffer totalCmds = new StringBuffer(); 
    private Logger logger = LoggerFactory.getLogger(getClass()); 
    private OutputCollector _collector; 
    private R2mClusterClient r2mClusterClient; 

    @Override 
    public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) { 
     _collector = outputCollector; 
     if (applicationContext == null) { 
      applicationContext = new ClassPathXmlApplicationContext("spring/spring-config-redisbolt.xml"); 
     } 
     if (r2mClusterClient == null) { 
      r2mClusterClient = (R2mClusterClient) applicationContext.getBean("r2mClusterClient"); 
     } 


    } 

    @Override 
    public void execute(Tuple tuple) { 
     String log = tuple.getString(0); 
     String lastCommands = tuple.getString(1); 

     try { 
      //log count 
      if (StringUtils.isNotEmpty(log)) { 
       logEmitNumber++; 
      } 

      if (StringUtils.isNotEmpty(lastCommands)) { 
       if(totalCmds==null){ 
        totalCmds = new StringBuffer(); 
       } 
       totalCmds.append(lastCommands);//line 61 
      } 

      //日志数量控制 
      int numberLimit = 1; 
      String flow_log_limit = r2mClusterClient.get(KeyUtils.KEY_PIPELINE_LIMIT); 
      if (StringUtils.isNotEmpty(flow_log_limit)) { 
       try { 
        numberLimit = Integer.parseInt(flow_log_limit); 
       } catch (Exception e) { 
        numberLimit = 1; 
        logger.error("error", e); 
       } 
      } 

      if (logEmitNumber >= numberLimit) { 
       StringBuffer _totalCmds = new StringBuffer(totalCmds); 
       try { 
        //pipeline submit 
        PipelinedCacheClusterClient pip = r2mClusterClient.pipelined(); 
        String[] commandArray = _totalCmds.toString().split(KeyUtils.REDIS_CMD_SPILT); 
        PipelineHelper.cmd(pip, commandArray); 
        pip.sync(); 
        pip.close(); 
        totalCmds = new StringBuffer(); 
       } catch (Exception e) { 
        logger.error("error", e); 
       } 

       logEmitNumber = 0; 
      } 
     } catch (Exception e) { 
      logger.error(new StringBuffer("====RedisBolt error for log=[ ").append(log).append("] \n commands=[").append(lastCommands).append("]").toString(), e); 
      _collector.reportError(e); 
      _collector.fail(tuple); 
     } 

     _collector.ack(tuple); 
    } 

    @Override 
    public void cleanup() { 

    } 

    @Override 
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { 
    } 

    @Override 
    public Map<String, Object> getComponentConfiguration() { 
     return null; 
    } 

} 

例外情報:

java.lang.AbstractStringBuilder.append(AbstractStringBuilder.java:415)でjava.lang.AbstractStringBuilder.ensureCapacityInternal(AbstractStringBuilder.java:113)で

java.lang.NullPointerExceptionがでjava.lang.StringBuffer.append(StringBuffer.java:237)at com.jd.jr.dataeye.storm.bolt.RedisBolt.execute(RedisBolt.java:61)at org.apache.storm.daemon.executor $ fn__5044 $ tuple_action_fn__5046.invoke(executor.clj:727)at org.apache.storm.daemon.executor $ mk_task_receiver $ fn__4965.invoke(executor.clj:459)at org.apache.storm.disruptor $ clojure_handler $ reify__4480.onEvent(disruptor。 clj:40)at org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(D org.apache.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:451)at org.apache.storm.disruptor $ consume_batch_when_available.invoke(disruptor.clj:73)at org.apache.storm .daemon.executor $ fn__5044 $ fn__5057 $ fn__5110.invoke(executor.clj:846)at org.apache.storm.util $ async_loop $ fn__557.invoke(util.clj:484)at clojure.lang.AFn.run(AFn .java:22)at java.lang.Thread.run(Thread.java:745)

誰でも私に理由を見つけるためにいくつかの助言を与えることができます。

+0

とJavaのバージョンを持っています.0_71 " – Fanl

答えて

0

これは本当に奇妙なことです。 2つのクラスのコードをお読みください。

https://github.com/openjdk-mirror/jdk7u-jdk/blob/master/src/share/classes/java/lang/AbstractStringBuilder.java

https://github.com/openjdk-mirror/jdk7u-jdk/blob/master/src/share/classes/java/lang/StringBuffer.java

AbstractStringBuilderは、NPEであること「の値」フィールドにアクセス可能フィールド「の値を」、割り当てない引数がないコンストラクタを持っています。 StringBufferのコンストラクタはそのコンストラクタを使用します。したがって、奇妙なことがシリアル化/逆シリアル化で発生し、残念ながらAbstractStringBuilderの 'value'フィールドがnullになっている可能性があります。

prepare()のtotalCmdsを初期化するほうがいいでしょうし、ボルト間の同期(スレッドセーフティ)も考慮する必要があります。 prepare()はボルトインスタンスごとに呼び出すことができるため、フィールドはスレッドセーフですが、クラスフィールドはスレッドセーフではありません。

0

私は問題が多分見つかると思います。

キーポイントである

"StringBufferの_totalCmds =新規のStringBuffer(totalCmds);"そして "totalCmds.append(lastCommands); //ライン61"

新しいオブジェクトは、それがサーバル手順を取る:

(1)メモリを割り当てる基準

を返す(2)

を初期化します

(1)の後に(2)を追加すると、StringBuffer.javaはAbstractStringBuilderを拡張します。Javaの

/** 
* The value is used for character storage. 
*/ 
char[] value; 

値が初期化されていません。これはヌルを取得します:

@Override 
public synchronized void ensureCapacity(int minimumCapacity) { 
    if (minimumCapacity > value.length) { 
     expandCapacity(minimumCapacity); 
    } 
} 

このブロットは、「1.7を別の質問、いくつかのデータは、多分、マルチスレッド環境下で失わ

関連する問題