0

タイトルがあまりにも曖昧であるが、正しくフレーズするのに問題があった場合には謝罪します。Apache Sparkでストリーミングデータに結合する

基本的に私は、Apache SparkとApache Kafkaが、リレーショナルデータベースからElasticsearchにデータを同期できるかどうかを判断しようとしています。

私の計画は、カフカのコネクタの1つを使用して、RDBMSからデータを読み取り、それをカフカのトピックにプッシュすることです。これは、モデルとDDLのERDになります。 ReportProductテーブルに存在する多対多の関係持っている非常に基本的な、ReportProductテーブル: ERD

CREATE TABLE dbo.Report (
    ReportID INT NOT NULL PRIMARY KEY, 
    Title NVARCHAR(500) NOT NULL, 
    PublishedOn DATETIME2 NOT NULL); 

CREATE TABLE dbo.Product (
    ProductID INT NOT NULL PRIMARY KEY, 
    ProductName NVARCHAR(100) NOT NULL); 

CREATE TABLE dbo.ReportProduct (
    ReportID INT NOT NULL, 
    ProductID INT NOT NULL, 
    PRIMARY KEY (ReportID, ProductID), 
    FOREIGN KEY (ReportID) REFERENCES dbo.Report (ReportID), 
    FOREIGN KEY (ProductID) REFERENCES dbo.Product (ProductID)); 

INSERT INTO dbo.Report (ReportID, Title, PublishedOn) 
VALUES (1, N'Yet Another Apache Spark StackOverflow question', '2017-09-12T19:15:28'); 

INSERT INTO dbo.Product (ProductID, ProductName) 
VALUES (1, N'Apache'), (2, N'Spark'), (3, N'StackOverflow'), (4, N'Random product'); 

INSERT INTO dbo.ReportProduct (ReportID, ProductID) 
VALUES (1, 1), (1, 2), (1, 3), (1, 4); 

SELECT * 
FROM dbo.Report AS R 
INNER JOIN dbo.ReportProduct AS RP 
    ON RP.ReportID = R.ReportID 
INNER JOIN dbo.Product AS P 
    ON P.ProductID = RP.ProductID; 

を私の目標は、次の構造を持つ文書にこれを変換することです:

{ 
    "ReportID":1, 
    "Title":"Yet Another Apache Spark StackOverflow question", 
    "PublishedOn":"2017-09-12T19:15:28+00:00", 
    "Product":[ 
    { 
     "ProductID":1, 
     "ProductName":"Apache" 
    }, 
    { 
     "ProductID":2, 
     "ProductName":"Spark" 
    }, 
    { 
     "ProductID":3, 
     "ProductName":"StackOverflow" 
    }, 
    { 
     "ProductID":4, 
     "ProductName":"Random product" 
    } 
    ] 
} 

私はローカルでモックアップした静的なデータを使って、この種の構造を作ることができました:

report.join(
    report_product.join(product, "product_id") 
    .groupBy("report_id") 
    .agg(
     collect_list(struct("product_id", "product_name")).alias("product") 
    ), "report_id").show 

しかし、これはあまりにも基本的なものであり、ストリームはもっと複雑になると思います。

データが不規則に変化しているため、レポートとその製品が常に変更されており、製品はしばらくの間(主に週単位で)変更されています。

これらの表のいずれかで発生したElasticsearchへの変更を複製したいと思います。あなたがConfluent Platform(またはseparately)の一部として提供されていますJDBC Sourceを使用することができ、また、「あなた一度kafka-connect-cdc-mssql

  • 調査することをお勧めします -

  • 答えて

    1
    1. カフカConnectには、ソースDBからデータを取得するにはカフカでデータを取得した場合は、Kafka Streams APIを使用してデータを操作したり、新しくリリースされたKSQLを見てください。あなたが選択するのは、Java(Kafka Streams)でのコーディングやSQLのような環境(KSQLでの)でのデータの操作のようなものです。いずれにせよ、これらの両方の出力は別のカフカの話題になるだろう。

    2. は最後に、(here利用可能、またはConfluent Platformの一部として)Elasticsearchカフカの接続プラグイン本当に良い音

    +0

    を使用してElasticsearchに上からカフカトピックをストリーミングします。私が前に作った研究から、カフカはあなたが私のための非パーティションキーに参加させることはできません。 KSQLはそれを解決しますか? –

    +0

    KSQLを使用して簡単に再パーティション化できます。これは、この問題を回避する方法です。私はそれを試していない。 –

    関連する問題