2017-08-09 10 views
0

私の目標は、kafkaを使用してjson形式の文字列を読み込み、文字列にフィルターをかけて、の部分を選択してメッセージをシンクしてメッセージを出力します(まだjsonの文字列形式です)。テストの目的のためにjson形式の文字列の一部をFlinkでKafkaから抽出する方法012

、私の入力文字列メッセージは次のようになります。

{"a":1,"b":2,"c":"3"} 

と実装の私のコードです:

def main(args: Array[String]): Unit = { 

val inputProperties = new Properties() 
inputProperties.setProperty("bootstrap.servers", "localhost:9092") 
inputProperties.setProperty("group.id", "myTest2") 
val inputTopic = "test" 

val outputProperties = new Properties() 
outputProperties.setProperty("bootstrap.servers", "localhost:9092") 
val outputTopic = "test2" 


val env = StreamExecutionEnvironment.getExecutionEnvironment 
env.getConfig.disableSysoutLogging 
env.getConfig.setRestartStrategy(RestartStrategies.fixedDelayRestart(4, 10000)) 
// create a checkpoint every 5 seconds 
env.enableCheckpointing(5000) 

// create a Kafka streaming source consumer for Kafka 0.10.x 
val kafkaConsumer = new FlinkKafkaConsumer010(
    inputTopic, 
    new JSONDeserializationSchema(), 
    inputProperties) 

val messageStream : DataStream[ObjectNode]= env 
    .addSource(kafkaConsumer).rebalance 

val filteredStream: DataStream[ObjectNode] = messageStream.filter(node => node.get("a") 
    .asText.equals("1") && node.get("b").asText.equals("2")) 

// Need help in this part, how to extract for instance a,c and 
// get something like {"a":"1", "c":"3"}? 
val testStream:DataStream[JsonNode] = filteredStream.map(
    node => { 
    node.get("a") 
    } 
) 

testStream.addSink(new FlinkKafkaProducer010[JsonNode](
    outputTopic, 
    new SerializationSchema[JsonNode] { 
    override def serialize(element: JsonNode): Array[Byte] = element.toString.getBytes() 
    }, outputProperties 
)) 

env.execute("Kafka 0.10 Example") 
} 

このコードのコメントで示されているように、私はどのように確認していませんメッセージの一部を適切に選択する。私はマップを使用していますが、メッセージ全体をどのように連結するかはわかりません。例えば、私がコードで行ったことは、「1」という結果しか得られないが、私が望むのは{"a": "c": "3"}

この問題を解決する別の方法。私はFlinkで見つけることができませんが、物事は、 "選択" APIがありますスパークストリーミングではありません。

そして、フリンクコミュニティの助けてくれてありがとう!これは私がこの小さなプロジェクトで達成したい最後の機能です。

答えて

1

Flinkストリーミングジョブは、各入力を1回処理して次のタスクに出力するか、外部ストレージに保存します。

1つの方法は、すべての出力をHDFSのような外部ストレージに保存することです。ストリーミングジョブが完了した後、バッチジョブを使用してJSONに結合します。

もう1つの方法は、stateとRichMapFunctionを使用して、すべてのキー値を含むJSONを取得することです。

stream.map(new MapFunction<String, Tuple2<String, String>>() { 
    public Tuple2<String, String> map(String value) throws Exception { 
     return new Tuple2<String, String>("mock", value); 
    } 
}).keyBy(0).map(new RichMapFunction<Tuple2<String,String>, String>() { 
    @Override 
    public String map(Tuple2<String, String> value) throws Exception { 
     ValueState<String> old = getRuntimeContext().getState(new ValueStateDescriptor<String>("test", String.class)); 
     String newVal = old.value(); 
     if (newVal != null) makeJSON(newVal, value.f1); 
     else newVal = value.f1; 
     old.update(newVal); 
     return newVal; 
    } 
}).print(); 

このマップ関数を使用します。filteredStream.map(function);

状態を使用すると、 {"a":1}、{"a":1、 "c":3}のような出力が表示されます。 最後の出力は必要なものでなければなりません。

+0

ありがとうございます! makeJSONはFlinkの組み込み関数ですか?それとも、自分で関数を書いてそこに置く必要があるのですか? – teddy

+0

@teddyいいえ、Flinkにはこのようなメソッドは含まれていませんが、説明のための擬似コードです。あなたはそれを実装することができます。 )多くのコードは必要ありません; – David

+0

Keyed状態は 'keyed stream'、つまりこの行の 'keyBy()'操作の後でのみ使用できるというエラーが発生しました(ValueState state = getRuntimeContext()。 – teddy

関連する問題