2016-10-14 28 views
3

私は、カフカのトピックからメッセージを読むSpark Streamingのシンプルなアプリを設定しようとしています。Spark Streaming Kafka Consumer

多くの作業の後、私はこの段階に入っていますが、下記の例外があります。

コード:

[WARNING] 
java.lang.reflect.InvocationTargetException 
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
    at java.lang.reflect.Method.invoke(Method.java:498) 
    at org.codehaus.mojo.exec.ExecJavaMojo$1.run(ExecJavaMojo.java:293) 
    at java.lang.Thread.run(Thread.java:745) 
Caused by: java.lang.IllegalArgumentException: requirement failed: No output operations registered, so nothing to execute 
    at scala.Predef$.require(Predef.scala:233) 
    at org.apache.spark.streaming.DStreamGraph.validate(DStreamGraph.scala:161) 
    at org.apache.spark.streaming.StreamingContext.validate(StreamingContext.scala:542) 
    at org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:601) 
    at org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:600) 
    at org.apache.spark.streaming.api.java.JavaStreamingContext.start(JavaStreamingContext.scala:624) 
    at com.ncr.dataplatform.api.StreamingE.main(StreamingE.java:66) 

私が飼育係に接続しようとした絶望のうち:

public static void main(String[] args) throws Exception { 

    String brokers = "my.kafka.broker" + ":" + "6667"; 
    String topics = "MyKafkaTopic"; 

    // Create context with a 2 seconds batch interval 
    SparkConf sparkConf = new SparkConf().setAppName("StreamingE") 
      .setMaster("local[1]") 
      ; 
    JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, Durations.seconds(2)); 

    Set<String> topicsSet = new HashSet<>(Arrays.asList(topics.split(","))); 
    Map<String, String> kafkaParams = new HashMap<>(); 
    kafkaParams.put("metadata.broker.list", brokers); 
    System.out.println("Brokers: " + brokers); 

    // Create direct kafka stream with brokers and topics 
    JavaPairInputDStream<String, String> messages = KafkaUtils.createDirectStream(
      jssc, 
      String.class, 
      String.class, 
      StringDecoder.class, 
      StringDecoder.class, 
      kafkaParams, 
      topicsSet 
    ); 

    System.out.println("Message received: " + messages); 

    // Start the computation 
    jssc.start(); 
    jssc.awaitTermination(); 

} 
スロー

String brokers = "my.kafka.zookeeper" + ":" + "2181"; 
String topics = "MyKafkaTopic"; 

しかし、それはスロー:

[WARNING] 
java.lang.reflect.InvocationTargetException 
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
    at java.lang.reflect.Method.invoke(Method.java:498) 
    at org.codehaus.mojo.exec.ExecJavaMojo$1.run(ExecJavaMojo.java:293) 
    at java.lang.Thread.run(Thread.java:745) 
Caused by: org.apache.spark.SparkException: java.io.EOFException: Received -1 when reading from channel, socket has likely been closed. 
    at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366) 
    at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366) 
    at scala.util.Either.fold(Either.scala:97) 
    at org.apache.spark.streaming.kafka.KafkaCluster$.checkErrors(KafkaCluster.scala:365) 
    at org.apache.spark.streaming.kafka.KafkaUtils$.getFromOffsets(KafkaUtils.scala:222) 
    at org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:484) 
    at org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:607) 
    at org.apache.spark.streaming.kafka.KafkaUtils.createDirectStream(KafkaUtils.scala) 
    at com.ncr.dataplatform.api.StreamingE.main(StreamingE.java:53) 

関連の依存関係は、以下のとおりです。

<properties> 
    <spark.version>1.6.2</spark.version> 
    <kafka.version>0.8.2.1</kafka.version> 
</properties> 

<dependency> 
    <groupId>org.apache.kafka</groupId> 
    <artifactId>kafka_2.10</artifactId> 
    <version>${kafka.version}</version> 
</dependency> 

<dependency> 
    <groupId>org.apache.spark</groupId> 
    <artifactId>spark-streaming_2.10</artifactId> 
    <version>${spark.version}</version> 
</dependency> 

