0

私はMySQLで、次のテーブル構造を持っている:ユーザ定義オブジェクトをデータフレームに変換し、RDBMSに書き込む - データベースとのマッピングを維持する方法?

テーブルのユーザー(NOT NULLと
のid INT、
名VARCHAR(20)NOT NULL、NULL、NOT
年齢INT、
アドレスVARCHARを(作成します100)NOT NULL);

今、私はKafkaからデータを読み込み、処理し、フィルタリングし、RDBMSにテーブル 'User'で書き込むスパークストリーミングジョブを書きたいと思っています。このため

が、私はまず、テーブルのPOJO表現作成している - 一度、今

JavaDStream<User> userStream = ... // created this stream with some processing 
userStream.foreachRDD(rdd -> { 
DataFrame df = sqlContext.createDataFrame(rdd,User.class); 
df.write().mode(SaveMode.Append).jdbc(MYSQL_CONNECTION_URL, "user", new java.util.Properties()); 
}); 

- 以下

@Data 
class User implements Serializable { 
private int id; 
private String name; 
private int age; 
private String address; 
} 

を、私は、データフレームにRDDを変換スパークジョブを書かれていますこのコードを実行するのは、データフレームがハッピーハザードの形で形成され、データベーススキーマと同期されないためです。したがって、 'id'カラムに 'address'を挿入しようとし、sql例外で終了します。

データフレームをデータベースのスキーマに理解させ、それに従ってUserオブジェクトからデータをロードする方法を理解できません。それを行う方法はありますか?私はJavaRDDJavaRDDにマップすることができますが、私はさらに何をすべきか理解できません。

また、私はこのリフレクションを使用しているAPIプロセスがパフォーマンスに影響を与えることも考えています。 POJOとリレーショナルデータベースの間のマッピングを維持し、データを挿入する方法があるかどうか教えてください。

答えて

1

このようにすることは、私のために働いています。

@Data 
class User implements Serializable { 
private int id; 
private String name; 
private int age; 
private String address; 
private static StructType structType = DataTypes.createStructType(new StructField[] { 
     DataTypes.createStructField("id", DataTypes.IntegerType, false), 
     DataTypes.createStructField("name", DataTypes.StringType, false), 
     DataTypes.createStructField("age", DataTypes.IntegerType, false), 
     DataTypes.createStructField("address", DataTypes.StringType, false) 
}); 

public static StructType getStructType() { 
    return structType; 
} 

public Object[] getAllValues() { 
    return new Object[]{id, name, age, address}; 
} 

} 

スパークジョブ -

JavaDStream<User> userStream = ... // created this stream with some processing 
userStream.map(e -> { 
      Row row = RowFactory.create(e.getAllValues()); 
      return row; 
     }).foreachRDD(rdd -> { 
      DataFrame df = sqlContext.createDataFrame(rdd,User.getStructType()); 
      df.write().mode(SaveMode.Append).jdbc(MYSQL_CONNECTION_URL, "user", new java.util.Properties()); 
     }); 

私は以前のもので、データフレームは、独自のデータ構造にPOJOをマッピングするためにリフレクションを使用しているため、これは以前より行うには良い方法だと思います。行が点火SQL自体のフォーマットであり、IはすでにgetAllValues()getStructTypeの列マッピング()にデータフレームへのデータの挿入のために言及しています私はすでに午前これは、クリーンな方法であります

私が間違っている場合は、私に修正してください。

関連する問題