2016-08-25 10 views
0

基本的なトポロジを得るためにApache Stormでローカルクラスタを設定しようとしています。 現在Kafkaを使用しています。2.11-0.8.2.1 & Storm 0.9.5。 SpoutとBoltが設定されており、いくつかのメッセージがパイプラインを通過しました。 2つのメッセージが送信された後に例外が発生しますが、理由はわかりません。 jarとの互換性/バージョンの問題かもしれません。 私はこのことを実行するためにもっと多くのジャーを追加しましたが、カップルのメッセージを送信した後でもクラッシュします。例外は、吐き出しから来ているようです。java.lang.NoSuchMethodError:org.apache.zookeeper.ZooKeeper。 <init>(Ljava/lang/String; ILorg/apache/zookeeper/Watcher; Z)V

トポロジコードはここにある:ロガーのメッセージを出力します

package com.smartstart.storm.topology; 

import java.io.InputStream; 
import java.util.Arrays; 
import java.util.Properties; 

import org.apache.log4j.Logger; 

import storm.kafka.BrokerHosts; 
import storm.kafka.KafkaSpout; 
import storm.kafka.SpoutConfig; 
import storm.kafka.StringScheme; 
import storm.kafka.ZkHosts; 
import backtype.storm.Config; 
import backtype.storm.StormSubmitter; 
import backtype.storm.generated.AlreadyAliveException; 
import backtype.storm.generated.InvalidTopologyException; 
import backtype.storm.generated.StormTopology; 
import backtype.storm.spout.SchemeAsMultiScheme; 
import backtype.storm.topology.TopologyBuilder; 
import kafka.api.OffsetRequest; 
import backtype.storm.LocalCluster; 

import com.smartstart.storm.bolt.PingMessageBolt; 

public class PingMessageTopology { 
    public static final Logger LOG = Logger.getLogger(PingMessageTopology.class); 
    private BrokerHosts brokerHosts; 

    public PingMessageTopology(String kafkaZk) { 
     brokerHosts = new ZkHosts(kafkaZk); 
    } 

    private StormTopology buildTopology(Properties props) { 
     SpoutConfig kafkaConfig = new SpoutConfig(brokerHosts, props.getProperty("KAFKA_TOPIC"), 
       props.getProperty("KAFKA_OFFSETMARKER"), "PingMessage_Topology"); 
     //boolean forceFromstart = Boolean.valueOf(props.getProperty(props.getProperty("KAFKA_FORCE_FROM_START"))); 
     boolean forceFromstart = Boolean.valueOf(props.getProperty("KAFKA_FORCE_FROM_START")); 

     kafkaConfig.forceFromStart = forceFromstart; 
     LOG.debug("forceFromstartTrue "+forceFromstart); 
     if (forceFromstart) { 
      kafkaConfig.startOffsetTime = OffsetRequest.EarliestTime(); 
     } 
     //kafkaConfig.startOffsetTime = OffsetRequest.LatestTime(); 

     kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme()); 
     TopologyBuilder builder = new TopologyBuilder(); 
     builder.setSpout("PingMessage_Spout", new KafkaSpout(kafkaConfig), 4); 
     builder.setBolt("PingMessage_Bolt", new PingMessageBolt(), 4).shuffleGrouping("PingMessage_Spout"); 
     return builder.createTopology(); 
    } 

    public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException { 
     String Propertiesfile = "PingTopology.properties"; 
     Properties props = new Properties(); 

     try { 
      InputStream in = ClassLoader.getSystemResourceAsStream(Propertiesfile); 
      LOG.info("in: " + in); 
      if (in != null) { 
       props.load(in); 
       PingMessageTopology kafkaSpoutTestTopology = new PingMessageTopology(props.getProperty("kafkaZk")); 
       StormTopology stormTopology = kafkaSpoutTestTopology.buildTopology(props); 
       Config config = new Config(); 
       config.setDebug(true); 

       // create a local Storm cluster 
       LocalCluster cluster = new LocalCluster(); // necessary for local cluster to run 
       cluster.submitTopology(props.getProperty("Topology_Name"), config, stormTopology); 
      } else { 
       LOG.info("Please Specify the Properties....."); 
      } 
     } catch (Exception e) { 
      LOG.error("Check The Configuration File...", e); 
     } 
    } 
} 

