2016-01-19 12 views
5

私はELKとPysparkを統合しました。私が得たローカルファイルシステムにELKデータとしてRDDを保存PysparkからElasticsearchにデータを書き込む方法は?

rdd.saveAsTextFile("/tmp/ELKdata") 
logData = sc.textFile('/tmp/ELKdata/*') 
errors = logData.filter(lambda line: "raw1-VirtualBox" in line) 
errors.count() 

値iが出力

(u'AVI0UK0KZsowGuTwoQnN」、{Uを持っ35

errors.first() 

あります'ホスト':u'raw1-VirtualBox '、u'ident:u'NetworkManager'、u'pid ':u'748'、u'message ':u "(eth0):デバイス状態の変更:ip-config - >セカンダリ(理由 'なし') [70 90 0]」、U '@タイムスタンプ':u'2016-01-12T10:59:48 + 05:30' })

私は取得pysparkから弾性検索のデータを書き込もう

エラー

errors.saveAsNewAPIHadoopFile(
    path='-', 
    outputFormatClass="org.elasticsearch.hadoop.mr.EsOutputFormat", 
    keyClass="org.apache.hadoop.io.NullWritable", 
    valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable", 
    conf= {"es.resource" : "logstash-2016.01.12/errors}) 

巨大なJavaのエラー

 

org.apache.spark.SparkException: RDD element of type java.lang.String cannot be used 
    at org.apache.spark.api.python.SerDeUtil$$anonfun$pythonToPairRDD$1$$anonfun$apply$3.apply(SerDeUtil.scala:113) 
    at org.apache.spark.api.python.SerDeUtil$$anonfun$pythonToPairRDD$1$$anonfun$apply$3.apply(SerDeUtil.scala:108) 
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) 
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) 
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$12.apply(PairRDDFunctions.scala:921) 
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$12.apply(PairRDDFunctions.scala:903) 
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62) 
    at org.apache.spark.scheduler.Task.run(Task.scala:54) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) 
    at java.lang.Thread.run(Thread.java:745) 
16/01/12 17:20:13 INFO TaskSetManager: Starting task 1.0 in stage 31.0 (TID 62, localhost, PROCESS_LOCAL, 1181 bytes) 
16/01/12 17:20:13 WARN TaskSetManager: Lost task 0.0 in stage 31.0 (TID 61, localhost): org.apache.spark.SparkException: RDD element of type java.lang.String cannot be used 
     org.apache.spark.api.python.SerDeUtil$$anonfun$pythonToPairRDD$1$$anonfun$apply$3.apply(SerDeUtil.scala:113) 
     org.apache.spark.api.python.SerDeUtil$$anonfun$pythonToPairRDD$1$$anonfun$apply$3.apply(SerDeUtil.scala:108) 
     scala.collection.Iterator$$anon$11.next(Iterator.scala:328) 
     scala.collection.Iterator$$anon$11.next(Iterator.scala:328) 
     org.apache.spark.rdd.PairRDDFunctions$$anonfun$12.apply(PairRDDFunctions.scala:921) 
     org.apache.spark.rdd.PairRDDFunctions$$anonfun$12.apply(PairRDDFunctions.scala:903) 
     org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62) 
     org.apache.spark.scheduler.Task.run(Task.scala:54) 
     org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177) 
     java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) 
     java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) 
     java.lang.Thread.run(Thread.java:745) 
16/01/12 17:20:13 ERROR TaskSetManager: Task 0 in stage 31.0 failed 1 times; aborting job 
16/01/12 17:20:13 INFO TaskSchedulerImpl: Cancelling stage 31 
16/01/12 17:20:13 INFO TaskSchedulerImpl: Stage 31 was cancelled 
16/01/12 17:20:13 INFO Executor: Executor is trying to kill task 1.0 in stage 31.0 (TID 62) 
16/01/12 17:20:13 INFO DAGScheduler: Failed to run saveAsNewAPIHadoopFile at PythonRDD.scala:665 
Traceback (most recent call last): 
    File "", line 6, in 
    File "/opt/spark/python/pyspark/rdd.py", line 1213, in saveAsNewAPIHadoopFile 
    keyConverter, valueConverter, jconf) 
    File "/opt/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py", line 538, in __call__ 
    File "/opt/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py", line 300, in get_return_value 
py4j.protocol.Py4JJavaError16/01/12 17:20:13 INFO Executor: Running task 1.0 in stage 31.0 (TID 62) 
16/01/12 17:20:13 ERROR Executor: Exception in task 1.0 in stage 31.0 (TID 62) 
org.apache.spark.TaskKilledException 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:168) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) 
    at java.lang.Thread.run(Thread.java:745) 
