2017-03-06 7 views
3

私はflinkを使用してcassandraにデータを読み書きするためにapache flinkを使用します。私はflink-connector-cassandraを使用したいと考えていましたが、私はコネクタの良いドキュメンテーション/例が見つかりません。apache flinkを使用してcassandraにデータを読み書きするJava API

Apache Flinkを使用してcassandraからデータを読み書きする方法を教えてください。私は純粋に書き込みのためのシンクの例だけを参照してください? apache flinkはapache sparkと同様にcassandraからのデータの読み取りも意味しますか?

+0

[このドキュメントとコード例](https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/connectors/cassandra.html)をご覧になりましたか? – user909481

+0

この例ではWRITE(挿入)についてのみ説明していますが、READ操作も探しています。 –

+0

リンクされたドキュメントは、Flinkがシンクしか提供していないストリーミングAPIを指します。バッチ(DataSet)APIには、潜在的に再利用できるCassandra Input-/Outputformatsがあります。 –

答えて

3

私は同じ質問がありました。これが私が探していたものです。私はそれがあなたが必要とするもののために単純化されているかどうかはわかりませんが、私はそれを誰にも見せてはいけないと考えました。私はこのアウトを考え出し

ClusterBuilder cb = new ClusterBuilder() { 
     @Override 
     public Cluster buildCluster(Cluster.Builder builder) { 
      return builder.addContactPoint("urlToUse.com").withPort(9042).build(); 
     } 
    }; 

    CassandraInputFormat<Tuple2<String, String>> cassandraInputFormat = new CassandraInputFormat<>("SELECT * FROM example.cassandraconnectorexample", cb); 

    cassandraInputFormat.configure(null); 
    cassandraInputFormat.open(null); 

    Tuple2<String, String> testOutputTuple = new Tuple2<>(); 
    cassandraInputFormat.nextRecord(testOutputTuple); 

    System.out.println("column1: " + testOutputTuple.f0); 
    System.out.println("column2: " + testOutputTuple.f1); 

の方法は、「CassandraInputFormat」クラスのコードを見つけ、それが(http://www.javatips.net/api/flink-master/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraInputFormat.java)働いていた方法を見てのおかげでした。私は正直言ってそれがちょうどフォーマットであり、名前に基づいてカサンドラからの読書のフルクラスではないと期待していました。私は他の人が同じことを考えているかもしれないという気持ちがあります。

+0

Tuple2 フォーマットを持たないcassandraにPOJOを書き込んで、代わりに独自のイベントタイプを返すことができます –

+0

はい、カスタムコードが必要です。 CassandraOutputFormatコードを少し変更したタプルの代わりにPOJOを使用するCassandra出力フォーマッタのバージョンを作成しました。 – Jicaar

0

を使用できRichFlatMapFunctionクラスを拡張するために

class MongoMapper extends RichFlatMapFunction[JsonNode,JsonNode]{ 
    var userCollection: MongoCollection[Document] = _ 
    override def open(parameters: Configuration): Unit = { 
// do something here like opening connection 
    val client: MongoClient = MongoClient("mongodb://localhost:10000") 

    userCollection = client.getDatabase("gp_stage").getCollection("users").withReadPreference(ReadPreference.secondaryPreferred()) 
    super.open(parameters) 
    } 
    override def flatMap(event: JsonNode, out: Collector[JsonNode]): Unit = { 

// Do something here per record and this function can make use of objects initialized via open 
     userCollection.find(Filters.eq("_id", somevalue)).limit(1).first().subscribe(
     (result: Document) => { 
//   println(result) 
         }, 
     (t: Throwable) =>{ 
     println(t) 
     }, 
     ()=>{ 
      out.collect(event) 
     } 
    ) 
    } 


    } 

} 

基本的にopen機能は、作業者ごとに一度だけ実行されるとflatmapは、レコードごとにそれを実行します。例はMongoのためのものですが、私はあなたのパイプラインの最初のステップではなく、あなたが同様にあなた自身のRichSourceFunction

を書くべきRichFlatMapFunctionを書くよりも、カサンドラからデータを読み取って理解するのと同様にあなたのケースではカサンドラ

+0

ありがとう@Gauravあなたはスカラの代わりにjavaで同様の例を教えてもらえますか? –

+0

http://stackoverflow.com/questions/34224423/apache-flink-executing-a-program-which-extends-the-richflatmapfunction-on-the-r –

+0

ありがとうございます@Gaurav –

0

のために使用することができますあなたはWikipediaEditsSourceの簡単な実装を見ることができます。

関連する問題