2017-06-09 5 views
0

私はspring-kafkaに関する多くのチュートリアルをspring-bootで見つけて、それらをうまく実装しました。すべてのチュートリアルでスプリングブートを使用するのはなぜですか?春カフカに何らかの依存関係はありますか、それとも最善の解決策ですか?kafkaを既存のスプリングプロジェクトに統合

私はすでに春のプロジェクトを持っており、その中にspring-kafka mavenモジュールを追加しようとしています。私のメインプロジェクトのpom.xmlは以下のようになります。

<?xml version="1.0" encoding="UTF-8"?> 
<project xmlns="http://maven.apache.org/POM/4.0.0" 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd"> 

<modelVersion>4.0.0</modelVersion> 

<groupId>com</groupId> 

<artifactId>Catalog</artifactId> 

<version>0.0.1-SNAPSHOT</version> 

<packaging>pom</packaging> 

<name>Catalog</name> 

<url>http://maven.apache.org</url> 

<properties> 
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> 
</properties> 

<dependencies> 
    <dependency> 
     <groupId>junit</groupId> 
     <artifactId>junit</artifactId> 
     <version>3.8.1</version> 
     <scope>test</scope> 
    </dependency> 
</dependencies> 

<modules> 
    <module>service</module> 
    <module>db</module> 
    <module>web</module> 
    <module>Test2</module> 
    <module>entity</module> 
    <module>history</module> 
</modules> 

</project> 

私のモジュールののpom.xmlはプロジェクトが実行されませんこれらの依存関係を持つ

<?xml version="1.0"?> 
<project xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd" 
xmlns="http://maven.apache.org/POM/4.0.0" 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"> 

<modelVersion>4.0.0</modelVersion> 

<parent> 
    <groupId>com</groupId> 
    <artifactId>Catalog</artifactId> 
    <version>0.0.1-SNAPSHOT</version> 
</parent> 

<groupId>com</groupId> 

<artifactId>history</artifactId> 

<version>0.0.1-SNAPSHOT</version> 

<name>history</name> 

<url>http://maven.apache.org</url> 

<properties> 
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> 
</properties> 

<dependencies> 
    <!-- https://mvnrepository.com/artifact/org.json/json --> 
    <dependency> 
     <groupId>org.json</groupId> 
     <artifactId>json</artifactId> 
     <version>20170516</version> 
    </dependency> 

    <dependency> 
     <groupId>junit</groupId> 
     <artifactId>junit</artifactId> 
     <version>4.12</version> 
     <scope>test</scope> 
    </dependency> 

    <!-- 
    https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter --> 
    <dependency> 
     <groupId>org.springframework.boot</groupId> 
     <artifactId>spring-boot-starter</artifactId> 
     <version>1.5.3.RELEASE</version> 
    </dependency> 


    <!-- https://mvnrepository.com/artifact/org.springframework.kafka/spring-kafka --> 
    <dependency> 
     <groupId>org.springframework.kafka</groupId> 
     <artifactId>spring-kafka</artifactId> 
     <version>1.2.2.RELEASE</version> 
    </dependency> 

    </dependencies> 

</project> 

以下のように見えます。私は交換する必要があります

<parent> 
    <groupId>com</groupId> 
    <artifactId>Catalog</artifactId> 
    <version>0.0.1-SNAPSHOT</version> 
</parent> 

チュートリアルのコードは、以下のとおりです。

<parent> 
    <groupId>org.springframework.boot</groupId> 
    <artifactId>spring-boot-starter-parent</artifactId> 
    <version>1.5.3.RELEASE</version> 
    <relativePath /> <!-- lookup parent from repository --> 
</parent> 

spring-bootを使用せずにspring-kafkaを実行するにはどうすればよいですか?

App.java

package org.history; 

public class App { 

    public static void main(String[] args) { 
     KafkaTest test = new KafkaTest(); 
     test.start(); 
    } 
} 

KafkaTest.java

package org.history; 

import java.util.HashMap; 
import java.util.Map; 

