project reactor
と協力しており、現在大きな問題があります。これは、(私たちのデータを公開する)私たちが生産する方法である:加入者のonnextには完全な商品が含まれていません
public Flux<String> getAllFlux() {
return Flux.<String>create(sink -> {
new Thread(){
public void run(){
Iterator<Cache.Entry<String, MyObject>> iterator = getAllIterator();
ObjectMapper mapper = new ObjectMapper();
while(iterator.hasNext()) {
try {
sink.next(mapper.writeValueAsString(iterator.next().getValue()));
} catch (IOException e) {
e.printStackTrace();
}
}
sink.complete();
}
} .start();
});
}
あなたは私たちがイテレータからデータを取っているし、JSON文字列としてそのイテレータの各項目を公開している見ることができるように。当社の加入者は次のことを行います。
flux.subscribe(new Subscriber<String>() {
private Subscription s;
int amount = 1; // the amount of received flux payload at a time
int onNextAmount;
String completeItem="";
ObjectMapper mapper = new ObjectMapper();
@Override
public void onSubscribe(Subscription s) {
System.out.println("subscribe");
this.s = s;
this.s.request(amount);
}
@Override
public void onNext(String item) {
MyObject myObject = null;
try {
System.out.println(item);
myObject = mapper.readValue(completeItem, MyObject.class);
System.out.println(myObject.toString());
} catch (IOException e) {
System.out.println(item);
System.out.println("failed: " + e.getLocalizedMessage());
}
onNextAmount++;
if (onNextAmount % amount == 0) {
this.s.request(amount);
}
}
@Override
public void onError(Throwable t) {
System.out.println(t.getLocalizedMessage())
}
@Override
public void onComplete() {
System.out.println("completed");
});
}
あなたは、我々は単に私たちが受け取る文字列の項目を印刷し、ジャクソンのラッパーを使用してオブジェクトにそれを解析している見ることができるように。
{"itemId": "someId", "itemDesc", "some description"}
が、文字列は、例えば次のように切断されたいくつかの項目について:
{"itemId": "some"
し、次の項目の後にそれを私たちが今だ問題は、私達の項目のほとんどのために、すべてが正常に動作していることです
"Id", "itemDesc", "some description"}
これらの切れ目のパターンはありません。それは完全にランダムであり、私たちがそのコードを実行するたびに異なっています。もちろん、私たちのjacksonは、その動作のエラーUnexpected end of Input
になっています。
このような動作を引き起こしているのは何ですか?どのように解決できますか?