タイトルがあまりにも曖昧であるが、正しくフレーズするのに問題があった場合には謝罪します。Apache Sparkでストリーミングデータに結合する
基本的に私は、Apache SparkとApache Kafkaが、リレーショナルデータベースからElasticsearchにデータを同期できるかどうかを判断しようとしています。
私の計画は、カフカのコネクタの1つを使用して、RDBMSからデータを読み取り、それをカフカのトピックにプッシュすることです。これは、モデルとDDLのERDになります。 ReportProduct
テーブルに存在する多対多の関係持っている非常に基本的な、Report
とProduct
テーブル:
CREATE TABLE dbo.Report (
ReportID INT NOT NULL PRIMARY KEY,
Title NVARCHAR(500) NOT NULL,
PublishedOn DATETIME2 NOT NULL);
CREATE TABLE dbo.Product (
ProductID INT NOT NULL PRIMARY KEY,
ProductName NVARCHAR(100) NOT NULL);
CREATE TABLE dbo.ReportProduct (
ReportID INT NOT NULL,
ProductID INT NOT NULL,
PRIMARY KEY (ReportID, ProductID),
FOREIGN KEY (ReportID) REFERENCES dbo.Report (ReportID),
FOREIGN KEY (ProductID) REFERENCES dbo.Product (ProductID));
INSERT INTO dbo.Report (ReportID, Title, PublishedOn)
VALUES (1, N'Yet Another Apache Spark StackOverflow question', '2017-09-12T19:15:28');
INSERT INTO dbo.Product (ProductID, ProductName)
VALUES (1, N'Apache'), (2, N'Spark'), (3, N'StackOverflow'), (4, N'Random product');
INSERT INTO dbo.ReportProduct (ReportID, ProductID)
VALUES (1, 1), (1, 2), (1, 3), (1, 4);
SELECT *
FROM dbo.Report AS R
INNER JOIN dbo.ReportProduct AS RP
ON RP.ReportID = R.ReportID
INNER JOIN dbo.Product AS P
ON P.ProductID = RP.ProductID;
を私の目標は、次の構造を持つ文書にこれを変換することです:
{
"ReportID":1,
"Title":"Yet Another Apache Spark StackOverflow question",
"PublishedOn":"2017-09-12T19:15:28+00:00",
"Product":[
{
"ProductID":1,
"ProductName":"Apache"
},
{
"ProductID":2,
"ProductName":"Spark"
},
{
"ProductID":3,
"ProductName":"StackOverflow"
},
{
"ProductID":4,
"ProductName":"Random product"
}
]
}
私はローカルでモックアップした静的なデータを使って、この種の構造を作ることができました:
report.join(
report_product.join(product, "product_id")
.groupBy("report_id")
.agg(
collect_list(struct("product_id", "product_name")).alias("product")
), "report_id").show
しかし、これはあまりにも基本的なものであり、ストリームはもっと複雑になると思います。
データが不規則に変化しているため、レポートとその製品が常に変更されており、製品はしばらくの間(主に週単位で)変更されています。
これらの表のいずれかで発生したElasticsearchへの変更を複製したいと思います。あなたがConfluent Platform(またはseparately)の一部として提供されていますJDBC Sourceを使用することができ、また、「あなた一度kafka-connect-cdc-mssql
調査することをお勧めします -
を使用してElasticsearchに上からカフカトピックをストリーミングします。私が前に作った研究から、カフカはあなたが私のための非パーティションキーに参加させることはできません。 KSQLはそれを解決しますか? –
KSQLを使用して簡単に再パーティション化できます。これは、この問題を回避する方法です。私はそれを試していない。 –