2017-12-15 12 views
0

オブジェクト私カフカJSONにJSON配列の変換のためのストリームは、以下の...私はJSONオブジェクトにJSON配列を変換するための通常のJavaのコードを持っていると私はカフカに、この通常のJavaストリームを変換する必要があり

import java.io.*; 
import java.util.*; 
import java.lang.*; 
import org.json.simple.JSONArray; 
import org.json.simple.JSONObject; 
import org.json.simple.parser.JSONParser; 
import org.json.simple.parser.ParseException; 
public class JsonParseTest { 
    public static void main(String[] args) { 
     try { 
      JSONParser parser = new JSONParser(); 
      JSONArray jsonArray = (JSONArray) parser.parse(new FileReader("/root/jsonTestFile.json")); 
      for (Object o : jsonArray) { 
//to get the Json object 
       JSONObject snap = (JSONObject) o; 
       System.out.println(snap); 
      } 
} 
catch (FileNotFoundException ex) { 
      ex.printStackTrace(); 
     } catch (IOException ex) { 
      ex.printStackTrace(); 
     } catch (ParseException ex) { 
      ex.printStackTrace(); 
     } catch (NullPointerException ex) { 
      ex.printStackTrace(); 
     } 
    } 
} 
です誰かが私はそれとconinueことができる唯一のロジック部分のコードを書くことで私を助けている場合

は、以下の私のロジック部である私はカフカの同じコードストリームを書くことができますどのようにこの

public class JsonParseTest { 
    public static void main(String[] args) { 
     try { 
      JSONParser parser = new JSONParser(); 
      JSONArray jsonArray = (JSONArray) parser.parse(new FileReader("/root/jsonTestFile.json")); 
      for (Object o : jsonArray) { 
//to get the Json object 
       JSONObject snap = (JSONObject) o; 
       System.out.println(snap); 
      } 
} 

のために助けを必要と少なくとも?誰かがこれを行うのを助けることができますか?

+0

シリアライザとデシリアライザを実装する必要があります。この[リンク](https://docs.confluent.io/3.0.0/streams/developer-guide.html#data-types-and-serialization)が役立ちます。 – Onkar

答えて

0

私は2つのオプションをお勧めします:あなたは(文字列、文字列)のストリームとしてあなたのストリームを処理し、解析するflatMapを使用することができます(Here is the javadoc)あなたがあなた自身のシリアライザ/デシリアライザを実装

    • JSONParser parser = new JSONParser(); 
      stringStream.flatMap((k,v) -> { 
          List<KeyValue<String,JSONOBject>> tmp = new ArrayList<KeyValue<String,JSONOBject>>(); 
          JSONArray jsonArray = (JSONArray) parser.parse(v); 
          for (Object o : jsonArray) { 
           JSONObject snap = (JSONObject) o; 
           tmp.add(new KeyValue(k, snap)); 
          }  
          return tmp; 
      }); 
      

    彼女:あなたのストリームの各要素と(文字列、JSONObject)のストリームに変換しますeラムダのコードをtry/catchにラップしなければならないという例外を除いて、私は全く扱っていませんでした。ヘルプ ため

  • +0

    ねえ、あなたは私の答えを投稿して私の疑問をクリアして上記のコードをチェックすることができます –

    0

    おかげで今のコードは、それが投げているエラーがJSONParserの引数であるので、多少の誤差

    KStreamBuilder builder = new KStreamBuilder(); 
         KStream<String, String> mstream = builder.stream(Serdes.String(), Serdes.String(), "intopic"); 
         KStream<String, JSONPObject>msstream = mstream.flatMap((k,v) -> { 
          List<KeyValue<String, JSONPObject>> tmp = new ArrayList<KeyValue<String, JSONPObject>>(); 
          JSONParser parser= new JSONParser() ; 
          JsonNode jsonNode = (JsonNode) parser.parse(v); 
          for (Object o : jsonNode){ 
           JSONPObject snap = (JSONPObject) o; 
           tmp.add(new KeyValue(k,snap)); 
          } 
          return tmp; 
         }); 
    

    を示している、それ以下のようになります。それは、ソース、グローバル、ブール値を含むように私に求めています...そしてもう一つのエラーはparser.parse(v)です。ここにそれはvが何らかのエラーを持っていることを示しています.... nは正しいコードですか?

    関連する問題