は、寄せ木形式で、代わりにストリーミングのスパークバッチジョブを使用して、あなたはSpark Structured Streamingを使用することができます。
ストラクチャードストリーミングは、Spark SQLエンジン上に構築されたスケーラブルでフォルトトレランスのストリーム処理エンジンです。静的データのバッチ計算を表現するのと同じ方法でストリーミング計算を表現できます。スパークSQLエンジンは、それを徐々に継続的に実行し、ストリーミングデータが到着すると最終結果を更新します。 Scala、Java、Python、RのDataset/DataFrame APIを使用して、ストリーミング集約、イベント時間ウィンドウ、ストリームとバッチ結合などを表現できます。計算は、最適化された同じSpark SQLエンジンで実行されます。最後に、システムは、チェックポイントおよびWrite Ahead Logsを使用して、正確に1回のフォールトトレランス機能をエンドツーエンドで保証します。簡単に言えば、ストラクチャードストリーミングは、ユーザーがストリーミングについて推論する必要なしに、速く、スケーラブルで、フォールトトレラントで、エンドツーエンドの正確に1回のストリーム処理を提供します。
カフカにはソースが組み込まれています。つまり、カフカのデータをポーリングできます。 Kafkaブローカーバージョン0.10.0以上と互換性があります。
カフカのデータをバッチモードでプルするには、定義された範囲のオフセットに対してデータセット/データフレームを作成します。以下のコードを記述することができ、今
| Column | Type |
|:-----------------|--------------:|
| key | binary |
| value | binary |
| topic | string |
| partition | int |
| offset | long |
| timestamp | long |
| timestampType | int |
、寄木細工の形式でHDFSにデータを書き込む:
df.write.parquet("hdfs://data.parquet")
詳細について
// Subscribe to 1 topic defaults to the earliest and latest offsets
val df = spark
.read
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1")
.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.as[(String, String)]
// Subscribe to multiple topics, specifying explicit Kafka offsets
val df = spark
.read
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1,topic2")
.option("startingOffsets", """{"topic1":{"0":23,"1":-2},"topic2":{"0":-2}}""")
.option("endingOffsets", """{"topic1":{"0":50,"1":-1},"topic2":{"0":-1}}""")
.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.as[(String, String)]
// Subscribe to a pattern, at the earliest and latest offsets
val df = spark
.read
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribePattern", "topic.*")
.option("startingOffsets", "earliest")
.option("endingOffsets", "latest")
.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.as[(String, String)]
ソースの各行は、次のスキーマを持ちますSpark Structured Streaming + Kafkaに関する情報は、以下のガイドを参照してください。Kafka Integration Guide
私はそれが助けて欲しい!
この回答は役に立ちましたか? – himanshuIIITian
おかげさまでヒマンシュウさん、ありがとうございました。これはSpark 2.2が必要なようですが、2.0のようなsparkの下位バージョンでこれを行う方法はありますか? – Henosis