2016-06-01 19 views
1

Spark Streamingを使用してKafkaから読み込み、トピックから単語が読み込まれた回数を永久にカウントする簡単なアプリケーションを作成しようとしています。これまでに非常に重要なupdateStateByKeyメソッドを呼び出す際に問題が発生しました。ジェネリックスに問題があるようですが、何が間違っているのかはわかりません。Spark StreamingでupdateStateByKeyを呼び出す例外が発生しました

エラー:

The method updateStateByKey(Function2<List<Integer>,Optional<S>,Optional<S>>) 
in the type JavaPairDStream<String,Integer> is not applicable for the arguments 
(Function2<List<Integer>,Optional<Integer>,Optional<Integer>>) 

マイコード:私はスカラ座でのジェネリック医薬品との相互作用に問題がある可能性がありだと思い

import java.util.HashMap; 
import java.util.Map; 
import java.util.concurrent.atomic.AtomicLong; 
import java.util.regex.Pattern; 
import java.util.Arrays; 
import scala.Tuple2; 
import scala.collection.immutable.List; 
import org.apache.spark.SparkConf; 
import org.apache.spark.streaming.Duration; 
import org.apache.spark.api.java.function.Function; 
import org.apache.spark.api.java.function.Function2; 
import org.apache.spark.api.java.function.FlatMapFunction; 
import org.apache.spark.api.java.function.PairFunction; 
import org.apache.spark.streaming.api.java.JavaDStream; 
import org.apache.spark.streaming.api.java.JavaPairDStream; 
import org.apache.spark.streaming.api.java.JavaPairReceiverInputDStream; 
import org.apache.spark.streaming.api.java.JavaStreamingContext; 
import org.apache.spark.streaming.kafka.KafkaUtils; 
import com.google.common.base.Optional; 


public class SimpleSparkApp { 
    static String appName = "Streaming"; 
    static String master = "local[*]"; 
    static String zk = "localhost:2181"; 
    static String consumerGroupId = "sparkStreaming"; 
    static String[] topics = {"testTopic", }; 
    static Integer numThreads = new Integer(1); 
    static final Pattern SPACE = Pattern.compile(" "); 
    static String checkpointDir = "/tmp"; 

    public static void main(String[] args) { 


    SparkConf conf = new SparkConf().setAppName(appName).setMaster(master); 
    JavaStreamingContext jsc = new JavaStreamingContext(conf, new Duration(10000)); 
    jsc.checkpoint(checkpointDir); 

    Map<String, Integer> topicMap = new HashMap<String, Integer>(); 
    for (String topic: topics) { 
     topicMap.put(topic, numThreads); 
    } 

    JavaPairReceiverInputDStream<String, String> messages = 
      KafkaUtils.createStream(jsc, zk, consumerGroupId, topicMap); 


    JavaDStream<String> lines = messages.map(new Function<Tuple2<String, String>, String>() { 
      @Override 
      public String call(Tuple2<String, String> tuple2) { 
      return tuple2._2(); 
      } 
     }); 

     JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() { 
      @Override 
      public Iterable<String> call(String x) { 
      return Arrays.asList(SPACE.split(x)); 
      } 
     }); 

     JavaPairDStream<String, Integer> wordCounts = words.mapToPair(
      new PairFunction<String, String, Integer>() { 
      @Override 
      public Tuple2<String, Integer> call(String s) { 
       return new Tuple2<>(s, 1); 
      } 
      }).reduceByKey(new Function2<Integer, Integer, Integer>() { 
      @Override 
      public Integer call(Integer i1, Integer i2) { 
       return i1 + i2; 
      } 
      }); 



     Function2<List<Integer>,Optional<Integer>,Optional<Integer>> UPDATE_FUNCTION = 
      new Function2<List<Integer>, Optional<Integer>, Optional<Integer>>() { 
      @Override 
      public Optional<Integer> call(List<Integer> values, Optional<Integer> state) { 
       Integer newSum = state.get(); 
       scala.collection.Iterator<Integer> i = values.iterator(); 
       while(i.hasNext()){ 
       newSum += i.next(); 
       } 
       runningCount.addAndGet(newSum); 
       System.out.print("Total number of words: " + String.valueOf(runningCount.get())); 
       return Optional.of(newSum); 
      } 
     }; 


     //ERROR is here 
     JavaPairDStream<String, Integer> runningCounts = 
       wordCounts.updateStateByKey(UPDATE_FUNCTION); 

     runningCounts.print(); 
     jsc.start(); 
     jsc.awaitTermination(); 
    } 
} 

?私はupdateStateByKeyに足を踏み入れるとき、私は、適切な関数宣言を参照してくださいので、私は、私はここに欠けているのかわからないんだけど:固定

/** 
    * Return a new "state" DStream where the state for each key is updated by applying 
    * the given function on the previous state of the key and the new values of each key. 
    * Hash partitioning is used to generate the RDDs with Spark's default number of partitions. 
    * @param updateFunc State update function. If `this` function returns None, then 
    *     corresponding state key-value pair will be eliminated. 
    * @tparam S State type 
    */ 
    def updateStateByKey[S](updateFunc: JFunction2[JList[V], Optional[S], Optional[S]]) 
    : JavaPairDStream[K, S] = { 
    implicit val cm: ClassTag[S] = fakeClassTag 
    dstream.updateStateByKey(convertUpdateStateFunction(updateFunc)) 
    } 

答えて

1

号 - 結局のところ、私が間違っているリストとイテレータクラスをインポートしました(私は)Eclipseを非難:

コメントアウト:

//import scala.collection.immutable.List; 
//import scala.collection.Iterator; 

追加で:

import java.util.Iterator; 
import java.util.List; 

若干変更された更新機能:

Function2<List<Integer>,Optional<Integer>,Optional<Integer>> UPDATE_FUNCTION = 
       new Function2<List<Integer>, Optional<Integer>, Optional<Integer>>() { 
       @Override 
       public Optional<Integer> call(List<Integer> values, Optional<Integer> state) { 
        Integer newSum = state.get(); 
        Iterator<Integer> i = values.iterator(); 
        while(i.hasNext()){ 
        newSum += i.next(); 
        } 
        runningCount.addAndGet(newSum); 
        System.out.print("Total number of words: " + String.valueOf(runningCount.get())); 
        return Optional.of(newSum); 
       } 
      }; 
関連する問題