Spark Streaming JobでSpark SQLを使用して、Hiveテーブルを検索しています。 カフカストリーミングは問題なく正常に動作します。 hiveContext.runSqlHive(sqlQuery);
をdirectKafkaStream.foreachRDD
の外に実行すると問題なく正常に動作します。しかし、ストリーミングジョブの中でハイブテーブルルックアップが必要です。 JDBC(jdbc:hive2://
)を使用すると動作しますが、Spark SQLを使いたいと思います。次のようにSpark SQLがSpark Streaming(KafkaStream)で失敗しました
私のソースコードの重要な場所が見えます:
// set context
SparkConf sparkConf = new SparkConf().setAppName(appName).set("spark.driver.allowMultipleContexts", "true");
SparkContext sparkSqlContext = new SparkContext(sparkConf);
JavaStreamingContext streamingContext = new JavaStreamingContext(sparkConf, Durations.seconds(batchDuration));
HiveContext hiveContext = new HiveContext(sparkSqlContext);
// Initialize Direct Spark Kafka Stream. Starts from top
JavaPairInputDStream<String, String> directKafkaStream =
KafkaUtils.createDirectStream(streamingContext,
String.class,
String.class,
StringDecoder.class,
StringDecoder.class,
kafkaParams,
topicsSet);
// work on stream
directKafkaStream.foreachRDD((Function<JavaPairRDD<String, String>, Void>) rdd -> {
rdd.foreachPartition(tuple2Iterator -> {
// get message
Tuple2<String, String> item = tuple2Iterator.next();
// lookup
String sqlQuery = "SELECT something FROM somewhere";
Seq<String> resultSequence = hiveContext.runSqlHive(sqlQuery);
List<String> result = scala.collection.JavaConversions.seqAsJavaList(resultSequence);
});
return null;
});
// Start the computation
streamingContext.start();
streamingContext.awaitTermination();
私はのtry-catchで囲む場合でも、意味のあるエラーを取得していません。
誰かが助けてくれることを祈っています。ありがとうございました。
//編集:あなたはそれを可能にしませんスパークSQLを使用したいという理由だけで
// work on stream
directKafkaStream.foreachRDD((Function<JavaPairRDD<String, String>, Void>) rdd -> {
// driver
Map<String, String> lookupMap = getResult(hiveContext); //something with hiveContext.runSqlHive(sqlQuery);
rdd.foreachPartition(tuple2Iterator -> {
// worker
while (tuple2Iterator != null && tuple2Iterator.hasNext()) {
// get message
Tuple2<String, String> item = tuple2Iterator.next();
// lookup
String result = lookupMap.get(item._2());
}
});
return null;
});
メッセージを処理するためにカフカメッセージから特定の値でテーブルルックアップが必要な場合は、どちらの方が良いでしょうか? – cSteusloff
jdbcを介してjoin vs queryを意味しますか? – zero323
私はあなたを誤解したと思います。私の計画はすでに間違いだと思った。私が悪用されるように。 – cSteusloff