2017-10-17 5 views
2

ClouderaクイックスタートドッカーコンテナでPython Spark(v 1.6.0)から始まります。 静的 .txtファイル(500 mb)を、succesを含む/user/root/access_log.txtのhdfsに入れます。 pysparkでPyspark sc.textFile()がファイルを完全にロードしない

私はPythonコードのTE次の行を含むファイルをロードしよう:

lines = sc.textFile("hdfs://quickstart.cloudera/user/root/access_log.txt") 

これは私にエラーを与えません。しかし、ファイルが完全にロードされていないことがわかりました。 HDFSは、実際に正しいファイルサイズを持っていながらも ..

lines.max() 

はないファイルの正しい最後の要素を提供します。

これはメモリの問題ですか?私のドッカーの設定は3840 MBに設定されています。 これを修正する方法がわかりません。私はあなたの答えを楽しみにしています。

編集:私は私のデータセット内の要素を数え

lines.count() 

し、私の驚きに、それは正しかったです!これは、私のファイルが正しくロードされたことを意味するはずです。しかし、依然として、.max()ステートメントが正しい要素を返さない理由が依然として残っています。

これは、さまざまなタスクと何か関係がありますか?

編集2: 一般max

10.190.174.142 - - [03/Dec/2011:13:28:10 -0800] "GET /images/filmpics/0000/2229/GOEMON-NUKI-000163.jpg HTTP/1.1" 200 184976 
10.190.174.142 - - [03/Dec/2011:13:28:11 -0800] "GET /images/filmmediablock/360/GOEMON-NUKI-000163.jpg HTTP/1.1" 200 60117 
10.190.174.142 - - [03/Dec/2011:13:28:10 -0800] "GET /images/filmmediablock/360/Chacha.jpg HTTP/1.1" 200 109379 
10.190.174.142 - - [03/Dec/2011:13:28:11 -0800] "GET /images/filmmediablock/360/GOEMON-NUKI-000159.jpg HTTP/1.1" 200 161657 
+0

こんにちは、マイク、ファイルが静的である:

from functools import reduce rdd.mapPartitions(lambda part: reduce(lambda _, x: [x], part, [])).collect()[-1] 

またはパーティションの数が多い場合

?通常のログファイルは、通常、最新のイベントで更新し続けます。これは、読み取り値がログと一致しない理由です。 –

+0

良い質問ですが、ファイルは静的です:) –

答えて

2

.txtファイルからいくつかの例でラインが(...)最後の要素を返すべきではありません。場合によっては、ログファイルで使用されている形式が辞書順を適用していて、その内容で運が良ければ、それは起こりません。データにIPアドレスが付いていて、非友好的(たとえばISO 8601ではなく)のタイムスタンプ形式を使用するため、最後の要素を取得することは期待できないものです。最後の要素を見つけるため

一つの方法は、インデックスを含むことがある:

from operator import itemgetter 

(rdd 
    .zipWithIndex()    # Add line number to get (line, no) 
    .max(key=itemgetter(1))[0]) # Compare elements using index 

ビット別のアプローチは、各パーティションの最後の要素と、これらの最後のものを見つけることです。

(rdd 
    .mapPartitionsWithIndex(
     lambda i, part: reduce(lambda _, x: [(i, x)], part, [])) 
    .max()[1]) # Take max using tuple ordering 
関連する問題