Apache Sparkを使用してApache Kafkaクラスタからデータを取得し、そのデータをHadoopファイルに格納するプログラムを実行しています。私のプログラムは以下の通りです:Apache Sparkカフカストリーム実行中にHadoop OutputFormat RunTimeExceptionを取得する
public final class SparkKafkaConsumer {
public static void main(String[] args) {
SparkConf sparkConf = new SparkConf().setAppName("JavaKafkaWordCount");
JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(2000));
Map<String, Integer> topicMap = new HashMap<String, Integer>();
String[] topics = "Topic1, Topic2, Topic3".split(",");
for (String topic: topics) {
topicMap.put(topic, 3);
}
JavaPairReceiverInputDStream<String, String> messages =
KafkaUtils.createStream(jssc, "kafka.test.com:2181", "NameConsumer", topicMap);
JavaDStream<String> lines = messages.map(new Function<Tuple2<String, String>, String>() {
public String call(Tuple2<String, String> tuple2) {
return tuple2._2();
}
});
JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
public Iterable<String> call(String x) {
return Lists.newArrayList(",".split(x));
}
});
JavaPairDStream<String, Integer> wordCounts = words.mapToPair(
new PairFunction<String, String, Integer>() {
public Tuple2<String, Integer> call(String s) {
return new Tuple2<String, Integer>(s, 1);
}
}).reduceByKey(new Function2<Integer, Integer, Integer>() {
public Integer call(Integer i1, Integer i2) {
return i1 + i2;
}
});
wordCounts.print();
wordCounts.saveAsHadoopFiles("hdfs://localhost:8020/user/spark/stream/", "txt");
jssc.start();
jssc.awaitTermination();
}
}
私が申請書を提出するには、このコマンドを使用しています:C:\spark-1.6.2-bin-hadoop2.6\bin\spark-submit --packages org.apache.spark:spark-streaming-kafka_2.10:1.6.2 --class "SparkKafkaConsumer" --master local[4] target\simple-project-1.0.jar
私はこのエラーを取得しています:このエラーの原因とどのように私はそれを解決するのですされて何java.lang.RuntimeException: class scala.runtime.Nothing$ not org.apache.hadoop.mapred.OutputFormat at org.apache.hadoop.conf.Configuration.setClass(Configuration.java:2148)
を?
これはスパークで問題http://stackoverflow.com/questions/29007085/saveasnewapihadoopfile-giving-error-when-used-のように見えます出力形式として.. –
代わりに 'saveAsHadoopFiles(" hdfs:// localhost:8020/user/spark/stream/"、" txt "、Text.class、IntWritable.class、TextOutputFormat.class)'を試してみてください。 –
@Hawknight 'Text.class'と' TextOutputFormat.class'の完全なパッケージは何ですか? – khateeb