<dependency> 
    <groupId>org.apache.spark</groupId> 
    <artifactId>spark-core_2.10</artifactId> 
    <version>${spark.version}</version> 
</dependency> 

<dependency> 
    <groupId>org.apache.spark</groupId> 
    <artifactId>spark-streaming-kafka_2.10</artifactId> 
    <version>${spark.version}</version> 
</dependency> 

私はお願いしたいと思います:私は、カフカのブローカーや飼育係サーバーへ

を接続する必要がありますか。

着信メッセージに接続できないようにするには、私のコードで何が間違っていますか? java.lang.IllegalArgumentExceptionが:によって引き起こさ

答えて

4

要求に失敗しました:ませ 出力操作がを登録していないので、スパークが動作を実行するためには何も

方法はそれのほとんどは変換が怠け者だということです。グラフを実行するには、Output Transformationを登録する必要があります。出力変換は、foreachRDD,print,collectまたはcount(以上)の形式で提供されます。代わりにprintlnを使用しての

DStream.print()を呼び出す:カフカについて

// Create direct kafka stream with brokers and topics 
JavaPairInputDStream<String, String> messages = KafkaUtils.createDirectStream(
     jssc, 
     String.class, 
     String.class, 
     StringDecoder.class, 
     StringDecoder.class, 
     kafkaParams, 
     topicsSet 
); 

messages.print(); 

// Start the computation 
jssc.start(); 
jssc.awaitTermination(); 

は、metadata.broker.listあなたのカフカのブローカーノードのアドレスを提供する必要があります。 ZooKeepersアドレスを提供する別のキーzookeeper.connectがあります。

0
import static org.apache.spark.streaming.kafka.KafkaUtils.createStream; 

import java.util.HashMap; 
import java.util.Map; 

import org.apache.spark.SparkConf; 
import org.apache.spark.storage.StorageLevel; 
import org.apache.spark.streaming.Seconds; 
import org.apache.spark.streaming.api.java.JavaPairReceiverInputDStream; 
import org.apache.spark.streaming.api.java.JavaStreamingContext; 
import org.slf4j.Logger; 
import org.slf4j.LoggerFactory; 

import com.google.common.collect.ImmutableMap; 
import java.io.FileInputStream; 
import java.io.InputStream; 
import java.util.Properties; 

import kafka.serializer.StringDecoder; 
import org.apache.hadoop.security.UserGroupInformation; 
import org.apache.spark.api.java.function.Function; 
import org.apache.spark.streaming.api.java.JavaDStream; 
import scala.Tuple2; 

public class KafkaKerberosReader { 

    // Spark information 
    private static SparkConf conf; 
    private static String appName = "KafkaKerberosReader"; 
    private static JavaStreamingContext context; 
    private static final Logger logger = LoggerFactory.getLogger(KafkaKerberosReader.class.getSimpleName()); 

    // Kafka information 
    private static String zkQuorum = ""; 
    private static String kfkQuorum = ""; 
    private static String group = ""; 
    private static Integer threads = 1; 
    private static Map<String, String> kafkaParams = new HashMap<String, String>(); 

    public static void loadProps() { 
     Properties prop = new Properties(); 
     try { 
      logger.info("------------------------------loadProps"); 
      InputStream input = new FileInputStream("config.properties"); 
      prop.load(input); 
      System.out.println("loadProps loaded:" + prop); 

      appName = prop.getProperty("app.name"); 
      autoOffsetReset = prop.getProperty("auto.offset.reset"); 
      secProtocol = prop.getProperty("security.protocol"); 
      kfkQuorum = bServers = prop.getProperty("bootstrap.servers"); 
      zkQuorum = zServers = prop.getProperty("zookeeper.connect"); 
      group = kGroupId = prop.getProperty("group.id"); 
      kKeyTabFile = prop.getProperty("kerberos.keytabfile"); 
      kJaas = prop.getProperty("kerberos.jaas"); 
      kTopic = prop.getProperty("kafka.topic"); 
      kPrincipal = prop.getProperty("kerberos.principal"); 
      logger.info("loadProps:Props:zk:" + zServers + ",issecure:" + secProtocol + ",autoOffsetReset:" 
        + autoOffsetReset + ",bServers:" + bServers + ",kJaas:" + kJaas + ",keytab:" + kKeyTabFile 
        + ", kTopic:" + kTopic + ", kPrincipal" + kPrincipal); 

      if (kPrincipal != null && kKeyTabFile != null) { 
       logger.info("---------------------Logging into Kerberos"); 
       org.apache.hadoop.conf.Configuration conf = new org.apache.hadoop.conf.Configuration(); 
       conf.set("hadoop.security.authentication", "Kerberos"); 
       UserGroupInformation.setConfiguration(conf); 
       UserGroupInformation.loginUserFromKeytabAndReturnUGI(kPrincipal, kKeyTabFile); 
      } 
     } catch (Exception e) { 
      e.printStackTrace(); 
     } 
    } 

