2017-05-17 3 views
0

私はバッチジョブとしてcassandraテーブルにデータを挿入するフリンクプロジェクトを持っています。私はすでに同じカンドンドラテーブルにpojoを書いているflinkストリームプロジェクトを持っていますが、cassandraOutputFormatはタプルとしてデータを必要とします(CassandraSinkのようなpojosを受け入れるように変更されていることがあります)。Flink cassandraOutputFormatタプルにフリーズした値が必要

DataSet<Tuple3<String, List<CustomDataObj>, String>> outputDataSet = listOfAlphaGroupingObject.map(new AlphaGroupingObjectToTuple3Mapper()); 

そして、ここでは、同様の出力をトリガラインです:

@Table(keyspace="mykeyspace", name="mytablename") 
public class AlphaGroupingObject implements Serializable { 

    @Column(name = "jobId") 
    private String jobId; 
    @Column(name = "datalist") 
    @Frozen("list<frozen<dataobj>") 
    private List<CustomDataObj> dataobjs; 
    @Column(name = "userid") 
    private String userid; 

    //Getters and Setters 
} 

そして、私はこのPOJOから作っていたタプルのデータセット:だからここで私が持っているPOJOはということです

outputDataSet.output(new CassandraOutputFormat<>("INSERT INTO mykeyspace.mytablename (jobid, datalist, userid) VALUES (?,?,?);", clusterThatWasBuilt)); 

今私が持っている問題は、私はこれを実行しようとすると、それはカサンドラテーブルへの出力にそれをしようとしたとき、私はこのエラーを取得することです:

Caused by: com.datastax.driver.core.exceptions.CodecNotFoundException: 
Codec not found for requested operation: [frozen<mykeyspace.dataobj> <-> flink.custom.data.CustomDataObj] 

私はそれがpojoだったときを知っています。フィールドに@Frozenアノテーションを追加するだけでしたが、タプルのためにそれを行う方法はわかりません。これを修正するための最良の/適切な方法は何ですか?または、実際に私が見つけていないcassandraOutputFormatにpojosを送信する方法があるので、私は何か不必要なことをしていますか?

事前にご協力いただきありがとうございます。

EDIT:CustomDataObjはに縛られていることカサンドラでテーブルスキーマを含む

@UDT(name="dataobj", keyspace = "mykeyspace") 
public class CustomDataObj implements Serializable { 


    @Field(name = "userid") 
    private String userId; 

    @Field(name = "groupid") 
    private String groupId; 

    @Field(name = "valuetext") 
    private String valueText; 

    @Field(name = "comments") 
    private String comments; 

    //Getters and setters 
} 

EDIT 2

:ここ

があまりにもCustomDataObjクラスのコードですmytablenameスキーマ。

CREATE TYPE mykeyspace.dataobj (
    userid text, 
    groupid text, 
    valuetext text, 
    comments text 
); 

CREATE TABLE mykeyspace.mytablename (
    jobid text, 
    datalist list<frozen<dataobj>>, 
    userid text, 
    PRIMARY KEY (jobid, userid) 
); 
+0

はそれが正しい 'リスト<フローズン'です??'>'がありません。 –

+0

さて、それはまだ正常に実行されています(これは正直なところ、欠けているとうまくいけば奇妙です)。そして、私は欠けている ">"を追加しました。 – Jicaar

+0

テーブルを追加してスキーマを入力してください –

答えて

0

私はので、私は答えとしてこれをマークしません、私はcassandraOutputFormatにタプルを提供するよりもより良い方法を発見したが、それは技術的にはまだこの質問に答えていないと信じています。私はcassandraのオブジェクトマッパーを使用してしまったので、pojoをテーブルに送ることができます。それでも、データがテーブルに正常に格納されていること、および実装されている方法ですべてが正しく機能していることを検証する必要がありますが、これは同様の問題に直面している人にとっては役に立ちます。ここで

は、ソリューションの概要を説明ドキュメントです:http://docs.datastax.com/en/developer/java-driver/2.1/manual/object_mapper/using/

0

CustomDataObjクラス

@UDT(name = "dataobj") 
public class CustomDataObj { 
    //...... 
} 

UDT注釈を追加編集

から@Column(name = "jobid")dataobjs冷凍注釈に変更jobid注釈を

@Table(keyspace="mykeyspace", name="mytablename") 
public class AlphaGroupingObject implements Serializable { 

    @Column(name = "jobid") 
    private String jobId; 

    @Column(name = "datalist") 
    @Frozen 
    private List<CustomDataObj> dataobjs; 
    @Column(name = "userid") 
    private String userid; 

    //Getters and Setters 
} 
+0

私はすでに持っています。私はそれを示すために私の質問を更新します。 – Jicaar

+0

最初の編集でアウトラインを変更しましたが、同じエラーがスローされます。 – Jicaar

+0

これを確認してくださいhttp://docs.datastax.com/ja/developer/java-driver/3.1/manual/custom_codecs/#creating-custom-codecs-for-user-defined-types-ud-ts –

関連する問題