2016-08-09 5 views
3

カフカアプリケーションでユニットテストを実行する必要があります。ユニットテストのためのkafkaトピックをクリアする

今私の問題は、テストの間にすべてのトピックをクリアしたいのですが、私はどのようにわからないのですか。

これは私の一時的な解決策です。テストごとに生成されたすべてのメッセージをコミットし、すべてのテストコンシューマを同じコンシューマグループに配置します。

override protected def afterEach():Unit={ 
    val cleanerConsumer= newConsumer(Seq.empty) 
    val topics=cleanerConsumer.listTopics() 
    println("pulisco") 
    cleanerConsumer.subscribe(topics.keySet()) 
    cleanerConsumer.poll(100) 
    cleanerConsumer.commitSync() 
    cleanerConsumer.close() 
} 

これは機能しません。理由はわかりません。

たとえば、テスト内に新しいコンシューマを作成すると、messagesには前のテストで生成されたメッセージが含まれます。

val consumerProbe = newConsumer(SMSGatewayTopic) 

val messages = consumerProbe.poll(1000) 

どうすればこの問題を解決できますか?

+0

カフカは、永続的なメッセージストアで、それは消費者が消費を開始するためのオフセットを決定できます。あなたはあなたが落ちた最後のオフセットを覚えておいて、その後にそれを消費し始めるだけです。 –

+0

たとえば、次のように埋め込みブローカーを使用してください。https://github.com/apache/kafka/blob/trunk/streams/src/test/java/org/apache/kafka/streams/integration/FanoutIntegrationTest.java –

+0

なぜこの質問はJavaでタグ付けされていますか?結果として、それは非常にJava固有の検索で現れます。それほど役に立たない。 – user1283068

答えて

1

また、Kafka/Zookeeperのインスタンスをテストソースに埋め込んで、そのような分離されたサービスをより多く制御することもできます。

trait Kafka { self: ZooKeeper => 
    Kafka.start() 
} 

object Kafka { 
    import org.apache.hadoop.fs.FileUtil 
    import kafka.server.KafkaServer 

    @volatile private var started = false 

    lazy val logDir = java.nio.file.Files.createTempDirectory("kafka-log").toFile 

    lazy val kafkaServer: KafkaServer = { 
    val config = com.typesafe.config.ConfigFactory. 
     load(this.getClass.getClassLoader) 

    val (host, port) = { 
     val (h, p) = config.getString("kafka.servers").span(_ != ':') 
     h -> p.drop(1).toInt 
    } 

    val serverConf = new kafka.server.KafkaConfig({ 
     val props = new java.util.Properties() 
     props.put("port", port.toString) 
     props.put("broker.id", port.toString) 
     props.put("log.dir", logDir.getAbsolutePath) 

     props.put(
     "zookeeper.connect", 
     s"localhost:${config getInt "test.zookeeper.port"}" 
    ) 

     props 
    }) 

    new KafkaServer(serverConf) 
    } 

    def start(): Unit = if (!started) { 
    try { 
     kafkaServer.startup() 
     started = true 
    } catch { 
     case err: Throwable => 
     println(s"fails to start Kafka: ${err.getMessage}") 
     throw err 
    } 
    } 

    def stop(): Unit = try { 
    if (started) kafkaServer.shutdown() 
    } finally { 
    FileUtil.fullyDelete(logDir) 
    } 
} 

trait ZooKeeper { 
    ZooKeeper.start() 
} 

object ZooKeeper { 
    import java.nio.file.Files 
    import java.net.InetSocketAddress 
    import org.apache.hadoop.fs.FileUtil 
    import org.apache.zookeeper.server.ZooKeeperServer 
    import org.apache.zookeeper.server.ServerCnxnFactory 

    @volatile private var started = false 
    lazy val logDir = Files.createTempDirectory("zk-log").toFile 
    lazy val snapshotDir = Files.createTempDirectory("zk-snapshots").toFile 

    lazy val (zkServer, zkFactory) = { 
    val srv = new ZooKeeperServer(
     snapshotDir, logDir, 500 
    ) 

    val config = com.typesafe.config.ConfigFactory. 
     load(this.getClass.getClassLoader) 
    val port = config.getInt("test.zookeeper.port") 

    srv -> ServerCnxnFactory.createFactory(
     new InetSocketAddress("localhost", port), 1024 
    ) 
    } 

    def start(): Unit = if (!zkServer.isRunning) { 
    try { 
     zkFactory.startup(zkServer) 

     started = true 

     while (!zkServer.isRunning) { 
     Thread.sleep(500) 
     } 
    } catch { 
     case err: Throwable => 
     println(s"fails to start ZooKeeper: ${err.getMessage}") 
     throw err 
    } 
    } 

    def stop(): Unit = try { 
    if (started) zkFactory.shutdown() 
    } finally { 
    try { FileUtil.fullyDelete(logDir) } catch { case _: Throwable =>() } 
    FileUtil.fullyDelete(snapshotDir) 
    } 
} 

テストクラスはextends Kafka with ZooKeeperにすることができます。

テストJVMがフォークされていない場合は、SBT testOptions in TestTests.Cleanup設定を使用して、テスト後に組み込みサービスを停止できます。

1

私はあなたのテストの前にすべてのトピックを作成することをお勧めします。たとえば、これはカフカのテストがトピックを作成/削除する方法です:

Kafka repository on GitHub

関連する問題