2017-06-12 11 views
0

私はakaを作成しています。カフカプロデューサーはスカラーです。私はscala kafkaクライアントからkafka brokerにメッセージを送信しようとしています。コマンドラインからkafkaのコンシューマーを開始する。カフカのプロデューサーと消費者はコマンドプロンプトからうまく機能しています。 KafkaはKerberosで、SASL_PlainTextセキュリティが有効です。Kerberosへのデータ送信scalaクライアントからのKafkaクラスタ

下記の私のconfファイル、クライアントコード、アプリケーションログをご覧ください。私は、コードからKerberosに接続する際に問題があるはずだと思います。

Scalaのクライアント:

KafkaClient { 
com.sun.security.auth.module.Krb5LoginModule required 
doNotPrompt=true 
useTicketCache=false 
useKeyTab=true 
serviceName="kafka" 
principal="[email protected]" 
keyTab="/home/pqr/.pqr.headless.keytab" 
debug=true 
client=true; 
}; 
Client { 
    com.sun.security.auth.module.Krb5LoginModule required 
    doNotPrompt=true 
    useKeyTab=true 
    useTicketCache=false 
    serviceName="zookeeper" 
    principal="[email protected]" 
    keyTab="/home/pqr/.pqr.headless.keytab" 
    debug=true; 
}; 

これは私がクラスタ上で私のjarファイルを実行すると、私は取得していますアプリケーションログです: をこれはKafka_Clientのconfファイルは、私がKerberos認証に使用しています

package com.ABC.adds.producer 

import akka.actor.ActorSystem 
import akka.kafka.ProducerSettings 
import akka.kafka.scaladsl.Producer 
import akka.stream.ActorMaterializer 
import akka.stream.scaladsl.Source 
import com.ABC.adds.models.Models.GMMOfaq 
import com.ABC.adds.producer.serializer.ModelSerializer 
import com.thoughtworks.xstream.XStream 
import com.thoughtworks.xstream.io.xml.DomDriver 
import com.typesafe.config.ConfigFactory 
import com.typesafe.scalalogging.LazyLogging 
import org.apache.kafka.clients.CommonClientConfigs 
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord, RecordMetadata} 
import org.apache.kafka.common.serialization.ByteArraySerializer 

import scala.concurrent.ExecutionContext.Implicits.global 
import scala.util.{Failure, Success} 

object faqProducer extends App with LazyLogging{ 

    val config = ConfigFactory.load() 
    implicit val system = ActorSystem.create("adds-faq-producer", config) 
    implicit val mat = ActorMaterializer() 

