2017-08-09 6 views
0

Flinkトレーニングを進めようとする/ exersizes。 issueに続いていた。
Elasticsearchを再インストールしました。バージョンは2.4.6です。
弾性探査が機能しているようです。 http://localhost:9200でアクセスできます。
cURLコマンドを使用して、練習の指示に従ってindexおよびtype mappingを作成しました。
Kibana:4.6.5をインストールしました.KibanaはElasticsearchに接続し、私が作成したindexを解決することができます。 、再びFlink 1.3.1、Elasticsearch 2.4.6、RuntimeException:ElasticsearchクライアントがElasticsearchノードに接続されていません。

Caused by: java.lang.RuntimeException: Elasticsearch client is not connected to any Elasticsearch nodes! 
    at org.apache.flink.streaming.connectors.elasticsearch2.Elasticsearch2ApiCallBridge.createClient(Elasticsearch2ApiCallBridge.java:72) 
    at org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.open(ElasticsearchSinkBase.java:272) 
    at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36) 
    at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:111) 
    at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:376) 
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253) 
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702) 
    at java.lang.Thread.run(Thread.java:748) 

今、私はカフカのトピックからデータを読み出し、Elasticsearchにデータを取り込もうとし、そして、それが次のエラーでSinkの作成に失敗したFlink仕事を、実行してみてください

libraryDependencies ++= Seq(
    "org.apache.flink" %% "flink-scala" % "1.3.1" % "provided", 
    "org.apache.flink" %% "flink-streaming-scala" % "1.3.1" % "provided", 
    "org.apache.flink" %% "flink-clients" % "1.3.1" % "provided", 
    "joda-time" % "joda-time" % "2.9.9", 
    "com.google.guava" % "guava" % "22.0", 
    "com.typesafe" % "config" % "1.3.0", 
    "org.apache.flink" % "flink-connector-kafka-0.10_2.10" % "1.3.1", 
    "org.apache.flink" % "flink-connector-elasticsearch2_2.10" % "1.3.1" 
) 

輸入:

import org.apache.flink.streaming.connectors.elasticsearch.{ElasticsearchSinkFunction, RequestIndexer} 
import org.apache.flink.streaming.connectors.elasticsearch2.ElasticsearchSink 
0ここに私の built.sbt設定があります

ビルド機能とSinkを返します。
合格パラメーター("localhost", 9300, "elasticsearch")。ホストには"127.0.0.1"を渡そうとしましたが、同じエラーがありました。

... 
... 
[2017-08-08 20:12:24,203][INFO ][node      ] [Beautiful Dreamer] stopping ... 
[2017-08-08 20:12:27,063][INFO ][node      ] [Beautiful Dreamer] stopped 
[2017-08-08 20:12:27,066][INFO ][node      ] [Beautiful Dreamer] closing ... 
[2017-08-08 20:12:28,104][INFO ][node      ] [Beautiful Dreamer] closed 
[2017-08-08 20:21:58,212][INFO ][node      ] [Aries] version[2.4.6], pid[1502], build[5376dca/2017-07-18T12:17:44Z] 
[2017-08-08 20:21:58,264][INFO ][node      ] [Aries] initializing ... 
[2017-08-08 20:22:00,569][INFO ][plugins     ] [Aries] modules [reindex, lang-expression, lang-groovy], plugins [], sites [] 
[2017-08-08 20:22:00,857][INFO ][env      ] [Aries] using [1] data paths, mounts [[/ (rootfs)]], net usable_space [1.6gb], net total_space [45gb], spins? [unknown], types [rootfs] 
[2017-08-08 20:22:00,857][INFO ][env      ] [Aries] heap size [1007.3mb], compressed ordinary object pointers [true] 
[2017-08-08 20:22:07,759][INFO ][node      ] [Aries] initialized 
[2017-08-08 20:22:07,759][INFO ][node      ] [Aries] starting ... 
[2017-08-08 20:22:08,179][INFO ][transport    ] [Aries] publish_address {127.0.0.1:9300}, bound_addresses {[::1]:9300}, {127.0.0.1:9300} 
[2017-08-08 20:22:08,187][INFO ][discovery    ] [Aries] elasticsearch/_BFBmx-XR5aHFhbN9nvX2g 
[2017-08-08 20:22:11,796][INFO ][cluster.service   ] [Aries] new_master {Aries}{_BFBmx-XR5aHFhbN9nvX2g}{127.0.0.1}{127.0.0.1:9300}, reason: zen-disco-join(elected_as_master, [0] joins received) 
[2017-08-08 20:22:12,415][INFO ][http      ] [Aries] publish_address {127.0.0.1:9200}, bound_addresses {[::1]:9200}, {127.0.0.1:9200} 
[2017-08-08 20:22:12,415][INFO ][node      ] [Aries] started 
[2017-08-08 20:22:12,654][INFO ][gateway     ] [Aries] recovered [2] indices into cluster_state 
[2017-08-08 20:22:14,526][INFO ][cluster.routing.allocation] [Aries] Cluster health status changed from [RED] to [YELLOW] (reason: [shards started [[nyc-places][1], [nyc-places][0]] ...]). .. 
... 
... 

が使用されているソフトウェアのバージョンに戻って、それはトレーニングサイトで使用されるバージョンと正確に一致しない場合があります。

private def getSink(host: String, port: Int, cluster: String) = { 

    val config = Map(
     // This instructs the sink to emit after every element, otherwise they would be buffered 
     "bulk.flush.max.actions" -> "1", 
     // default cluster name 
     "cluster.name" -> cluster 
    ) 

    val jConfig: java.util.Map[String, String] = new java.util.HashMap() 
    jConfig.putAll(config.asJava) 

    println(s"jConfig: ${jConfig.toString()}") 

    val transports = List(new InetSocketAddress(InetAddress.getByName(host), port)) 
    val jTransports = new util.ArrayList(transports.asJava) 

    println(s"jTransports: ${jTransports.toString()}") 

    val esSink = new ElasticsearchSink(jConfig, jTransports, 
     new MyElasticsearchInserter("nyc-idx", "popular-locations")) 

    esSink 
    } 

は、ここでそれは状態だ示すためにElasticsearchログからの追加情報です。
鉱山はFlink:1.3.1、およびElasticsearch:2.4.6です。

答えて

0

Elasticsearchインストールを2.3.5バージョンにダウングレードすることで対応しました。
それは正確にコネクタ次Elasticsearchversion依存関係を一致させるために行われた があります

"org.apache.flink" % "flink-connector-elasticsearch2_2.10" % "1.3.1" 
関連する問題