次のコードは、Spark Submitを使用してKafkaからのメッセージを読み込むためのコードです。 コードはエラーなしで実行され、終了しますが、メッセージは読み取られません(出力ファイルは空であり、rdd.foreachPartition内のログは印刷されません)。Spark Streaming Kafkaを使用してカフカトピックからメッセージを読み取ることができません
package hive;
import java.net.URI;
import java.util.*;
import org.apache.spark.SparkConf;
import org.apache.spark.TaskContext;
import org.apache.spark.api.java.*;
import org.apache.spark.api.java.function.*;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.StreamingContext;
import org.apache.spark.streaming.api.java.*;
import org.apache.spark.streaming.kafka010.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.fs.FileSystem;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import scala.Tuple2;
public class SparkKafka1 {
private static final Logger logger = LoggerFactory.getLogger(SparkKafka1.class);
public static void main(String[] args) {
Map<String, Object> kafkaParams = new HashMap<>();
kafkaParams.put("bootstrap.servers", "http://192.168.1.214:9092,http://192.168.1.214:9093");
kafkaParams.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
kafkaParams.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
//kafkaParams.put("group.id", "StreamingGroup");
kafkaParams.put("auto.offset.reset", "smallest");
kafkaParams.put("enable.auto.commit", false);
String user = "ankit";
String password = "[email protected]";
Collection<String> topics = Arrays.asList("StreamingTopic");
SparkConf conf = new SparkConf().setMaster("spark://192.168.1.214:7077")
.set("spark.deploy.mode", "cluster").set("user",user)
.set("password",password).set("spark.driver.memory", "1g").set("fs.defaultFS", "hdfs://192.168.1.214:9000")
.setAppName("NetworkWordCount");
JavaStreamingContext streamingContext = new JavaStreamingContext(conf,new Duration(500));
JavaInputDStream<ConsumerRecord<String, String>> stream =
KafkaUtils.createDirectStream(
streamingContext,
LocationStrategies.PreferConsistent(),
ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams)
);
stream.mapToPair(record -> new Tuple2<>(record.key(), record.value()));
stream.foreachRDD(rdd ->{
rdd.foreachPartition(item ->{
while (item.hasNext()) {
System.out.println(">>>>>>>>>>>>>>>>>>>>>>>>>>>"+item.next());
logger.info("next item="+item.next());
}
});
});
logger.info("demo log="+stream.count());
stream.foreachRDD(rdd -> {
OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
rdd.foreachPartition(consumerRecords -> {
OffsetRange o = offsetRanges[TaskContext.get().partitionId()];
System.out.println(
o.topic() + " " + o.partition() + " " + o.fromOffset() + " " + o.untilOffset());
rdd.saveAsTextFile("/home/ankit/work/warehouse/Manish.txt");
logger.info("tokenizing inside processElement method");
});
});
}
}
次はのpom.xmlです:
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>SparkTest</groupId>
<artifactId>SparkTest</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>
<name>SparkTest</name>
<url>http://maven.apache.org</url>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<!-- https://mvnrepository.com/artifact/org.scala-lang/scala-library -->
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>2.11.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.1.0</version>
<scope>provided </scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.1.0</version>
<scope>provided </scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_2.11</artifactId>
<version>2.1.0</version>
<scope>provided </scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>2.1.0</version>
<scope>provided </scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-flume_2.11</artifactId>
<version>2.1.0</version>
<scope>provided </scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
<version>2.1.0</version>
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-jdbc</artifactId>
<version>1.1.0</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>2.6.0</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-auth</artifactId>
<version>2.6.0</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.6.0</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.6.0</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>3.8.1</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>
<configuration>
<!-- or whatever version you use -->
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.0.0</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/LICENSE</exclude>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
<filter>
<artifact>org.apache.spark:spark-streaming-kafka-0-10_2.11</artifact>
<includes> <include>org/apache/spark/streaming/kafka010/**</include>
</includes>
</filter>
</filters>
<transformers>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
次のコマンドは、ジョブを送信します
./spark-submit --class hive.SparkKafka1 --master spark://192.168.1.214:6066 --deploy-mode cluster --supervise --executor-memory 2G --total-executor-cores 4 hdfs://192.168.1.214:9000/input/SparkTest-0.0.1-SNAPSHOT.jar