大きなJSONファイルをダウンロードして解析する必要があります。 メモリ関連の問題を避けるために、私は1000個のjsonオブジェクトのバッチでカスタムコンバータの応答オブジェクトから入力ストリームを解析しています。カスタムレトロフィットコンバータから複数のアイテムをRxJavaCallAdapterFactoryに送信する方法
解析されたオブジェクトを呼び出し側の観測可能なものに戻すまで、すべてうまくいきます。
私のAPIメソッドは、このように呼ばれている:
Observable<MyResponseStream> typesObs = api.getTypes(request.method, request.options);
応答がカスタムコンバータによって処理され
public class MyResponseConverterFactory extends Converter.Factory {
public MyResponseConverterFactory() {
}
@Override
public Converter<ResponseBody, ?> responseBodyConverter(Type type, Annotation[] annotations, Retrofit retrofit) {
if (MyResponseStream.class.equals(type)) {
// We will process only response where the client wait for I2ctResponseStream
return MyResponseConverter.INSTANCE;
}
return null;
}
public static MyResponseConverterFactory create() {
return new MyResponseConverterFactory();
}
final static class MyResponseConverter implements Converter<ResponseBody, MyResponseStream> {
static final MyResponseConverter INSTANCE = new MyResponseConverter();
@Override
public MyResponseStream convert(ResponseBody responseBody) throws IOException {
return new MyResponseStream(responseBody.byteStream());
}
}
}
MyResponseStream
は、解析されたオブジェクトを取得するための
public class MyResponseStream extends MyResponse<ArrayList<JSONObject>> {
private final static int BATCH_SIZE = 1000;
public interface ObjectsStreamListener {
void onObjectsParsed(String parentKey, ArrayList<ObjectNode> items);
}
private ArrayList<ObjectNode> mItems;
private ObjectMapper mMapper;
private ObjectsStreamListener mListener;
private InputStream mInputStream;
public MyResponseStream(InputStream inputStream) {
super();
mInputStream = inputStream;
mItems = new ArrayList<>();
}
public void start(ObjectsStreamListener listener) {
mListener = listener;
if (mInputStream != null) {
parse();
}
}
private void parse() {
try {
mMapper = new ObjectMapper();
JsonParser parser = mMapper.getFactory().createParser(mInputStream);
String key;
JsonToken currentToken = parser.nextToken();
while (currentToken != null) {
parser.nextFieldName();
key = parser.getCurrentName();
if ("method".equals(key)) {
method = parser.nextTextValue();
} else if ("success".equals(key)) {
isSuccess = parser.nextIntValue(0) == 1;
Cs.e("isSuccess " + isSuccess);
} else if ("data".equals(key)) {
currentToken = parser.nextToken();
parseData(parser);
} else {
currentToken = parser.nextToken();
}
}
parser.close();
} catch (IOException e) {
e.printStackTrace();
}
}
private void parseData(JsonParser parser) throws IOException {
String currentKey;
ObjectNode node;
while (parser.nextToken() != null) {
// Consume FIELD_NAME token
parser.nextFieldName();
// Get parent key (ex groups)
currentKey = parser.getCurrentName();
while (parser.getCurrentToken() == JsonToken.START_ARRAY) {
while (parser.nextToken() == JsonToken.START_OBJECT) {
node = mMapper.readTree(parser);
mItems.add(node);
if (mItems.size() == BATCH_SIZE) {
if (mListener != null) {
mListener.onObjectsParsed(currentKey, mItems);
mItems.clear();
}
}
}
}
if (!mItems.isEmpty()) {
if (mListener != null) {
mListener.onObjectsParsed(currentKey, mItems);
mItems.clear();
}
}
}
}
}
のように見えます私はリスナーを登録しています
typesObs.map(responseStream -> {
responseStream.start(new MyResponseStream.ObjectsStreamListener() {
@Override public void onObjectsParsed(String parentKey, ArrayList<ObjectNode> items) {
Cs.e("parentKey " + parentKey + " items " + items);
}
});
return responseStream;
})
このアプローチはうまくいきますが、私はRxJavaを何らかの方法で観測できるわけではないので、良い解決策のようには見えません。
私の質問:コンバーターから観測可能な結果をonNext()
と呼び出す方法はありますか? 私はあなたが取ることができる
retrofit.getRxCallAdapterFactory().getCallerObservable().onNext(items)