は次のとおりです。シェアsparkRDD使用して、私が実装何のapacheのIgnite
val sparkConf = new SparkConf().setAppName(“SharedRDD”).setMaster(“local”)
val sc = SparkContext.getOrCreate(sparkConf)
val sparkRDD = sc.wholeTextFiles("sample.csv", 10)
これRDDは、今、今後
val igniteContext = new IgniteContext(sc, “example-shared-rdd.xml”, false)
val sharedIgniteRDD = igniteContext.fromCache[String, String](“cachedIgniteRDD”)
if (sharedIgniteRDD.isEmpty())
sharedIgniteRDD.savePairs(sparkRDD)
IgniteContext
によってキャッシュされている任意のスパーク仕事をこのRDDにアクセスする必要がある場合、それが作成する必要はありません代わりに新しいものが点火キャッシュからそれを取り出す。入力ファイルのval RDDfromCache = igniteContext.fromCache[String, String](“CachedIgniteRDD”)
サンプルデータ
25/07/13,11599,CLOSED
25/07/13,256,PENDING_PAYMENT
25/07/13,12111,COMPLETE
25/07/13,8827,CLOSED
25/07/13,11318,COMPLETE
25/07/13,7130,COMPLETE
25/07/13,4530,COMPLETE
25/07/13,2911,PROCESSING
25/07/13,5657,PENDING_PAYMENT
25/07/13,5648,PENDING_PAYMENT
私はその入力ファイルのステータス列にグループ分けを行うにgroupByKey()APIを呼び出す必要があります。
あなたのご協力をお待ちしております。 cache
またはpersist
の
おかげ
今まであなたの質問に答えてください –
igniteキャッシュに保存する前にまたはignite cacheに保存した後にgroupByKeyを探していますか? –
私はigniteキャッシュに保存した後。 IgniteキャッシュであるRDDfromcacheにgroupByKeyを適用する必要があります。 – Vijay