2016-05-18 6 views
1

Spark MLlibの新機能です。 StreamingLogisticRegressionWithSGDモデルを実装しようとしています。 Sparkのドキュメントではほとんど情報が提供されていません。私は、ソケットストリームに2,22-22-22を入力すると、私は私がラベル0または1での特徴を入力するように期待していることを理解しますが、私は本当に私がそれ以上の標識のためにそれを設定することができるかどうかを知りたい StreamingLogisticRegressionWithSGDの分類クラス/ラベルを設定しない方法

ERROR DataValidators: Classification labels should be 0 or 1. Found 1 invalid labels 

を取得しています。 StreamingLogisticRegressionWithSGDの分類のクラス数を設定する方法がわかりません。

ありがとうございます!

コード

package test; 

import java.util.List; 

import org.apache.spark.SparkConf; 
import org.apache.spark.SparkContext; 
import org.apache.spark.api.java.JavaRDD; 
import org.apache.spark.api.java.function.Function; 
import org.apache.spark.mllib.classification.StreamingLogisticRegressionWithSGD; 
import org.apache.spark.mllib.linalg.Vector; 
import org.apache.spark.mllib.linalg.Vectors; 
import org.apache.spark.mllib.regression.LabeledPoint; 
import org.apache.spark.streaming.Durations; 
import org.apache.spark.streaming.StreamingContext; 
import org.apache.spark.streaming.api.java.JavaDStream; 
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream; 
import org.apache.spark.streaming.api.java.JavaStreamingContext; 

public class SLRPOC { 

    private static StreamingLogisticRegressionWithSGD slrModel; 

    private static int numFeatures = 3; 

    public static void main(String[] args) { 
     SparkConf sparkConf = new SparkConf().setMaster("local[3]").setAppName("SLRPOC"); 
     SparkContext sc = new SparkContext(sparkConf); 
     StreamingContext ssc = new StreamingContext(sc, Durations.seconds(10)); 
     JavaStreamingContext jssc = new JavaStreamingContext(ssc); 

     slrModel = new StreamingLogisticRegressionWithSGD().setStepSize(0.5).setNumIterations(10).setInitialWeights(Vectors.zeros(numFeatures)); 

     slrModel.trainOn(getDStreamTraining(jssc)); 
     slrModel.predictOn(getDStreamPrediction(jssc)).foreachRDD(new Function<JavaRDD<Double>, Void>() { 

      private static final long serialVersionUID = 5287086933555760190L; 

      @Override 
      public Void call(JavaRDD<Double> v1) throws Exception { 
       List<Double> list = v1.collect(); 
       for (Double d : list) { 
        System.out.println(d); 
       } 
       return null; 
      } 
     }); 

     jssc.start(); 
     jssc.awaitTermination(); 
    } 

    public static JavaDStream<LabeledPoint> getDStreamTraining(JavaStreamingContext context) { 
     JavaReceiverInputDStream<String> lines = context.socketTextStream("localhost", 9998); 

     return lines.map(new Function<String, LabeledPoint>() { 

      private static final long serialVersionUID = 1268686043314386060L; 

      @Override 
      public LabeledPoint call(String data) throws Exception { 
       System.out.println("Inside LabeledPoint call : ----- "); 
       String arr[] = data.split(","); 
       double vc[] = new double[3]; 
       String vcS[] = arr[1].split("-"); 
       int i = 0; 
       for (String vcSi : vcS) { 
        vc[i++] = Double.parseDouble(vcSi); 
       } 
       return new LabeledPoint(Double.parseDouble(arr[0]), Vectors.dense(vc)); 
      } 
     }); 
    } 

    public static JavaDStream<Vector> getDStreamPrediction(JavaStreamingContext context) { 
     JavaReceiverInputDStream<String> lines = context.socketTextStream("localhost", 9999); 

     return lines.map(new Function<String, Vector>() { 

      private static final long serialVersionUID = 1268686043314386060L; 

      @Override 
      public Vector call(String data) throws Exception { 
       System.out.println("Inside Vector call : ----- "); 
       String vcS[] = data.split("-"); 
       double vc[] = new double[3]; 
       int i = 0; 
       for (String vcSi : vcS) { 
        vc[i++] = Double.parseDouble(vcSi); 
       } 
       return Vectors.dense(vc); 
      } 
     }); 
    } 
} 

例外