    val producerSettings = ProducerSettings(system, new ByteArraySerializer, new ModelSerializer[PPOfaq](classOf[PPOfaq])) 
    .withBootstrapServers("jbt12324.systems.pfk.ABC:3382") 
    .withProperty(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT") 
     .withProperty("zookeeper.connect","jbt234234.systems.pfk.ABC:2341,jbt234.systems.pfk.ABC:234,jbt1234324.systems.pfk.ABC:2234") 
     .withProperty(CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG, "1") 

    val xstream = new XStream(new DomDriver) 
    val personString: String = scala.io.Source.fromInputStream(getClass().getClassLoader().getResourceAsStream("PPOfaq.xml")).mkString 
    xstream.alias("faq", classOf[PPOfaq]) 
    val ppofaq: PPOfaq = xstream.fromXML(personString).asInstanceOf[PPOfaq] 

    logger.info(s"Producer Configuration is : {} ", producerSettings.toString) 
    logger.info(s"Sending message : {}", ppofaq) 

    logger.info("KafkaProducer Send first fetching Partitions for topics") 
    val kafkaProducer = producerSettings.createKafkaProducer() 
    kafkaProducer.partitionsFor("asp.adds.ppo.pems") 
    val done1 = kafkaProducer.send(new ProducerRecord[Array[Byte], PPOfaq]("asp.adds.ppo.pems", ppofaq)) 
    val recordMetaData : RecordMetadata = done1.get() 

    logger.info("Topic is : " + recordMetaData.topic() +" partition is : "+ recordMetaData.partition() +" offset is : "+ recordMetaData.offset()) 

    logger.info("KafkaProdcuer Send first fetching Partitions for topics end") 

    val done = Source.single(ppofaq) 
    .map { elem => 
     new ProducerRecord[Array[Byte], PPOfaq]("asp.adds.ppo.pems", ppofaq) 
    } 
    .runWith(Producer.plainSink(producerSettings)) 

    done onComplete { 
    case Success(s) => { 
    logger.info(s"The producer has sent a message to the topic: asp.adds.ppo.pems!!") 
    } 
    case Failure(e) => { 
    logger.error("Erorr occured while producing Topic", e) 
    e.printStackTrace() 
    e.fillInStackTrace() 
    e.getCause 
    e.getMessage 
    } 
} 
} 

ですアプリケーションログ:

[[email protected] ResourceBundle]$ java -Djava.security.auth.login.config=kafka_client_jaas.conf -Djava.security.krb5.conf=/etc/krb5.conf -Djavax.security.auth.useSubjectCredsOnly=true -Djava.security.debug=logincontext,gssloginconfig,configfile,configparser, -jar adds-producer.jar 
      [Policy Parser]: creating policy entry for expanded java.ext.dirs path: 
        file:/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.51-0.b16.el6_6.x86_64/jre/lib/ext/* 
      [Policy Parser]: creating policy entry for expanded java.ext.dirs path: 
        file:/usr/java/packages/lib/ext/* 
    14:44:56.520 [main] INFO c.h.adds.producer.addsProducer$ - Producer Configuration is : [email protected] 
    14:44:56.523 [main] INFO c.h.adds.producer.addsProducer$ - Sending message : PPOadds(PIC_EVENT,01/06/2016,26/10/2016,ASSIGNED,asd_asdasd_ERRORED,asd,asdMSR,High,CASE-3,CASE-4,CASE-1,CASE-2,,CustomAttributes(GCAL,PTS Only,,DTF_INT_WORKFLOWS_COMPLETE,16065763,2495921,12,CreditDefaultSwap,CreditDefaultSwap,VERIFIED_asdasd,ABCUSA,asdCDS)) 
    14:44:56.524 [main] INFO c.h.adds.producer.addsProducer$ - KafkaProducer Send first fetching Partitions for topics 
    configfile: reading file:/home/asdasd-asdasdds-adds-asda/adasd-erer/hfgh/kafka_client_jaas.conf 
    configparser: Reading next config entry: KafkaClient 
    configparser:   com.sun.security.auth.module.Krb5LoginModule, REQUIRED 
    configparser:     [email protected] 
    configparser:     debug=true 
    configparser:     doNotPrompt=true 
    configparser:     keyTab=/home/asdasd-asdad-adds-rrewr/.sdfsf-sdfsd-adds-sdfsf.headless.keytab 
    configparser:     client=true 
    configparser:     useKeyTab=true 
    configparser:     useTicketCache=false 
    configparser:     serviceName=kafka 
    configparser: Reading next config entry: Client 
    configparser:   com.sun.security.auth.module.Krb5LoginModule, REQUIRED 
    configparser:     [email protected] 
    configparser:     debug=true 
    configparser:     doNotPrompt=true 
    configparser:     keyTab=/home/sdfdsf-sfds-adds-sdf/.sdff.sdsfs-adds-usdfs.headless.keytab 
    configparser:     useKeyTab=true 
    configparser:     useTicketCache=false 
    configparser:     serviceName=zookeeper 
    Debug is true storeKey false useTicketCache false useKeyTab true doNotPrompt true ticketCache is null isInitiator true KeyTab is /home/dasda-sasd-adds-asdad/.asdad-asd-adds-adsasd.headless.keytab refreshKrb5Config is false principal is [email protected] tryFirstPass is false useFirstPass is false storePass is false clearPass is false 
    principal is [email protected] 
    Will use keytab 
      [LoginContext]: login success 
    Commit Succeeded 

      [LoginContext]: commit success 
    14:44:56.748 [main] WARN o.a.k.c.producer.ProducerConfig - The configuration 'zookeeper.connect' was supplied but isn't a known config. 
    adds in thread "main" org.apache.kafka.common.errors.Timeoutadds: Failed to update metadata after 60000 ms. 

ia何か悪いことをしている。 おかげで、 マヘンドラTonape

答えて

0

私たちは、クラスタで消費者側からのメッセージを消費することができませんでしたが、我々は、我々はカフカ0.10を使用して、当社のアプリケーションAPIのを書いて、私たちのクラスタはカフカを持っているためである私たちのローカルマシンにメッセージを消費することができますこれらの2つのバージョンAPIの間に違いがあることがわかります。

また、Kerberosデバッグログを有効にして、ユーザーがKerberos対応クラスタで認証されているかどうかを確認してください。

関連する問題