2017-09-15 9 views
0

Sparkストリーミング用のカスタムforeachライターがあります。各行について、JDBCソースに書き込みます。また、JDBCオペレーションを実行してJDBCオペレーションを実行した後に、次のサンプルコードで「ステップ-1」や「ステップ-3」のような値を更新する前に、いくつかの高速検索を実行したいと考えています。Apache Sparkのメモリ内に存在するデータベース

ドンREDIS、MongoDBのような外部データベースを使用したくない。私はRocksDB、Derbyなどの足跡の少ないものがほしいと思っています...

チェックポイントのようにアプリケーションごとに1つのファイルを保存しても大丈夫です。内部データベースフォルダを作成します...

私はあなたが一人として、「スパークとあなたのインメモリDBを探しています何

def main(args: Array[String]): Unit = { 

val brokers = "quickstart:9092" 
val topic = "safe_message_landing_app_4" 

val sparkSession = SparkSession.builder().master("local[*]").appName("Ganesh-Kafka-JDBC-Streaming").getOrCreate(); 

val sparkContext = sparkSession.sparkContext; 
sparkContext.setLogLevel("ERROR") 
val sqlContext = sparkSession.sqlContext; 

val kafkaDataframe = sparkSession.readStream.format("kafka") 
    .options(Map("kafka.bootstrap.servers" -> brokers, "subscribe" -> topic, 
    "startingOffsets" -> "latest", "group.id" -> " Jai Ganesh", "checkpoint" -> "cp/kafka_reader")) 
    .load() 

kafkaDataframe.printSchema() 
kafkaDataframe.createOrReplaceTempView("kafka_view") 
val sqlDataframe = sqlContext.sql("select concat (topic, '-' , partition, '-' , offset) as KEY, string(value) as VALUE from kafka_view") 

val customForEachWriter = new ForeachWriter[Row] { 
    override def open(partitionId: Long, version: Long) = { 
    println("Open Started ==> partitionId ==> " + partitionId + " ==> version ==> " + version) 
    true 
    } 

    override def process(value: Row) = { 
    // Step 1 ==> Lookup a key in persistent KEY-VALUE store 

    // JDBC operations 

    // Step 3 ==> Update the value in persistent KEY-VALUE store 
    } 

    override def close(errorOrNull: Throwable) = { 
    println(" ************** Closed ****************** ") 
    } 
} 

val yy = sqlDataframe 
    .writeStream 
    .queryName("foreachquery") 
    .foreach(customForEachWriter) 
    .start() 

yy.awaitTermination() 

sparkSession.close(); 

}

+1

あなたはHTTPSについて尋ねています://db.apacheを。 org/derby/docs/10.13/devguide/cdevdvlpinmemdb.html? NVMのようなハードウェアの使用について話していない限り、「永続的なメモリ内のデータベース」が何であるかは分かりません。特別なハードウェアがなければ、Derbyのメモリー内データベースは耐久性がありません。 –

+1

私が意味することはメモリ内のことを意味します.. mysql、redisは別のプロセスとして実行されます...私は望ましくありません... derbyはドライバプログラムをsparkにロードし、エグゼキュータからはderbyに接続したい... coz my spark糸で運営されている仕事は5台のマシンになりますので、私はderbyを使うことができます...そして、それは私の必要なステップ1と3のために働くでしょう...しかしMVCCをサポートしていないので、H2データベース...だから私はダービーとH2を使って経験したいですが、火花は – Manjesh

+2

でもOKです。 Derbyの「インプロセス」データベースエンジンの用語は「組み込み」であり、他の(Java)アプリケーションにDerbyを組み込む場合はうまく機能します。 DerbyがMVCCデータベースエンジンでないことは間違いありません。 Derbyを使い始めるには、以下のチュートリアルをお勧めします:https://db.apache.org/derby/docs/10.13/getstart/ –

答えて

1

Manjesh、

..スパークする任意のインメモリDBを見ることができませんでしたシームレスなクラスタ、単一のプロセス空間を共有 "、MVCCのサポートはSnappyDataが提供するものです。 SnappyDataでは、高速ルックアップを実行するテーブルは、Sparkストリーミングジョブを実行しているプロセスと同じプロセス内にあります。それを確認してくださいhere

SnappyDataには、コア製品のApache V2ライセンスがあり、参照している特定の用途はOSSダウンロードで入手できます。

(情報開示:私はSnappyDataの従業員だと製品が質問に答えているので、それはこの質問への製品の特定の答えを提供することは理にかなって)

関連する問題