2017-08-31 17 views
0

私はScala 2.11.7とFlink 1.3.2でデータを処理しています。今度は結果のorg.apache.flink.api.scala.DataSetをneo4jグラフデータベースに保存したいと思います。neo4j with Flink and Scala

の互換性のためのGitHubのプロジェクトがあります。

  • のNeo4jとFLINK:_HTTPS://github.com/FaKod/neo4j-scala
  • FLINKのグラフライブラリ「ゼリーhttps://github.com/s1ck/flink-neo4j
  • スカラ座のNeo4jとは"とneo4j:_https://github.com/albertodelazzari/gelly-neo4j

最も有望な方法は何ですか?または、neo4jのREST APIを直接使用する方がよいでしょうか?

(ところで:なぜstackoverflowの...リンクpostetの数を制限しない)

私はFLINK-のNeo4jを試してみましたが、JavaとScalaのクラスを混合しながら、いくつかの問題があるようです:

package dummy.neo4j 

import org.apache.flink.api.common.io.OutputFormat 
import org.apache.flink.api.java.io.neo4j.Neo4jOutputFormat 
import org.apache.flink.api.java.tuple.{Tuple, Tuple2} 
import org.apache.flink.api.scala._ 

object Neo4jDummyWriter { 

    def main(args: Array[String]) { 
    val env = ExecutionEnvironment.getExecutionEnvironment 

    val outputFormat: OutputFormat[_ <: Tuple] = Neo4jOutputFormat.buildNeo4jOutputFormat.setRestURI("http://localhost:7474/db/data/") 
    .setConnectTimeout(1000).setReadTimeout(1000).setCypherQuery("UNWIND {inserts} AS i CREATE (a:User {name:i.name, born:i.born})") 
    .addParameterKey(0, "name").addParameterKey(1, "born").setTaskBatchSize(1000).finish 

    val tuple1: Tuple = new Tuple2("abc", 1) 
    val tuple2: Tuple = new Tuple2("def", 2) 

    val test = env.fromElements[Tuple](tuple1, tuple2) 
    println("test: " + test.getClass) 
    test.output(outputFormat) 
    } 

} 
は、

スレッド "main"の例外java.lang.ClassCastException:[Ljava.lang.Object; [Lorg.apache.flink.api.common.typeinfo.TypeInformation;にキャストすることはできません。 dummy.neo4j.Neo4jDummyWriter.mainで(Neo4jDummyWriter.scala)

型の不一致、予想:OUTPUTFORMAT:dummy.neo4j.Neo4jDummyWriter $ .main(20 Neo4jDummyWriter.scala)で [タプル]、実:OUTPUTFORMAT [_ <:タプル]

答えて

0

が溶液タプルにTuple2オブジェクトを変更することはない。

package dummy.neo4j 

import org.apache.flink.api.common.io._ 
import org.apache.flink.api.java.io.neo4j.Neo4jOutputFormat 
import org.apache.flink.api.java.tuple.Tuple2 
import org.apache.flink.api.scala._ 

object Neo4jDummyWriter { 

    def main(args: Array[String]) { 
    val env = ExecutionEnvironment.getExecutionEnvironment 

    val tuple1 = ("user9", 1978) 
    val tuple2 = ("user10", 1996) 
    val datasetWithScalaTuples = env.fromElements(tuple1, tuple2) 
    val dataset: DataSet[Tuple2[String, Int]] = datasetWithScalaTuples.map(tuple => new Tuple2(tuple._1, tuple._2)) 

    val outputFormat = Neo4jOutputFormat.buildNeo4jOutputFormat.setRestURI("http://localhost:7474/db/data/").setUsername("neo4j").setPassword("...") 
    .setConnectTimeout(1000).setReadTimeout(1000).setCypherQuery("UNWIND {inserts} AS i CREATE (a:User {name:i.name, born:i.born})") 
    .addParameterKey(0, "name").addParameterKey(1, "born").setTaskBatchSize(1000).finish.asInstanceOf[OutputFormat[Tuple2[String, Int]]] 

    dataset.output(outputFormat) 
    env.execute 
    } 

}