Sparkストリーミング用のカスタムforeachライターがあります。各行について、JDBCソースに書き込みます。また、JDBCオペレーションを実行してJDBCオペレーションを実行した後に、次のサンプルコードで「ステップ-1」や「ステップ-3」のような値を更新する前に、いくつかの高速検索を実行したいと考えています。Apache Sparkのメモリ内に存在するデータベース
ドンREDIS、MongoDBのような外部データベースを使用したくない。私はRocksDB、Derbyなどの足跡の少ないものがほしいと思っています...
チェックポイントのようにアプリケーションごとに1つのファイルを保存しても大丈夫です。内部データベースフォルダを作成します...
私はあなたが一人として、「スパークとあなたのインメモリDBを探しています何
def main(args: Array[String]): Unit = {
val brokers = "quickstart:9092"
val topic = "safe_message_landing_app_4"
val sparkSession = SparkSession.builder().master("local[*]").appName("Ganesh-Kafka-JDBC-Streaming").getOrCreate();
val sparkContext = sparkSession.sparkContext;
sparkContext.setLogLevel("ERROR")
val sqlContext = sparkSession.sqlContext;
val kafkaDataframe = sparkSession.readStream.format("kafka")
.options(Map("kafka.bootstrap.servers" -> brokers, "subscribe" -> topic,
"startingOffsets" -> "latest", "group.id" -> " Jai Ganesh", "checkpoint" -> "cp/kafka_reader"))
.load()
kafkaDataframe.printSchema()
kafkaDataframe.createOrReplaceTempView("kafka_view")
val sqlDataframe = sqlContext.sql("select concat (topic, '-' , partition, '-' , offset) as KEY, string(value) as VALUE from kafka_view")
val customForEachWriter = new ForeachWriter[Row] {
override def open(partitionId: Long, version: Long) = {
println("Open Started ==> partitionId ==> " + partitionId + " ==> version ==> " + version)
true
}
override def process(value: Row) = {
// Step 1 ==> Lookup a key in persistent KEY-VALUE store
// JDBC operations
// Step 3 ==> Update the value in persistent KEY-VALUE store
}
override def close(errorOrNull: Throwable) = {
println(" ************** Closed ****************** ")
}
}
val yy = sqlDataframe
.writeStream
.queryName("foreachquery")
.foreach(customForEachWriter)
.start()
yy.awaitTermination()
sparkSession.close();
}
あなたはHTTPSについて尋ねています://db.apacheを。 org/derby/docs/10.13/devguide/cdevdvlpinmemdb.html? NVMのようなハードウェアの使用について話していない限り、「永続的なメモリ内のデータベース」が何であるかは分かりません。特別なハードウェアがなければ、Derbyのメモリー内データベースは耐久性がありません。 –
私が意味することはメモリ内のことを意味します.. mysql、redisは別のプロセスとして実行されます...私は望ましくありません... derbyはドライバプログラムをsparkにロードし、エグゼキュータからはderbyに接続したい... coz my spark糸で運営されている仕事は5台のマシンになりますので、私はderbyを使うことができます...そして、それは私の必要なステップ1と3のために働くでしょう...しかしMVCCをサポートしていないので、H2データベース...だから私はダービーとH2を使って経験したいですが、火花は – Manjesh
でもOKです。 Derbyの「インプロセス」データベースエンジンの用語は「組み込み」であり、他の(Java)アプリケーションにDerbyを組み込む場合はうまく機能します。 DerbyがMVCCデータベースエンジンでないことは間違いありません。 Derbyを使い始めるには、以下のチュートリアルをお勧めします:https://db.apache.org/derby/docs/10.13/getstart/ –