私は、SparkとMLLibでクラスタリングを行うために使用しているネットワークデータが大量にあります。私は、時刻、方向(ネットワークへの出入り)、送信バイト数、受信バイト数、および各接続時間を表す一連のベクトルにデータを正規化しました。合計7つの次元があります。ネットワーク異常検出にSpark MLLibクラスタリング(K-Means)を使用する
KMeansを使用すると、このデータでモデルを構築するのは簡単です。このモデルを使用して、各入力ベクトルは「分類」され、距離は最も近い重心に計算されます。最後に、RDD(現在距離でタグ付けされている)が距離でソートされ、最も極端な値が抽出されます。
私のデータの入力列の1つは、接続uuid(ユニークな英数字の識別子)です。私はモデルを介してこのデータを持ち運びたい(各入力ベクトルに一意のタグを付けたままにしておく)が、フロートに変換できない場合は例外が発生する。
ここでの質問は、「アウトライヤーを元の入力データに最も効率的に結び付ける方法は?」入力データは大きく正規化されており、元の入力に似ていません。さらに、送信元および宛先IPアドレスが失われています。私はモデルを構築する際に、どのカラムを考慮する(あるいは逆に無視する)のかをKMeansに知らせるインタフェースは見当たりません。
def get_distance(clusters):
def _distance_map(record):
cluster = clusters.predict(record)
centroid = clusters.clusterCenters[cluster]
dist = np.linalg.norm(np.array(record) - np.array(centroid))
return (dist, record)
return _distance_map
def parseMap(row):
# parses rows of data out of the input strings
def conMap(row):
# normalizes the values to be used in building the model
rdd = sc.textFile('/data2/network/201610').filter(lambda r: r[0] != '#')
tcp = rdd.map(parseMap).filter(lambda r: r['proto'] == 'tcp')
cons = tcp.map(conMap) # this normalizes connection data
model = KMeans.train(cons, (24 * 7), maxIterations=25,
runs=1, initializationMode = "random")
data_distance = cons.map(get_distance(model)).sortByKey(ascending=False)
print(data_distance.take(10))