私は、メッセージングアプリからのデータを収集していますが、それは永続スパークは、ストリーミング出力
私はカフカを使用したいは、 はスパークストリーミングを使用してカフカから消費日あたり約50万レコードを送信します と..私は私が試したそれぞれのアプローチで問題を抱えているインパラ
でのHadoopおよびクエリする
アプローチ1それを持続 - 寄木細工のディレクトリに外部のハイブの寄木細工のテーブルを指し、寄木細工のようRDD保存を
// scala
val ssc = new StreamingContext(sparkConf, Seconds(bucketsize.toInt))
val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2)
lines.foreachRDD(rdd => {
// 1 - Create a SchemaRDD object from the rdd and specify the schema
val SchemaRDD1 = sqlContext.jsonRDD(rdd, schema)
// 2 - register it as a spark sql table
SchemaRDD1.registerTempTable("sparktable")
// 3 - qry sparktable to produce another SchemaRDD object of the data needed 'finalParquet'. and persist this as parquet files
val finalParquet = sqlContext.sql(sql)
finalParquet.saveAsParquetFile(dir)
問題はfinalParquetです。 saveAsParquetFileは、巨大なノーを出力します。 Kafkaから受け取ったDstreamは、1分間のバッチサイズで200以上のファイルを出力します。 多くのファイルを出力する理由は、計算が別の投稿の後に説明されているように配布されたためです。how to make saveAsTextFile NOT split output into multiple file? 解決策は私にとって最適ではないようです。 1人のユーザーの状態として - 単一の出力ファイルを持つことは、データがほとんどない場合にのみ良い考えです。
アプローチ2 - 利用Hivecontext。これは正常に動作します
# python
sqlContext = HiveContext(sc)
ssc = StreamingContext(sc, int(batch_interval))
kvs = KafkaUtils.createStream(ssc, zkQuorum, group, {topics: 1})
lines = kvs.map(lambda x: x[1]).persist(StorageLevel.MEMORY_AND_DISK_SER)
lines.foreachRDD(sendRecord)
def sendRecord(rdd):
sql = "INSERT INTO TABLE table select * from beacon_sparktable"
# 1 - Apply the schema to the RDD creating a data frame 'beaconDF'
beaconDF = sqlContext.jsonRDD(rdd,schema)
# 2- Register the DataFrame as a spark sql table.
beaconDF.registerTempTable("beacon_sparktable")
# 3 - insert to hive directly from a qry on the spark sql table
sqlContext.sql(sql);
ハイブテーブルに直接RDDデータを挿入し、それは寄木細工のテーブルに直接挿入しますが、処理時間は、バッチ間隔時間を超えるとバッチの遅延がスケジュールされています。 消費者は、キューに入れ始める処理するために、生産されていただきましたし、バッチに追いつくカント。
ハイブへの書き込みが遅いようです。 iveは、バッチ間のサイズを調整し、より多くの消費者インスタンスを実行しようとしました。要約すると
複数のファイルや潜在的なレイテンシの問題はハイブに書いてあることを考えるとストリーミングスパークからビッグデータを永続化するための最良の方法は何ですか? 他の人は何をしていますか?
同様の質問がここに尋ねてきたが、それにあまりにも多くのファイル How to make Spark Streaming write its output so that Impala can read it?
に溶液#2で任意のヘルプ
あなたは、出力ストリームのための別のウィンドウを設定することができます。 foreachRDD(rdd => ' – ssedano
これは私にとっては非常に一般的なユースケースのように思えます。誰もそれに答えなかったことに私は驚いています。私はデータベースを使用することを提案すると思います。スパークはそれ自体では置き換えられません。 CassandraまたはHBaseを試してください(HBaseの学習曲線は非常に急峻です)。 – avloss