2016-10-14 10 views
4

最新のFlink-1.1.2-Hadoop-27およびflink-connector-kafka-0.10.2-hadoop1 jarを使用しています。ClassNotFoundException:kafkaトピックを消費しているときにorg.apache.flink.streaming.api.checkpoint.CheckpointNotifierが返される

FLINKの消費者は以下の通りです:

Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/flink/streaming/api/checkpoint/CheckpointNotifier 
at java.lang.ClassLoader.defineClass1(Native Method) 
at java.lang.ClassLoader.defineClass(Unknown Source) 
at java.security.SecureClassLoader.defineClass(Unknown Source) 
at java.net.URLClassLoader.defineClass(Unknown Source) 
at java.net.URLClassLoader.access$100(Unknown Source) 
at java.net.URLClassLoader$1.run(Unknown Source) 
at java.net.URLClassLoader$1.run(Unknown Source) 
at java.security.AccessController.doPrivileged(Native Method) 
at java.net.URLClassLoader.findClass(Unknown Source) 
at java.lang.ClassLoader.loadClass(Unknown Source) 
at sun.misc.Launcher$AppClassLoader.loadClass(Unknown Source) 
at java.lang.ClassLoader.loadClass(Unknown Source) 
at java.lang.ClassLoader.defineClass1(Native Method) 
at java.lang.ClassLoader.defineClass(Unknown Source) 
at java.security.SecureClassLoader.defineClass(Unknown Source) 
at java.net.URLClassLoader.defineClass(Unknown Source) 
at java.net.URLClassLoader.access$100(Unknown Source) 
at java.net.URLClassLoader$1.run(Unknown Source) 
at java.net.URLClassLoader$1.run(Unknown Source) 
at java.security.AccessController.doPrivileged(Native Method) 
at java.net.URLClassLoader.findClass(Unknown Source) 
at java.lang.ClassLoader.loadClass(Unknown Source) 
at sun.misc.Launcher$AppClassLoader.loadClass(Unknown Source) 
at java.lang.ClassLoader.loadClass(Unknown Source) 
at com.bt.oss.voice.main.FlnkConsumer.main(FlnkConsumer.java:50)Caused by: java.lang.ClassNotFoundException: org.apache.flink.streaming.api.checkpoint.CheckpointNotifier 
at java.net.URLClassLoader.findClass(Unknown Source) 
at java.lang.ClassLoader.loadClass(Unknown Source) 
at sun.misc.Launcher$AppClassLoader.loadClass(Unknown Source) 
at java.lang.ClassLoader.loadClass(Unknown Source) 
... 25 more 

答えて

5

あなたはバージョンが混合されています

StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment(); 
     if (properties == null) { 
      properties = new Properties(); 
      InputStream props = Resources.getResource(KAFKA_CONFIGURATION_FILE).openStream(); 
      properties.load(props); 

      DataStream<String> stream = env.addSource(new FlinkKafkaConsumer082<>(KAFKA_SIP_TOPIC, new SimpleStringSchema() , properties)); 

以下は、私が実行した後に取得する例外です。 Flink 0.10.2のKafka消費者は、Flink 1.1.2では動作しません。

あなたはFLINK 1.1.2で提供さカフカコネクタを使用して、次のMavenの依存関係を含める必要があります。

<dependency> 
    <groupId>org.apache.flink</groupId> 
    <artifactId>flink-connector-kafka-0.8_2.10</artifactId> 
    <version>1.1.2</version> 
</dependency> 

詳細についてはdocumentationをチェックしてみて下さい。

関連する問題