2017-02-19 6 views
1

私はテキストを含むファイルを持っています。私がしたいのは、テキストをトークン化し、ストップワードを削除し、2グラムを生成するためのパイプラインを使用することです。パイプラインを合わせてデータを処理する

ステップ1:ファイルを読む

val data = sparkSession.read.text("data.txt").toDF("text") 

ステップ2:私はこれまで何をやったか

私はそれを知っている

val pipe1 = new Tokenizer().setInputCol("text").setOutputCol("words") 
val pipe2 = new StopWordsRemover().setInputCol("words").setOutputCol("filtered") 
val pipe3 = new NGram().setN(2).setInputCol("filtered").setOutputCol("ngrams") 

val pipeline = new Pipeline().setStages(Array(pipe1, pipe2, pipe3)) 
val model = pipeline.fit(data) 

パイプラインを構築しpipeline.fit(data)PipelineModelが生成されますが、PipelineModelの使用方法はわかりません。

ご協力いただければ幸いです。

答えて

2

あなたがval model = pipeline.fit(data)コードを実行すると、すべてのEstimatorの段階(すなわち:などの分類、回帰、クラスタリング、などの機械学習タスク)がデータに適合しているとTransformerステージが作成されます。このパイプラインでフィーチャを作成しているので、Transformerステージしかありません。

Transformerステージでモデルを実行するには、val results = model.transform(data)を実行する必要があります。これにより、データフレームに対して各Transformerステージが実行されます。したがって、model.transform(data)プロセスの最後に、元の行、Tokenizer出力、StopWordsRemover出力、最後にNGram結果で構成されるデータフレームが作成されます。機能の作成が完了した後、トップ5 ngramsを発見

はSparkSQLクエリを介して実行することができます。最初にngram列を展開し、次にgroupby ngramをカウントし、カウントされた列で降順に並べ替えてから、show(5)を実行します。代わりにshow(5)の代わりに"LIMIT 5メソッドを使用することもできます。余談として

、あなたはおそらく、標準のクラス名ではありません何かに自分のオブジェクト名を変更する必要があります。それ以外の場合は、曖昧なスコープエラーが発生します。

CODE:

import org.apache.spark.sql.SparkSession 
import org.apache.spark.ml.feature.Tokenizer 
import org.apache.spark.sql.SparkSession._ 
import org.apache.spark.sql.functions._ 
import org.apache.spark.ml.feature.NGram 
import org.apache.spark.ml.feature.StopWordsRemover 
import org.apache.spark.ml.{Pipeline, PipelineModel} 

object NGramPipeline { 
    def main() { 
     val sparkSession = SparkSession.builder.appName("NGram Pipeline").getOrCreate() 

     val sc = sparkSession.sparkContext 

     val data = sparkSession.read.text("quangle.txt").toDF("text") 

     val pipe1 = new Tokenizer().setInputCol("text").setOutputCol("words") 
     val pipe2 = new StopWordsRemover().setInputCol("words").setOutputCol("filtered") 
     val pipe3 = new NGram().setN(2).setInputCol("filtered").setOutputCol("ngrams") 

     val pipeline = new Pipeline().setStages(Array(pipe1, pipe2, pipe3)) 
     val model = pipeline.fit(data) 

     val results = model.transform(data) 

     val explodedNGrams = results.withColumn("explNGrams", explode($"ngrams")) 
     explodedNGrams.groupBy("explNGrams").agg(count("*") as "ngramCount").orderBy(desc("ngramCount")).show(10,false) 

    } 
} 
NGramPipeline.main() 



OUTPUT:線があることが原因となっている構文(コンマ、ダッシュ、など)があること

+-----------------+----------+ 
|explNGrams  |ngramCount| 
+-----------------+----------+ 
|quangle wangle |9   | 
|wangle quee.  |4   | 
|'mr. quangle  |3   | 
|said, --   |2   | 
|wangle said  |2   | 
|crumpetty tree |2   | 
|crumpetty tree, |2   | 
|quangle wangle, |2   | 
|crumpetty tree,--|2   | 
|blue babboon, |2   | 
+-----------------+----------+ 
only showing top 10 rows 

お知らせ重複したnグラムを実行するときは、シンタックスをフィルタリングすることをお勧めします。通常、これは正規表現で行うことができます。

+0

「トランスフォーマー」とは何ですか?具体的な内容については、パイプラインドキュメントの「どのように機能するのか」を参照してください。お返事のための https://spark.apache.org/docs/latest/ml-pipeline.html#how-it-works – JamCon

+0

感謝。最も頻繁に使用される5グラムを得るために何をすべきか説明してください。追加したコードでは、最初の5つの2グラムが表示されます(5つの最も頻繁なものではありません)。 –

+0

これは、ngram機能が作成された後のSQLクエリです。この例に追加されています。 – JamCon

関連する問題