検索の長い時間が経っても助けにならないので、私はあなたに質問する必要があります!私はApache Sparkでツイートからハッシュタグのシンプルなワードカウントをしたい。アプリケーションはKafkaからハッシュタグを取得し、reduceByKey関数まですべて正常に動作します。私は必要なものApache Spark:reduceByKey関数は、Javaアプリケーションを停止します
-------------------------------------------
Time: 1483986210000 ms
-------------------------------------------
(Presse,1)
(Trump,1)
(TheResistanceGQ,1)
(MerylStreep,1)
(theresistance,1)
(Theranos,1)
(Russian,1)
(Trump,1)
(trump,1)
(Üstakıl,1)
...
は似Hastagsは私が必要とするので、カウントして表示を得ることです。結果はこのようなものです。この機能がないと
(私はTwitterやスパークbetwenn直接接続があることを知っています) reduceByKey機能が、私は次のエラーを取得:
package org.apache.spark.examples.streaming;
import java.util.HashMap;
import java.util.HashSet;
import java.io.FileOutputStream;
import java.io.PrintStream;
import java.time.Duration;
import java.util.Arrays;
import java.util.Iterator;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.regex.Pattern;
import scala.Tuple2;
import kafka.serializer.StringDecoder;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.function.*;
import org.apache.spark.streaming.api.java.*;
import org.apache.spark.streaming.kafka.KafkaUtils;
import org.apache.spark.streaming.Durations;
import org.apache.log4j.Logger;
/**
* Consumes messages from one or more topics in Kafka and does wordcount.
*/
public final class JavaDirectKafkaWordCount {
private static final Pattern SPACE = Pattern.compile(" ");
public static void main(String[] args) throws Exception {
String brokers = "XXX.XXX.XXX.XXX:9092";
String topics = "topicMontag";
// Create context with a 2 seconds batch interval
SparkConf sparkConf = new SparkConf().setAppName("JavaDirectKafkaWordCount").setMaster("local[*]");
JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, Durations.seconds(2));
Set<String> topicsSet = new HashSet<>(Arrays.asList(topics.split(",")));
Map<String, String> kafkaParams = new HashMap<>();
kafkaParams.put("metadata.broker.list", brokers);
kafkaParams.put("group.id", "1");
kafkaParams.put("auto.offset.reset", "smallest");
// Create direct kafka stream with brokers and topics
JavaPairInputDStream<String, String> messages = KafkaUtils.createDirectStream(jssc, String.class, String.class,
StringDecoder.class, StringDecoder.class, kafkaParams, topicsSet);
messages.foreachRDD(rdd -> {
System.out.println(
"--- New RDD with " + rdd.partitions().size() + " partitions and " + rdd.count() + " records");
// rdd.foreach(record -> System.out.println(record._2));
});
JavaDStream<String> lines = messages.map(new Function<Tuple2<String, String>, String>() {
@Override
public String call(Tuple2<String, String> tuple2) {
return tuple2._2();
}
});
JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
@Override
public Iterator<String> call(String x) {
return Arrays.asList(SPACE.split(x)).iterator();
}
});
JavaPairDStream<String, Integer> wordCounts = words.mapToPair(new PairFunction<String, String, Integer>() {
@Override
public Tuple2<String, Integer> call(String s) {
return new Tuple2<>(s, 1);
}
});
JavaPairDStream<String, Integer> result = wordCounts.reduceByKey(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer i1, Integer i2) {
return new Integer(i1 + i2);
}
});
//wordCounts.print();
result.print();
// PrintStream out = new PrintStream(new
// FileOutputStream("output.txt"));
// System.setOut(out);
// Start the computation
jssc.start();
jssc.awaitTermination();
}
}
:ここ
17/01/09 19:28:54 INFO DAGScheduler: ShuffleMapStage 0 (mapToPair at JavaDirectKafkaWordCount.java:106) finished in 0,377 s
17/01/09 19:28:54 INFO DAGScheduler: looking for newly runnable stages
17/01/09 19:28:54 INFO DAGScheduler: running: Set()
17/01/09 19:28:54 INFO DAGScheduler: waiting: Set(ResultStage 1)
17/01/09 19:28:54 INFO DAGScheduler: failed: Set()
17/01/09 19:28:54 INFO DAGScheduler: Submitting ResultStage 1 (ShuffledRDD[4] at reduceByKey at JavaDirectKafkaWordCount.java:113), which has no missing parents
17/01/09 19:28:54 INFO MemoryStore: Block broadcast_1 stored as values in memory (estimated size 3.2 KB, free 899.7 MB)
17/01/09 19:28:54 INFO MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 1948.0 B, free 899.7 MB)
17/01/09 19:28:54 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on XXX.XXX.XXX.XXX:56435 (size: 1948.0 B, free: 899.7 MB)
17/01/09 19:28:54 INFO SparkContext: Created broadcast 1 from broadcast at DAGScheduler.scala:1012
17/01/09 19:28:54 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 1 (ShuffledRDD[4] at reduceByKey at JavaDirectKafkaWordCount.java:113)
17/01/09 19:28:54 INFO TaskSchedulerImpl: Adding task set 1.0 with 1 tasks
17/01/09 19:28:54 INFO TaskSetManager: Starting task 0.0 in stage 1.0 (TID 2, localhost, partition 0, ANY, 5800 bytes)
17/01/09 19:28:54 INFO Executor: Running task 0.0 in stage 1.0 (TID 2)
17/01/09 19:28:54 INFO ShuffleBlockFetcherIterator: Getting 2 non-empty blocks out of 2 blocks
17/01/09 19:28:54 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 5 ms
17/01/09 19:28:54 ERROR Executor: Exception in task 0.0 in stage 1.0 (TID 2)
java.lang.NoClassDefFoundError: net/jpountz/util/SafeUtils
at org.apache.spark.io.LZ4BlockInputStream.read(LZ4BlockInputStream.java:124)
at java.io.ObjectInputStream$PeekInputStream.read(ObjectInputStream.java:2338)
at java.io.ObjectInputStream$PeekInputStream.readFully(ObjectInputStream.java:2351)
at java.io.ObjectInputStream$BlockDataInputStream.readShort(ObjectInputStream.java:2822)
at java.io.ObjectInputStream.readStreamHeader(ObjectInputStream.java:804)
at java.io.ObjectInputStream.<init>(ObjectInputStream.java:301)
...
は私のコードです
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>3.8.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.10</artifactId>
<version>0.8.2.2</version>
</dependency>
<dependency>
<groupId>org.twitter4j</groupId>
<artifactId>twitter4j-stream</artifactId>
<version>4.0.4</version>
</dependency>
<dependency>
<groupId>com.twitter</groupId>
<artifactId>hbc-core</artifactId>
<version>2.2.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.10</artifactId>
<version>2.0.1</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-xml</artifactId>
<version>2.11.0-M4</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka_2.10</artifactId>
<version>1.6.1</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-examples_2.10</artifactId>
<version>1.0.0</version>
</dependency>
</dependencies>
たぶん誰かがアイデアを持っている:ここでは0
は私のpom.xmlのですか?ありがとう...
'NoClassDefFo undError'は非常に具体的な意味と原因を持っています。残りのログ出力は何ですか? – nitind