ボルトコードスニペットはここにある:すべてのjarファイルをリスト

package com.smartstart.storm.bolt; 

import java.io.BufferedReader; 
import java.io.IOException; 
import java.io.InputStream; 
import java.io.InputStreamReader; 
import java.net.HttpURLConnection; 
import java.net.MalformedURLException; 
import java.net.ProtocolException; 
import java.net.URL; 
import java.net.UnknownHostException; 
import java.text.SimpleDateFormat; 
import java.util.ArrayList; 
import java.util.Date; 
import java.util.List; 
import java.util.Map; 
import java.util.Properties; 

import org.apache.log4j.Logger; 
import org.json.simple.JSONObject; 
import org.json.simple.parser.JSONParser; 
import org.json.simple.parser.ParseException; 

import backtype.storm.task.OutputCollector; 
import backtype.storm.task.TopologyContext; 
import backtype.storm.topology.OutputFieldsDeclarer; 
import backtype.storm.topology.base.BaseRichBolt; 
import backtype.storm.tuple.Tuple; 

import com.mongodb.BasicDBObject; 
import com.mongodb.DB; 
import com.mongodb.DBCollection; 
import com.mongodb.DBObject; 
import com.mongodb.Mongo; 
import com.mongodb.MongoOptions; 
import com.mongodb.ServerAddress; 

public class PingMessageBolt extends BaseRichBolt { 
    public static final Logger LOG = Logger.getLogger(PingMessageBolt.class); 

    private Mongo mongo; 
    private DB db; 
    private DBCollection sessioncollection, messagecollection, messagecollectionarc,errorCollection; 
    private OutputCollector collector; 
    /*For fetching URL from properties and to use in all places in the class */ 
    private String urlString; 
    /*Common date formatter to use in Insert and update */ 
    SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd't'HH:mm:ss.SSS'z'"); 
    private String apiKey; 

    @Override 
    public void prepare(Map stromConf, TopologyContext context, OutputCollector collector) { 
     String PropertiesFile = "PingTopology.properties"; 
     Properties props = new Properties(); 
     this.collector=collector; 
     try { 
      InputStream in = ClassLoader.getSystemResourceAsStream(PropertiesFile); 
      LOG.debug("bolt prepare--------------------------------"); 
      LOG.debug("InputStream string :"+ convertStreamToString(in)); 
      LOG.debug("-------------------------------------------"); 

      LOG.debug(in); 
      if (in != null) { 
       props.load(in); 
       /*URL retrieving from properties file.*/ 
       urlString = props.getProperty("URL"); 
       apiKey = props.getProperty("API_KEY"); 
       String mongoHost = props.getProperty("MONGO_HOST"); 
       List<ServerAddress> serverAddresses = new ArrayList<ServerAddress>(); 

       if (mongoHost != null) { 
        String[] hostNames = mongoHost.split(","); 
        int port = Integer.parseInt(props.getProperty("MONGO_PORT")); 
        for (String hostName : hostNames) { 
         ServerAddress address = new ServerAddress(hostName, port); 
         serverAddresses.add(address); 
        } 
       } 
       MongoOptions options = new MongoOptions(); 
       options.setAutoConnectRetry(true); 
       LOG.debug(props.getProperty("MONGO_AUTOCONNECT_RETRY_TIME")); 
       options.setMaxAutoConnectRetryTime(
         Long.valueOf(props.getProperty("MONGO_AUTOCONNECT_RETRY_TIME"))); 
       mongo = new Mongo(serverAddresses, options); 
       db = mongo.getDB(props.getProperty("DB_NAME")); 
       boolean auth = db.authenticate(props.getProperty("MONGO_USER"), props.getProperty("MONGO_PWD").toCharArray()); 
       // (props.getProperty("MONGO_USER"), 
       // props.getProperty("MONGO_PWD").toCharArray()); 
       System.out.println("" + auth); 

       if (auth) { 
        sessioncollection = db.getCollection("clientSessions"); 
        messagecollection = db.getCollection("clientMessages"); 
        errorCollection = db.getCollection("clientErrorMessages"); 
        messagecollectionarc = db.getCollection("clientMessagesArc"); 
       } else { 
        LOG.warn("incorrect user name or password", new Throwable(
          "Incorrect UserName or Password")); 
       } 

      } else { 
       LOG.debug("Please check the configuration file...."); 
      } 
     } catch (Exception e) { 
      LOG.error("Exception in Bolt....", e); 
     } 
    } 

