2017-08-22 4 views
1

私はBigData ecoシステムを初めて使い始めています。Kafkaから読んで寄木張りでhdfsに書き込む

私はスパークストリーミングを使用してカフカのトピックを読むことに関するいくつかの記事を読んでいますが、ストリーミングではなくスパークジョブを使用してカフカから読むことができるかどうかを知りたいですか? 「はい」の場合は、私を始められるいくつかの記事やコードスニペットを指摘してくれますか?

質問の2番目の部分は、寄木張りの形式でhdfsに書き込んでいます。 カフカから読んだら、私はrddがあると思う。 このrddをデータフレームに変換し、データフレームを下地ファイルとして書き込みます。 これは正しいアプローチですか。

助けてください。カフカからデータを読み込み、HDFSに書き込むために

おかげ

答えて

1

は、寄せ木形式で、代わりにストリーミングのスパークバッチジョブを使用して、あなたはSpark Structured Streamingを使用することができます。

ストラクチャードストリーミングは、Spark SQLエンジン上に構築されたスケーラブルでフォルトトレランスのストリーム処理エンジンです。静的データのバッチ計算を表現するのと同じ方法でストリーミング計算を表現できます。スパークSQLエンジンは、それを徐々に継続的に実行し、ストリーミングデータが到着すると最終結果を更新します。 Scala、Java、Python、RのDataset/DataFrame APIを使用して、ストリーミング集約、イベント時間ウィンドウ、ストリームとバッチ結合などを表現できます。計算は、最適化された同じSpark SQLエンジンで実行されます。最後に、システムは、チェックポイントおよびWrite Ahead Logsを使用して、正確に1回のフォールトトレランス機能をエンドツーエンドで保証します。簡単に言えば、ストラクチャードストリーミングは、ユーザーがストリーミングについて推論する必要なしに、速く、スケーラブルで、フォールトトレラントで、エンドツーエンドの正確に1回のストリーム処理を提供します。

カフカにはソースが組み込まれています。つまり、カフカのデータをポーリングできます。 Kafkaブローカーバージョン0.10.0以上と互換性があります。

カフカのデータをバッチモードでプルするには、定義された範囲のオフセットに対してデータセット/データフレームを作成します。以下のコードを記述することができ、今

| Column   | Type   | 
|:-----------------|--------------:| 
| key    |  binary | 
| value   |  binary | 
| topic   |  string | 
| partition  |   int | 
| offset   |   long | 
| timestamp  |   long | 
| timestampType |   int | 

、寄木細工の形式でHDFSにデータを書き込む:

df.write.parquet("hdfs://data.parquet") 

詳細について

// Subscribe to 1 topic defaults to the earliest and latest offsets 
val df = spark 
    .read 
    .format("kafka") 
    .option("kafka.bootstrap.servers", "host1:port1,host2:port2") 
    .option("subscribe", "topic1") 
    .load() 
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") 
    .as[(String, String)] 

// Subscribe to multiple topics, specifying explicit Kafka offsets 
val df = spark 
    .read 
    .format("kafka") 
    .option("kafka.bootstrap.servers", "host1:port1,host2:port2") 
    .option("subscribe", "topic1,topic2") 
    .option("startingOffsets", """{"topic1":{"0":23,"1":-2},"topic2":{"0":-2}}""") 
    .option("endingOffsets", """{"topic1":{"0":50,"1":-1},"topic2":{"0":-1}}""") 
    .load() 
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") 
    .as[(String, String)] 

// Subscribe to a pattern, at the earliest and latest offsets 
val df = spark 
    .read 
    .format("kafka") 
    .option("kafka.bootstrap.servers", "host1:port1,host2:port2") 
    .option("subscribePattern", "topic.*") 
    .option("startingOffsets", "earliest") 
    .option("endingOffsets", "latest") 
    .load() 
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") 
    .as[(String, String)] 

ソースの各行は、次のスキーマを持ちますSpark Structured Streaming + Kafkaに関する情報は、以下のガイドを参照してください。Kafka Integration Guide

私はそれが助けて欲しい!

+0

この回答は役に立ちましたか? – himanshuIIITian

+1

おかげさまでヒマンシュウさん、ありがとうございました。これはSpark 2.2が必要なようですが、2.0のようなsparkの下位バージョンでこれを行う方法はありますか? – Henosis

関連する問題