私はakaを作成しています。カフカプロデューサーはスカラーです。私はscala kafkaクライアントからkafka brokerにメッセージを送信しようとしています。コマンドラインからkafkaのコンシューマーを開始する。カフカのプロデューサーと消費者はコマンドプロンプトからうまく機能しています。 KafkaはKerberosで、SASL_PlainTextセキュリティが有効です。Kerberosへのデータ送信scalaクライアントからのKafkaクラスタ
下記の私のconfファイル、クライアントコード、アプリケーションログをご覧ください。私は、コードからKerberosに接続する際に問題があるはずだと思います。
Scalaのクライアント:
KafkaClient {
com.sun.security.auth.module.Krb5LoginModule required
doNotPrompt=true
useTicketCache=false
useKeyTab=true
serviceName="kafka"
principal="[email protected]"
keyTab="/home/pqr/.pqr.headless.keytab"
debug=true
client=true;
};
Client {
com.sun.security.auth.module.Krb5LoginModule required
doNotPrompt=true
useKeyTab=true
useTicketCache=false
serviceName="zookeeper"
principal="[email protected]"
keyTab="/home/pqr/.pqr.headless.keytab"
debug=true;
};
これは私がクラスタ上で私のjarファイルを実行すると、私は取得していますアプリケーションログです: をこれはKafka_Clientのconfファイルは、私がKerberos認証に使用しています
package com.ABC.adds.producer
import akka.actor.ActorSystem
import akka.kafka.ProducerSettings
import akka.kafka.scaladsl.Producer
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.Source
import com.ABC.adds.models.Models.GMMOfaq
import com.ABC.adds.producer.serializer.ModelSerializer
import com.thoughtworks.xstream.XStream
import com.thoughtworks.xstream.io.xml.DomDriver
import com.typesafe.config.ConfigFactory
import com.typesafe.scalalogging.LazyLogging
import org.apache.kafka.clients.CommonClientConfigs
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord, RecordMetadata}
import org.apache.kafka.common.serialization.ByteArraySerializer
import scala.concurrent.ExecutionContext.Implicits.global
import scala.util.{Failure, Success}
object faqProducer extends App with LazyLogging{
val config = ConfigFactory.load()
implicit val system = ActorSystem.create("adds-faq-producer", config)
implicit val mat = ActorMaterializer()
val producerSettings = ProducerSettings(system, new ByteArraySerializer, new ModelSerializer[PPOfaq](classOf[PPOfaq]))
.withBootstrapServers("jbt12324.systems.pfk.ABC:3382")
.withProperty(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT")
.withProperty("zookeeper.connect","jbt234234.systems.pfk.ABC:2341,jbt234.systems.pfk.ABC:234,jbt1234324.systems.pfk.ABC:2234")
.withProperty(CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG, "1")
val xstream = new XStream(new DomDriver)
val personString: String = scala.io.Source.fromInputStream(getClass().getClassLoader().getResourceAsStream("PPOfaq.xml")).mkString
xstream.alias("faq", classOf[PPOfaq])
val ppofaq: PPOfaq = xstream.fromXML(personString).asInstanceOf[PPOfaq]
logger.info(s"Producer Configuration is : {} ", producerSettings.toString)
logger.info(s"Sending message : {}", ppofaq)
logger.info("KafkaProducer Send first fetching Partitions for topics")
val kafkaProducer = producerSettings.createKafkaProducer()
kafkaProducer.partitionsFor("asp.adds.ppo.pems")
val done1 = kafkaProducer.send(new ProducerRecord[Array[Byte], PPOfaq]("asp.adds.ppo.pems", ppofaq))
val recordMetaData : RecordMetadata = done1.get()
logger.info("Topic is : " + recordMetaData.topic() +" partition is : "+ recordMetaData.partition() +" offset is : "+ recordMetaData.offset())
logger.info("KafkaProdcuer Send first fetching Partitions for topics end")
val done = Source.single(ppofaq)
.map { elem =>
new ProducerRecord[Array[Byte], PPOfaq]("asp.adds.ppo.pems", ppofaq)
}
.runWith(Producer.plainSink(producerSettings))
done onComplete {
case Success(s) => {
logger.info(s"The producer has sent a message to the topic: asp.adds.ppo.pems!!")
}
case Failure(e) => {
logger.error("Erorr occured while producing Topic", e)
e.printStackTrace()
e.fillInStackTrace()
e.getCause
e.getMessage
}
}
}
ですアプリケーションログ:
[[email protected] ResourceBundle]$ java -Djava.security.auth.login.config=kafka_client_jaas.conf -Djava.security.krb5.conf=/etc/krb5.conf -Djavax.security.auth.useSubjectCredsOnly=true -Djava.security.debug=logincontext,gssloginconfig,configfile,configparser, -jar adds-producer.jar
[Policy Parser]: creating policy entry for expanded java.ext.dirs path:
file:/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.51-0.b16.el6_6.x86_64/jre/lib/ext/*
[Policy Parser]: creating policy entry for expanded java.ext.dirs path:
file:/usr/java/packages/lib/ext/*
14:44:56.520 [main] INFO c.h.adds.producer.addsProducer$ - Producer Configuration is : [email protected]
14:44:56.523 [main] INFO c.h.adds.producer.addsProducer$ - Sending message : PPOadds(PIC_EVENT,01/06/2016,26/10/2016,ASSIGNED,asd_asdasd_ERRORED,asd,asdMSR,High,CASE-3,CASE-4,CASE-1,CASE-2,,CustomAttributes(GCAL,PTS Only,,DTF_INT_WORKFLOWS_COMPLETE,16065763,2495921,12,CreditDefaultSwap,CreditDefaultSwap,VERIFIED_asdasd,ABCUSA,asdCDS))
14:44:56.524 [main] INFO c.h.adds.producer.addsProducer$ - KafkaProducer Send first fetching Partitions for topics
configfile: reading file:/home/asdasd-asdasdds-adds-asda/adasd-erer/hfgh/kafka_client_jaas.conf
configparser: Reading next config entry: KafkaClient
configparser: com.sun.security.auth.module.Krb5LoginModule, REQUIRED
configparser: [email protected]
configparser: debug=true
configparser: doNotPrompt=true
configparser: keyTab=/home/asdasd-asdad-adds-rrewr/.sdfsf-sdfsd-adds-sdfsf.headless.keytab
configparser: client=true
configparser: useKeyTab=true
configparser: useTicketCache=false
configparser: serviceName=kafka
configparser: Reading next config entry: Client
configparser: com.sun.security.auth.module.Krb5LoginModule, REQUIRED
configparser: [email protected]
configparser: debug=true
configparser: doNotPrompt=true
configparser: keyTab=/home/sdfdsf-sfds-adds-sdf/.sdff.sdsfs-adds-usdfs.headless.keytab
configparser: useKeyTab=true
configparser: useTicketCache=false
configparser: serviceName=zookeeper
Debug is true storeKey false useTicketCache false useKeyTab true doNotPrompt true ticketCache is null isInitiator true KeyTab is /home/dasda-sasd-adds-asdad/.asdad-asd-adds-adsasd.headless.keytab refreshKrb5Config is false principal is [email protected] tryFirstPass is false useFirstPass is false storePass is false clearPass is false
principal is [email protected]
Will use keytab
[LoginContext]: login success
Commit Succeeded
[LoginContext]: commit success
14:44:56.748 [main] WARN o.a.k.c.producer.ProducerConfig - The configuration 'zookeeper.connect' was supplied but isn't a known config.
adds in thread "main" org.apache.kafka.common.errors.Timeoutadds: Failed to update metadata after 60000 ms.
ia何か悪いことをしている。 おかげで、 マヘンドラTonape