import org.apache.kafka.clients.consumer.ConsumerConfig; 
import org.apache.kafka.clients.consumer.ConsumerRecord; 
import org.apache.kafka.clients.producer.ProducerConfig; 
import org.apache.kafka.common.serialization.IntegerDeserializer; 
import org.apache.kafka.common.serialization.IntegerSerializer; 
import org.apache.kafka.common.serialization.StringDeserializer; 
import org.apache.kafka.common.serialization.StringSerializer; 
import org.springframework.kafka.core.DefaultKafkaConsumerFactory; 
import org.springframework.kafka.core.DefaultKafkaProducerFactory; 
import org.springframework.kafka.core.KafkaTemplate; 
import org.springframework.kafka.core.ProducerFactory; 
import 
org.springframework.kafka.listener.KafkaMessageListenerContainer; 
import org.springframework.kafka.listener.MessageListener; 
import org.springframework.kafka.listener.config.ContainerProperties; 

public class KafkaTest { 

public void start() { 
    System.out.println("starting"); 
    ContainerProperties containerProps = new ContainerProperties("cscart_product_topic"); 

    containerProps.setMessageListener(new MessageListener<Integer, String>() { 

     @Override 
     public void onMessage(ConsumerRecord<Integer, String> message) { 
      System.out.println("received: " + message); 
     } 
    }); 

    KafkaMessageListenerContainer<Integer, String> container = createContainer(containerProps); 
    container.setBeanName("testAuto"); 
    container.start(); 

    try { 
     Thread.sleep(1000); 
    } catch (InterruptedException e) { 
     // TODO Auto-generated catch block 
     e.printStackTrace(); 
    } 
    // wait a bit for the container to start 

    KafkaTemplate<Integer, String> template = createTemplate(); 
    template.setDefaultTopic("cscart_product_topic"); 
    template.sendDefault(0, "foo"); 
    template.sendDefault(2, "bar"); 
    template.sendDefault(0, "baz"); 
    template.sendDefault(2, "qux"); 
    template.flush(); 
    container.stop(); 
} 

private KafkaMessageListenerContainer<Integer, String> createContainer(
     ContainerProperties containerProps) { 
    Map<String, Object> props = consumerProps(); 
    DefaultKafkaConsumerFactory<Integer, String> cf = 
        new DefaultKafkaConsumerFactory<Integer, String>(props); 
    KafkaMessageListenerContainer<Integer, String> container = 
        new KafkaMessageListenerContainer<>(cf, containerProps); 
    return container; 
} 

private KafkaTemplate<Integer, String> createTemplate() { 
    Map<String, Object> senderProps = senderProps(); 
    ProducerFactory<Integer, String> pf = 
     new DefaultKafkaProducerFactory<Integer, String>(senderProps); 
    KafkaTemplate<Integer, String> template = new KafkaTemplate<>(pf); 
    return template; 
} 

private Map<String, Object> consumerProps() { 
    Map<String, Object> props = new HashMap<>(); 
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); 
    props.put(ConsumerConfig.GROUP_ID_CONFIG, "cscart_product"); 
    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true); 
    props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100"); 
    props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000"); 
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class); 
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); 
    return props; 
} 

private Map<String, Object> senderProps() { 
    Map<String, Object> props = new HashMap<>(); 
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); 
    props.put(ProducerConfig.RETRIES_CONFIG, 0); 
    props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384); 
    props.put(ProducerConfig.LINGER_MS_CONFIG, 1); 
    props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432); 
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class); 
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); 
    return props; 
} 

} 

公式ドキュメントで説明したようにこれを追加します。

<dependency> 
     <groupId>org.springframework.kafka</groupId> 
     <artifactId>spring-kafka</artifactId> 
     <version>1.2.1.RELEASE</version> 
    </dependency> 

その後、私は最初の行にKafkaTest.javaにエラー

package org.history; 

エラーを取得するには、タイプorg.springframework.messaging.Messageを解決できない」と言う。それは間接的に必要とされるから参照します.class files "

+0

Uで見てください。柔軟性とプロセス全体のコントロールを提供します。その部門には春のkafkaがないと言っているわけではありません。 –

答えて

0

これをpom.xmlに追加するだけで、spring-kafkaをspring bootなしで使用することができます。

<dependency> 
    <groupId>org.springframework.kafka</groupId> 
    <artifactId>spring-kafka</artifactId> 
    <version>1.2.1.RELEASE</version> 
</dependency> 

はいつもバニラapacheのカフカdependecyを使用して、そこから実装することができます official site

+0

"org.springframework.messaging.Messageの型を解決できません。必須の.classファイルから間接参照されています"という追加の後、パッケージ名にこのエラーが表示されます。 – Sid

+0

'spring-messaging' jarを推移的依存として取得する必要があります。 –

関連する問題