インサイドLabeledPointコール:----- 16/05/18午前17時51分10秒INFOエグゼキュータ: 完成タスク0.0でステージ4.0(TID4)。結果: ドライバに送信された953バイト16/05/18 17:51:10情報TaskSetManager:完了したタスク0.0 in localhost(1/1)で8ミリ秒でステージ4.0(TID 4)16/05/18 17:51: 10 INFO TaskSchedulerImpl:タスクのすべてが が完了したTaskSet 4.0が削除されました。プールから16/05/18 17:51:10 INFO DAGScheduler:ResultStage 4(trainOnがSLRPOC.java:33)0.009秒で終了16/05/18 17:51:10 INFO DAGScheduler:ジョブ6が終了しました:SLRPOC.java:33でtrainOnが発生しました 0.019578 s 16/05/18 17:51:10 ERROR DataValidators:分類ラベルは0または1でなければなりません。無効なラベル16/05/18 17:51:10 INFO JobScheduler:ジョブストリーミングジョブの開始1463574070000ジョブからのms.1 時刻の設定1463574070000 ms 16/05/18 17:51:10エラーJobScheduler: ジョブストリーミングジョブを実行中にエラーが発生する1463574070000 ms.0 org.apache.spark.SparkException:入力の検証に失敗しました。 94: org.apache.spark.mllib.regression.GeneralizedLinearAlgorithm.run(GeneralizedLinearAlgorithm.scala:251)で org.apache.spark.mllib.regression.StreamingLinearAlgorithm $$ anonfun $ trainOn $ 1.apply(StreamingLinearAlgorithm.scalaで ) でorg.apache.spark.mllib.regression.StreamingLinearAlgorithm $$ anonfun $ trainOn $ 1.apply(StreamingLinearAlgorithm.scala:92) org.apache.spark.streaming.dstream.ForEachDStream $$ anonfun $ 1での$$ anonfun $ apply $ mcV $ sp $ 1.apply $ mcV $ sp(ForEachDStream.scala:42) org.apache.spark.streaming.dstream.ForEachDStream $$ anonfun $ 1 $$ anonfun $ apply $ mcV $ sp $ 1に適用されます。 apply(ForEachDStream.scala:40) at org.apache.spark.streaming.dstream.ForEachDSt連$$ anonfun $ 1 $$ anonfun $適用$ MCV $ SP $ 1.apply(ForEachDStream.scala:40) org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:399)で でorg.apache.spark.streaming.dstream.ForEachDStream $$ anonfun $ 1.apply $ mcV $ sp(ForEachDStream.scala:40) at org.apache.spark.streaming.dstream.ForEachDStream $$ anonfun $ 1.apply(ForEachDStream .scala:40) at org.apache.spark.streaming.dstream.ForEachDStream $$ anonfun $ 1.apply(ForEachDStream.scala:40) at scala.util.Try $ .apply(Try.scala:161)at org.apache.spark.streaming.scheduler.Job.run(Job.scala:34)at org.apache.spark.streaming.scheduler.JobScheduler $ JobHandler $$ anonfun $ run $ 1.appl y $ mcV $ sp(JobScheduler。スカラ: org.apache.spark.streaming.scheduler.JobScheduler $で207) :207) org.apache.spark.streaming.scheduler.JobScheduler $ JobHandler $$ anonfun $には$ 1.apply(JobScheduler.scalaを実行しますJobHandler $$ anonfun $ run $ 1.apply(JobScheduler.scala:207) (scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)) org.apache.spark.streaming.scheduler.JobScheduler $ JobHandler.run JobScheduler.scala:206) でjava.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) java.util.concurrent.ThreadPoolExecutor $ Worker.run(ThreadPoolExecutor.java:615)は、Javaで で。 lang.Thread.run(Thread.java:745)スレッド "main"の例外 org.apache.spark.SparkException:入力の検証に失敗しました。 94: org.apache.spark.mllib.regression.GeneralizedLinearAlgorithm.run(GeneralizedLinearAlgorithm.scala:251)で org.apache.spark.mllib.regression.StreamingLinearAlgorithm $$ anonfun $ trainOn $ 1.apply(StreamingLinearAlgorithm.scalaで ) でorg.apache.spark.mllib.regression.StreamingLinearAlgorithm $$ anonfun $ trainOn $ 1.apply(StreamingLinearAlgorithm.scala:92) org.apache.spark.streaming.dstream.ForEachDStream $$ anonfun $ 1での$$ anonfun $ apply $ mcV $ sp $ 1.apply $ mcV $ sp(ForEachDStream.scala:42) org.apache.spark.streaming.dstream.ForEachDStream $$ anonfun $ 1 $$ anonfun $ apply $ mcV $ sp $ 1に適用されます。 apply(ForEachDStream.scala:40) at org.apache.spark.streaming.dstream.ForEachDSt連$$ anonfun $ 1 $$ anonfun $適用$ MCV $ SP $ 1.apply(ForEachDStream.scala:40) org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:399)で でorg.apache.spark.streaming.dstream.ForEachDStream $$ anonfun $ 1.apply $ mcV $ sp(ForEachDStream.scala:40) at org.apache.spark.streaming.dstream.ForEachDStream $$ anonfun $ 1.apply(ForEachDStream .scala:40) at org.apache.spark.streaming.dstream.ForEachDStream $$ anonfun $ 1.apply(ForEachDStream.scala:40) at scala.util.Try $ .apply(Try.scala:161)at org.apache.spark.streaming.scheduler.Job.run(Job.scala:34)at org.apache.spark.streaming.scheduler.JobScheduler $ JobHandler $$ anonfun $ run $ 1.appl y $ mcV $ sp(JobScheduler.scala:207) at org.apache.spark.streaming.scheduler.JobScheduler $ JobHandler $$ anonfun $ run $ 1.apply(JobScheduler.scala:207) at org.apache。 spark.streaming.scheduler.JobScheduler $ JobHandler $$ anonfun $ run $ 1.apply(JobScheduler.scala:207) scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)at org.apache.spark.streaming。 scheduler.JobScheduler $ JobHandler.run(JobScheduler.scala:206) でjava.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) java.util.concurrent.ThreadPoolExecutor $ Worker.run(ThreadPoolExecutorで。 java:615) java.lang.Thread.run(Thread.java:745)16/05/18 17:51:10情報 StreamingContext:シャットダウンからstop(stopGracefully = false)を呼び出す フック16/05/18 17:51:10 INF:SparkContext:開始ジョブ:foreachRDD: SLRPOC.java:34 16/05/18 17:51:10 INFO DAGScheduler :ジョブ7は終了しました: SLRPOC.java:34のforeachRDDは0.000020秒かかりました16/05/18 17:51:10情報 JobScheduler:ジョブストリーミングジョブ1463574070000終了。1 from job 時刻セット1463574070000 ms 16/05/18 17:51:10情報ReceiverTracker: すべての2つの受信者に送信停止信号を送信16/05/18 17:51:10 INFO ReceiverSupervisorImpl:受信停止信号16/05/18 17:51:10情報 ReceiverSupervisorImpl:メッセージでレシーバーを停止中: ドライバーによって停止:16/05/18 17:51:10 INFO ReceiverSupervisorImpl: receiver onStop 16/05/18 17:51:10 INFO ReceiverSupervisorImpl : 登録解除1 16/05/18 17:51:10情報 受信停止通知16/05/18 17:51:10情報 受信者監視者メッセージ:メッセージ付き受信者の停止: ドライバ停止:16/05/18 17:51:10情報ReceiverSupervisorImpl: と呼ばれるiver onStop 16/05/18 17:51:10 INFO ReceiverSupervisorImpl: 受信機の登録解除0 16/05/18 17:51:10エラーReceiverTracker: ストリーム1の登録解除された受信機:ドライバによる停止16/05/18 17: 51:10 INFO ReceiverSupervisorImpl:停止レシーバー1 16/05/18 17時51分10秒ERRORのReceiverTracker:ストリーム0のための登録を抹消受信機:あなたはすでにこれを考え出した場合

答えて

2

わからないドライバにより停止 が、あなたの2つの分類、0または1のみを許可するバイナリアルゴリズムを使用してください。多分にするには、複数の分類アルゴリズムを使用する必要があります

import org.apache.spark.mllib.classification.{LogisticRegressionWithLBFGS, LogisticRegressionModel} 
import org.apache.spark.mllib.evaluation.MulticlassMetrics 
new LogisticRegressionWithLBFGS().setNumClasses(10) 
+0

LogisticRegressionWithLBFGSは、オンライン学習をサポートしていないため、ここでは選択肢ではありません。 –

関連する問題