0

私はストリーミングアプリケーション構造スパークを書いていますでPysparkで許可されていません。スパークストリーミング:カフカのグループIDは、<strong>カフカ</strong>からデータを読み出すためにスパーク構造化されたストリーミング

しかし、Sparkの現在のバージョンは2.1.0です。これは、パラメータとしてグループIDを設定することができず、各クエリに対して一意のIDを生成します。しかし、カフカ接続は、事前に設定されたグループIDを必要とするグループベースの承認です。

したがって、チームが希望しないので、にSparkを更新する必要なく、接続を確立するための回避策がありますか?

マイコード:

if __name__ == "__main__": 
    spark = SparkSession.builder.appName("DNS").getOrCreate() 
    sc = spark.sparkContext 
    sc.setLogLevel("WARN") 

    # Subscribe to 1 topic 
    lines = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "host:9092").option("subscribe", "record").option('kafka.security.protocol',"SASL_PLAINTEXT").load() 
    print(lines.isStreaming) #print TRUE 
    lines.selectExpr("CAST(value AS STRING)") 
    # Split the lines into words 
    words = lines.select(
    explode(
     split(lines.value, " ") 
     ).alias("word") 
    ) 
    # Generate running word count 
    wordCounts = words.groupBy("word").count() 

    # Start running the query that prints the running counts to the console 
    query = wordCounts \ 
     .writeStream \ 
     .outputMode("complete") \ 
     .format("console") \ 
     .start() 

    query.awaitTermination() 
+0

Spark 2.2でも 'group.id'を設定することはできません - http://spark.apache.org/docs/latest/structured-streaming-kafka -integration.html#kafka-specific-configurations – himanshuIIITian

+0

この[Databricks doc](https://docs.databricks.com/spark/latest/structured-streaming/kafka.html)_Since Spark 2.2によれば、オプションでグループID。ただし予期しない動作が発生する可能性があるため、十分注意して使用してください._ – ELI

+0

奇妙な! Spark 2.2のドキュメントによれば、私たちはできません。 2つの文書の間に不一致があるかもしれません。 – himanshuIIITian

答えて

0

KafkaUtilsクラス"group.id"のパラメータ値を上書きします。元のグループidの"spark-executor-"のconcatになります。以下は

はこれをやっているKafkaUtilsからのコードです:

// driver and executor should be in different consumer groups 
    val originalGroupId = kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG) 
    if (null == originalGroupId) { 
     logError(s"${ConsumerConfig.GROUP_ID_CONFIG} is null, you should probably set it") 
    } 
    val groupId = "spark-executor-" + originalGroupId 
    logWarning(s"overriding executor ${ConsumerConfig.GROUP_ID_CONFIG} to ${groupId}") 
    kafkaParams.put(ConsumerConfig.GROUP_ID_CONFIG, groupId) 

私たちは、同じ問題に直面しました。 KafkaはプリセットグループIDを持つACLに基づいていたので、唯一のことはカフカ設定のグループIDを変更することでした。私たちが当てていた当初のグループIDの脇の部分"spark-executor-" + originalGroupId

+0

私はストリーミングコンテキストを作成せずにkafkaからストリーミングデータを直接読み込むSpark Structured Streaming(上記のコード)を使用しています – ELI

関連する問題