2016-05-18 2 views
0

にカサンドラのデータを読み込み、私はpysparkを起動します。次のようにpyspark DATAFRAME

[[email protected] python]$ pyspark --conf spark.cassandra.connection.host=10.0.0.60 
Python 2.7.11 |Anaconda custom (64-bit)| (default, Dec 6 2015, 18:08:32) 
Type "copyright", "credits" or "license" for more information. 

IPython 4.1.2 -- An enhanced Interactive Python. 
?   -> Introduction and overview of IPython's features. 
%quickref -> Quick reference. 
help  -> Python's own help system. 
object? -> Details about 'object', use 'object??' for extra details. 
16/05/18 10:52:08 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 
16/05/18 10:52:10 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041. 
Welcome to 
     ____    __ 
    /__/__ ___ _____/ /__ 
    _\ \/ _ \/ _ `/ __/ '_/ 
    /__/.__/\_,_/_/ /_/\_\ version 1.6.1 
     /_/ 

Using Python version 2.7.11 (default, Dec 6 2015 18:08:32) 
SparkContext available as sc, HiveContext available as sqlContext. 

私は、単純な何かをしようとすると、私は役に立たないのですエラーのスタックフレームを取得:

In [1]: import pyspark_cassandra 

In [2]: user = sc.cassandraTable("tickdata", "timeseries").toDF() 
--------------------------------------------------------------------------- 
Py4JJavaError        Traceback (most recent call last) 
<ipython-input-2-59f7356f4bac> in <module>() 
----> 1 user = sc.cassandraTable("tickdata", "timeseries").toDF() 

/home/idf/anaconda2/lib/python2.7/site-packages/pyspark_cassandra-0.3.5-py2.7.egg/pyspark_cassandra/context.pyc in cassandraTable(self, *args, **kwargs) 
    28  def cassandraTable(self, *args, **kwargs): 
    29   """Returns a CassandraTableScanRDD for the given keyspace and table""" 
---> 30   return CassandraTableScanRDD(self, *args, **kwargs) 

/home/idf/anaconda2/lib/python2.7/site-packages/pyspark_cassandra-0.3.5-py2.7.egg/pyspark_cassandra/rdd.pyc in __init__(self, ctx, keyspace, table, row_format, read_conf, **read_conf_kwargs) 
    233   read_conf = as_java_object(ctx._gateway, self.read_conf.settings()) 
    234 
--> 235   self.crdd = self._helper \ 
    236    .cassandraTable(
    237     ctx._jsc, 

/home/idf/anaconda2/lib/python2.7/site-packages/pyspark_cassandra-0.3.5-py2.7.egg/pyspark_cassandra/rdd.pyc in _helper(self) 
    130  @property 
    131  def _helper(self): 
--> 132   return helper(self.ctx) 
    133 
    134 

/home/idf/anaconda2/lib/python2.7/site-packages/pyspark_cassandra-0.3.5-py2.7.egg/pyspark_cassandra/util.pyc in helper(ctx) 
    91 
    92  if not _helper: 
---> 93   _helper = load_class(ctx, "pyspark_cassandra.PythonHelper").newInstance() 
    94 
    95  return _helper 

/home/idf/anaconda2/lib/python2.7/site-packages/pyspark_cassandra-0.3.5-py2.7.egg/pyspark_cassandra/util.pyc in load_class(ctx, name) 
    83 def load_class(ctx, name): 
    84  return ctx._jvm.java.lang.Thread.currentThread().getContextClassLoader() \ 
---> 85    .loadClass(name) 
    86 
    87 _helper = None 

/opt/spark-latest/python/lib/py4j-0.9-src.zip/py4j/java_gateway.py in __call__(self, *args) 
    811   answer = self.gateway_client.send_command(command) 
    812   return_value = get_return_value(
--> 813    answer, self.gateway_client, self.target_id, self.name) 
    814 
    815   for temp_arg in temp_args: 

/opt/spark-latest/python/pyspark/sql/utils.pyc in deco(*a, **kw) 
    43  def deco(*a, **kw): 
    44   try: 
---> 45    return f(*a, **kw) 
    46   except py4j.protocol.Py4JJavaError as e: 
    47    s = e.java_exception.toString() 

/opt/spark-latest/python/lib/py4j-0.9-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name) 
    306     raise Py4JJavaError(
    307      "An error occurred while calling {0}{1}{2}.\n". 
--> 308      format(target_id, ".", name), value) 
    309    else: 
    310     raise Py4JError(

Py4JJavaError: An error occurred while calling o20.loadClass. 
: java.lang.ClassNotFoundException: pyspark_cassandra.PythonHelper 
    at java.net.URLClassLoader.findClass(URLClassLoader.java:381) 
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424) 
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357) 
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
    at java.lang.reflect.Method.invoke(Method.java:497) 
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231) 
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:381) 
    at py4j.Gateway.invoke(Gateway.java:259) 
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133) 
    at py4j.commands.CallCommand.execute(CallCommand.java:79) 
    at py4j.GatewayConnection.run(GatewayConnection.java:209) 
    at java.lang.Thread.run(Thread.java:745) 


In [3]: 

編集1

これを行うには、更に多くの

私を取得します
[[email protected] python]$ pyspark --packages TargetHolding/pyspark-cassandra:0.3.5 --conf spark.cassandra.connection.host=10.0.0.60 
Python 2.7.11 |Anaconda custom (64-bit)| (default, Dec 6 2015, 18:08:32) 
Type "copyright", "credits" or "license" for more information. 

IPython 4.1.2 -- An enhanced Interactive Python. 
?   -> Introduction and overview of IPython's features. 
%quickref -> Quick reference. 
help  -> Python's own help system. 
object? -> Details about 'object', use 'object??' for extra details. 
Ivy Default Cache set to: /home/idf/.ivy2/cache 
The jars for the packages stored in: /home/idf/.ivy2/jars 
:: loading settings :: url = jar:file:/opt/spark-1.6.1-bin-hadoop2.6/lib/spark-assembly-1.6.1-hadoop2.6.0.jar!/org/apache/ivy/core/settings/ivysettings.xml 
TargetHolding#pyspark-cassandra added as a dependency 
:: resolving dependencies :: org.apache.spark#spark-submit-parent;1.0 
    confs: [default] 
    found TargetHolding#pyspark-cassandra;0.3.5 in spark-packages 
    found com.datastax.spark#spark-cassandra-connector-java_2.10;1.6.0-M1 in list 
    found com.datastax.spark#spark-cassandra-connector_2.10;1.6.0-M1 in list 
    found org.apache.cassandra#cassandra-clientutil;3.0.2 in list 
    found com.datastax.cassandra#cassandra-driver-core;3.0.0 in list 
    found io.netty#netty-handler;4.0.33.Final in central 
    found io.netty#netty-buffer;4.0.33.Final in central 
    found io.netty#netty-common;4.0.33.Final in central 
    found io.netty#netty-transport;4.0.33.Final in central 
    found io.netty#netty-codec;4.0.33.Final in central 
    found io.dropwizard.metrics#metrics-core;3.1.2 in list 
    found org.slf4j#slf4j-api;1.7.7 in list 
    found org.apache.commons#commons-lang3;3.3.2 in list 
    found com.google.guava#guava;16.0.1 in list 
    found org.joda#joda-convert;1.2 in list 
    found joda-time#joda-time;2.3 in list 
    found com.twitter#jsr166e;1.1.0 in list 
    found org.scala-lang#scala-reflect;2.10.5 in list 
:: resolution report :: resolve 902ms :: artifacts dl 18ms 
    :: modules in use: 
    TargetHolding#pyspark-cassandra;0.3.5 from spark-packages in [default] 
    com.datastax.cassandra#cassandra-driver-core;3.0.0 from list in [default] 
    com.datastax.spark#spark-cassandra-connector-java_2.10;1.6.0-M1 from list in [default] 
    com.datastax.spark#spark-cassandra-connector_2.10;1.6.0-M1 from list in [default] 
    com.google.guava#guava;16.0.1 from list in [default] 
    com.twitter#jsr166e;1.1.0 from list in [default] 
    io.dropwizard.metrics#metrics-core;3.1.2 from list in [default] 
    io.netty#netty-buffer;4.0.33.Final from central in [default] 
    io.netty#netty-codec;4.0.33.Final from central in [default] 
    io.netty#netty-common;4.0.33.Final from central in [default] 
    io.netty#netty-handler;4.0.33.Final from central in [default] 
    io.netty#netty-transport;4.0.33.Final from central in [default] 
    joda-time#joda-time;2.3 from list in [default] 
    org.apache.cassandra#cassandra-clientutil;3.0.2 from list in [default] 
    org.apache.commons#commons-lang3;3.3.2 from list in [default] 
    org.joda#joda-convert;1.2 from list in [default] 
    org.scala-lang#scala-reflect;2.10.5 from list in [default] 
    org.slf4j#slf4j-api;1.7.7 from list in [default] 
    --------------------------------------------------------------------- 
    |     |   modules   || artifacts | 
    |  conf  | number| search|dwnlded|evicted|| number|dwnlded| 
    --------------------------------------------------------------------- 
    |  default  | 18 | 0 | 0 | 0 || 18 | 0 | 
    --------------------------------------------------------------------- 
:: retrieving :: org.apache.spark#spark-submit-parent 
    confs: [default] 
    0 artifacts copied, 18 already retrieved (0kB/22ms) 
16/05/18 12:06:12 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 
Welcome to 
     ____    __ 
    /__/__ ___ _____/ /__ 
    _\ \/ _ \/ _ `/ __/ '_/ 
    /__/.__/\_,_/_/ /_/\_\ version 1.6.1 
     /_/ 

