2016-05-09 8 views
2

モデルをスパークバッチで作成し、リアルタイム処理のためにSparkストリーミングで使用できますか?モデルをSparkバッチで作成し、Sparkストリーミングで使用できますか?

私は、トレーニングと予測の両方が同じタイプの処理(線形回帰)で構築されているApache Sparkサイトのさまざまな例を見てきました。

+0

私が理解しているように、スパークストリーミングは実際のストリーミングではありません。ストリームをバッチに分割することで、訓練したバッチモデルがうまく動作するようになります。リアルタイムデータの_real_ストリーム処理が必要な場合は、Kafka、Flink、またはStormを参照してください。 – erip

+0

@eripサブ秒ストリーミングのように、OPが「リアルタイム」を意味するかどうかはわかりません。 –

+0

@YuvalItzchakovそれ以外の意味は?トレーニングデータのリアルタイムバッチ? – erip

答えて

1

- アプリケーションをストリーミングで

val model = DecisionTree.trainClassifier(trainingData, numClasses, categoricalFeaturesInfo,impurity, maxDepth, maxBins) 
    model.save(sc, "target/tmp/myDecisionTreeClassificationModel") 

私が今実装したもう一つのソリューション。

私はspark-Batchでモデルを作成しました。 最終モデルオブジェクト名がregmodelであるとします。

final LinearRegressionModel regmodel =algorithm.run(JavaRDD.toRDD(parsedData)); 

と火花コンテキスト名は

JavaSparkContext sc = new JavaSparkContext(sparkConf); 

今同じコードで私はこのような予測を同じSC

final JavaStreamingContext jssc = new JavaStreamingContext(sc,new Duration(Integer.parseInt(conf.getWindow().trim()))); 

を使用してやってストリーミング火花を作成していますように、SCです:

JavaPairDStream<Double, Double> predictvalue = dist1.mapToPair(new PairFunction<LabeledPoint, Double,Double>() { 
       private static final long serialVersionUID = 1L; 
       @Override 
       public Tuple2<Double, Double> call(LabeledPoint v1) throws Exception { 
        Double p = v1.label(); 
        Double q = regmodel.predict(v1.features()); 
        return new Tuple2<Double, Double>(p,q); 
       } 
      }); 
5

モデルをスパークバッチで作成し、リアルタイム処理のためにスパークストリーミングで使用できますか?

もちろん、はい。スパークコミュニティでは、彼らはオフライントレーニングのオンライン予測と呼んでいます。 sparkの多くのトレーニングアルゴリズムでは、ファイルシステムHDFS/S3にモデルを保存できます。同じモデルをストリーミングアプリケーションで読み込むことができます。予測を行うためにモデルのpredictメソッドを呼び出すだけです。

ストリーミング+ MLLibのセクションthis linkを参照してください。例えば

、あなたはバッチ・アプリケーションでは

... DecisionTreeをオフラインで訓練し、オンライン予測を行いたい場合は - ここで

val sameModel = DecisionTreeModel.load(sc, "target/tmp/myDecisionTreeClassificationModel") 
    sameModel.predict(newData) 
+0

はバッチで、scはsparkcontextに属し、scはストリーミングではsparkバッチまたはsparkstreamingcontextに属します。私はアプリケーションを開発するためにJavaを使用しています。ここでは、保存とロード操作はSparkContext(Scala)にのみ適用されますが、JavaSparkContextでは適用されません。 – Saurabh

+0

@Saurabh StreamingContextは元のSparkContextをラップします。同じことがjavaとscalaの両方に当てはまります。 [this](http://spark.apache.org/docs/latest/mllib-linear-methods.html#linear-least-squares-lasso-and-ridge-regression)の例を見て、 "Java"タブに切り替えます線形回帰のセーブ&ロードの例(各例の最後の2行)。 [JavaStreamingContext](http://spark.apache.org/docs/latest/api/java/org/apache/spark/streaming/api/java/JavaStreamingContext)のこのjavadocを見てください。html#sparkContext())sparkContext()メソッドはJavaSparkContextを返します。 –

関連する問題