2017-08-02 11 views
0

次のコードは、Spark Submitを使用してKafkaからのメッセージを読み込むためのコードです。 コードはエラーなしで実行され、終了しますが、メッセージは読み取られません(出力ファイルは空であり、rdd.foreachPartition内のログは印刷されません)。Spark Streaming Kafkaを使用してカフカトピックからメッセージを読み取ることができません

package hive; 
import java.net.URI; 
import java.util.*; 
import org.apache.spark.SparkConf; 
import org.apache.spark.TaskContext; 
import org.apache.spark.api.java.*; 
import org.apache.spark.api.java.function.*; 
import org.apache.spark.streaming.Duration; 
import org.apache.spark.streaming.Durations; 
import org.apache.spark.streaming.StreamingContext; 
import org.apache.spark.streaming.api.java.*; 
import org.apache.spark.streaming.kafka010.*; 
import org.slf4j.Logger; 
import org.slf4j.LoggerFactory; 
import org.apache.hadoop.fs.FileSystem; 
import org.apache.kafka.clients.consumer.ConsumerRecord; 
import org.apache.kafka.common.TopicPartition; 
import org.apache.kafka.common.serialization.StringDeserializer; 
import scala.Tuple2; 
public class SparkKafka1 { 
    private static final Logger logger = LoggerFactory.getLogger(SparkKafka1.class); 
public static void main(String[] args) { 
    Map<String, Object> kafkaParams = new HashMap<>(); 
    kafkaParams.put("bootstrap.servers", "http://192.168.1.214:9092,http://192.168.1.214:9093"); 
    kafkaParams.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); 
    kafkaParams.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); 
    //kafkaParams.put("group.id", "StreamingGroup"); 
    kafkaParams.put("auto.offset.reset", "smallest"); 
    kafkaParams.put("enable.auto.commit", false); 

    String user = "ankit"; 
    String password = "[email protected]"; 
    Collection<String> topics = Arrays.asList("StreamingTopic"); 
    SparkConf conf = new SparkConf().setMaster("spark://192.168.1.214:7077") 
    .set("spark.deploy.mode", "cluster").set("user",user) 
    .set("password",password).set("spark.driver.memory", "1g").set("fs.defaultFS", "hdfs://192.168.1.214:9000") 
    .setAppName("NetworkWordCount"); 
    JavaStreamingContext streamingContext = new JavaStreamingContext(conf,new Duration(500)); 
    JavaInputDStream<ConsumerRecord<String, String>> stream = 
     KafkaUtils.createDirectStream(
     streamingContext, 
     LocationStrategies.PreferConsistent(), 
     ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams) 
    ); 

    stream.mapToPair(record -> new Tuple2<>(record.key(), record.value())); 
    stream.foreachRDD(rdd ->{    
     rdd.foreachPartition(item ->{ 
      while (item.hasNext()) {  
       System.out.println(">>>>>>>>>>>>>>>>>>>>>>>>>>>"+item.next()); 
       logger.info("next item="+item.next()); 
} 
}); 
}); 
    logger.info("demo log="+stream.count()); 
    stream.foreachRDD(rdd -> { 
      OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges(); 
      rdd.foreachPartition(consumerRecords -> { 
      OffsetRange o = offsetRanges[TaskContext.get().partitionId()]; 
      System.out.println(
       o.topic() + " " + o.partition() + " " + o.fromOffset() + " " + o.untilOffset()); 
      rdd.saveAsTextFile("/home/ankit/work/warehouse/Manish.txt"); 
      logger.info("tokenizing inside processElement method"); 
      }); 
     }); 
} 
} 

