0

pySparkを使用してhbaseテーブルに書き込もうとしています。これまでのところ、hbaseからデータを読み取ることができました。 hbaseテーブルに書き込むときに例外が発生します。PySparkを使用してHBaseテーブルに書き込み中にエラーが発生しました

次の形式でコマンドを実行
from pyspark.sql import SparkSession 
from pyspark import SparkContext 
from pyspark.sql.types import * 

properties = { 
    "instanceId" : "hbase", 
    "zookeepers" : "10-x-x-x.local:2181,10-x-x-x.local:2181,10-x-x-x.local:2181", 
    "hbase.columns.mapping" : "KEY_FIELD STRING :key, A STRING c:a, B STRING c:b", 
    "hbase.use.hbase.context" : False, 
    "hbase.config.resources" : "file:///etc/hbase/conf/hbase-site.xml", 
    "hbase.table" : "t" 
} 
spark = SparkSession\ 
     .builder\ 
     .appName("hbaseWrite")\ 
     .getOrCreate() 

sc = spark.sparkContext 

#I am able to read the data successfully. 
#df = spark.read.format("org.apache.hadoop.hbase.spark")\ 
# .options(**properties)\ 
# .load() 

data = [("3","DATA 3 A", "DATA 3 B")] 
columns = ['KEY_FIELD','A','B'] 
cSchema = StructType([StructField(columnName, StringType()) for columnName in columns]) 
df = spark.createDataFrame(data, schema=cSchema) 
df.write\ 
     .options(**properties)\ 
     .mode('overwrite').format("org.apache.hadoop.hbase.spark").save() 

spark2-submit --master local[*] write_to_hbase.py 

スパークバージョン:2.2.0.cloudera1(私は私の火花のバージョンを変更することはできません) HBaseのバージョン:1.2.0-cdh5.12.0を(しかし、私は私のHBaseのバージョンを変更することができます)

注:私はspark2ジャーフォルダへのHBaseのjarファイルを追加したと私はspark2ジャーフォルダに依存するjarを次のように追加されまし。

  1. 火花core_2.11-1.6.1.jar
  2. -3.1.0-incubating.jar HTRACEコア
  3. スカラライブラリ-2.9.1.jar

エラー:

py4j.protocol.Py4JJavaError: An error occurred while calling o70.save. 
: java.lang.RuntimeException: org.apache.hadoop.hbase.spark.DefaultSource does not allow create table as select. 
     at scala.sys.package$.error(package.scala:27) 
     at org.apache.spark.sql.execution.datasources.DataSource.write(DataSource.scala:476) 

私は複数の提案を試みましたが、何も機能しませんでした。それは重複した質問かもしれませんが、私は答えを見つけるための他の選択肢がありません。

+1

https://stackoverflow.com/questions/38470114/how-to-connect-hbase-and-spark -using-python/38575095 –

+0

@AniketKulkarni:データソースと書式を変更してコードを実行すると、コードは次のようにスタックします。** ::依存関係の解決:: org.apache.spark#spark-submit-parent; 1.0 ** –

+0

@AniketKulkarni:レポは利用できません。 https://repo1.maven.org/maven2/com/hortonworks/shc/1.0.0-1.6-s_2.10/ –

答えて

0

Gitのレポhttps://github.com/hortonworks-spark/shcをコンパイルして、それを解決し、火花ジャーフォルダにSHCジャーを置きます。そして、その後@Aniket Kulkarniさんによってsuggessted link

最終的なコードは次のようになり、

from pyspark.sql import SparkSession 
from pyspark import SparkContext 
from pyspark.sql.types import * 

properties = { 
    "instanceId" : "hbase", 
    "zookeepers" : "10-x-x-x.local:2181,10-x-x-x.local:2181,10-x-x-x.local:2181", 
    "hbase.columns.mapping" : "KEY_FIELD STRING :key, A STRING c:a, B STRING c:b", 
    "hbase.use.hbase.context" : False, 
    "hbase.config.resources" : "file:///etc/hbase/conf/hbase-site.xml", 
    "hbase.table" : "test_table" 
} 
spark = SparkSession.builder\ 
     .appName("hbaseWrite")\ 
     .getOrCreate() 

sc = spark.sparkContext 
catalog = ''.join("""{ 
    "table":{"namespace":"default", "name":"test_table"} 
    "rowkey":"key", 
    "columns":{ 
     "KEY_FIELD":{"cf":"rowkey", "col":"key", "type":"string"}, 
     "A":{"cf":"c", "col":"a", "type":"string"}, 
     "B":{"cf":"c", "col":"b", "type":"string"} 
    } 
}""".split()) 


data = [("3","DATA 3 A", "DATA 3 B")] 
columns = ['KEY_FIELD','A','B'] 
cSchema = StructType([StructField(columnName, StringType()) for columnName in columns]) 
df = spark.createDataFrame(data, schema=cSchema) 
df.write\ 
     .options(catalog=catalog)\ 
     .options(**properties)\ 
     .mode('overwrite').format("org.apache.spark.sql.execution.datasources.hbase").save() 
1

Cloudera distributionを使用している場合は、に書き込む正式な方法はありません。PYSAPRKを使用してください。これはCloudera support Teamによって確認されています。

Hortonworksを使用していて、spark 2.0がある場合は、次のリンクをクリックしてください。

Pyspark to Hbase write

+0

Clouderaサポートチームの確認リンクを提供できますか? –

+0

@AdaPongaya申し訳ありませんが、私はそれが内部の文書であるので、私はそれをすることができません。私のチームが職場で提起したサポート事例のケースノートに掲載されました –

関連する問題