2017-09-20 11 views
2

Hbaseに一括読み込みを行うhfilesを作成しようとしています。 私は次のコードを使用しています:hfilesを作成する際にスパークの問題がありました.-前のセルよりも字句的に大きいキーを追加しました。

val df = sqlContext.read.format("com.databricks.spark.csv") 
    .option("header", "true") 
    .option("inferSchema", "true") 
    .load("data.csv") 

import sqlContext.implicits._ 

val DF2 = df.filter($"company".isNotNull) 
    .dropDuplicates(Array("company")) 
    .sortWithinPartitions("company").sort("company") 

val rdd = DF2.flatMap(x => { 
    val rowKey = Bytes.toBytes(x(0).toString) 
    for (i <- 0 to cols.length - 1) yield { 
    val index = x.fieldIndex(new String(cols(i))) 
    val value = if (x.isNullAt(index)) "".getBytes else x(index).toString.getBytes 
     (new ImmutableBytesWritable(rowKey), new KeyValue(rowKey, COLUMN_FAMILY, cols(i), value)) 
    } 
}) 

rdd.saveAsNewAPIHadoopFile("HDFS LOcation", classOf[ImmutableBytesWritable], classOf[KeyValue], classOf[HFileOutputFormat2], fconf) 

を、私は、次のデータに

company,date,open,high,low,close,volume 
ABG,01-Jan-2010,11.53,11.53,11.53,11.53,0 
ABM,01-Jan-2010,20.66,20.66,20.66,20.66,0 
ABR,01-Jan-2010,1.99,1.99,1.99,1.99,0 
ABT,01-Jan-2010,53.99,53.99,53.99,53.99,0 
ABX,01-Jan-2010,39.38,39.38,39.38,39.38,0 
ACC,01-Jan-2010,28.1,28.1,28.1,28.1,0 
ACE,01-Jan-2010,50.4,50.4,50.4,50.4,0 
ACG,01-Jan-2010,8.25,8.25,8.25,8.25,0 
ADC,01-Jan-2010,27.25,27.25,27.25,27.25,0 

を使用していますそれは、私もデータを並べ替えてみましたが、それでも

java.io.IOException: Added a key not lexically larger than previous. Current cell = ADC/data:high/1505862570671/Put/vlen=5/seqid=0, lastCell = ADC/data:open/1505862570671/Put/vlen=5/seqid=0 
    at org.apache.hadoop.hbase.io.hfile.AbstractHFileWriter.checkKey(AbstractHFileWriter.java:204) 
    at org.apache.hadoop.hbase.io.hfile.HFileWriterV2.append(HFileWriterV2.java:265) 
    at org.apache.hadoop.hbase.regionserver.StoreFile$Writer.append(StoreFile.java:992) 
    at org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2$1.write(HFileOutputFormat2.java:199) 

としてエラーがスローされますエラーがスローされます。

+0

使用しているHBaseのjarファイルのバージョンは何ですか? –

+0

@squid私はhbase 1.2.3 jarsを使用しています – showstealer

答えて

0

時間を費やした後、解決策が見つかりました。根本原因は列がソートされていないことです。 HFileOutputFormat2->AbstractHFileWriterを書きながらHFILEは辞書順ソート順にし、あなたのケースでキー値を必要とするので

Added a key not lexically larger than previous. Current cellを見つけました。一度列をソートすると、すでに行レベルで並べ替えが適用されています。

質問はここで良い説明why-hbase-keyvaluesortreducer-need-to-sort-all-keyvalueです。

ソリューション:

//sort columns 
val cols = companyDs.columns.sorted 

//Rest of the code is same 

val output = companyDs.rdd.flatMap(x => { 
    val rowKey = Bytes.toBytes(x(0).toString) 
val hkey = new ImmutableBytesWritable(rowKey) 
    for (i <- 0 to cols.length - 1) yield { 
    val index = x.fieldIndex(new String(cols(i))) 
    val value = if (x.isNullAt(index)) "".getBytes else x(index).toString.getBytes 
    val kv = new KeyValue(rowKey,COLUMN_FAMILY, cols(i).getBytes(),System.currentTimeMillis()+i ,x(i).toString.getBytes()) 
    (hkey,kv) 
    } 
}) 
output.saveAsNewAPIHadoopFile("<path>" 
    , classOf[ImmutableBytesWritable], classOf[KeyValue], 
    classOf[HFileOutputFormat2], config) 
関連する問題