2016-09-14 15 views
0

updateStateByKey関数を使用して大きなデータを同時にキャッシュする際に問題があります。ここに例があります。Spark Streaming - updateStateByKeyとキャッシュデータ

私はカフカからデータ(姓、年齢)を取得します。私はupdateStateByKeyを使用するので、すべての人の実際の年齢を維持したい。また、すべての人の名前を知りたいので、外部テーブル(姓、名前)と出力を結合します。ハイブから。それは本当に大きなテーブルだと思うので、私はすべてのバッチでそれをロードしたくありません。そして、問題があります。

すべてがうまくいきます。すべてのバッチでテーブルを読み込んだときに、テーブルをキャッシュしようとすると、StreamingContextは起動しません。私もregisterTempTableを使用して、後でSQLとデータを結合しようとしましたが、同じエラーが発生しました。

問題は、updateStateByKeyに必要なチェックポイントが問題だと思われます。 updateStateByKeyを削除してチェックポイントを終了すると、エラーが発生しますが、両方を削除すると動作します。私は取得しています

エラー:pastebin

をここではコードです:チェックポイントが有効になっているときに

import sys 

from pyspark import SparkContext, SparkConf 
from pyspark.sql import SQLContext, HiveContext 
from pyspark.streaming import StreamingContext 
from pyspark.streaming.kafka import KafkaUtils 

# function to keep actual state  
def updateFunc(channel, actualChannel): 
    if (actualChannel is None or not channel is None): 
     try: 
      actualChannel = channel[-1] 
     except Exception: 
      pass 
    if channel is None: 
     channel = actualChannel 
    return actualChannel 

def splitFunc(row): 
    row = row.strip() 
    lname,age = row.split() 
    return (lname,age)  


def createContext(brokers,topics): 
    # some conf 
    conf = SparkConf().setAppName(appName).set("spark.streaming.stopGracefullyOnShutdown","true").set("spark.dynamicAllocation.enabled","false").\ 
    set("spark.serializer","org.apache.spark.serializer.KryoSerializer").set("spark.sql.shuffle.partitions",'100') 
    # create SparkContext 
    sc = SparkContext(conf=conf) 

    # create HiveContext 
    sqlContext = HiveContext(sc) 

    # create Streaming Context 
    ssc = StreamingContext(sc, 5) 

    # read big_df and cache (not work, Streaming Context not start) 
    big_df = sqlContext.sql('select lastname,name from `default`.`names`') 
    big_df.cache().show(10) 

    # join table 
    def joinTable(time,rdd): 
     if rdd.isEmpty()==False: 
      df = HiveContext.getOrCreate(SparkContext.getOrCreate()).createDataFrame(rdd,['lname','age']) 

      # read big_df (work) 
      #big_df = HiveContext.getOrCreate(SparkContext.getOrCreate()).sql('select lastname,name from `default`.`names`') 

      # join DMS 
      df2 = df.join(big_df,df.lname == big_df.lastname,"left_outer") 

      return df2.map(lambda row:row) 

    # streaming 
    kvs = KafkaUtils.createDirectStream(ssc, [topics], {'metadata.broker.list': brokers})   
    kvs.map(lambda (k,v): splitFunc(v)).updateStateByKey(updateFunc).transform(joinTable).pprint() 

    return ssc 

if __name__ == "__main__": 
    appName="SparkCheckpointUpdateSate" 
    if len(sys.argv) != 3: 
     print("Usage: SparkCheckpointUpdateSate.py <broker_list> <topic>") 
     exit(-1) 

    brokers, topics = sys.argv[1:] 

    # getOrCreate Context 
    checkpoint = 'SparkCheckpoint/checkpoint' 
    ssc = StreamingContext.getOrCreate(checkpoint,lambda: createContext(brokers,topics)) 

    # start streaming 
    ssc.start() 
    ssc.awaitTermination() 

あなたはどのように適切にキャッシュデータに教えてもらえますか?たぶん、わからない回避策があります。

スパークバー。 1.6

答えて

0

これは、lazyインスタンス化されたbig_dfのグローバルインスタンスを使用して動作します。そのようなものはrecoverable_network_wordcount.py で行われます。

def getBigDf(): 
    if ('bigdf' not in globals()): 
     globals()['bigdf'] = HiveContext.getOrCreate(SparkContext.getOrCreate()).sql('select lastname,name from `default`.`names`') 
    return globals()['bigdf'] 

def createContext(brokers,topics): 
    ... 
    def joinTable(time,rdd): 
     ... 
     # read big_df (work) 
     big_df = getBigDF() 

     # join DMS 
     df2 = df.join(big_df,df.lname == big_df.lastname,"left_outer") 

     return df2.map(lambda row:row) 
    ... 

ストリーミングのように、すべてのデータはストリーミング処理の中でキャッシュされていなければなりません。

関連する問題