2017-06-28 5 views
0

にデータフレームに分けて、それぞれの行の繰り返しを数える:私は私のデータを洗浄するための次のコードを持っているのScala

val customSchema = StructType(Array(
     StructField("time_stamp_0", StringType, true), 
     StructField("sender_ip_1", StringType, true), 
     StructField("receiver_ip_2", StringType, true))) 

    ///////////////////////////////////////////////////make train dataframe 
    val Dstream_Train = sc.textFile("/Users/saeedtkh/Desktop/sharedsaeed/dataset3.txt") 

    val Row_Dstream_Train = Dstream_Train.map(line => line.split(",")).map(array => { 

     val array1 = array(0).trim.split("IP") 
     val array2 = array1(1).split(">") 
     val array3 = array2(1).split(":") 

     val first = Try(array1(0).trim) getOrElse "" 
     val second = Try(array2(0).trim) getOrElse "" 
     val third = Try(array3(0)) getOrElse "" 

     Row.fromSeq(Seq(first, second, third)) 
    }) 
    val Frist_Dataframe = session.createDataFrame(Row_Dstream_Train, customSchema) 

    val columns1and2 = Window.partitionBy("sender_ip_1", "receiver_ip_2") // <-- matches groupBy 

    ///I add count to the dataframe 
    val Dataframe_addcount = Frist_Dataframe.withColumn("count", count($"receiver_ip_2") over columns1and2).distinct() 
    // Dataframe.show() 
    val final_add_count_attack = Dataframe_addcount.withColumn("attack", when($"count" > 10 , 0).otherwise(1)) 

私のコードのoutpoutには、次のデータフレームである。

+---------------+-----------+-------------+-----+------+ 
| time_stamp_0|sender_ip_1|receiver_ip_2|count|attack| 
+---------------+-----------+-------------+-----+------+ 
|06:10:55.881073| 10.0.0.3|  10.0.0.1| 88|  0| 
|06:10:55.881095| 10.0.0.3|  10.0.0.1| 88|  0| 
|06:10:55.881114| 10.0.0.3|  10.0.0.1| 88|  0| 
|06:10:55.881133| 10.0.0.3|  10.0.0.1| 88|  0| 
|06:10:55.881152| 10.0.0.3|  10.0.0.1| 88|  0| 
|06:10:55.881172| 10.0.0.3|  10.0.0.1| 88|  0| 
|06:10:55.881191| 10.0.0.3|  10.0.0.1| 88|  0| 
|06:10:55.881210| 10.0.0.3|  10.0.0.1| 88|  0| 
|06:10:55.881229| 10.0.0.3|  10.0.0.1| 88|  0| 
|06:10:55.881249| 10.0.0.3|  10.0.0.1| 88|  0| 
|06:10:55.881268| 10.0.0.3|  10.0.0.1| 88|  0| 
|06:10:55.881287| 10.0.0.3|  10.0.0.1| 88|  0| 
|06:10:55.881306| 10.0.0.3|  10.0.0.1| 88|  0| 
|06:10:55.881325| 10.0.0.3|  10.0.0.1| 88|  0| 
|06:10:55.881344| 10.0.0.3|  10.0.0.1| 88|  0| 
|06:10:55.881363| 10.0.0.3|  10.0.0.1| 88|  0| 
|06:10:55.881386| 10.0.0.3|  10.0.0.1| 88|  0| 
|06:10:55.881401| 10.0.0.3|  10.0.0.2| 85|  0| 
|06:10:55.881424| 10.0.0.3|  10.0.0.2| 85|  0| 
|06:10:55.881440| 10.0.0.3|  10.0.0.2| 85|  0| 
+---------------+-----------+-------------+-----+------+ 

私の問題私はこのコードを操作する必要があるということです:val Dataframe_addcount = Frist_Dataframe.withColumn("count", count($"receiver_ip_2") over columns1and2).distinct()私のコードの中で、各行の総行数を持つ代わりに、その行までのカウントを表示するような方法です。たとえば、データフレーム内にすべてのものが88個あるのではなく、このようなものが必要です。 (この例は最初の20行だけです)

+---------------+-----------+-------------+-----+------+ 
    | time_stamp_0|sender_ip_1|receiver_ip_2|count|attack| 
    +---------------+-----------+-------------+-----+------+ 
    |06:10:55.881073| 10.0.0.3|  10.0.0.1| 1 |  0| 
    |06:10:55.881095| 10.0.0.3|  10.0.0.1| 2 |  0| 
    |06:10:55.881114| 10.0.0.3|  10.0.0.1| 3 |  0| 
    |06:10:55.881133| 10.0.0.3|  10.0.0.1| 4 |  0| 
    |06:10:55.881152| 10.0.0.3|  10.0.0.1| 5 |  0| 
    |06:10:55.881172| 10.0.0.3|  10.0.0.1| 6 |  0| 
    |06:10:55.881191| 10.0.0.3|  10.0.0.1| 7 |  0| 
    |06:10:55.881210| 10.0.0.3|  10.0.0.1| 8 |  0| 
    |06:10:55.881229| 10.0.0.3|  10.0.0.1| 9 |  0| 
    |06:10:55.881249| 10.0.0.3|  10.0.0.1| 10|  0| 
    |06:10:55.881268| 10.0.0.3|  10.0.0.1| 11|  0| 
    |06:10:55.881287| 10.0.0.3|  10.0.0.1| 12|  0| 
    |06:10:55.881306| 10.0.0.3|  10.0.0.1| 13|  0| 
    |06:10:55.881325| 10.0.0.3|  10.0.0.1| 14|  0| 
    |06:10:55.881344| 10.0.0.3|  10.0.0.1| 15|  0| 
    |06:10:55.881363| 10.0.0.3|  10.0.0.1| 16|  0| 
    |06:10:55.881386| 10.0.0.3|  10.0.0.1| 17|  0| 
    |06:10:55.881401| 10.0.0.3|  10.0.0.1| 1 |  0| 
    |06:10:55.881424| 10.0.0.3|  10.0.0.1| 2 |  0| 
    |06:10:55.881440| 10.0.0.3|  10.0.0.1| 3 |  0| 
    +---------------+-----------+-------------+-----+------+ 

私を助けてください。前もって感謝します。

答えて

1

あなたはウィンドウのシリアル番号を与えるだろうwindow機能のためrank機能を使用する必要がありました。しかし、このためにはdataframeを注文する必要があります。

次の変更は、コードの残りの部分

val columns1and2 = Window.partitionBy("sender_ip_1", "receiver_ip_2").orderBy("time_stamp_0") // <-- matches groupBy 

val Dataframe_addcount = Frist_Dataframe.withColumn("count", rank() over columns1and2).distinct() 

要件のために十分でなければなりませんあなたと同じです。答えが役立つことを願っています。

+0

あなたは完璧な人です。いつものように。 :) – Queen

+0

そのすべてについての経験:)受け入れてくれてありがとう。 –

+1

私はすでに回答しました:)チェックしてください –

関連する問題