次はのpom.xmlです:

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> 
    <modelVersion>4.0.0</modelVersion> 

    <groupId>SparkTest</groupId> 
    <artifactId>SparkTest</artifactId> 
    <version>0.0.1-SNAPSHOT</version> 
    <packaging>jar</packaging> 


    <name>SparkTest</name> 
    <url>http://maven.apache.org</url> 

    <properties> 
     <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> 
    </properties> 

    <dependencies> 
     <!-- https://mvnrepository.com/artifact/org.scala-lang/scala-library --> 
     <dependency> 
      <groupId>org.scala-lang</groupId> 
      <artifactId>scala-library</artifactId> 
      <version>2.11.0</version> 
     </dependency> 
     <dependency> 
      <groupId>org.apache.spark</groupId> 
      <artifactId>spark-core_2.11</artifactId> 
      <version>2.1.0</version> 
      <scope>provided </scope> 
     </dependency> 
     <dependency> 
      <groupId>org.apache.spark</groupId> 
      <artifactId>spark-sql_2.11</artifactId> 
      <version>2.1.0</version> 
      <scope>provided </scope> 
     </dependency> 
     <dependency> 
      <groupId>org.apache.spark</groupId> 
      <artifactId>spark-hive_2.11</artifactId> 
      <version>2.1.0</version> 
      <scope>provided </scope> 
     </dependency> 
     <dependency> 
      <groupId>org.apache.spark</groupId> 
      <artifactId>spark-streaming_2.11</artifactId> 
      <version>2.1.0</version> 
      <scope>provided </scope> 
     </dependency> 
     <dependency> 
      <groupId>org.apache.spark</groupId> 
      <artifactId>spark-streaming-flume_2.11</artifactId> 
      <version>2.1.0</version> 
      <scope>provided </scope> 
     </dependency> 
     <dependency> 
      <groupId>org.apache.spark</groupId> 
      <artifactId>spark-streaming-kafka-0-10_2.11</artifactId> 
      <version>2.1.0</version> 
     </dependency> 
     <dependency> 
      <groupId>org.apache.hive</groupId> 
      <artifactId>hive-jdbc</artifactId> 
      <version>1.1.0</version> 
     </dependency> 
     <dependency> 
      <groupId>org.apache.hadoop</groupId> 
      <artifactId>hadoop-hdfs</artifactId> 
      <version>2.6.0</version> 
     </dependency> 
     <dependency> 
      <groupId>org.apache.hadoop</groupId> 
      <artifactId>hadoop-auth</artifactId> 
      <version>2.6.0</version> 
     </dependency> 
     <dependency> 
      <groupId>org.apache.hadoop</groupId> 
      <artifactId>hadoop-common</artifactId> 
      <version>2.6.0</version> 
     </dependency> 
     <dependency> 
      <groupId>org.apache.hadoop</groupId> 
      <artifactId>hadoop-client</artifactId> 
      <version>2.6.0</version> 
     </dependency> 
     <dependency> 
      <groupId>junit</groupId> 
      <artifactId>junit</artifactId> 
      <version>3.8.1</version> 
      <scope>test</scope> 
     </dependency> 
    </dependencies> 
    <build> 
     <plugins> 
      <plugin> 
       <groupId>org.apache.maven.plugins</groupId> 
       <artifactId>maven-compiler-plugin</artifactId> 
       <version>3.1</version> 
       <configuration> 
        <!-- or whatever version you use --> 
        <source>1.8</source> 
        <target>1.8</target> 
       </configuration> 
      </plugin> 
      <plugin> 
      <groupId>org.apache.maven.plugins</groupId> 
      <artifactId>maven-shade-plugin</artifactId> 
      <version>3.0.0</version> 
      <executions> 
       <execution> 
        <phase>package</phase> 
        <goals> 
         <goal>shade</goal> 
        </goals> 
        <configuration> 
         <filters> 
          <filter> 
           <artifact>*:*</artifact> 
           <excludes> 
            <exclude>META-INF/LICENSE</exclude> 
            <exclude>META-INF/*.SF</exclude> 
            <exclude>META-INF/*.DSA</exclude> 
            <exclude>META-INF/*.RSA</exclude> 
           </excludes> 
          </filter> 
          <filter> 
          <artifact>org.apache.spark:spark-streaming-kafka-0-10_2.11</artifact> 
          <includes>        <include>org/apache/spark/streaming/kafka010/**</include> 
          </includes> 
          </filter> 
         </filters> 
         <transformers> 
          <transformer 
            implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/> 
         </transformers> 
        </configuration> 
       </execution> 
      </executions> 
     </plugin> 
     </plugins> 
    </build> 
</project> 

次のコマンドは、ジョブを送信します

./spark-submit --class hive.SparkKafka1 --master spark://192.168.1.214:6066 --deploy-mode cluster --supervise --executor-memory 2G --total-executor-cores 4 hdfs://192.168.1.214:9000/input/SparkTest-0.0.1-SNAPSHOT.jar 

答えて

0

私が見るには、このプログラムを実行していないが、あなたがしているようですkafka 0.10.2を使用し、最も小さいものは推奨されていません。

0

この2つのコマンドを追加する必要があります。

  1. streamingContext.start(); //このアプリケーションを起動します。
  2. streamingContext.awaitTermination(); //このアプリケーションを閉じないようにします。

そして、私はあなたがbootstrap.serversためのHTTP *値を使用して参照してください。 httpプレフィックスを削除します。ところで、コードにspark confを設定している場合は、 です。コマンドラインで同じ値を設定するのは無駄です。 ちょうどそれをチェックしてください。前と同じエラーが存在する場合私にお知らせください。

関連する問題