2017-07-17 6 views
0

出力NiFiポートからScalaコードを使用してFlinkにデータを転送するには、いくつかの助けが必要です。Flink to NiFi connector

私は.addSource()に固執しています。追加のパラメータ([OUT])を要求しますが、私がそれらを提供するときにはエラーが発生します。 Scalaコードとエラーメッセージは次のとおりです。

package flinkTest 

import java.nio.charset.{Charset, StandardCharsets} 

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment 
import org.apache.flink.streaming.api.scala.DataStream 
import org.apache.flink.streaming.connectors.nifi.NiFiSource 
import org.apache.flink.streaming.api.functions.source.SourceFunction 
import org.apache.flink.streaming.connectors.nifi.NiFiDataPacket 

import org.apache.nifi.remote.client.{SiteToSiteClient, SiteToSiteClientConfig} 

object NifiFlow { 
    def main(): Unit = { 

    // get the execution environment 
    val env: StreamExecutionEnvironment = 
    StreamExecutionEnvironment.getExecutionEnvironment 

    // get input data by connecting to NiFi 
    val clientConfig: SiteToSiteClientConfig = new SiteToSiteClient.Builder() 
     .url("http://localhost:8080/nifi") 
     .portName("Data to flink") 
     .requestBatchCount(2) 
     .buildConfig() 

    val nifiSource: SourceFunction[NiFiDataPacket] = new NiFiSource(clientConfig) 

は、ここで[OUT]

なし[OUT]

Error:(28, 76) value nifiSource of type org.apache.flink.streaming.api.functions.source.SourceFunction[org.apache.flink.streaming.connectors.nifi.NiFiDataPacket] does not take type parameters. 
    val streamSource: DataStream[NiFiDataPacket] = env.addSource(nifiSource[NiFiDataPacket]).setParallelism(2) 

2)で作品

val streamSource: DataStream[NiFiDataPacket] = 
    env.addSource(nifiSource).setParallelism(2) 

といくつかのより多くのコード

val dataStream = streamSource.map(dataPacket => new String(dataPacket.getContent, StandardCharsets.UTF_8)) 

    dataStream.print() 

    env.execute() 
    } 
} 

1)です

Error:(28, 66) type mismatch; 
found : org.apache.flink.streaming.api.functions.source.SourceFunction[org.apache.flink.streaming.connectors.nifi.NiFiDataPacket] 
required: org.apache.flink.streaming.api.function.source.SourceFunction[?] 
    val streamSource: DataStream[NiFiDataPacket] = env.addSource(nifiSource).setParallelism(2) 

例はhereとし、Scalaに書き直しました。

私は何かアドバイスをいただきありがとうございます。

UPD2

package flinkTest 

import org.apache.nifi.remote.client.{SiteToSiteClient, SiteToSiteClientConfig} 
import org.apache.flink.streaming.api.scala._ 
import org.apache.flink.streaming.connectors.nifi._ 

object NifiFlow { 
    def main(): Unit = { 

    // get the execution environment 
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment 

    // get input data by connecting to NiFi 
    val clientConfig: SiteToSiteClientConfig = new SiteToSiteClient.Builder() 
     .url("http://localhost:8080/nifi") 
     .portName("Data to flink") 
     .requestBatchCount(2) 
     .buildConfig() 

    val nifiSource = new NiFiSource(clientConfig) 

    val streamSource: DataStream[String] = env 
     .addSource(nifiSource) 
     .map(x => x.getAttributes().toString) 

    env.execute() 
    } 
} 

ERROR

Connected to the target VM, address: '127.0.0.1:41218', transport: 'socket' 
Exception in thread "main" org.apache.flink.api.common.functions.InvalidTypesException: Interfaces and abstract classes are not valid types: interface org.apache.flink.streaming.connectors.nifi.NiFiDataPacket 
    at org.apache.flink.api.java.typeutils.TypeExtractor.privateGetForClass(TypeExtractor.java:871) 
    at org.apache.flink.api.java.typeutils.TypeExtractor.privateGetForClass(TypeExtractor.java:863) 
    at org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoWithTypeHierarchy(TypeExtractor.java:406) 
    at org.apache.flink.api.java.typeutils.TypeExtractor.privateCreateTypeInfo(TypeExtractor.java:197) 
    at org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfo(TypeExtractor.java:184) 
    at flinkTest.NifiFlow$.main(NiFiFlow.scala:23) 

答えて

1

Scalaの

org.apache.flink.streaming.api.scala.StreamExecutionEnvironment

の実行環境の特殊な実装がちょうどorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment

の代わりに使用があります
+0

役に立たなかったそれは何とか型クラスを変換することはできません。 エラー:(37、38)タイプの不一致。 が見つかりました:org.apache.flink.streaming.api.functions.source.SourceFunction [** org.apache.flink.streaming.connectors.nifi.NiFiDataPacket **] 必須:org.apache.flink.streaming.api。 function.source.SourceFunction [**?**] val streamSource = env.addSource(nifiSource) –

+0

変更されたコードと完全なエラーでフルファイルを添付できますか? – daggett

+0

は最初の投稿を更新しました。コードが長すぎてコメントに貼り付けることができません。 –

1

env.addSource(nifiSource)env.getJavaEnv.getConfig.disableClosureCleaner()おそらく

の設定でのみ動作し、このオープンソースプロジェクトでScalaのソースは、(FLINK-scala_2.11 ...瓶にあります)ビットを更新する必要があります。