私はMySQLで、次のテーブル構造を持っている:ユーザ定義オブジェクトをデータフレームに変換し、RDBMSに書き込む - データベースとのマッピングを維持する方法?
テーブルのユーザー(NOT NULLと
のid INT、
名VARCHAR(20)NOT NULL、NULL、NOT
年齢INT、
アドレスVARCHARを(作成します100)NOT NULL);
今、私はKafkaからデータを読み込み、処理し、フィルタリングし、RDBMSにテーブル 'User'で書き込むスパークストリーミングジョブを書きたいと思っています。このため
が、私はまず、テーブルのPOJO表現作成している - 一度、今
JavaDStream<User> userStream = ... // created this stream with some processing
userStream.foreachRDD(rdd -> {
DataFrame df = sqlContext.createDataFrame(rdd,User.class);
df.write().mode(SaveMode.Append).jdbc(MYSQL_CONNECTION_URL, "user", new java.util.Properties());
});
- 以下
@Data
class User implements Serializable {
private int id;
private String name;
private int age;
private String address;
}
を、私は、データフレームにRDDを変換スパークジョブを書かれていますこのコードを実行するのは、データフレームがハッピーハザードの形で形成され、データベーススキーマと同期されないためです。したがって、 'id'カラムに 'address'を挿入しようとし、sql例外で終了します。
データフレームをデータベースのスキーマに理解させ、それに従ってUserオブジェクトからデータをロードする方法を理解できません。それを行う方法はありますか?私はJavaRDDはJavaRDDにマップすることができますが、私はさらに何をすべきか理解できません。
また、私はこのリフレクションを使用しているAPIプロセスがパフォーマンスに影響を与えることも考えています。 POJOとリレーショナルデータベースの間のマッピングを維持し、データを挿入する方法があるかどうか教えてください。