2017-03-13 7 views
0

Spark on Hiveを試しています。コードでは、私は新しいDataFrameを作成し、HiveContext.createDataFrame方法を使用して、カスタムデータとそれを埋める:私は​​コマンドで上記のコードを実行するとSpark Hive:DataFrameの列を取得できません

JavaSparkContext sc = ...; 
HiveContext hiveCtx = new HiveContext(sc); 

StructField f1 = new StructField("columnA", DataTypes.StringType, false, null); 
StructField f2 = new StructField("columnB", DataTypes.StringType, false, null); 

StructType st = new StructType(new StructField[] {f1, f2}); 

Row r1 = RowFactory.create("A", "B"); 
Row r2 = RowFactory.create("C", "D"); 

List<Row> allRows = new ArrayList<Row>(); 
allRows.add(r1); 
allRows.add(r2); 

DataFrame testDF = hiveCtx.createDataFrame(allRows, st); 

testDF.explain();       // show the DF data 

for(String col : testDF.columns()) {  // list the columns, all seems to be ok here?! 
    System.out.println(col); 
} 

Column columnA = testDF.col("columnA"); // get the column --> exception!!! 

... 

は、私は次のような出力が得られます。

=== APP RUNNING === 
17/03/13 12:20:29 INFO Persistence: Property hive.metastore.integral.jdo.pushdown unknown - will be ignored 
17/03/13 12:20:29 INFO Persistence: Property datanucleus.cache.level2 unknown - will be ignored 
17/03/13 12:20:29 WARN Connection: BoneCP specified but not present in CLASSPATH (or one of dependencies) 
17/03/13 12:20:29 WARN Connection: BoneCP specified but not present in CLASSPATH (or one of dependencies) 
17/03/13 12:20:31 INFO Datastore: The class "org.apache.hadoop.hive.metastore.model.MFieldSchema" is tagged as "embedded-only" so does not have its own datastore table. 
17/03/13 12:20:31 INFO Datastore: The class "org.apache.hadoop.hive.metastore.model.MOrder" is tagged as "embedded-only" so does not have its own datastore table. 
17/03/13 12:20:32 INFO Datastore: The class "org.apache.hadoop.hive.metastore.model.MFieldSchema" is tagged as "embedded-only" so does not have its own datastore table. 
17/03/13 12:20:32 INFO Datastore: The class "org.apache.hadoop.hive.metastore.model.MOrder" is tagged as "embedded-only" so does not have its own datastore table. 
17/03/13 12:20:33 INFO metastore: Trying to connect to metastore with URI thrift://my-server-url:9083 
17/03/13 12:20:33 INFO metastore: Connected to metastore. 
== Physical Plan == 
LocalTableScan [columnA#0,columnB#1], [[A,B],[C,D]] 
columnA 
columnB 
Exception in thread "main" java.lang.NullPointerException 
     at org.apache.spark.sql.catalyst.expressions.AttributeReference.hashCode(namedExpressions.scala:218) 
     at scala.runtime.ScalaRunTime$.hash(ScalaRunTime.scala:210) 
     at scala.util.hashing.MurmurHash3.productHash(MurmurHash3.scala:63) 
     at scala.util.hashing.MurmurHash3$.productHash(MurmurHash3.scala:210) 
     at scala.runtime.ScalaRunTime$._hashCode(ScalaRunTime.scala:172) 
     at scala.Tuple2.hashCode(Tuple2.scala:19) 
     at scala.collection.mutable.FlatHashTable$HashUtils$class.elemHashCode(FlatHashTable.scala:391) 
     at scala.collection.mutable.HashSet.elemHashCode(HashSet.scala:41) 
     at scala.collection.mutable.FlatHashTable$class.findEntryImpl(FlatHashTable.scala:123) 
     at scala.collection.mutable.FlatHashTable$class.containsEntry(FlatHashTable.scala:119) 
     at scala.collection.mutable.HashSet.containsEntry(HashSet.scala:41) 
     at scala.collection.mutable.HashSet.contains(HashSet.scala:58) 
     at scala.collection.GenSetLike$class.apply(GenSetLike.scala:43) 
     at scala.collection.mutable.AbstractSet.apply(Set.scala:45) 
     at scala.collection.SeqLike$$anonfun$distinct$1.apply(SeqLike.scala:494) 
     at scala.collection.immutable.List.foreach(List.scala:318) 
     at scala.collection.SeqLike$class.distinct(SeqLike.scala:493) 
     at scala.collection.AbstractSeq.distinct(Seq.scala:40) 
     at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolve(LogicalPlan.scala:264) 
     at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveQuoted(LogicalPlan.scala:191) 
     at org.apache.spark.sql.DataFrame.resolve(DataFrame.scala:151) 
     at org.apache.spark.sql.DataFrame.col(DataFrame.scala:664) 
     at temp.HiveTest.main(HiveTest.java:57) 
     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
     at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
     at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
     at java.lang.reflect.Method.invoke(Method.java:498) 
     at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:731) 
     at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181) 
     at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206) 
     at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121) 
     at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) 

ここに私の​​コール:

spark-submit --class temp.HiveTest --master yarn --deploy-mode client /home/daniel/application.jar 

なぜDataFrame.col(...)NullPointerExceptionの呼び出しを与えます??

+0

あなたは 'COLUMN'をインポートしたのですか?例えばimport static org.apache.spark.sql.functions。*; ' – Yaron

+0

はい、私は 'import org.apache.spark.sql.Column'を持っています。私はIDEにエラーが表示されないので、インポート/構文エラーは発生しません。 –

+0

--master local [1]として実行するとどうなりましたか? – Yaron

答えて

2

Metadata.empty()nullを変更しよう:

StructField f1 = new StructField("columnA", DataTypes.StringType, false, Metadata.empty()); 
StructField f2 = new StructField("columnB", DataTypes.StringType, false, Metadata.empty()); 
+0

それは私のためにそれをした、ありがとう! 'DataFrame'を作成するための別の方法は、BeanClassを使用して' List allRows = ... 'を使用し、後で' DataFrame testDF = hiveCtx.createDataFrame(allRows、MyBeanClass.class);を呼び出します; –

+0

同じ問題。上記の方法で修正しました。ありがとう – Abhishyam