2
Hbaseに一括読み込みを行うhfilesを作成しようとしています。 私は次のコードを使用しています:hfilesを作成する際にスパークの問題がありました.-前のセルよりも字句的に大きいキーを追加しました。
val df = sqlContext.read.format("com.databricks.spark.csv")
.option("header", "true")
.option("inferSchema", "true")
.load("data.csv")
import sqlContext.implicits._
val DF2 = df.filter($"company".isNotNull)
.dropDuplicates(Array("company"))
.sortWithinPartitions("company").sort("company")
val rdd = DF2.flatMap(x => {
val rowKey = Bytes.toBytes(x(0).toString)
for (i <- 0 to cols.length - 1) yield {
val index = x.fieldIndex(new String(cols(i)))
val value = if (x.isNullAt(index)) "".getBytes else x(index).toString.getBytes
(new ImmutableBytesWritable(rowKey), new KeyValue(rowKey, COLUMN_FAMILY, cols(i), value))
}
})
rdd.saveAsNewAPIHadoopFile("HDFS LOcation", classOf[ImmutableBytesWritable], classOf[KeyValue], classOf[HFileOutputFormat2], fconf)
を、私は、次のデータに
company,date,open,high,low,close,volume
ABG,01-Jan-2010,11.53,11.53,11.53,11.53,0
ABM,01-Jan-2010,20.66,20.66,20.66,20.66,0
ABR,01-Jan-2010,1.99,1.99,1.99,1.99,0
ABT,01-Jan-2010,53.99,53.99,53.99,53.99,0
ABX,01-Jan-2010,39.38,39.38,39.38,39.38,0
ACC,01-Jan-2010,28.1,28.1,28.1,28.1,0
ACE,01-Jan-2010,50.4,50.4,50.4,50.4,0
ACG,01-Jan-2010,8.25,8.25,8.25,8.25,0
ADC,01-Jan-2010,27.25,27.25,27.25,27.25,0
を使用していますそれは、私もデータを並べ替えてみましたが、それでも
java.io.IOException: Added a key not lexically larger than previous. Current cell = ADC/data:high/1505862570671/Put/vlen=5/seqid=0, lastCell = ADC/data:open/1505862570671/Put/vlen=5/seqid=0
at org.apache.hadoop.hbase.io.hfile.AbstractHFileWriter.checkKey(AbstractHFileWriter.java:204)
at org.apache.hadoop.hbase.io.hfile.HFileWriterV2.append(HFileWriterV2.java:265)
at org.apache.hadoop.hbase.regionserver.StoreFile$Writer.append(StoreFile.java:992)
at org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2$1.write(HFileOutputFormat2.java:199)
としてエラーがスローされますエラーがスローされます。
使用しているHBaseのjarファイルのバージョンは何ですか? –
@squid私はhbase 1.2.3 jarsを使用しています – showstealer