私は、カフカのトピックからメッセージを読むSpark Streamingのシンプルなアプリを設定しようとしています。Spark Streaming Kafka Consumer
多くの作業の後、私はこの段階に入っていますが、下記の例外があります。
コード:
[WARNING]
java.lang.reflect.InvocationTargetException
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.codehaus.mojo.exec.ExecJavaMojo$1.run(ExecJavaMojo.java:293)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.IllegalArgumentException: requirement failed: No output operations registered, so nothing to execute
at scala.Predef$.require(Predef.scala:233)
at org.apache.spark.streaming.DStreamGraph.validate(DStreamGraph.scala:161)
at org.apache.spark.streaming.StreamingContext.validate(StreamingContext.scala:542)
at org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:601)
at org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:600)
at org.apache.spark.streaming.api.java.JavaStreamingContext.start(JavaStreamingContext.scala:624)
at com.ncr.dataplatform.api.StreamingE.main(StreamingE.java:66)
私が飼育係に接続しようとした絶望のうち:
public static void main(String[] args) throws Exception {
String brokers = "my.kafka.broker" + ":" + "6667";
String topics = "MyKafkaTopic";
// Create context with a 2 seconds batch interval
SparkConf sparkConf = new SparkConf().setAppName("StreamingE")
.setMaster("local[1]")
;
JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, Durations.seconds(2));
Set<String> topicsSet = new HashSet<>(Arrays.asList(topics.split(",")));
Map<String, String> kafkaParams = new HashMap<>();
kafkaParams.put("metadata.broker.list", brokers);
System.out.println("Brokers: " + brokers);
// Create direct kafka stream with brokers and topics
JavaPairInputDStream<String, String> messages = KafkaUtils.createDirectStream(
jssc,
String.class,
String.class,
StringDecoder.class,
StringDecoder.class,
kafkaParams,
topicsSet
);
System.out.println("Message received: " + messages);
// Start the computation
jssc.start();
jssc.awaitTermination();
}
スロー
String brokers = "my.kafka.zookeeper" + ":" + "2181";
String topics = "MyKafkaTopic";
しかし、それはスロー:
[WARNING]
java.lang.reflect.InvocationTargetException
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.codehaus.mojo.exec.ExecJavaMojo$1.run(ExecJavaMojo.java:293)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.spark.SparkException: java.io.EOFException: Received -1 when reading from channel, socket has likely been closed.
at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
at scala.util.Either.fold(Either.scala:97)
at org.apache.spark.streaming.kafka.KafkaCluster$.checkErrors(KafkaCluster.scala:365)
at org.apache.spark.streaming.kafka.KafkaUtils$.getFromOffsets(KafkaUtils.scala:222)
at org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:484)
at org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:607)
at org.apache.spark.streaming.kafka.KafkaUtils.createDirectStream(KafkaUtils.scala)
at com.ncr.dataplatform.api.StreamingE.main(StreamingE.java:53)
関連の依存関係は、以下のとおりです。
<properties>
<spark.version>1.6.2</spark.version>
<kafka.version>0.8.2.1</kafka.version>
</properties>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.10</artifactId>
<version>${kafka.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.10</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka_2.10</artifactId>
<version>${spark.version}</version>
</dependency>
私はお願いしたいと思います:私は、カフカのブローカーや飼育係サーバーへ
を接続する必要がありますか。
着信メッセージに接続できないようにするには、私のコードで何が間違っていますか? java.lang.IllegalArgumentExceptionが:によって引き起こさ