2017-06-08 5 views
0

ハイブが正しく設定されているため、pysparkを使用してreplを入力した後にspark.sqlを使用して照会できます。私はSQL Serverからテーブルを読み込み、ハイブに保存したい。 pyspark --driver-class-path sqljdbc4.jar --jars sqljdbc4.jarのようなjdbc jarでレプリケーションを開始すると、SQL Serverから読み込むことができます。しかし今、スパークはハイブにアクセスできません。既存のハイブテーブルのクエリでは、Lzo Codecエラーが発生します(下記参照)。pysparkのクラスパスにsql server jarを追加した後にハイブを照会できません

私は外部SQLサーバテーブルをどのようにクエリ/プルダウンして、それを既存のハイブテーブルに保存できるか知りたいと思っています。

spark.sql("select max(product_id) from table").show() 
    Traceback (most recent call last): 
     File "<stdin>", line 1, in <module> 
     File "/usr/lib/spark/python/pyspark/sql/dataframe.py", line 287, in show 
     print(self._jdf.showString(n, truncate)) 
     File "/usr/lib/spark/python/lib/py4j-0.10.3-src.zip/py4j/java_gateway.py", line 1133, in __call__ 
     File "/usr/lib/spark/python/pyspark/sql/utils.py", line 63, in deco 
     return f(*a, **kw) 
     File "/usr/lib/spark/python/lib/py4j-0.10.3-src.zip/py4j/protocol.py", line 319, in get_return_value 
    py4j.protocol.Py4JJavaError: An error occurred while calling o95.showString. 
    : org.apache.spark.sql.catalyst.errors.package$TreeNodeException: execute, tree: 
    Exchange SinglePartition 
    +- *HashAggregate(keys=[], functions=[partial_max(product_id#35)], output=[max#45]) 
     +- HiveTableScan [product_id#35], MetastoreRelation db, table 

     at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:56) 
     at org.apache.spark.sql.execution.exchange.ShuffleExchange.doExecute(ShuffleExchange.scala:114) 
     at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115) 
     at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115) 
     at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:136) 
     at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) 
     at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:133) 
     at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:114) 
     at org.apache.spark.sql.execution.InputAdapter.inputRDDs(WholeStageCodegenExec.scala:233) 
     at org.apache.spark.sql.execution.aggregate.HashAggregateExec.inputRDDs(HashAggregateExec.scala:138) 
     at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:361) 
     at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115) 
     at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115) 
     at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:136) 
     at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) 
     at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:133) 
     at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:114) 
     at org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:240) 
     at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:323) 
     at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:39) 
     at org.apache.spark.sql.Dataset$$anonfun$org$apache$spark$sql$Dataset$$execute$1$1.apply(Dataset.scala:2193) 
     at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:57) 
     at org.apache.spark.sql.Dataset.withNewExecutionId(Dataset.scala:2546) 
     at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$execute$1(Dataset.scala:2192) 
     at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collect(Dataset.scala:2199) 
     at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:1935) 
     at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:1934) 
     at org.apache.spark.sql.Dataset.withTypedCallback(Dataset.scala:2576) 
     at org.apache.spark.sql.Dataset.head(Dataset.scala:1934) 
     at org.apache.spark.sql.Dataset.take(Dataset.scala:2149) 
     at org.apache.spark.sql.Dataset.showString(Dataset.scala:239) 
     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
     at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
     at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
     at java.lang.reflect.Method.invoke(Method.java:498) 
     at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:237) 
     at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) 
     at py4j.Gateway.invoke(Gateway.java:280) 
     at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) 
     at py4j.commands.CallCommand.execute(CallCommand.java:79) 
     at py4j.GatewayConnection.run(GatewayConnection.java:214) 
     at java.lang.Thread.run(Thread.java:745) 
    Caused by: java.lang.RuntimeException: Error in configuring object 
     at org.apache.hadoop.util.ReflectionUtils.setJobConf(ReflectionUtils.java:112) 
     at org.apache.hadoop.util.ReflectionUtils.setConf(ReflectionUtils.java:78) 
     at org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:136) 
     at org.apache.spark.rdd.HadoopRDD.getInputFormat(HadoopRDD.scala:185) 
     at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:198) 
     at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:248) 
     at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:246) 
     at scala.Option.getOrElse(Option.scala:121) 
     at org.apache.spark.rdd.RDD.partitions(RDD.scala:246) 
     at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35) 
     at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:248) 
     at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:246) 
     at scala.Option.getOrElse(Option.scala:121) 
     at org.apache.spark.rdd.RDD.partitions(RDD.scala:246) 
     at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35) 
     at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:248) 
     at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:246) 
     at scala.Option.getOrElse(Option.scala:121) 
     at org.apache.spark.rdd.RDD.partitions(RDD.scala:246) 
     at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35) 
     at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:248) 
     at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:246) 
     at scala.Option.getOrElse(Option.scala:121) 
     at org.apache.spark.rdd.RDD.partitions(RDD.scala:246) 
     at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35) 
     at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:248) 
     at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:246) 
     at scala.Option.getOrElse(Option.scala:121) 
     at org.apache.spark.rdd.RDD.partitions(RDD.scala:246) 
     at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35) 
     at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:248) 
     at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:246) 
     at scala.Option.getOrElse(Option.scala:121) 
     at org.apache.spark.rdd.RDD.partitions(RDD.scala:246) 
     at org.apache.spark.ShuffleDependency.<init>(Dependency.scala:91) 
     at org.apache.spark.sql.execution.exchange.ShuffleExchange$.prepareShuffleDependency(ShuffleExchange.scala:263) 
     at org.apache.spark.sql.execution.exchange.ShuffleExchange.prepareShuffleDependency(ShuffleExchange.scala:86) 
     at org.apache.spark.sql.execution.exchange.ShuffleExchange$$anonfun$doExecute$1.apply(ShuffleExchange.scala:123) 
     at org.apache.spark.sql.execution.exchange.ShuffleExchange$$anonfun$doExecute$1.apply(ShuffleExchange.scala:114) 
     at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:52) 
     ... 41 more 
    Caused by: java.lang.reflect.InvocationTargetException 
     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
     at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
     at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
     at java.lang.reflect.Method.invoke(Method.java:498) 
     at org.apache.hadoop.util.ReflectionUtils.setJobConf(ReflectionUtils.java:109) 
     ... 80 more 
    Caused by: java.lang.IllegalArgumentException: Compression codec com.hadoop.compression.lzo.LzoCodec not found. 
     at org.apache.hadoop.io.compress.CompressionCodecFactory.getCodecClasses(CompressionCodecFactory.java:139) 
     at org.apache.hadoop.io.compress.CompressionCodecFactory.<init>(CompressionCodecFactory.java:179) 
     at org.apache.hadoop.mapred.TextInputFormat.configure(TextInputFormat.java:45) 
     ... 85 more 
    Caused by: java.lang.ClassNotFoundException: Class com.hadoop.compression.lzo.LzoCodec not found 
     at org.apache.hadoop.conf.Configuration.getClassByName(Configuration.java:2101) 
     at org.apache.hadoop.io.compress.CompressionCodecFactory.getCodecClasses(CompressionCodecFactory.java:132) 
     ... 87 more 

答えて

0

--driver-class-pathフラグは、デフォルト設定で設定した値を置き換えます。新しい値は追加されません。おそらく、あなたのspark-defaults.confファイルのクラスパスにLZO jarファイルが追加されていて、それをフラグで上書きしているため無視されています。あなたは、どちらかのはず:

1)

または

2を設定し、あなたの​​全体ドライバーのクラスパスを含めて)は、火花defaults.confにファイルにspark.driver.extraClassPath設定でクラスパスにSQL JDBCのjarファイルを追加します。

また、コメントに@Timが指摘しているように、 - jarsフラグを使用してコマンドラインにjarファイルを指定する必要があります。

+0

ありがとう@RyanW。私はあなたがまだ瓶を渡さなければならないと言わなければならないと思います。それから私はそれを正しい印を付けます。 – Tim

関連する問題