私はストリーミングアプリケーション構造スパークを書いていますでPysparkで許可されていません。スパークストリーミング:カフカのグループIDは、<strong>カフカ</strong>からデータを読み出すためにスパーク構造化されたストリーミング
しかし、Sparkの現在のバージョンは2.1.0です。これは、パラメータとしてグループIDを設定することができず、各クエリに対して一意のIDを生成します。しかし、カフカ接続は、事前に設定されたグループIDを必要とするグループベースの承認です。
したがって、チームが希望しないので、にSparkを更新する必要なく、接続を確立するための回避策がありますか?
マイコード:
if __name__ == "__main__":
spark = SparkSession.builder.appName("DNS").getOrCreate()
sc = spark.sparkContext
sc.setLogLevel("WARN")
# Subscribe to 1 topic
lines = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "host:9092").option("subscribe", "record").option('kafka.security.protocol',"SASL_PLAINTEXT").load()
print(lines.isStreaming) #print TRUE
lines.selectExpr("CAST(value AS STRING)")
# Split the lines into words
words = lines.select(
explode(
split(lines.value, " ")
).alias("word")
)
# Generate running word count
wordCounts = words.groupBy("word").count()
# Start running the query that prints the running counts to the console
query = wordCounts \
.writeStream \
.outputMode("complete") \
.format("console") \
.start()
query.awaitTermination()
Spark 2.2でも 'group.id'を設定することはできません - http://spark.apache.org/docs/latest/structured-streaming-kafka -integration.html#kafka-specific-configurations – himanshuIIITian
この[Databricks doc](https://docs.databricks.com/spark/latest/structured-streaming/kafka.html)_Since Spark 2.2によれば、オプションでグループID。ただし予期しない動作が発生する可能性があるため、十分注意して使用してください._ – ELI
奇妙な! Spark 2.2のドキュメントによれば、私たちはできません。 2つの文書の間に不一致があるかもしれません。 – himanshuIIITian