0
私は現時点でJavaを学習しており、カフカストリームの小さなアプリケーションを構築してデータを取り込み、基本的に新しいストリームを作成しています。javaのjsonデータを操作する... java newbie
public class Main {
public static void main(final String[] args) throws Exception {
final Properties streamsConfiguration = new Properties();
streamsConfiguration.put("application.id", "streams-consumer-logs");
streamsConfiguration.put("client.id", "consumer-client");
streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
streamsConfiguration.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class);
streamsConfiguration.put("servers");
final Serde<String> stringSerde = Serdes.String();
final Serde<Long> longSerde = Serdes.Long();
System.out.println(streamsConfiguration);
final KStreamBuilder builder = new KStreamBuilder();
final KStream<String, String> textLines = builder.stream(stringSerde, stringSerde,"sdp_logs");
//textLines.print();
System.out.println("test");
KStream<String, String> wordCounts = textLines
.map((key, value) -> {
//how do I manipulate this value which is a json object?
Gson gson = new Gson();
String json = gson.toJson(value);
FilterLogs filterlogs = new FilterLogs(json);
String filterlogs = filterlogs.execute();
return KeyValue.pair(key, json);
});
//KStream<String, Long> streamWrite = wordCounts.toStream();
wordCounts.print();
final KafkaStreams streams = new KafkaStreams(builder, streamsConfiguration);
streams.cleanUp();
streams.start();
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
}
}
私は、このようになりますFilterLogs
クラスを持っている:
value
戻り
KStream<String, String> wordCounts = textLines.map((key, value) -> {}
次のようになります
public class FilterLogs {
private type? jsoonObj;
public FilterLogs (type? jsonObj) {
this.jsonObj = jsonObj;
}
public type? getResult() { return result; }
public void execute() {
//manipulation of jsonObj goes in here?
}
}
}
:
{
"tags": [
"dgtl"
],
"beat": {
"hostname": "host1",
"name": "host`",
"version": "5.4.1"
},
"input_type": "log",
"@timestamp": "2017-09-27T20:01:52.282Z",
"source": "/log/state-change.log",
"offset": 1725163,
"message": "message of the log is here"
"type": "log"
}
私の理解から私は関数型プログラミングのパラダイムから来ているので、私はvalue
を受け取り、私の場合はFilterLogs
クラスに渡してから、そのクラスは新しいvalue
を返さなければならないということです。
私は混乱していますが、クラスにはどのようなタイプを使用するのですか。実際にjsonを解析してどのように処理するのですか?
通常、javascriptではループの束を解析して欲しいことをすることができますが、Javaでこれを行う方法はあまりありません。
jsonオブジェクトをこのように戻したい場合はどうすればよいですか?
{
"tags": [
"dgtl"
],
"beat": {
"hostname": "host1"
},
"offset": 1725163,
"message": "message of the log is here"
}