Using Python version 2.7.11 (default, Dec 6 2015 18:08:32) 
SparkContext available as sc, HiveContext available as sqlContext. 

In [1]: import pyspark_cassandra 

In [2]: rdd = sc.cassandraTable("tickdata", "timeseries") 
16/05/18 12:08:36 WARN ClosureCleaner: Expected a closure; got pyspark_cassandra.ToRow$ 
16/05/18 12:08:36 WARN ClosureCleaner: Expected a closure; got pyspark_util.BatchPickler 

In [3]: 
+0

どのようにpyspark-cassandraをインクルードしますか? – zero323

+0

分かりませんか?上に私は輸入pyspark_cassandraを言う – Ivan

+0

私はコマンドラインに何かを追加する必要がありますか? – Ivan

答えて

1

pyspark-cassandraは、PythonコードとScalaコードの両方を動作させる必要があります。あなたの質問からパッケージをどのように含めるかわからないので、私はPYTHONPATHにPythonコードを追加するだけです。

あなたはScalaの2.10(スパーク< = 1.6のデフォルトのビルド)でスパークを使用している場合は、--packagespyspark-cassandraを使用することができます。

pyspark --packages TargetHolding:pyspark-cassandra:0.3.5 \ 
     --conf spark.cassandra.connection.host=10.0.0.60 

をそうでなければ、--jars、​​と--py-filesを使って自分でそれを構築する必要があります引数(3つはすべて必須)。

+0

ありがとうございました。編集1を参照してください。 – Ivan

+0

私はそれらが奇妙になる:[2]:rdd = sc.cassandraTable( "tickdata"、 "timeseries") 16/05/18 12:08:36 WARN ClosureCleaner:閉鎖が予想されました。 pyspark_cassandra.ToRowを取得しました$ 16/05/18 12:08:36 WARN ClosureCleaner:クロージャが必要です。得たpyspark_util.BatchPickler – Ivan

+0

はい、それは正常に見えます。私はわかりません。警告に関して、私はこれらがあなたが心配すべきものではないと思います。 – zero323

関連する問題