updateStateByKey関数を使用して大きなデータを同時にキャッシュする際に問題があります。ここに例があります。Spark Streaming - updateStateByKeyとキャッシュデータ
私はカフカからデータ(姓、年齢)を取得します。私はupdateStateByKeyを使用するので、すべての人の実際の年齢を維持したい。また、すべての人の名前を知りたいので、外部テーブル(姓、名前)と出力を結合します。ハイブから。それは本当に大きなテーブルだと思うので、私はすべてのバッチでそれをロードしたくありません。そして、問題があります。
すべてがうまくいきます。すべてのバッチでテーブルを読み込んだときに、テーブルをキャッシュしようとすると、StreamingContextは起動しません。私もregisterTempTableを使用して、後でSQLとデータを結合しようとしましたが、同じエラーが発生しました。
問題は、updateStateByKeyに必要なチェックポイントが問題だと思われます。 updateStateByKeyを削除してチェックポイントを終了すると、エラーが発生しますが、両方を削除すると動作します。私は取得しています
エラー:pastebin
をここではコードです:チェックポイントが有効になっているときに
import sys
from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext, HiveContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
# function to keep actual state
def updateFunc(channel, actualChannel):
if (actualChannel is None or not channel is None):
try:
actualChannel = channel[-1]
except Exception:
pass
if channel is None:
channel = actualChannel
return actualChannel
def splitFunc(row):
row = row.strip()
lname,age = row.split()
return (lname,age)
def createContext(brokers,topics):
# some conf
conf = SparkConf().setAppName(appName).set("spark.streaming.stopGracefullyOnShutdown","true").set("spark.dynamicAllocation.enabled","false").\
set("spark.serializer","org.apache.spark.serializer.KryoSerializer").set("spark.sql.shuffle.partitions",'100')
# create SparkContext
sc = SparkContext(conf=conf)
# create HiveContext
sqlContext = HiveContext(sc)
# create Streaming Context
ssc = StreamingContext(sc, 5)
# read big_df and cache (not work, Streaming Context not start)
big_df = sqlContext.sql('select lastname,name from `default`.`names`')
big_df.cache().show(10)
# join table
def joinTable(time,rdd):
if rdd.isEmpty()==False:
df = HiveContext.getOrCreate(SparkContext.getOrCreate()).createDataFrame(rdd,['lname','age'])
# read big_df (work)
#big_df = HiveContext.getOrCreate(SparkContext.getOrCreate()).sql('select lastname,name from `default`.`names`')
# join DMS
df2 = df.join(big_df,df.lname == big_df.lastname,"left_outer")
return df2.map(lambda row:row)
# streaming
kvs = KafkaUtils.createDirectStream(ssc, [topics], {'metadata.broker.list': brokers})
kvs.map(lambda (k,v): splitFunc(v)).updateStateByKey(updateFunc).transform(joinTable).pprint()
return ssc
if __name__ == "__main__":
appName="SparkCheckpointUpdateSate"
if len(sys.argv) != 3:
print("Usage: SparkCheckpointUpdateSate.py <broker_list> <topic>")
exit(-1)
brokers, topics = sys.argv[1:]
# getOrCreate Context
checkpoint = 'SparkCheckpoint/checkpoint'
ssc = StreamingContext.getOrCreate(checkpoint,lambda: createContext(brokers,topics))
# start streaming
ssc.start()
ssc.awaitTermination()
あなたはどのように適切にキャッシュデータに教えてもらえますか?たぶん、わからない回避策があります。
スパークバー。 1.6