    public static void main(String[] args) { 
     logger.info("------------------------------main:START"); 
     loadProps(); 
     // Configure the application 
     configureSpark(); 

     // Create the context 
     context = createContext(kTopic); 

     // Stop the application 
     context.start(); 
     context.awaitTermination(); 
     logger.info("main:END"); 
    } 

    /** 
    * ----------------------------------------------- | This is the kernel of 
    * the spark application | ----------------------------------------------- 
    * 
    */ 
    private static JavaStreamingContext createContext(String topic) { 

     logger.info("-------------------------------------------------------"); 
     logger.info("|   Starting: {}    |", appName); 
     logger.info("|   kafkaParams:    |", kafkaParams); 
     logger.info("-------------------------------------------------------"); 

     // Create the spark streaming context 
     context = new JavaStreamingContext(conf, Seconds.apply(5)); 

     // Read from a Kerberized Kafka 
     JavaPairReceiverInputDStream<String, String> kafkaStream = createStream(context, zkQuorum, "Default", 
       ImmutableMap.of(topic, threads), StorageLevel.MEMORY_AND_DISK_SER()); 

     kafkaStream.print(); 
     JavaDStream<String> lines = kafkaStream.map(new Function<Tuple2<String, String>, String>() { 
      private static final long serialVersionUID = 1L; 

      @Override 
      public String call(Tuple2<String, String> tuple2) { 
       return tuple2._2(); 
      } 
     }); 
     lines.print(); 

     // kafkaStream.map(message -> message._2.toLowerCase()).print(); 
     logger.info("-------------------------------------------------------"); 
     logger.info("|   Finished: {}    |", appName); 
     logger.info("-------------------------------------------------------"); 

     return context; 
    } 

    /** 
    * Create a SparkConf and configure it. 
    * 
    */ 
    private static void configureSpark() { 
     logger.info("------------------------------Initializing '%s'.", appName); 
     conf = new SparkConf().setAppName(appName); 

     if (group != null && group.trim().length() != 0) { 
      kafkaParams.put("group.id", group); 
     } 
     kafkaParams.put("auto.offset.reset", autoOffsetReset); 
     kafkaParams.put("security.protocol", secProtocol); 
     kafkaParams.put("bootstrap.servers", kfkQuorum); 
     kafkaParams.put("zookeeper.connect", zkQuorum); 

     logger.info(">- Configuration done with the follow properties:"); 
     logger.info(conf.toDebugString()); 
    } 

    static String autoOffsetReset, secProtocol, bServers, zServers, kGroupId, kKeyTabFile, kJaas, kTopic, kPrincipal; 

} 

プロパティ:

app.name=KafkaKerberosReader 

auto.offset.reset=smallest 

security.protocol=PLAINTEXTSASL 

bootstrap.servers=sandbox.hortonworks.com:6667 

zookeeper.connect=sandbox.hortonworks.com:2181 

group.id=Default 

kafka.topic=ifinboundprecint 

//#kerberos.keytabfile=/etc/hello.keytab 

//#kerberos.jaas=/etc/kafka/conf/kafka_client_jaas.conf 

//#[email protected] 

呼び出し:

火花提出--master糸--deployモードのクライアント--num-執行3 を - エグゼキュータメモリ500M - エキスパートコア3 - クラスcom.my.spark.KafkaKerberosReader 〜/ SparkStreamKafkaTest-1.0-SNAPSHOT .jar

関連する問題