2016-07-21 4 views
1

Apache Sparkを使用してApache Kafkaクラスタからデータを取得し、そのデータをHadoopファイルに格納するプログラムを実行しています。私のプログラムは以下の通りです:Apache Sparkカフカストリーム実行中にHadoop OutputFormat RunTimeExceptionを取得する

public final class SparkKafkaConsumer { 
    public static void main(String[] args) { 
     SparkConf sparkConf = new SparkConf().setAppName("JavaKafkaWordCount"); 
     JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(2000)); 
     Map<String, Integer> topicMap = new HashMap<String, Integer>(); 
     String[] topics = "Topic1, Topic2, Topic3".split(","); 
     for (String topic: topics) { 
      topicMap.put(topic, 3); 
     } 
     JavaPairReceiverInputDStream<String, String> messages = 
       KafkaUtils.createStream(jssc, "kafka.test.com:2181", "NameConsumer", topicMap); 
     JavaDStream<String> lines = messages.map(new Function<Tuple2<String, String>, String>() { 
      public String call(Tuple2<String, String> tuple2) { 
       return tuple2._2(); 
      } 
     }); 
     JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() { 
      public Iterable<String> call(String x) { 
       return Lists.newArrayList(",".split(x)); 
      } 
     }); 
     JavaPairDStream<String, Integer> wordCounts = words.mapToPair(
       new PairFunction<String, String, Integer>() { 
        public Tuple2<String, Integer> call(String s) { 
         return new Tuple2<String, Integer>(s, 1); 
        } 
       }).reduceByKey(new Function2<Integer, Integer, Integer>() { 
        public Integer call(Integer i1, Integer i2) { 
         return i1 + i2; 
        } 
       }); 
     wordCounts.print(); 
     wordCounts.saveAsHadoopFiles("hdfs://localhost:8020/user/spark/stream/", "txt"); 
     jssc.start(); 
     jssc.awaitTermination(); 
    } 
} 

私が申請書を提出するには、このコマンドを使用しています:C:\spark-1.6.2-bin-hadoop2.6\bin\spark-submit --packages org.apache.spark:spark-streaming-kafka_2.10:1.6.2 --class "SparkKafkaConsumer" --master local[4] target\simple-project-1.0.jar

私はこのエラーを取得しています:このエラーの原因とどのように私はそれを解決するのですされて何java.lang.RuntimeException: class scala.runtime.Nothing$ not org.apache.hadoop.mapred.OutputFormat at org.apache.hadoop.conf.Configuration.setClass(Configuration.java:2148)

を?

+0

これはスパークで問題http://stackoverflow.com/questions/29007085/saveasnewapihadoopfile-giving-error-when-used-のように見えます出力形式として.. –

+0

代わりに 'saveAsHadoopFiles(" hdfs:// localhost:8020/user/spark/stream/"、" txt "、Text.class、IntWritable.class、TextOutputFormat.class)'を試してみてください。 –

+0

@Hawknight 'Text.class'と' TextOutputFormat.class'の完全なパッケージは何ですか? – khateeb

答えて

2

私はこのエラーが本当に喚起的ではないことに同意しますが、saveAsHadoopFileのいずれかの方法で出力するデータの形式を指定する方が、この種の例外から身を守る方が良いでしょう。あなたの例では

saveAsHadoopFiles(java.lang.String prefix, java.lang.String suffix, java.lang.Class<?> keyClass, java.lang.Class<?> valueClass, java.lang.Class<F> outputFormatClass) 

、に対応するであろう:ここ

は、ドキュメントのあなたの特定のメソッドのプロトタイプだあなたwordCounts PairDStreamのフォーマットに基づいて

wordCounts.saveAsHadoopFiles("hdfs://localhost:8020/user/spark/stream/", "txt", Text.class, IntWritable.class, TextOutputFormat.class) 

、私はTextを選びましたキーのタイプはStringであり、キーに関連付けられた値はIntegerであるため、IntWritableです。

基本的なプレーンテキストファイルだけが必要な場合はTextOutputFormatを使用しますが、出力オプションの詳細についてはFileOutputFormatのサブクラスを参照できます。

Textクラスはorg.apache.hadoop.ioパッケージから、TextOutputFormatorg.apache.hadoop.mapredパッケージから提供されます。ただ、完全性について

1

(@ジョナサンは、正しい答えを与えた)

import org.apache.hadoop.io.IntWritable; 
import org.apache.hadoop.io.Text; 
import org.apache.hadoop.mapred.TextOutputFormat; 

... 
wordCounts.saveAsHadoopFiles("hdfs://localhost:8020/user/spark/stream/", "txt", Text.class, IntWritable.class, TextOutputFormat.class) 
関連する問題