16/01/12 17:20:13 WARN TaskSetManager: Lost task 1.0 in stage 31.0 (TID 62, localhost): org.apache.spark.TaskKilledException: 
     org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:168) 
     java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) 
     java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) 
     java.lang.Thread.run(Thread.java:745) 
16/01/12 17:20:13 INFO TaskSchedulerImpl: Removed TaskSet 31.0, whose tasks have all completed, from pool 
: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.saveAsNewAPIHadoopFile. 
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 31.0 failed 1 times, most recent failure: Lost task 0.0 in stage 31.0 (TID 61, localhost): org.apache.spark.SparkException: RDD element of type java.lang.String cannot be used 
     org.apache.spark.api.python.SerDeUtil$$anonfun$pythonToPairRDD$1$$anonfun$apply$3.apply(SerDeUtil.scala:113) 
     org.apache.spark.api.python.SerDeUtil$$anonfun$pythonToPairRDD$1$$anonfun$apply$3.apply(SerDeUtil.scala:108) 
     scala.collection.Iterator$$anon$11.next(Iterator.scala:328) 
     scala.collection.Iterator$$anon$11.next(Iterator.scala:328) 
     org.apache.spark.rdd.PairRDDFunctions$$anonfun$12.apply(PairRDDFunctions.scala:921) 
     org.apache.spark.rdd.PairRDDFunctions$$anonfun$12.apply(PairRDDFunctions.scala:903) 
     org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62) 
     org.apache.spark.scheduler.Task.run(Task.scala:54) 
     org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177) 
     java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) 
     java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) 
     java.lang.Thread.run(Thread.java:745) 
Driver stacktrace: 
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1185) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1174) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1173) 
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) 
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) 
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1173) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:688) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:688) 
    at scala.Option.foreach(Option.scala:236) 
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:688) 
    at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1391) 
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) 
    at akka.actor.ActorCell.invoke(ActorCell.scala:456) 
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) 
    at akka.dispatch.Mailbox.run(Mailbox.scala:219) 
    at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) 
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) 
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) 
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) 
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) 


私はそれが手動でデータを書き込むことができていなかった場合

errors = logData.filter(lambda line: "raw1-VirtualBox" in line) 
errors = errors.map(lambda item: ('AVI0UK0KZsowGuTwoQnP',{"host": "raw1-VirtualBox", 
    "ident": "NetworkManager", 
    "pid": "69", 
    "message": " sucess <info> (eth0): device state change: ip-config -> secondaries (reason 'none') [70 90 0]", 
    "@timestamp": "2016-01-12T10:59:48+05:30" 
    })) 

しかし、フィルタリングされたデータ&を管理されたデータを弾性検索に書きたいと思います。

+0

午前:

あなたはここで見つけることができ、追加のオプションを追加することができます。com/technologies/spark/load_and_transform_data) – pyspark

答えて

4

私は同様の問題を抱えていましたが、ここでそれを解決する方法がありました。まず、RDDを使用してデータフレームを使用しました。今受け入れ答えに一度データフレーム同様

from pyspark.sql import SQLContext 
df.write.format("org.elasticsearch.spark.sql").option("es.resource", "logstash-2016.01.12/errors").save() 
+1

ESノードのIPアドレスまたはDNSはどこから得られますか? – Somar

0

、私はRDDとしてデータを書き込もうと、同じ船に乗っていました。上記の答えは本当に近いですが、有益な設定オプションがたくさんあります。ノードにデフォルトのlocalhostを使用していない限り、この回答は機能しません。

データフレームは、よりクリーンでシンプルな方法です。 pysparkシェルを使用している場合は、シェルを起動するときに、elasticsearch hadoop jarへのパスを追加します。

CLIから使用してシェルを起動します。

$ pyspark2 --jars <pathtojar>/elasticsearch-hadoop-5.X.X.jar 

あなたは、必ずしも次の行は必要ありません:あなたはあなたのデータフレームを持っている場合、あなたは、単に次のことを必要とする、プラスの可能な追加

from pyspark.sql import SQLContext 

をオプション:

df.write.format("org.elasticsearch.spark.sql") 
.option("es.resource", "<index/type>") 
.option("es.nodes", "<enter node address or name>").save() 

指定したインデックス/タイプがまだ存在しない場合Elasticsearchでは、それが作成されます。 //help.mortardata:この[リンク](HTTP、以下 https://www.elastic.co/guide/en/elasticsearch/hadoop/current/configuration.html

関連する問題