Issue
RxJS 4:
I am trying to store some values and emit them after x seconds from the first value stored time (in other words, the first value received from websocket will toggle the timer to store the incoming values and then emit after x seconds). After values are emitted, the timer will stop and no values will be emitted. Until I received a new value from websocket, the cycle begins again.
The reason is that currently in my app, it will likely to emit a value every nanosecond (the websocket data) and it's kinda affecting the performance. I am thinking to store as many values I can receive for x second, then emitting those values together to do some batch calculations all at once.
I have tried this but it seems not to work as expected.
public testObs = new Observable<any>();
public bufferStarted = false;
private subject = new Subject<any>();
webSocket.onmessage = ((event: any) => {
this.subject.next(event.data);
if(!bufferStarted) {
bufferStarted = true;
//start the buffer now
const startInterval = Observable.timer();
//emit value after 1s and close buffer
const closingInterval = val => {
console.log(`Buffer is open! Emitting value after 1s`)
bufferStarted = false;
return Observable.interval(1000);
}
this.testObs = this.subject.bufferToggle(startInterval, closingInterval);
}
}
In component, I subscribe to testObs.subscribe((e) => ... )
. Example: A value is sent through the websocket, and it started the timer that opens the buffer for 1 second. Within 1 second, 50 more values are received from websocket. I was expecting 51 values (array length of 51) received in the component. However I received that the observable is undefined. Help is appreciated.
Solution
bufferTime
seems to be a good choice for you.
It will group the emitted items for a given period of time. However it will also emit an empty array when no items emits from the source, you can use filter
to omit the empty array though.
Here's an example:
// assume a web socket stream
ws$;
const notEmpty = arr => Boolean(arr.length);
const grouped$ = ws$.pipe(bufferTime(1000), filter(notEmpty));
grouped$.subscribe(group => {
// group is an array of item from ws$
// do something here
});
Answered By - t7yang
0 comments:
Post a Comment
Note: Only a member of this blog may post a comment.