2017-03-21 3 views
0

大きなJSONファイルをダウンロードして解析する必要があります。 メモリ関連の問題を避けるために、私は1000個のjsonオブジェクトのバッチでカスタムコンバータの応答オブジェクトから入力ストリームを解析しています。カスタムレトロフィットコンバータから複数のアイテムをRxJavaCallAdapterFactoryに送信する方法

解析されたオブジェクトを呼び出し側の観測可能なものに戻すまで、すべてうまくいきます。

私のAPIメソッドは、このように呼ばれている:

Observable<MyResponseStream> typesObs = api.getTypes(request.method, request.options); 

応答がカスタムコンバータによって処理され

public class MyResponseConverterFactory extends Converter.Factory { 

    public MyResponseConverterFactory() { 
    } 


    @Override 
    public Converter<ResponseBody, ?> responseBodyConverter(Type type, Annotation[] annotations, Retrofit retrofit) { 
     if (MyResponseStream.class.equals(type)) { 
      // We will process only response where the client wait for I2ctResponseStream 
      return MyResponseConverter.INSTANCE; 
     } 

     return null; 
    } 

    public static MyResponseConverterFactory create() { 
     return new MyResponseConverterFactory(); 
    } 

    final static class MyResponseConverter implements Converter<ResponseBody, MyResponseStream> { 
     static final MyResponseConverter INSTANCE = new MyResponseConverter(); 

     @Override 
     public MyResponseStream convert(ResponseBody responseBody) throws IOException { 
      return new MyResponseStream(responseBody.byteStream()); 
     } 
    } 
} 

MyResponseStreamは、解析されたオブジェクトを取得するための

public class MyResponseStream extends MyResponse<ArrayList<JSONObject>> { 

    private final static int BATCH_SIZE = 1000; 

    public interface ObjectsStreamListener { 
     void onObjectsParsed(String parentKey, ArrayList<ObjectNode> items); 
    } 

    private ArrayList<ObjectNode> mItems; 
    private ObjectMapper mMapper; 
    private ObjectsStreamListener mListener; 
    private InputStream mInputStream;     

    public MyResponseStream(InputStream inputStream) { 
     super(); 
     mInputStream = inputStream; 
     mItems = new ArrayList<>(); 
    } 

    public void start(ObjectsStreamListener listener) { 
     mListener = listener; 
     if (mInputStream != null) { 
      parse(); 
     } 
    } 

    private void parse() { 
     try { 
      mMapper = new ObjectMapper(); 
      JsonParser parser = mMapper.getFactory().createParser(mInputStream); 
      String key; 
      JsonToken currentToken = parser.nextToken(); 

      while (currentToken != null) { 
       parser.nextFieldName(); 
       key = parser.getCurrentName(); 

       if ("method".equals(key)) { 
        method = parser.nextTextValue(); 
       } else if ("success".equals(key)) { 
        isSuccess = parser.nextIntValue(0) == 1; 
        Cs.e("isSuccess " + isSuccess); 
       } else if ("data".equals(key)) { 
        currentToken = parser.nextToken(); 
        parseData(parser); 
       } else { 
        currentToken = parser.nextToken(); 
       } 
      } 
      parser.close(); 
     } catch (IOException e) { 
      e.printStackTrace(); 
     } 
    } 

    private void parseData(JsonParser parser) throws IOException { 
     String currentKey; 

     ObjectNode node; 
     while (parser.nextToken() != null) { 
      // Consume FIELD_NAME token 
      parser.nextFieldName(); 

      // Get parent key (ex groups) 
      currentKey = parser.getCurrentName(); 

      while (parser.getCurrentToken() == JsonToken.START_ARRAY) { 
       while (parser.nextToken() == JsonToken.START_OBJECT) { 
        node = mMapper.readTree(parser); 
        mItems.add(node); 
        if (mItems.size() == BATCH_SIZE) { 
         if (mListener != null) { 
          mListener.onObjectsParsed(currentKey, mItems); 
          mItems.clear(); 
         } 
        } 
       } 
      } 

      if (!mItems.isEmpty()) { 
       if (mListener != null) { 
        mListener.onObjectsParsed(currentKey, mItems); 
        mItems.clear(); 
       } 
      } 
     } 
    } 
} 

のように見えます私はリスナーを登録しています

typesObs.map(responseStream -> { 
      responseStream.start(new MyResponseStream.ObjectsStreamListener() { 
       @Override public void onObjectsParsed(String parentKey, ArrayList<ObjectNode> items) { 
        Cs.e("parentKey " + parentKey + " items " + items); 
       } 
      }); 
      return responseStream; 
     }) 

このアプローチはうまくいきますが、私はRxJavaを何らかの方法で観測できるわけではないので、良い解決策のようには見えません。

私の質問:コンバーターから観測可能な結果を​​onNext()と呼び出す方法はありますか? 私はあなたが取ることができる

retrofit.getRxCallAdapterFactory().getCallerObservable().onNext(items) 

答えて

1

一つのアプローチのようなもので

mListener.onObjectsParsed(currentKey, mItems); 

を交換したいの代わりに、コンバータのアダプタを作成することです。

このアプローチは、次の順序でやや機能する:

  1. チェックタイプは、デリゲートの作成カスタムアダプタを作成しますObservable<Response>
  2. 用のアダプタを取得するために使用retrofit.callAdapter(...)Observable<MyResponseStream>
  3. ある場合以前に取り出されたアダプタで観察可能な状態にして、操作を適用して戻り値のobservableにObservable<MyResponseStream>を作成します。
関連する問題