    static String convertStreamToString(java.io.InputStream is) { 
     java.util.Scanner s = new java.util.Scanner(is).useDelimiter("\\A"); 
     return s.hasNext() ? s.next() : ""; 
    } 
    @Override 
    public void execute(Tuple input) { 
     try { 
      LOG.debug("bolt execute --------------------------------"); 
      LOG.debug("Tuple string :"+ input.getValue(0).toString()); 
      LOG.debug("-------------------------------------------"); 

      DBObject recivedobj = (DBObject) com.mongodb.util.JSON.parse(input.getValue(0).toString()); 
      BasicDBObject recvObj = (BasicDBObject) recivedobj; 
      BasicDBObject msgvalue = (BasicDBObject) recvObj.get("SET"); 

      if (input.getValues().size() > 0) { 
       JSONParser jsonReceivedObj = new JSONParser(); 
       JSONObject jsonTObj = (JSONObject) jsonReceivedObj.parse(input.getValue(0).toString()); 
       JSONObject receivedJSONObject = (JSONObject) jsonTObj.get("SET"); 
       LOG.debug("receivedJSONObject " + receivedJSONObject); 

       String operation = (String) jsonTObj.get("OPERATION"); 
       /*Written common query which is used in both insert and update*/ 
       BasicDBObject sessionbd = (BasicDBObject) sessioncollection 
         .findOne(querySpecification("sessionGuiId", (int) msgvalue.get("session_gui_id"))); 
       LOG.debug("sessionbd " + sessionbd); 
       switch (operation) { 
       case "INSERT": { 
        BasicDBObject msgobj = insertPingMessage(receivedJSONObject, sessionbd); 
        LOG.debug("msgobj query: " + msgobj); 
        break; 
       } 
       case "UPDATE": { 
        updatePingMessage(receivedJSONObject, sessionbd); 
        break; 
       } 
       default: { 
        LOG.debug("Please mention correct operation..."); 
        break; 
       } 
       } 
      } 
     } catch (Throwable e) { 
      try{ 

       LOG.error(e); 
       BasicDBObject errorInsert = new BasicDBObject(); 
       errorInsert.append("error", input.getValues()); 
       errorInsert.append("fromtopology", "PingMessage"); 
       errorInsert.append("exception",e.getMessage()); 
       StringBuilder stack=new StringBuilder(); 
       for (int i = 0; i < e.getStackTrace().length; i++) { 
       StackTraceElement elem=e.getStackTrace()[i]; 
       stack.append(elem.getClassName()+"."+elem.getMethodName()+"():"+elem.getFileName()+":"+elem.getLineNumber()+" \n"); 
       } 
       errorInsert.append("exceptionstacktrace",stack.toString()); 
       errorCollection.insert(errorInsert); 

      } 
      catch(Throwable t){ 
       LOG.error(t); 
      } 

     } 
     finally { 
      collector.ack(input);  
     } 
    } 

