にデータフレームに分けて、それぞれの行の繰り返しを数える:私は私のデータを洗浄するための次のコードを持っているのScala
val customSchema = StructType(Array(
StructField("time_stamp_0", StringType, true),
StructField("sender_ip_1", StringType, true),
StructField("receiver_ip_2", StringType, true)))
///////////////////////////////////////////////////make train dataframe
val Dstream_Train = sc.textFile("/Users/saeedtkh/Desktop/sharedsaeed/dataset3.txt")
val Row_Dstream_Train = Dstream_Train.map(line => line.split(",")).map(array => {
val array1 = array(0).trim.split("IP")
val array2 = array1(1).split(">")
val array3 = array2(1).split(":")
val first = Try(array1(0).trim) getOrElse ""
val second = Try(array2(0).trim) getOrElse ""
val third = Try(array3(0)) getOrElse ""
Row.fromSeq(Seq(first, second, third))
})
val Frist_Dataframe = session.createDataFrame(Row_Dstream_Train, customSchema)
val columns1and2 = Window.partitionBy("sender_ip_1", "receiver_ip_2") // <-- matches groupBy
///I add count to the dataframe
val Dataframe_addcount = Frist_Dataframe.withColumn("count", count($"receiver_ip_2") over columns1and2).distinct()
// Dataframe.show()
val final_add_count_attack = Dataframe_addcount.withColumn("attack", when($"count" > 10 , 0).otherwise(1))
私のコードのoutpoutには、次のデータフレームである。
+---------------+-----------+-------------+-----+------+
| time_stamp_0|sender_ip_1|receiver_ip_2|count|attack|
+---------------+-----------+-------------+-----+------+
|06:10:55.881073| 10.0.0.3| 10.0.0.1| 88| 0|
|06:10:55.881095| 10.0.0.3| 10.0.0.1| 88| 0|
|06:10:55.881114| 10.0.0.3| 10.0.0.1| 88| 0|
|06:10:55.881133| 10.0.0.3| 10.0.0.1| 88| 0|
|06:10:55.881152| 10.0.0.3| 10.0.0.1| 88| 0|
|06:10:55.881172| 10.0.0.3| 10.0.0.1| 88| 0|
|06:10:55.881191| 10.0.0.3| 10.0.0.1| 88| 0|
|06:10:55.881210| 10.0.0.3| 10.0.0.1| 88| 0|
|06:10:55.881229| 10.0.0.3| 10.0.0.1| 88| 0|
|06:10:55.881249| 10.0.0.3| 10.0.0.1| 88| 0|
|06:10:55.881268| 10.0.0.3| 10.0.0.1| 88| 0|
|06:10:55.881287| 10.0.0.3| 10.0.0.1| 88| 0|
|06:10:55.881306| 10.0.0.3| 10.0.0.1| 88| 0|
|06:10:55.881325| 10.0.0.3| 10.0.0.1| 88| 0|
|06:10:55.881344| 10.0.0.3| 10.0.0.1| 88| 0|
|06:10:55.881363| 10.0.0.3| 10.0.0.1| 88| 0|
|06:10:55.881386| 10.0.0.3| 10.0.0.1| 88| 0|
|06:10:55.881401| 10.0.0.3| 10.0.0.2| 85| 0|
|06:10:55.881424| 10.0.0.3| 10.0.0.2| 85| 0|
|06:10:55.881440| 10.0.0.3| 10.0.0.2| 85| 0|
+---------------+-----------+-------------+-----+------+
私の問題私はこのコードを操作する必要があるということです:val Dataframe_addcount = Frist_Dataframe.withColumn("count", count($"receiver_ip_2") over columns1and2).distinct()
私のコードの中で、各行の総行数を持つ代わりに、その行までのカウントを表示するような方法です。たとえば、データフレーム内にすべてのものが88個あるのではなく、このようなものが必要です。 (この例は最初の20行だけです)
+---------------+-----------+-------------+-----+------+
| time_stamp_0|sender_ip_1|receiver_ip_2|count|attack|
+---------------+-----------+-------------+-----+------+
|06:10:55.881073| 10.0.0.3| 10.0.0.1| 1 | 0|
|06:10:55.881095| 10.0.0.3| 10.0.0.1| 2 | 0|
|06:10:55.881114| 10.0.0.3| 10.0.0.1| 3 | 0|
|06:10:55.881133| 10.0.0.3| 10.0.0.1| 4 | 0|
|06:10:55.881152| 10.0.0.3| 10.0.0.1| 5 | 0|
|06:10:55.881172| 10.0.0.3| 10.0.0.1| 6 | 0|
|06:10:55.881191| 10.0.0.3| 10.0.0.1| 7 | 0|
|06:10:55.881210| 10.0.0.3| 10.0.0.1| 8 | 0|
|06:10:55.881229| 10.0.0.3| 10.0.0.1| 9 | 0|
|06:10:55.881249| 10.0.0.3| 10.0.0.1| 10| 0|
|06:10:55.881268| 10.0.0.3| 10.0.0.1| 11| 0|
|06:10:55.881287| 10.0.0.3| 10.0.0.1| 12| 0|
|06:10:55.881306| 10.0.0.3| 10.0.0.1| 13| 0|
|06:10:55.881325| 10.0.0.3| 10.0.0.1| 14| 0|
|06:10:55.881344| 10.0.0.3| 10.0.0.1| 15| 0|
|06:10:55.881363| 10.0.0.3| 10.0.0.1| 16| 0|
|06:10:55.881386| 10.0.0.3| 10.0.0.1| 17| 0|
|06:10:55.881401| 10.0.0.3| 10.0.0.1| 1 | 0|
|06:10:55.881424| 10.0.0.3| 10.0.0.1| 2 | 0|
|06:10:55.881440| 10.0.0.3| 10.0.0.1| 3 | 0|
+---------------+-----------+-------------+-----+------+
私を助けてください。前もって感謝します。
あなたは完璧な人です。いつものように。 :) – Queen
そのすべてについての経験:)受け入れてくれてありがとう。 –
私はすでに回答しました:)チェックしてください –