2017-05-24 14 views
0

Kerberos対応のHadoopクラスタの問題に直面しています。spark、kerberos、yarn-cluster - > hbaseへの接続

私たちは、Kafka(ダイレクトストリーム)、hbaseとやり取りする糸クラスター上でストリーミングジョブを実行しようとしています。

どういうわけか、クラスタモードでhbaseに接続できません。 keytabを使用してhbaseにログインします。

これは、我々が何をすべきかです:

spark-submit --master yarn-cluster --keytab "dev.keytab" --principal "[email protected]" --conf "spark.executor.extraJavaOptions=-Dlog4j.configuration=log4j_executor_conf.properties -XX:+UseG1GC" --conf "spark.driver.extraJavaOptions=-Dlog4j.configuration=log4j_driver_conf.properties -XX:+UseG1GC" --conf spark.yarn.stagingDir=hdfs:///tmp/spark/ --files "job.properties,log4j_driver_conf.properties,log4j_executor_conf.properties" service-0.0.1-SNAPSHOT.jar job.properties 

のHBaseに接続するには:

def getHbaseConnection(properties: SerializedProperties): (Connection, UserGroupInformation) = { 


    val config = HBaseConfiguration.create(); 
    config.set("hbase.zookeeper.quorum", HBASE_ZOOKEEPER_QUORUM_VALUE); 
    config.set("hbase.zookeeper.property.clientPort", 2181); 
    config.set("hadoop.security.authentication", "kerberos"); 
    config.set("hbase.security.authentication", "kerberos"); 
    config.set("hbase.cluster.distributed", "true"); 
    config.set("hbase.rpc.protection", "privacy"); 
    config.set("hbase.regionserver.kerberos.principal", “hbase/[email protected]”); 
    config.set("hbase.master.kerberos.principal", “hbase/[email protected]”); 

    UserGroupInformation.setConfiguration(config); 

    var ugi: UserGroupInformation = null; 
     if (SparkFiles.get(properties.keytab) != null 
     && (new java.io.File(SparkFiles.get(properties.keytab)).exists)) { 
     ugi = UserGroupInformation.loginUserFromKeytabAndReturnUGI(properties.kerberosPrincipal, 
      SparkFiles.get(properties.keytab)); 
     } else { 
     ugi = UserGroupInformation.loginUserFromKeytabAndReturnUGI(properties.kerberosPrincipal, 
      properties.keytab); 
     } 


    val connection = ConnectionFactory.createConnection(config); 
    return (connection, ugi); 
    } 

、我々はHBaseのに接続します。 ...。

foreachRDD { rdd => 
     if (!rdd.isEmpty()) { 
     //var ugi: UserGroupInformation = Utils.getHbaseConnection(properties)._2 
     rdd.foreachPartition { partition => 
      val connection = Utils.getHbaseConnection(propsObj)._1 
      val table = … 
      partition.foreach { json => 

      } 
      table.put(puts) 
      table.close() 
      connection.close() 
     } 
     } 
    } 

keytabファイルが糸ステージング/ tempディレクトリにコピー取得されていない、我々はそれが中にありますので... SparkFiles.getに、我々は--filesでキータブ渡した場合、スパーク提出が失敗であることを取得されていません - -keytabは既にあります。

+0

Sparkは内部的に '--principal' /' --keytab'(または一致する 'spark.yarn.principal' /' .keytab')を使用しているので、コード内のUGIを気にする必要はありません。そして、Spark 1.4以降、Launcherはドライバが起動する前に "HBase token" *を取得してexecutorにブロードキャストしなければなりません - Clouderaが提供する 'hbase-spark'ライブラリは、実行者側で自動的に実行されるので、試してみてください。 –

+0

現実のエラーメッセージ**はあなたのソースコードよりもはるかに役立ちます - 実際のSparkバージョン。 –

+0

バージョン:scala:2.10.5 spark:1.6.0 hbase-client:1.2.0 – SudhirJ

答えて

0

エラーがある:

このサーバが失敗し、サーバリストである:myserver.test.com/120.111.25.45:60020 RpcRetryingCaller {globalStartTime = 1497943263013、100 =一時停止、= 5を再試行}、org.apache .hadoop.hbase.ipc.FailedServerException:このサーバは、失敗したサーバリストにあります:myserver.test.com/120.111.25.45:60020 RpcRetryingCaller {globalStartTime = 1497943263013、pause = 100、retries = 5}、org.apache.hadoop .hbase.ipc.FailedServerException:このサーバは、失敗したサーバリストにあります:myserver.test.com/120.111.25.45:60020 at org.apache.hadoop.hbase.client.RpcRetryingCaller.callWithRetries(RpcRetryingCaller.java:147) at org.apache.hadoop.hbase.client.HTable.get(HTable.java:935)