2017-02-01 7 views
2

ファイルからデータを集めてHDFSに集めます。 hbaseの特定のテーブルの値を持つデータからいくつかの詳細を追加する必要があります。地図上のスパーク行を呼び出す

が、私は例外を持っている:

org.apache.spark.SparkException: Task not serializable 
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166) 
    at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158) 
    at org.apache.spark.SparkContext.clean(SparkContext.scala:1623) 
    at org.apache.spark.rdd.RDD.map(RDD.scala:286) 
    at org.apache.spark.api.java.JavaRDDLike$class.mapToPair(JavaRDDLike.scala:113) 
    at org.apache.spark.api.java.AbstractJavaRDDLike.mapToPair(JavaRDDLike.scala:46) 
    at ...... 
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
    at java.lang.reflect.Method.invoke(Method.java:497) 
    at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:577) 
    at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:174) 
    at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:197) 
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:112) 
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) 
Caused by: java.io.NotSerializableException: org.apache.hadoop.hbase.client.ConnectionManager$HConnectionImplementation 
Serialization stack: 

    at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:38) 
    at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47) 
    at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:80) 
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:164) 

我々はmap関数の間のHBaseにアクセスしようとしたとき、私は、問題が発生したことを知っています。

私の質問は、hbaseテーブルに含まれている値で自分のRDDを完成させる方法です。例えば

:HDFS内 ファイルはCSVです:HBaseの中

Name;Number1;Number2 
toto;1;2 

は、我々は、名前TOTOへのデータ仲間を持っています。

私はNumber1とNumber2(もっとも簡単な部分)の合計を から取得し、テーブルのデータと集計する必要があります。 例:

還元剤のキーはtataで、hbaseテーブルのrowkey totoを取得することで取得できます。

提案がありますか?

+0

私の答えを確認してください[類似の問題](http://stackoverflow.com/a/41759525/647053)のように見えます。 htableなどのようなものを移動する...閉鎖にその場合に修正されます –

+0

マッピングの数は、基本的にデータのHbaseのキーの数はどのくらいですか? –

+0

あなたの答えをありがとう、私はこのRam Ghadiyaramを試してみます。 テーブルには何百万ものキーがあり、各キーには何千ものカラムがあります – okitas

答えて

1

が最後に同僚がそれをやった、あなたのアドバイスのおかげで:

ので、これはHBaseのテーブルから件のデータを使用してファイルを集約する可能マップのコードです。

private final Logger LOGGER = LoggerFactory.getLogger(AbtractGetSDMapFunction.class); 




/** 
* Namespace name 
*/ 
public static final String NAMESPACE = "NameSpace"; 
private static final String ID = "id"; 
private Connection connection = null; 
private static final String LINEID = "l"; 
private static final String CHANGE_LINE_ID = "clid"; 
private static final String CHANGE_LINE_DATE = "cld"; 
private String constClientPortHBase; 
private String constQuorumHBase; 
private int constTimeOutHBase; 
private String constZnodeHBase; 
public void initConnection() { 
    Configuration conf = HBaseConfiguration.create(); 
    conf.setInt("timeout", constTimeOutHBase); 
    conf.set("hbase.zookeeper.quorum", constQuorumHBase); 
    conf.set("hbase.zookeeper.property.clientPort", constClientPortHBase); 
    conf.set("zookeeper.znode.parent", constZnodeHBase); 
    try { 
     connection = HConnectionManager.createConnection(conf); 
    } catch (Exception e) { 
     LOGGER.error("Error in the configuration of the connection with HBase.", e); 
    } 
} 

public Tuple2<String, myInput> call(String row) throws Exception { 
//this is where you need to init the connection for hbase to avoid serialization problem 
    initConnection(); 

....do your work 
State state = getCurrentState(myInput.getKey()); 
....do your work 
} 

public AbtractGetSDMapFunction(String constClientPortHBase, String constQuorumHBase, String constZnodeHBase, int constTimeOutHBase) { 
    this.constClientPortHBase = constClientPortHBase; 
    this.constQuorumHBase = constQuorumHBase; 
    this.constZnodeHBase = constZnodeHBase; 
    this.constTimeOutHBase = constTimeOutHBase; 
} 

/***************************************************************************/ 
/** 
* Table Name 
*/ 
public static final String TABLE_NAME = "Table"; 

public state getCurrentState(String key) throws TechnicalException { 
    LOGGER.debug("start key {}", key); 
    String buildRowKey = buildRowKey(key); 
    State currentState = new State(); 
    String columnFamily = State.getColumnFamily(); 
    if (!StringUtils.isEmpty(buildRowKey) && null != columnFamily) { 
     try { 
      Get scan = new Get(Bytes.toBytes(buildRowKey)); 
      scan.addFamily(Bytes.toBytes(columnFamily)); 
      addColumnsToScan(scan, columnFamily, ID);     
      Result result = getTable().get(scan); 
      currentState.setCurrentId(getLong(result, columnFamily, ID));    
     } catch (IOException ex) { 
      throw new TechnicalException(ex); 
     } 
     LOGGER.debug("end "); 
    } 
    return currentState; 
} 

/***********************************************************/ 

private Table getTable() throws IOException, TechnicalException { 
    Connection connection = getConnection(); 
    // Table retrieve 
    if (connection != null) { 
     Table table = connection.getTable(TableName.valueOf(NAMESPACE, TABLE_NAME)); 


     return table; 
    } else { 
     throw new TechnicalException("Connection to Hbase not available"); 
    } 
} 

/****************************************************************/ 




private Long getLong(Result result, String columnFamily, String qualifier) { 
    Long toLong = null; 
    if (null != columnFamily && null != qualifier) { 
     byte[] value = result.getValue(Bytes.toBytes(columnFamily), Bytes.toBytes(qualifier)); 
     toLong = (value != null ? Bytes.toLong(value) : null); 
    } 
    return toLong; 
} 

private String getString(Result result, String columnFamily, String qualifier) { 
    String toString = null; 
    if (null != columnFamily && null != qualifier) { 
     byte[] value = result.getValue(Bytes.toBytes(columnFamily), Bytes.toBytes(qualifier)); 
     toString = (value != null ? Bytes.toString(value) : null); 
    } 
    return toString; 
} 


public Connection getConnection() { 
    return connection; 
} 

public void setConnection(Connection connection) { 
    this.connection = connection; 
} 



private void addColumnsToScan(Get scan, String family, String qualifier) { 
    if (org.apache.commons.lang.StringUtils.isNotEmpty(family) && org.apache.commons.lang.StringUtils.isNotEmpty(qualifier)) { 
     scan.addColumn(Bytes.toBytes(family), Bytes.toBytes(qualifier)); 
    } 
} 

private String buildRowKey(String key) throws TechnicalException { 
    StringBuilder rowKeyBuilder = new StringBuilder(); 
    rowKeyBuilder.append(HashFunction.makeSHA1Hash(key)); 
    return rowKeyBuilder.toString(); 
} 
関連する問題