    @Override 
    public void cleanup() { 
     mongo.close(); 
    } 

    @Override 
    public void declareOutputFields(OutputFieldsDeclarer declarer) { 

    } 

コンバインドスクリーンショットはこちらです: enter image description here

コンソールには例外を含むLoggerメッセージがあふれています。

2016-08-25 08:24:55 [Thread-15-PingMessage_Spout] INFO ZooKeeper:97 - Client environment:zookeeper.version=3.3.2-1031432, built on 11/05/2010 05:32 GMT 
2016-08-25 08:24:55 [Thread-15-PingMessage_Spout] INFO ZooKeeper:97 - Client environment:host.name=USTX04NB002865.corp.smartstartinc.com 
2016-08-25 08:24:55 [Thread-15-PingMessage_Spout] INFO ZooKeeper:97 - Client environment:java.version=1.8.0_102 
2016-08-25 08:24:55 [Thread-15-PingMessage_Spout] INFO ZooKeeper:97 - Client environment:java.vendor=Oracle Corporation 
2016-08-25 08:24:55 [Thread-15-PingMessage_Spout] INFO ZooKeeper:97 - Client environment:java.home=C:\Program Files\Java\jre1.8.0_102 
2016-08-25 08:24:55 [Thread-15-PingMessage_Spout] INFO ZooKeeper:97 - Client environment:java.class.path=C:\Users\Fred.Quatro\workspace\PingMessage\bin;C:\Users\Fred.Quatro\workspace\PingMessage\lib\log4j-1.2.16.jar;C:\Users\Fred.Quatro\workspace\PingMessage\lib\metrics-core-2.1.2.jar;C:\Users\Fred.Quatro\workspace\PingMessage\lib\metrics-core-2.1.5.jar;C:\Users\Fred.Quatro\workspace\PingMessage\lib\scala-library-2.10.4.jar;C:\Users\Fred.Quatro\workspace\PingMessage\lib\kafka-clients-0.8.2.1.jar;C:\Users\Fred.Quatro\workspace\PingMessage\lib\commons-lang-2.6.jar;C:\Users\Fred.Quatro\workspace\PingMessage\lib\snakeyaml-1.14.jar;C:\Users\Fred.Quatro\workspace\PingMessage\lib\json-simple-1.1.1.jar;C:\Users\Fred.Quatro\workspace\PingMessage\lib\mongo-2.10.1.jar;C:\Users\Fred.Quatro\workspace\PingMessage\lib\guava-18.0.jar;C:\Users\Fred.Quatro\workspace\PingMessage\lib\jersey-client.jar;C:\Users\Fred.Quatro\workspace\PingMessage\lib\storm-core-0.9.5.jar;C:\Users\Fred.Quatro\workspace\PingMessage\lib\storm-kafka-0.9.2-incubating.jar;C:\Users\Fred.Quatro\workspace\PingMessage\lib\java-json-schema.jar;C:\Users\Fred.Quatro\workspace\PingMessage\lib\netty-all-4.0.34.Final.jar;C:\JarLibrary\slf4j-log4j12-1.4.3.jar;C:\JarLibrary\slf4j-api-1.4.3.jar;C:\JarLibrary\commons-exec-1.1.jar;C:\JarLibrary\commons-io-2.4.jar;C:\JarLibrary\disruptor-2.10.1.jar;C:\JarLibrary\kryo-3.0.1.jar;C:\JarLibrary\objenesis-1.2.jar;C:\JarLibrary\minlog-1.2.jar;C:\JarLibrary\clojure-1.5.1.jar;C:\JarLibrary\carbonite-1.4.0.jar;C:\JarLibrary\chill_2.10-0.3.0.jar;C:\JarLibrary\chill-java-0.3.5.jar;C:\Users\Fred.Quatro\workspace\PingMessage\lib\kafka_2.10-0.8.2.1.jar;C:\Users\Fred.Quatro\workspace\PingMessage\lib\zookeeper-3.3.2.jar;C:\Users\Fred.Quatro\workspace\PingMessage\lib\curator-client-2.3.0.jar;C:\JarLibrary\curator-framework-2.1.0-incubating.jar 
2016-08-25 08:24:55 [Thread-15-PingMessage_Spout] INFO ZooKeeper:97 - Client environment:java.library.path=C:\Program Files\Java\jre1.8.0_102\bin;C:\windows\Sun\Java\bin;C:\windows\system32;C:\windows;C:/Program Files/Java/jre1.8.0_102/bin/server;C:/Program Files/Java/jre1.8.0_102/bin;C:/Program Files/Java/jre1.8.0_102/lib/amd64;C:\ProgramData\Oracle\Java\javapath;C:\windows\system32;C:\windows;C:\windows\System32\Wbem;C:\windows\System32\WindowsPowerShell\v1.0\;C:\Program Files\TortoiseSVN\bin;C:\apache-storm-0.9.5\bin;C:\Program Files\Java\jre1.8.0_102\bin;C:\Python27;C:\Python27\Lib\site-packages\;C:\Python27\Scripts\;;C:\Users\Fred.Quatro\Desktop;;. 
2016-08-25 08:24:55 [Thread-15-PingMessage_Spout] INFO ZooKeeper:97 - Client environment:java.io.tmpdir=C:\Users\FRED~1.QUA\AppData\Local\Temp\ 
2016-08-25 08:24:55 [Thread-15-PingMessage_Spout] INFO ZooKeeper:97 - Client environment:java.compiler=<NA> 
2016-08-25 08:24:55 [Thread-15-PingMessage_Spout] INFO ZooKeeper:97 - Client environment:os.name=Windows 7 
2016-08-25 08:24:55 [Thread-15-PingMessage_Spout] INFO ZooKeeper:97 - Client environment:os.arch=amd64 
2016-08-25 08:24:55 [Thread-15-PingMessage_Spout] INFO ZooKeeper:97 - Client environment:os.version=6.1 
2016-08-25 08:24:55 [Thread-15-PingMessage_Spout] INFO ZooKeeper:97 - Client environment:user.name=Fred.Quatro 
2016-08-25 08:24:55 [Thread-15-PingMessage_Spout] INFO ZooKeeper:97 - Client environment:user.home=C:\Users\Fred.Quatro 
2016-08-25 08:24:55 [Thread-15-PingMessage_Spout] INFO ZooKeeper:97 - Client environment:user.dir=C:\Users\Fred.Quatro\workspace\PingMessage 
2016-08-25 08:24:55 [Thread-19-PingMessage_Spout] ERROR util:0 - Async loop died! 
java.lang.NoSuchMethodError: org.apache.zookeeper.ZooKeeper.<init>(Ljava/lang/String;ILorg/apache/zookeeper/Watcher;Z)V 
    at org.apache.curator.utils.DefaultZookeeperFactory.newZooKeeper(DefaultZookeeperFactory.java:29) 
    at org.apache.curator.framework.imps.CuratorFrameworkImpl$2.newZooKeeper(CuratorFrameworkImpl.java:167) 
    at org.apache.curator.HandleHolder$1.getZooKeeper(HandleHolder.java:94) 
    at org.apache.curator.HandleHolder.getZooKeeper(HandleHolder.java:55) 
    at org.apache.curator.ConnectionState.reset(ConnectionState.java:210) 
    at org.apache.curator.ConnectionState.start(ConnectionState.java:101) 
    at org.apache.curator.CuratorZookeeperClient.start(CuratorZookeeperClient.java:188) 
    at org.apache.curator.framework.imps.CuratorFrameworkImpl.start(CuratorFrameworkImpl.java:232) 
    at storm.kafka.ZkState.<init>(ZkState.java:62) 
    at storm.kafka.KafkaSpout.open(KafkaSpout.java:85) 
    at backtype.storm.daemon.executor$fn__3371$fn__3386.invoke(executor.clj:522) 
    at backtype.storm.util$async_loop$fn__460.invoke(util.clj:461) 
    at clojure.lang.AFn.run(AFn.java:24) 
    at java.lang.Thread.run(Unknown Source) 
****** edited ********* 
2016-08-25 08:24:55 [SyncThread:0] DEBUG FinalRequestProcessor:88 - 107e0749ffffff95ffffffd25174053a706f7274737107e0cffffffab5e58ffffff98707404706f72747107e01d70737107e0f0043,v{s{31,s{'world,'anyone}}},2 response:: '/storm/errors/PingMessage_Topology-1-1472131492/PingMessage_Spout/e0000000002 
2016-08-25 08:24:55 [Thread-15-PingMessage_Spout] ERROR util:0 - Halting process: ("Worker died") 
java.lang.RuntimeException: ("Worker died") 
    at backtype.storm.util$exit_process_BANG_.doInvoke(util.clj:325) 
    at clojure.lang.RestFn.invoke(RestFn.java:423) 
    at backtype.storm.daemon.worker$fn__4694$fn__4695.invoke(worker.clj:493) 
    at backtype.storm.daemon.executor$mk_executor_data$fn__3272$fn__3273.invoke(executor.clj:240) 
    at backtype.storm.util$async_loop$fn__460.invoke(util.clj:473) 
    at clojure.lang.AFn.run(AFn.java:24) 
    at java.lang.Thread.run(Unknown Source) 
2016-08-25 08:24:55 [Thread-6-SendThread(127.0.0.1:2000)] DEBUG ClientCnxn:818 - Reading reply sessionid:0x156c1dfe5f1000b, packet:: clientPath:null serverPath:null finished:false header:: 91,1 replyHeader:: 91,43,0 request:: '/storm/errors/PingMessage_Topology-1-1472131492/PingMessage_Spout/e,#ffffffacffffffed05737201f636c6f6a7572652e6c616e672e50657273697374656e7441727261794d6170ffffffd02836ffffff8f21ffffffe4ffffffa0f2024c055f6d6574617401d4c636c6f6a7572652f6c616e672f4950657273697374656e744d61703b5b056172726179740135b4c6a6176612f6c616e672f4f626a6563743b787201b636c6f6a7572652e6c616e672e4150657273697374656e744d6170784ffffffabffffffaa63ffffffef537020249055f6861736849075f6861736865717870ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff7075720135b4c6a6176612e6c616e672e4f626a6563743bffffff90ffffffce58ffffff9f1073296c200787000087372014636c6f6a7572652e6c616e672e4b6579776f72643931ffffffc4627a595b82034904686173684c045f737472740124c6a6176612f6c616e672f537472696e673b4c0373796d740154c636c6f6a7572652f6c616e672f53796d626f6c3b787024fffffffdffffff84ffffff84740a3a74696d652d736563737372013636c6f6a7572652e6c616e672e53796d626f6cffffffbcffffff897ffffffed1cffffffa744ffffffc32054904686173684c055f6d6574617107e014c045f7374727107e084c046e616d657107e084c026e737107e087870ffffff86ffffffc6affffffcb70740974696d652d736563737107e0e7073720116a6176612e6c616e672e496e746567657212ffffffe2ffffffa0ffffffa4fffffff7ffffff81ffffff8738201490576616c756578720106a6176612e6c616e672e4e756d626572ffffff86ffffffacffffff951dbffffff94ffffffe0ffffff8b200787057ffffffbefffffff1ffffffa7737107e07ffffffb3515bffffffec70737107e0c1519ffffffe233707074056572726f7270744676a6176612e6c616e672e4e6f537563684d6574686f644572726f723a206f72672e6170616368652e7a6f6f6b65657065722e5a6f6f4b65657065722e3c696e69743e284c6a6176612f6c616e672f537472696e673b494c6f72672f6170616368652f7a6f6f6b65657065722f576174636865723b5a2956da96174206f72672e6170616368652e63757261746f722e7574696c732e44656661756c745a6f6f6b6565706572466163746f72792e6e65775a6f6f4b65657065722844656661756c745a6f6f6b6565706572466163746f72792e6a6176613a323929da96174206f72672e6170616368652e63757261746f722e6672616d65776f726b2e696d70732e43757261746f724672616d65776f726b496d706c24322e6e65775a6f6f4b65657065722843757261746f724672616d65776f726b496d706c2e6a6176613a31363729da96174206f72672e6170616368652e63757261746f722e48616e646c65486f6c64657224312e6765745a6f6f4b65657065722848616e646c65486f6c6465722e6a6176613a393429da96174206f72672e6170616368652e63757261746f722e48616e646c65486f6c6465722e6765745a6f6f4b65657065722848616e646c65486f6c6465722e6a6176613a353529da96174206f72672e6170616368652e63757261746f722e436f6e6e656374696f6e53746174652e726573657428436f6e6e656374696f6e53746174652e6a6176613a32313029da96174206f72672e6170616368652e63757261746f722e436f6e6e656374696f6e53746174652e737461727428436f6e6e656374696f6e53746174652e6a6176613a31303129da96174206f72672e6170616368652e63757261746f722e43757261746f725a6f6f6b6565706572436c69656e742e73746172742843757261746f725a6f6f6b6565706572436c69656e742e6a6176613a31383829da96174206f72672e6170616368652e63757261746f722e6672616d65776f726b2e696d70732e43757261746f724672616d65776f726b496d706c2e73746172742843757261746f724672616d65776f726b496d706c2e6a6176613a32333229da961742073746f726d2e6b61666b612e5a6b53746174652e3c696e69743e285a6b53746174652e6a6176613a363229da961742073746f726d2e6b61666b612e4b61666b6153706f75742e6f70656e284b61666b6153706f75742e6a6176613a383529da96174206261636b747970652e73746f726d2e6461656d6f6e2e6578656375746f7224666e5f5f3333373124666e5f5f333338362e696e766f6b65286578656375746f722e636c6a3a35323229da96174206261636b747970652e73746f726d2e7574696c246173796e635f6c6f6f7024666e5f5f3436302e696e766f6b65287574696c2e636c6a3a34363129da9617420636c6f6a7572652e6c616e672e41466e2e72756e2841466e2e6a6176613a323429da96174206a6176612e6c616e672e5468726561642e72756e28556e6b6e6f776e20536f7572636529da737107e0748ffffffe94e4470737107e0cffffffaaffffffb1ffffffd4ffffff8b70707404686f737470740255553545830344e423030323836352e636f72702e736d6172747374617274696e632e636f6d737107e0749ffffff95ffffffd25174053a706f7274737107e0cffffffab5e58ffffff98707404706f72747107e01d70737107e0f0043,v{s{31,s{'world,'anyone}}},2 response:: '/storm/errors/PingMessage_Topology-1-1472131492/PingMessage_Spout/e0000000003 
2016-08-25 08:24:55 [SyncThread:0] DEBUG FinalRequestProcessor:88 - Processing request:: sessionid:0x156c1dfe5f1000b type:getChildren2 cxid:0x5c zxid:0xfffffffffffffffe txntype:unknown reqpath:/storm/errors/PingMessage_Topology-1-1472131492/PingMessage_Spout 
2016-08-25 08:24:55 [SyncThread:0] DEBUG FinalRequestProcessor:160 - sessionid:0x156c1dfe5f1000b type:getChildren2 cxid:0x5c zxid:0xfffffffffffffffe txntype:unknown reqpath:/storm/errors/PingMessage_Topology-1-1472131492/PingMessage_Spout 
2016-08-25 08:24:55 [SyncThread:0] DEBUG FinalRequestProcessor:88 - Processing request:: sessionid:0x156c1dfe5f1000b type:getChildren2 cxid:0x5d zxid:0xfffffffffffffffe txntype:unknown reqpath:/storm/errors/PingMessage_Topology-1-1472131492/PingMessage_Spout 
2016-08-25 08:24:55 [Thread-6-SendThread(127.0.0.1:2000)] DEBUG ClientCnxn:818 - Reading reply sessionid:0x156c1dfe5f1000b, packet:: clientPath:null serverPath:null finished:false header:: 92,12 replyHeader:: 92,43,0 request:: '/storm/errors/PingMessage_Topology-1-1472131492/PingMessage_Spout,F response:: v{'e0000000003,'e0000000002,'e0000000001,'e0000000000},s{34,34,1472131495046,1472131495046,0,4,0,0,1,4,43} 
2016-08-25 08:24:55 [SyncThread:0] DEBUG FinalRequestProcessor:160 - sessionid:0x156c1dfe5f1000b type:getChildren2 cxid:0x5d zxid:0xfffffffffffffffe txntype:unknown reqpath:/storm/errors/PingMessage_Topology-1-1472131492/PingMessage_Spout 

答えて

0

kafkaSpoutのコンフィギュレーションに欠けがあります:
はここで編集され出力されます。あなたのzookeeper(あなたがbrokerHostsで渡したもの)が動作していることを確認してください。 zkCli.shで簡単に確認し、ブローカーIDを確認することができます。 私はspoutConfigのプロパティをチェックし、kafka/zookeeperは正しく動作しているかどうかを確認します。私は次のspoutconfigの詳細を持っています:

spoutConfig sc = new SpoutConfig("localhost:2181","topicname", "/offset-dir","id"); 
sc.bufferSizeBytes = 1024*1024*4; 
sc.fetchSizeBytes = 1024*1024*4; 
sc.forceFromStart = true; 
sc.scheme = new SchemeAsMultipleScheme(new StringScheme()); 

あなたはsout/boltでタスクをセットアップしませんでした。

builder.setSpout("PingMessage_Spout", new KafkaSpout(kafkaConfig), 4).setNumTasks(1); 
     builder.setBolt("PingMessage_Bolt", new PingMessageBolt(), 4).setNumTask(1).shuffleGrouping("PingMessage_Spout"); 
     return builder.createTopology(); 

私は飼い犬が止まっていると信じています。私は同様の設定とその作業をしています。

+0

カフカはそれ自体で正常に動作しています。私はJavaでプロデューサ&コンシューマプロジェクトを作成しましたが、それらは正しく動作しています。カフカはストームと話すことができません。私の環境はWindows 7です。java.lang.NoSuchMethodErrorは誤ったキュレーターjarを教えてくれます。私はここで間違っていますか? –

1

ストームでカフカを実行するには、他にもたくさんのもの(ジャー)が必要です。すなわち、キュレーター、スカラ、ヤムール、&クロージャーなどが挙げられる。また、すべてのjarファイルは互いに互換性がなければなりません。そうしないと、恐ろしいNoClassDefFoundErrorが発生します。 これはおそらく唯一の答えではありませんが、ここでは、それは誰にも役立ちます場合には私のために働いたリストは、次のとおりです。

enter image description here

0

我々がポンポンにはApache Collections4を追加したときに我々は、このエラーを得た:

<!-- https://mvnrepository.com/artifact/org.apache.commons/commons-collections4 --> 
    <dependency> 
     <groupId>org.apache.commons</groupId> 
     <artifactId>commons-collections4</artifactId> 
     <version>4.0</version> 
    </dependency> 

ListUtils.partition()メソッドが必要でした。この瓶を外して固定しました。

関連する問題