2015-10-01 4 views
7
、私は現在、水路を使用してい

私は、メッセージングアプリからのデータを収集していますが、それは永続スパークは、ストリーミング出力

私はカフカを使用したい

は、 はスパークストリーミングを使用してカフカから消費日あたり約50万レコードを送信します と..私は私が試したそれぞれのアプローチで問題を抱えているインパラ

でのHadoopおよびクエリする

アプローチ1それを持続 - 寄木細工のディレクトリに外部のハイブの寄木細工のテーブルを指し、寄木細工のようRDD保存を

// scala 
val ssc = new StreamingContext(sparkConf, Seconds(bucketsize.toInt)) 
val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2) 
lines.foreachRDD(rdd => { 

    // 1 - Create a SchemaRDD object from the rdd and specify the schema 
    val SchemaRDD1 = sqlContext.jsonRDD(rdd, schema) 

    // 2 - register it as a spark sql table 
    SchemaRDD1.registerTempTable("sparktable") 

    // 3 - qry sparktable to produce another SchemaRDD object of the data needed 'finalParquet'. and persist this as parquet files 
    val finalParquet = sqlContext.sql(sql) 
    finalParquet.saveAsParquetFile(dir) 

問題はfinalParquetです。 saveAsParquetFileは、巨大なノーを出力します。 Kafkaから受け取ったDstreamは、1分間のバッチサイズで200以上のファイルを出力します。 多くのファイルを出力する理由は、計算が別の投稿の後に説明されているように配布されたためです。how to make saveAsTextFile NOT split output into multiple file? 解決策は私にとって最適ではないようです。 1人のユーザーの状態として - 単一の出力ファイルを持つことは、データがほとんどない場合にのみ良い考えです。

アプローチ2 - 利用Hivecontext。これは正常に動作します

# python 
sqlContext = HiveContext(sc) 
ssc = StreamingContext(sc, int(batch_interval)) 
kvs = KafkaUtils.createStream(ssc, zkQuorum, group, {topics: 1}) 
lines = kvs.map(lambda x: x[1]).persist(StorageLevel.MEMORY_AND_DISK_SER) 
lines.foreachRDD(sendRecord) 

def sendRecord(rdd): 

    sql = "INSERT INTO TABLE table select * from beacon_sparktable" 

    # 1 - Apply the schema to the RDD creating a data frame 'beaconDF' 
    beaconDF = sqlContext.jsonRDD(rdd,schema) 

    # 2- Register the DataFrame as a spark sql table. 
    beaconDF.registerTempTable("beacon_sparktable") 

    # 3 - insert to hive directly from a qry on the spark sql table 
    sqlContext.sql(sql); 

ハイブテーブルに直接RDDデータを挿入し、それは寄木細工のテーブルに直接挿入しますが、処理時間は、バッチ間隔時間を超えるとバッチの遅延がスケジュールされています。 消費者は、キューに入れ始める処理するために、生産されていただきましたし、バッチに追いつくカント。

ハイブへの書き込みが遅いようです。 iveは、バッチ間のサイズを調整し、より多くの消費者インスタンスを実行しようとしました。要約すると

複数のファイルや潜在的なレイテンシの問題はハイブに書いてあることを考えるとストリーミングスパークからビッグデータを永続化するための最良の方法は何ですか? 他の人は何をしていますか?

同様の質問がここに尋ねてきたが、それにあまりにも多くのファイル How to make Spark Streaming write its output so that Impala can read it?

に溶液#2で任意のヘルプ

+0

あなたは、出力ストリームのための別のウィンドウを設定することができます。 foreachRDD(rdd => ' – ssedano

+0

これは私にとっては非常に一般的なユースケースのように思えます。誰もそれに答えなかったことに私は驚いています。私はデータベースを使用することを提案すると思います。スパークはそれ自体では置き換えられません。 CassandraまたはHBaseを試してください(HBaseの学習曲線は非常に急峻です)。 – avloss

答えて

0

のための多くのおかげで、作成したファイルの数を並置として、彼は、ディレクトリの問題を持っています各RDDの区画の数を介して制御することができる。

この例を参照してください:

// create a Hive table (assume it's already existing) 
sqlContext.sql("CREATE TABLE test (id int, txt string) STORED AS PARQUET") 

// create a RDD with 2 records and only 1 partition 
val rdd = sc.parallelize(List(List(1, "hello"), List(2, "world")), 1) 

// create a DataFrame from the RDD 
val schema = StructType(Seq(
StructField("id", IntegerType, nullable = false), 
StructField("txt", StringType, nullable = false) 
)) 
val df = sqlContext.createDataFrame(rdd.map(Row(_:_*)), schema) 

// this creates a single file, because the RDD has 1 partition 
df.write.mode("append").saveAsTable("test") 

は今、私はあなたがカフカからデータをプルする頻度、および各RDDのパーティションの数(デフォルトでは、あなたのカフカトピックのパーティションで遊ぶことができますね、おそらく再分割によって減らすことができます)。

CDH 5.5.1のSpark 1.5を使用していますが、df.write.mode("append").saveAsTable("test")またはSQL文字列を使用しても同じ結果が得られます。

関連する問題