1
Spark Streamingを使用してKafkaから読み込み、トピックから単語が読み込まれた回数を永久にカウントする簡単なアプリケーションを作成しようとしています。これまでに非常に重要なupdateStateByKeyメソッドを呼び出す際に問題が発生しました。ジェネリックスに問題があるようですが、何が間違っているのかはわかりません。Spark StreamingでupdateStateByKeyを呼び出す例外が発生しました
エラー:
The method updateStateByKey(Function2<List<Integer>,Optional<S>,Optional<S>>)
in the type JavaPairDStream<String,Integer> is not applicable for the arguments
(Function2<List<Integer>,Optional<Integer>,Optional<Integer>>)
マイコード:私はスカラ座でのジェネリック医薬品との相互作用に問題がある可能性がありだと思い
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
import java.util.regex.Pattern;
import java.util.Arrays;
import scala.Tuple2;
import scala.collection.immutable.List;
import org.apache.spark.SparkConf;
import org.apache.spark.streaming.Duration;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaPairReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka.KafkaUtils;
import com.google.common.base.Optional;
public class SimpleSparkApp {
static String appName = "Streaming";
static String master = "local[*]";
static String zk = "localhost:2181";
static String consumerGroupId = "sparkStreaming";
static String[] topics = {"testTopic", };
static Integer numThreads = new Integer(1);
static final Pattern SPACE = Pattern.compile(" ");
static String checkpointDir = "/tmp";
public static void main(String[] args) {
SparkConf conf = new SparkConf().setAppName(appName).setMaster(master);
JavaStreamingContext jsc = new JavaStreamingContext(conf, new Duration(10000));
jsc.checkpoint(checkpointDir);
Map<String, Integer> topicMap = new HashMap<String, Integer>();
for (String topic: topics) {
topicMap.put(topic, numThreads);
}
JavaPairReceiverInputDStream<String, String> messages =
KafkaUtils.createStream(jsc, zk, consumerGroupId, topicMap);
JavaDStream<String> lines = messages.map(new Function<Tuple2<String, String>, String>() {
@Override
public String call(Tuple2<String, String> tuple2) {
return tuple2._2();
}
});
JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
@Override
public Iterable<String> call(String x) {
return Arrays.asList(SPACE.split(x));
}
});
JavaPairDStream<String, Integer> wordCounts = words.mapToPair(
new PairFunction<String, String, Integer>() {
@Override
public Tuple2<String, Integer> call(String s) {
return new Tuple2<>(s, 1);
}
}).reduceByKey(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer i1, Integer i2) {
return i1 + i2;
}
});
Function2<List<Integer>,Optional<Integer>,Optional<Integer>> UPDATE_FUNCTION =
new Function2<List<Integer>, Optional<Integer>, Optional<Integer>>() {
@Override
public Optional<Integer> call(List<Integer> values, Optional<Integer> state) {
Integer newSum = state.get();
scala.collection.Iterator<Integer> i = values.iterator();
while(i.hasNext()){
newSum += i.next();
}
runningCount.addAndGet(newSum);
System.out.print("Total number of words: " + String.valueOf(runningCount.get()));
return Optional.of(newSum);
}
};
//ERROR is here
JavaPairDStream<String, Integer> runningCounts =
wordCounts.updateStateByKey(UPDATE_FUNCTION);
runningCounts.print();
jsc.start();
jsc.awaitTermination();
}
}
?私はupdateStateByKeyに足を踏み入れるとき、私は、適切な関数宣言を参照してくださいので、私は、私はここに欠けているのかわからないんだけど:固定
/**
* Return a new "state" DStream where the state for each key is updated by applying
* the given function on the previous state of the key and the new values of each key.
* Hash partitioning is used to generate the RDDs with Spark's default number of partitions.
* @param updateFunc State update function. If `this` function returns None, then
* corresponding state key-value pair will be eliminated.
* @tparam S State type
*/
def updateStateByKey[S](updateFunc: JFunction2[JList[V], Optional[S], Optional[S]])
: JavaPairDStream[K, S] = {
implicit val cm: ClassTag[S] = fakeClassTag
dstream.updateStateByKey(convertUpdateStateFunction(updateFunc))
}