2017-10-10 12 views
0

私は現時点でJavaを学習しており、カフカストリームの小さなアプリケーションを構築してデータを取り込み、基本的に新しいストリームを作成しています。javaのjsonデータを操作する... java newbie

public class Main { 

    public static void main(final String[] args) throws Exception { 

     final Properties streamsConfiguration = new Properties(); 

     streamsConfiguration.put("application.id", "streams-consumer-logs"); 
     streamsConfiguration.put("client.id", "consumer-client"); 
     streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); 
     streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); 
     streamsConfiguration.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class); 
     streamsConfiguration.put("servers"); 


     final Serde<String> stringSerde = Serdes.String(); 
     final Serde<Long> longSerde = Serdes.Long(); 

     System.out.println(streamsConfiguration); 


     final KStreamBuilder builder = new KStreamBuilder(); 
     final KStream<String, String> textLines = builder.stream(stringSerde, stringSerde,"sdp_logs"); 


     //textLines.print(); 
     System.out.println("test"); 



     KStream<String, String> wordCounts = textLines 
       .map((key, value) -> { 
        //how do I manipulate this value which is a json object? 
        Gson gson = new Gson(); 
        String json = gson.toJson(value); 

        FilterLogs filterlogs = new FilterLogs(json); 

        String filterlogs = filterlogs.execute(); 

        return KeyValue.pair(key, json); 
       }); 

     //KStream<String, Long> streamWrite = wordCounts.toStream(); 

     wordCounts.print(); 


     final KafkaStreams streams = new KafkaStreams(builder, streamsConfiguration); 

     streams.cleanUp(); 
     streams.start(); 


     Runtime.getRuntime().addShutdownHook(new Thread(streams::close)); 
    } 

} 

私は、このようになりますFilterLogsクラスを持っている:

は、私はこのようなメインクラスが持っている

私のJSONオブジェクトは、その value戻り KStream<String, String> wordCounts = textLines.map((key, value) -> {}次のようになります
public class FilterLogs { 

    private type? jsoonObj; 

    public FilterLogs (type? jsonObj) { 
     this.jsonObj = jsonObj; 
    } 


    public type? getResult() { return result; } 


    public void execute() { 

     //manipulation of jsonObj goes in here? 

     } 
    } 

} 

{ 
    "tags": [ 
    "dgtl" 
    ], 
    "beat": { 
    "hostname": "host1", 
    "name": "host`", 
    "version": "5.4.1" 
    }, 
    "input_type": "log", 
    "@timestamp": "2017-09-27T20:01:52.282Z", 
    "source": "/log/state-change.log", 
    "offset": 1725163, 
    "message": "message of the log is here" 
    "type": "log" 
} 

私の理解から私は関数型プログラミングのパラダイムから来ているので、私はvalueを受け取り、私の場合はFilterLogsクラスに渡してから、そのクラスは新しいvalueを返さなければならないということです。

私は混乱していますが、クラスにはどのようなタイプを使用するのですか。実際にjsonを解析してどのように処理するのですか?

通常、javascriptではループの束を解析して欲しいことをすることができますが、Javaでこれを行う方法はあまりありません。

jsonオブジェクトをこのように戻したい場合はどうすればよいですか?

{ 
    "tags": [ 
    "dgtl" 
    ], 
    "beat": { 
    "hostname": "host1" 
    }, 
    "offset": 1725163, 
    "message": "message of the log is here" 
} 

答えて

0

あなたは新しい

  1. JSON-Pをチェックアウトする必要があります(JSON処理&解析のために) - http://json-b.net/
  2. JSON-BのAPI(JSONバインディング用)、最近リリースされた - https://github.com/javaee/jsonp

あなたの用途に合わせて意図的に作成されました