2016-11-17 6 views
1

DStreamとしての単語リストがあります。例:リスト(車、速度、事故、スピード、悪い)。私はこのリストから双曲線を形成したい。私はRDDでこれを持っていますが、DStreamの問題に直面しています。私はforeachRDD関数を使用しています。以下は私が持っているものです -DStreamでのリスト処理

変換後にRDDの内容を印刷しようとしています。

def printRDD(rddString: RDD[String]) ={ 
     val z = rddString.map(y => y.toString.split(",").filter(_.nonEmpty). 
     map(y => y.replaceAll("""\W""", "").toLowerCase) 
     .filter(_.nonEmpty) 
     .sliding(2).filter(_.size == 2).map{ case Array(a, b) => ((a, b), 1) }) 
     .flatMap(x => x) 
     println(z) 
} 
val x = lines.map(plainTextToLemmas(_, stopWords)) 
val words = x.flatMap(y=> y.toString.split(",")) 
words.foreachRDD(rdd => printRDD(rdd)) 

変換関数printRDDの後に内容を表示する方法はありますか。印刷定義内でprintln(z)を使用しても、flatMapでMapPartitionsRDD [18]が返されます。私は入力を読み込むためにKafkaのスパークストリーミングを使用していますが、私はコンソールで単語の値を取得します。私は、printRDD関数を呼び出した後に、その言葉が変更されないと思います。

+0

ストリーム処理後のバイグラムにはどうなりますか?その機能はコンソール印刷だけです。 – maasg

答えて

1

あなたはこれらすべてがDStreamではなく、foreachRDD内部で動作して行い、その後、DStreamprintを呼び出すことができます。

lines 
    .map(plainTextToLemmas(_, stopWords)) 
    .flatMap(y => y.toString.split(",")) 
    .map(y => y.toString.split(",").filter(_.nonEmpty)) 
    .map(y => y.replaceAll("""\W""", "").toLowerCase) 
    .filter(_.nonEmpty) 
    .sliding(2) 
    .filter(_.size == 2) 
    .flatMap { case Array(a, b) => ((a, b), 1) } 
    .print() 

これは、ドライバーのコンソールにDStreamの内容をプリントアウトする必要があります。

注意すべき重要なことは、あなたがDStream上で動作しているものの、それは与えられたバッチ時間で基礎となるRDD「にドリル」方法だとRDD内部の実際の型を公開するので、あなたがする必要はありませんということですforeachRDDを使用して内部の実際のデータに到達してください。

関連する問題