2017-06-15 15 views
0

私はネットワークトラフィックログファイルがNeo4jで分析できるかどうか調べようとしています。 したがって、「sh」-libraryを使用してリアルタイムでBro IDSから3つの異なるログファイルを「テーリング」し、py2neoを使用して非常に遅いと思われるneo4jにログレコードをインポートします。 CSVインポートはリアルタイムであるため、ここでは機能しません。Neo4j - ログファイルをリアルタイムでインポートするには書き込み速度が遅すぎますか?

例:ほぼ4.000.000接続のtcpreplayを使用して、1時間のパケットキャプチャファイルを分析しています。私はテンポの半分で演奏しました。だから2時間後、私は約4.000.000のログエントリを持っていた。分析の開始から3,5時間後の今、私は5ノードと4の関係からなる289691グラフをインポートしたばかりだ。全体では、ほぼ2倍の時間でデータの約15%を占めています。

(これはグラフの一つである)私はpy2neoを使用していますが、コードは次のようになります。

def create_conn_graph(connlog): 
[...] 

## Start Session 
graph = Graph(bolt=True, password="neo4j") 
tx = graph.begin() 

############ 
## Nodes ## 
############ 

## Connection Node 
conn = Node("Connection", uid=connlog['uid'], 
      ts=connlog['ts'], 
      date=evt_date, 
      time=evt_time, 
      [...]) 

conn_properties = dict(conn) 
for key in conn_properties.keys(): 
    if conn[key] == "-" or conn[key] == "(empty)": 
     conn[key] = "0" 
conn.update() 
tx.merge(conn, "Connection", "uid") 

## IP Nodes 
orig = Node("IP", ip=connlog['orig_h']) 
tx.merge(orig) 

resp = Node("IP", ip=connlog['resp_h']) 
tx.merge(resp) 

## History Node 
if connlog['history']: 
    hist_flow = history_flow(connlog['history']) 
    history_node = Node("History", history=connlog['history'], flow=hist_flow) 
    tx.merge(history_node, "History", "history") 

    ## (Connection)-[HAS_HISTORY]->(History) 
    conn_hist = Relationship(conn, "HAS_HISTORY", history_node) 
    tx.merge(conn_hist) 

## Conn_State 
conn_state = Node("Conn_State", state=connlog['conn_state'], meaning=CONN_STATE[connlog['conn_state']]) 
tx.merge(conn_state, "Conn_State", "conn_state") 

tx.commit() 
tx = graph.begin() 

##################### 
## Relationships ## 
##################### 

## (IP)-[STARTS_CONNECTION]->(Connection) 
orig_conn = Relationship(orig, "STARTS_CONNECTION", conn, port=connlog['orig_p']) 
tx.merge(orig_conn) 

## (Connection)-[CONNECTS_TO]->(IP) 
conn_resp = Relationship(conn, "CONNECTS_TO", resp, port=connlog['resp_p']) 
tx.merge(conn_resp) 

## (Connection)-[HAS_CONN_STATE]->(Conn_State) 
conn_connstate = Relationship(conn, "HAS_CONN_STATE", conn_state) 
tx.merge(conn_connstate) 

tx.commit() 
## (Connection)-[PRODUCED]-> (DNS|HTTP) 
if connlog['service'] == "dns": 
    graph.run("MATCH (c:Connection {uid:{uid}}), (d:DNS {uid:{uid}}) \ 
     MERGE (c)-[:PRODUCED]->(d)", 
       {"uid": connlog['uid']}) 

if connlog['service'] == "http": 
    graph.run("MATCH (c:Connection {uid:{uid}}), (d:HTTP {uid:{uid}}) \ 
     MERGE (c)-[:PRODUCED]->(d)", 
       {"uid": connlog['uid']}) 

return True 


## End of create_conn_graph ######################################## 


if __name__ == "__main__": 
    logentry = {} 
    logfield = CONNLOG 
    logline = [] 

    for line in tail("-F", LOG_DIR, _iter=True, _bg=True): 
     entry = line.strip().split("\t") 
     if line.startswith('#'): 
      continue 
     for i in range(len(logfield)): 
      logentry[logfield[i]] = entry[i] 
     create_conn_graph(logentry) 

私は次の制約とインデックスを持っている:

graph.run("CREATE CONSTRAINT ON (c:Connection) ASSERT c.uid IS UNIQUE") 
graph.run("CREATE CONSTRAINT ON (i:IP) ASSERT i.ip IS UNIQUE") 
graph.run("CREATE CONSTRAINT ON (c:Conn_State) ASSERT c.conn_state IS UNIQUE") 
graph.run("CREATE CONSTRAINT ON (h:History) ASSERT h.history IS UNIQUE") 
graph.run("CREATE CONSTRAINT ON (host:Host) ASSERT host.host is UNIQUE") 
graph.run("CREATE CONSTRAINT ON (q:QueryType) ASSERT q.type is UNIQUE") 
graph.run("CREATE CONSTRAINT ON (qc:QueryClass) ASSERT qc.class is UNIQUE") 
graph.run("CREATE CONSTRAINT ON (rc:ResponseCode) ASSERT rc.code is UNIQUE") 
graph.run("CREATE CONSTRAINT ON (ic:InfoCode) ASSERT ic.code is UNIQUE") 
graph.run("CREATE CONSTRAINT ON (ua:UserAgent) ASSERT ua.useragent is UNIQUE") 
graph.run("CREATE CONSTRAINT ON (m:Method) ASSERT m.method is UNIQUE") 
graph.run("CREATE CONSTRAINT ON (r:Referrer) ASSERT r.referrer is UNIQUE") 
graph.run("CREATE INDEX ON :DNS(uid)") 
graph.run("CREATE INDEX ON :Uri(uri)") 
graph.run("CREATE INDEX ON :HTTP(uid)") 

たぶん誰かが私を与えることができます私が間違ってやっていることや、コードで間違いを犯した箇所を示唆していますか? neo4jへの書き込み中に一時的なエラーが発生したため、コミット量が発生します。トランザクションの数が増えると、エラーはなくなりました。任意のヘルプ

+0

はあなたには、いくつかのスキーマ制約/インデックスを作成したことがありますか? – logisima

+0

質問に制約とインデックスを追加しました –

答えて

1

を事前に

おかげで私は、Pythonのドライバ(s)が最速のものではありません、私の経験から、ボンネットの下に何をしているかpy2neoわかりません。

私はおそらく、何が起こるかを完全に制御できるプレーンなCypherステートメントを使用します。

また、間違った/欠落しているインデックスもあります。すべてのクエリ/操作でインデックスが使用されていることを確認してください。それ以外の場合は完全スキャンになります。

  • (D:DNS {UID:{UID}})
  • (D:HTTP {UID:{UID}})

私はまた、あなたがあたりもう少しデータを送信することを示唆していますトランザクション(10kレコードのような)

また、ログバッチごとにいくつかの前処理を行うことも意味があります。各ログ回線ではなく、ログセグメントごとに個別のIPノードを事前に作成します。

これはあまりにもあなたを助けるかもしれない: http://jexp.de/blog/2017/03/5-tips-tricks-for-fast-batched-updates-of-graph-structures-with-neo4j-and-cypher/

+0

ヒントをありがとう。欠落しているインデックスについては、私は少しコードをミスマッチしています。 HTTP_RecordインデックスとDNS_Recordインデックスは、コードからHTTPノードとDNSノードに属し、正しい名前になりました。 –

関連する問題