2017-07-10 2 views
0

project reactorと協力しており、現在大きな問題があります。これは、(私たちのデータを公開する)私たちが生産する方法である:加入者のonnextには完全な商品が含まれていません

public Flux<String> getAllFlux() { 
    return Flux.<String>create(sink -> { 
     new Thread(){ 
      public void run(){ 
       Iterator<Cache.Entry<String, MyObject>> iterator = getAllIterator(); 
       ObjectMapper mapper = new ObjectMapper(); 

       while(iterator.hasNext()) { 
        try { 
         sink.next(mapper.writeValueAsString(iterator.next().getValue())); 
        } catch (IOException e) { 
         e.printStackTrace(); 
        } 
       } 

       sink.complete(); 
      } 
     } .start(); 
    }); 
} 

あなたは私たちがイテレータからデータを取っているし、JSON文字列としてそのイテレータの各項目を公開している見ることができるように。当社の加入者は次のことを行います。

flux.subscribe(new Subscriber<String>() { 
    private Subscription s; 

    int amount = 1; // the amount of received flux payload at a time 
    int onNextAmount; 

    String completeItem=""; 

    ObjectMapper mapper = new ObjectMapper(); 

    @Override 
    public void onSubscribe(Subscription s) { 
     System.out.println("subscribe"); 

     this.s = s; 
     this.s.request(amount); 
    } 

    @Override 
    public void onNext(String item) { 
     MyObject myObject = null; 

     try { 
      System.out.println(item); 

      myObject = mapper.readValue(completeItem, MyObject.class); 

      System.out.println(myObject.toString()); 
     } catch (IOException e) { 
      System.out.println(item); 
      System.out.println("failed: " + e.getLocalizedMessage()); 
     } 

     onNextAmount++; 

     if (onNextAmount % amount == 0) { 
      this.s.request(amount); 
     } 
    } 

    @Override 
    public void onError(Throwable t) { 
     System.out.println(t.getLocalizedMessage()) 
    } 

    @Override 
    public void onComplete() { 
     System.out.println("completed"); 
    }); 
} 

あなたは、我々は単に私たちが受け取る文字列の項目を印刷し、ジャクソンのラッパーを使用してオブジェクトにそれを解析している見ることができるように。

{"itemId": "someId", "itemDesc", "some description"} 

が、文字列は、例えば次のように切断されたいくつかの項目について:

{"itemId": "some" 

し、次の項目の後にそれを私たちが今だ問題は、私達の項目のほとんどのために、すべてが正常に動作していることです

"Id", "itemDesc", "some description"} 

これらの切れ目のパターンはありません。それは完全にランダムであり、私たちがそのコードを実行するたびに異なっています。もちろん、私たちのjacksonは、その動作のエラーUnexpected end of Inputになっています。

このような動作を引き起こしているのは何ですか?どのように解決できますか?

答えて

1

ソリューション:

ではなく、文字列のフラックス内のオブジェクトを送信:

public Flux<ItemIgnite> getAllFlux() { 
    return Flux.create(sink -> { 
     new Thread(){ 
      public void run(){ 
       Iterator<Cache.Entry<String, ItemIgnite>> iterator = getAllIterator(); 

       while(iterator.hasNext()) { 
        sink.next(iterator.next().getValue()); 
       } 
      } 
     } .start(); 
    }); 
} 

と、次のproducesタイプを使用します。

@RequestMapping(value="/allFlux", method=RequestMethod.GET, produces="application/stream+json") 

ここで重要なのはstream+jsonを使用することで、だけでなく、json

関連する問題