2016-10-31 7 views
4

メッセージアトリビュートからハッシュキーを生成するメッセージスケジューラがあります。メッセージスケジューラは、キーを使用してKafkaトピックキューに配置します。Python:単体テストのためにカフカの話題を嘲笑う方法は?

これは、重複排除の目的で実行されます。しかし、実際にローカルクラスタを設定して、それが期待通りに実行されていることを確認することなく、この重複排除をどのようにテストすることができるかはわかりません。

カフカのトピックキューを嘲笑するためのツールをオンラインで検索しても役に立たなかったと私はおそらくこれを間違った方法で考えているのではないかと心配しています。

最終的に、Kafkaキューを模擬するために使用されるものは、ローカルクラスタと同じように動作する必要があります。つまり、トピックキューへのKey挿入による重複除外解除を提供する必要があります。

このようなツールはありますか?

答えて

1

kafka特有の機能、またはkakfa特有の機能を実装する必要がある場合は、kakfaを使用してください。

kafkaはその重複排除ロジックに関するテストを行っていますか?もしそうなら、おそらく十分にあなたの組織の知覚不全のリスク軽減するために、次の組み合わせ:あなたのハッシュ・ロジック(同じオブジェクトが実際に同じハッシュを生成しないことを確認してください)

  • カフカ話題の

    • ユニットテスト(カフカのプロジェクトへの内部)重複排除テストカフカとアプリの統合を検証
    • プリフライトスモークテストは

    カフカはテストAROの任意の並べ替えを持っていない場合トピックの重複排除や変更の破綻が懸念される場合は、カフカ固有の機能についての自動チェックを行うことが重要です。これは統合テストを通じて行うことができます。私は最近、ドッカーベースの統合テストパイプラインで多くの成功を収めました。カフカドッカーの画像を作成する初期の作業(すでにコミュニティから入手可能な可能性があります)が完了すると、統合テストパイプラインをセットアップするのは簡単になります。パイプラインは次のようになります。これらのパスいったんベースのユニットテストが実行されている

    • アプリケーション、(ハッシュ・ロジック)
    • を、あなたのCIサーバは
    • 統合テストが実行されているカフカを起動し、その複製を検証することだけ発する書き込みトピックへの単一のメッセージ。

  • は、私は重要なことは確かカフカの統合テストがONLY絶対にカフカ特定の機能に依存しているテストが含まに最小化されているようにすることですね。ドッキング装置を使用しても単位テストよりも1桁〜1ミリ秒ほど遅いですか?考慮すべきもう一つのことは、統合パイプラインを維持するオーバーヘッドが、kakfaがそれが主張するトピック重複排除を提供することを信頼するリスクがあることです。

    1

    モックKafka uder PythonユニットテストSBTテストタスクを以下のようにしました。 Pysparkをインストールする必要があります。、ため

    import random 
    import unittest 
    from itertools import chain 
    
    from pyspark.streaming.kafka import KafkaUtils 
    from pyspark.streaming.tests import PySparkStreamingTestCase 
    
    class KafkaStreamTests(PySparkStreamingTestCase): 
        timeout = 20 # seconds 
        duration = 1 
    
        def setUp(self): 
         super(KafkaStreamTests, self).setUp() 
    
         kafkaTestUtilsClz = self.ssc._jvm.java.lang.Thread.currentThread().getContextClassLoader()\ 
          .loadClass("org.apache.spark.streaming.kafka.KafkaTestUtils") 
         self._kafkaTestUtils = kafkaTestUtilsClz.newInstance() 
         self._kafkaTestUtils.setup() 
    
        def tearDown(self): 
         if self._kafkaTestUtils is not None: 
          self._kafkaTestUtils.teardown() 
          self._kafkaTestUtils = None 
    
         super(KafkaStreamTests, self).tearDown() 
    
        def _randomTopic(self): 
         return "topic-%d" % random.randint(0, 10000) 
    
        def _validateStreamResult(self, sendData, stream): 
         result = {} 
         for i in chain.from_iterable(self._collect(stream.map(lambda x: x[1]), 
                    sum(sendData.values()))): 
          result[i] = result.get(i, 0) + 1 
    
         self.assertEqual(sendData, result) 
    
        def test_kafka_stream(self): 
         """Test the Python Kafka stream API.""" 
         topic = self._randomTopic() 
         sendData = {"a": 3, "b": 5, "c": 10} 
    
         self._kafkaTestUtils.createTopic(topic) 
         self._kafkaTestUtils.sendMessages(topic, sendData) 
    
         stream = KafkaUtils.createStream(self.ssc, self._kafkaTestUtils.zkAddress(), 
                 "test-streaming-consumer", {topic: 1}, 
                 {"auto.offset.reset": "smallest"}) 
         self._validateStreamResult(sendData, stream) 
    

    詳細例水路:Pythonは(app_test.py)をテストケース

    val testPythonTask = TaskKey[Unit]("testPython", "Run python tests.") 
    
    val command = "python3 -m unittest app_test.py" 
    val workingDirectory = new File("./project/src/main/python") 
    
    testPythonTask := { 
        val s: TaskStreams = streams.value 
        s.log.info("Executing task testPython") 
        Process(command, 
        workingDirectory, 
        // arguments for using org.apache.spark.streaming.kafka.KafkaTestUtils in Python 
        "PYSPARK_SUBMIT_ARGS" -> "--jars %s pyspark-shell" 
         // collect all jar paths from project 
         .format((fullClasspath in Runtime value) 
         .map(_.data.getCanonicalPath) 
         .filter(_.contains(".jar")) 
         .mkString(",")), 
        "PYSPARK_PYTHON" -> "python3") ! s.log 
    } 
    
    //attach custom test task to default test tasks 
    test in Test := { 
        testPythonTask.value 
        (test in Test).value 
    } 
    
    testOnly in Test := { 
        testPythonTask.value 
        (testOnly in Test).value 
    } 
    

    で:

    build.sbt でテストを実行しなければならないタスクを定義Kinesisと他のpyspark.streaming.testsモジュール。

    関連する問題