2017-12-12 10 views
1

私はkafkaflinkをinteregateしようとしています。アイデアは、カフカ・キューを消費し、フリンクを使用してデータを変換することです。私はNoClassDefFoundError FlipとKafkaを統合しようとしています

https://github.com/abhishek-ch/evolveML/blob/master/flink/kafka-flink-example/pom.xml

これらは私の依存関係している下記の例に次のようです。

 <dependency> 
      <groupId>org.apache.flink</groupId> 
      <artifactId>flink-java</artifactId> 
      <version>1.3.2</version> 
     </dependency> 
     <dependency> 
      <groupId>org.apache.flink</groupId> 
      <artifactId>flink-streaming-core</artifactId> 
      <version>0.9.1</version> 
      </dependency> 
     <dependency> 
      <groupId>org.apache.flink</groupId> 
      <artifactId>flink-clients_2.11</artifactId> 
      <version>1.3.2</version> 
     </dependency> 
     <dependency> 
      <groupId>org.apache.flink</groupId> 
      <artifactId>flink-connector-kafka-0.8_2.11</artifactId> 
      <version>1.3.2</version> 
     </dependency> 
     <dependency> 
      <groupId>org.apache.kafka</groupId> 
      <artifactId>kafka_2.11</artifactId> 
      <version>1.0.0</version> 
     </dependency> 

また、kafkaクラスとflinkクラスを次のようにプロジェクトに含めます。カフカキューを消費するため

 <plugin> 
      <groupId>org.apache.maven.plugins</groupId> 
      <artifactId>maven-dependency-plugin</artifactId> 
      <version>3.0.2</version> 
      <executions> 
       <execution> 
        <id>unpack</id> 
        <!-- executed just before the package phase --> 
        <phase>prepare-package</phase> 
        <goals> 
         <goal>unpack</goal> 
        </goals> 
        <configuration> 
         <artifactItems> 
          <!-- For Flink connector classes --> 
          <artifactItem> 
           <groupId>org.apache.flink</groupId> 
           <artifactId>flink-connector-kafka-0.8_2.11</artifactId> 
           <version>1.3.2</version> 
           <type>jar</type> 
           <overWrite>false</overWrite> 
           <outputDirectory>${project.build.directory}/classes</outputDirectory> 
           <includes>org/apache/flink/**</includes> 
          </artifactItem> 
          <!-- For Kafka API classes --> 
          <artifactItem> 
           <groupId>org.apache.kafka</groupId> 
           <artifactId>kafka_2.11</artifactId> 
           <version>1.0.0</version> 
           <type>jar</type> 
           <overWrite>false</overWrite> 
           <outputDirectory>${project.build.directory}/classes</outputDirectory> 
           <includes>kafka/**</includes> 
          </artifactItem> 
         </artifactItems> 
        </configuration> 
       </execution> 
      </executions> 
     </plugin> 

私のJavaコードは、このコードサンプルは、私が先に述べたgithubのプロジェクトからである

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); 

    Map<String, String> map = new HashMap<>(); 
    map.put("bootstrap.servers", kafka_server); 
    map.put("zookeeper.connect", "localhost:40862"); 
    map.put("group.id", "test"); 
    map.put("topic", "data"); 

    // parse user parameters 
    ParameterTool parameterTool = ParameterTool.fromMap(map); 

    DataStream<String> messageStream = null; 
    try { 
     messageStream = env.addSource(new org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer082<>(
       parameterTool.getRequired("topic"), 
       new SimpleStringSchema(), 
       parameterTool.getProperties())); 
    } catch (Exception e) { 
     LOGGER.error("Error", e); 
    } 

    // print() will write the contents of the stream to the TaskManager's standard out stream 
    // the rebelance call is causing a repartitioning of the data so that all machines 
    // see the messages (for example in cases when "num kafka partitions" < "num flink operators" 
    messageStream.rebalance().map(new MapFunction<String, String>() { 
     private static final long serialVersionUID = -6867736771747690202L; 

     @Override 
     public String map(String value) throws Exception { 
      LOGGER.info("============================" + value); 
      return "Kafka and Flink says: " + value; 
     } 
    }).print(); 

    try { 
     env.execute(); 
    } catch (Exception e) { 
     e.printStackTrace(); 
    } 

です。このコードは、Tomcatにデプロイされたwarファイル内で実行されます。

このコードを実行すると、次のエラーが発生します。

 Unrecoverable error java.util.concurrent.CompletionException: java.lang.NoClassDefFoundError: org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer082 

私は戦争のエキスのクラスについて言及しました。私はこれを解決する方法を理解しようとしています。どんな助けや助言も高く評価されます。

答えて

1

代わりにflink-streaming-java_2.11バージョン1.3.2に依存するように、flink-streaming-core依存関係を変更する必要があります。 (flink-streaming-coreは数年前にflink-streaming-javaとflink-streaming-scalaに名前が変更されました)

また、flink-connector-kafka-0.8_2.11はKafkaバージョン0.8.x用です一方、あなたはKafkaバージョン1.0.0と組み合わせています。私はあなたがkafka_2.11依存関係を削除し、mavenに依存してカフカjarの正しいバージョンを推移的に含めることをお勧めします。

1

NoClassDefFoundErrorは、多くの場合、バージョン/依存関係の問題を暗示しています。実際、依存関係はちょっと混乱しています。

1.3.2(現在のリリース)と0.9.1(かなり古いバージョン)からFlinkの依存関係をインポートしています。 Flink KafkaコネクタはKafka 0.8用ですが、Kafka 1.0.0に依存しています。

関連する問題