1

私はsparkで新しく、私はストリーミングプログラムを作りたいと思います。私は自分の行ごとに繰り返しを予測する必要があります。ここに私の生データがあります:私はspacheでオンライン予測のためにストリーミングモードを使ってデータフレームを作ることができません

05:49:56.604899 00:00:00:00:00:02 > 00:00:00:00:00:03, ethertype IPv4 (0x0800), length 10202: 10.0.0.2.54880 > 10.0.0.3.5001: Flags [.], seq 3641977583:3641987719, ack 129899328, win 58, options [nop,nop,TS val 432623 ecr 432619], length 10136 
05:49:56.604908 00:00:00:00:00:03 > 00:00:00:00:00:02, ethertype IPv4 (0x0800), length 66: 10.0.0.3.5001 > 10.0.0.2.54880: Flags [.], ack 10136, win 153, options [nop,nop,TS val 432623 ecr 432623], length 0 
05:49:56.604900 00:00:00:00:00:02 > 00:00:00:00:00:03, ethertype IPv4 (0x0800), length 4410: 10.0.0.2.54880 > 10.0.0.3.5001: Flags [P.], seq 10136:14480, ack 1, win 58, options [nop,nop,TS val 432623 ecr 432619], length 4344 

私は以下のように私の適切な出力を抽出するコードを書いています。

しかし私のコードは、ストリーミングモードではありません。

enter image description here

は、ここに私のコードです(私はcolumn1のとcolumn2の上の繰り返しの数を必要に応じて)。私はストリーミングモードを取得するために別のコードを実行しました。 train.csvファイルがストリーミング形式で生成されているためです。しかし、私はいくつかのエラーがあります。 は、ここに私のストリーミングコードです:

val dataFrame_trainingData = sqlContext.createDataFrame(rowRdd, customSchema) 

は、すべてのボディは、私はこのコードを修正するのを助けることができる:

import org.apache.spark.SparkConf 
import org.apache.spark.mllib.linalg.Vectors 
import org.apache.spark.mllib.regression.{LabeledPoint, StreamingLinearRegressionWithSGD} 
import org.apache.spark.sql.Row 
import org.apache.spark.sql.types.{StringType, StructField, StructType} 
import org.apache.spark.streaming.{Seconds, StreamingContext} 

import scala.util.Try 
/** 
    * Created by saeedtkh on 5/24/17. 
    */ 
object Main_ML_with_Streaming { 
    def main(args: Array[String]) { 

    val conf = new SparkConf().setAppName("saeed_test").setMaster("local[*]") 
    //val sc = new SparkContext(conf) 
    val ssc = new StreamingContext(conf, Seconds(5)) 


    /////////////////////Start extract the packet 
    val customSchema = StructType(Array(
     StructField("column0", StringType, true), 
     StructField("column1", StringType, true), 
     StructField("column2", StringType, true))) 


    val rdd = ssc.textFileStream("/Users/saeedtkh/Desktop/sharedsaeed/train.csv") 
    val rowRdd =rdd.map(line => line.split(">")).map(array => { 
     val first = Try(array(0).trim.split(" ")(0)) getOrElse "" 
     val second = Try(array(1).trim.split(" ")(6)) getOrElse "" 
     val third = Try(array(2).trim.split(" ")(0).replace(":", "")) getOrElse "" 
     Row.fromSeq(Seq(first, second, third)) 
    }) 

    val dataFrame_trainingData = sqlContext.createDataFrame(rowRdd, customSchema) 
    dataFrame_trainingData.groupBy("column1","column2").count().show() 

    /////////////////////end extract the packet 

    val testData = ssc.textFileStream(/Users/saeedtkh/Desktop/sharedsaeed/test.csv).map(LabeledPoint.parse) 
    ////////////////////end trainging and testing 

    val numFeatures = 3 
    val model = new StreamingLinearRegressionWithSGD() 
     .setInitialWeights(Vectors.zeros(numFeatures)) 

    model.trainOn(dataFrame_trainingData) 
    model.predictOnValues(testData.map(lp => (lp.label, lp.features))).print() 

    ssc.start() 
    ssc.awaitTermination() 

    print("Here is the anwser: *****########*********#########*******222") 
    } 
} 

問題は、私は私のコードでこの行でsqlcontextを使用してデータフレームを作成することができない、ということですストリーミング方式で動作し、線形回帰または他のアルゴリズムを使用して各行の繰り返しを予測します。どうもありがとう。

アップデート1: Acoordingがナンバーワンに答えるために、私はforeachのを追加しましたが、エラーがあるまだ存在している: enter image description here

答えて

0

まず、それはssc.textFileStreamDStreamなくRDD返すことに注意することが重要ですので、変数ますrdd,rowRddおよびtestDataという名前は実際にはRDDではなく、RDDの連続したシーケンスでの抽象である。したがって、これらをRDDが必要なcreateDataFrameに渡すことはできません。

あなたはhereが説明するように、DStream.foreachRDDを使用して、それぞれの基礎となるRDDのうち、データフレームを作成することができます。

rowRdd.foreachRDD { rdd => 
    val dataFrame_trainingData = sqlContext.createDataFrame(rdd, customSchema) 
    // ... 
} 

しかし、あなたはStreamingLinearRegressionWithSGDtrainOnpredictOnValuesのための入力としてDStreamsを期待していることに気づくはずです - ので、あなたは、単に渡すことができます元のDStreamをDataFramesに変換せずに

+0

返信いただきありがとうございます。私はforeachを追加しましたが、エラーはまだ存在します。私は質問を更新し、あなたはupdate1のエラーを見ることができます。 –

+0

将来、いくつかのクエリを実行する必要があるため、データフレームを使用しました。しかし、私はあなたが言ったように、最初の値と2番目の値を直前の値で直接使うことができますか? –

+1

'foreachRDD'アプローチ(私が書いたように、' StreamingLinearRegressionWithSGD'がどのように動作するかを考えれば、あなたのケースでは最良のアプローチのようには見えません)を使うなら、 'foreachRDD ' ' - それの外側ではない(すなわち、中括弧'} 'の前)。 –

関連する問題