2017-01-26 6 views
3

今日は面白い問題に遭遇しました。私はファイルをアップロードしているアプリケーションに取り組んでおり、プログレスバーを実装したいと思っています。このアプリは、React/Redux/Redux-Observableを使って書かれています。アップロードの進捗状況に応じてアクションをディスパッチしたい私はそれを実装するために何をしたのですか:rxjsのfromSubscriberの実装

withProgress(method, url, body = {}, headers = {}) { 
    const progressSubscriber = Subscriber.create(); 

    return { 
     Subscriber: progressSubscriber, 
     Request: this.ajax({ url, method, body, headers, progressSubscriber }), 
    }; 
} 

私はすべての私のAjax要求を行うために使用するクラスを持っています。 this.ajaxは、パラメータが渡されたObservable.ajaxを呼び出します。

export const blobStorageUploadEpic = (action$) => { 
    return action$.ofType(a.BLOB_STORAGE_UPLOAD) 
    .mergeMap(({ payload }) => { 
     const { url, valetKey, blobId, blobData, contentType } = payload; 

     const { Subscriber, Request } = RxAjax.withProgress('PUT', `${url}?${valetKey}`, blobData, { 
      'x-ms-blob-type': 'BlockBlob', 
      'Content-Type': contentType, 
     }); 

     const requestObservable = Request 
     .map(() => ({ type: a.BLOB_STORAGE_UPLOAD_SUCCESS, payload: { blobId } })) 
     .catch((err) => Observable.of({ type: a.BLOB_STORAGE_UPLOAD_FAILURE, err })); 

     return Observable.fromSubscriber(Subscriber) 
     .map((e) => ({ percentage: (e.loaded/e.total) * 100 })) 
     .map((percentage) => ({ type: a.BLOB_STORAGE_UPLOAD_PROGRESS, payload: { percentage} })) 
     .merge(requestObservable); 
    }); 
}; 

これは私の叙事詩です。私は加入者を取り戻すと、私はObservableのカスタム静的メソッドを書いて加入者を受け取りました。私はRequest(それはObservableです)とそれをマージします。

Observable.fromSubscriber = function fromSubscriber(externalSubscriber) { 
    return Observable.create((subscriber) => { 
     externalSubscriber.next =  (val) => subscriber.next(val); 
     externalSubscriber.error = (err) => subscriber.error(err); 
     externalSubscriber.complete =() => subscriber.complete(); 
    }); 
}; 

最後に、ここで私はObservableに書いたカスタムの静的メソッドです。私は2つの理由でこれを書いた。 1.同様の問題を扱っている他の誰かのための例として(私はSubscriberからObservableを作る方法を見つけ出そうと多くの時間を費やしました)そして、これが達成する最善の方法であるかどうか尋ねるこの目標。 rxjsは深く、私はこれを行う既存の方法があると思いますが、私はそれを見つけることができませんでした。

答えて

6

Subjectは、以下も同様に動作するはずです、何のためにあるのか、本質的である:

ここ
export const blobStorageUploadEpic = (action$) => { 
    return action$.ofType(a.BLOB_STORAGE_UPLOAD) 
    .mergeMap(({ payload }) => { 
     const { url, valetKey, blobId, blobData, contentType } = payload; 

     const progressSubscriber = new Rx.Subject(); 
     const request = Rx.Observable.ajax({ 
      method: 'PUT', 
      url: `${url}?${valetKey}`, 
      body: blobData, 
      headers: { 
       'x-ms-blob-type': 'BlockBlob', 
       'Content-Type': contentType, 
      }, 
      progressSubscriber 
     }); 

     const requestObservable = request 
      .map(() => ({ type: a.BLOB_STORAGE_UPLOAD_SUCCESS, payload: { blobId } })) 
      .catch((err) => Observable.of({ type: a.BLOB_STORAGE_UPLOAD_FAILURE, err })); 

     return progressSubscriber 
      .map((e) => ({ percentage: (e.loaded/e.total) * 100 })) 
      .map((percentage) => ({ type: a.BLOB_STORAGE_UPLOAD_PROGRESS, payload: { percentage} })) 
      .merge(requestObservable); 
    }); 
}; 

は(@jsfiddleを生きる)より一般的な例です。

let data = ""; 
for (let c = 0; c < 100000; ++c) { 
    data += "" + Math.random(); 
} 

const progressSubscriber = new Rx.Subject(); 
const request = Rx.Observable.ajax({ 
    method: 'POST', 
    url: "/echo/json/", 
    body: JSON.stringify({ data }), 
    progressSubscriber 
}); 

progressSubscriber 
    .merge(request) 
    .subscribe(console.log);