Issue
I'm currently trying to implement a simple offline/online sync mechanism using observables.
Basically, I have two observables:
- Connection observable: The first observable gives me the information whether or not there is an internet connections. It emits when the network state changes
- Data observable: The second observable has the data that needs to synced. It emits when there is new data to be synced
What I want to achieve is to combine the above observables so that:
- As long as the connection state is
false
, the combined observable shouldn't emit. In this case, the data observable should retain its state - As long as the connection state is
true
, the combined observable should emit every time there is data in the data observable - If the connection state switches from
false
totrue
, it should emit for every value on the data observable
A small example that currently uses filter and combineLatest can be found here: https://codesandbox.io/s/offline-sync-s5lv49?file=/src/index.js
Unfortunately, this doesn't behave as intended at all.
Is there any operator to achieve the required behavior? As an alternative, I could maybe poll the connection status of course and emit every X seconds. But ideally, I'd like a clean combination of Observables, though I'm a bit lost with what operator makes most sense.
To clear the idea up: I need to sync all data, not just the latest. So the data observable should buffer the data.
Solution
After also searching for the term "gate", I found the following stack overflow question and post: Conditional emission delays with rxjs
Basically, the answer is using delayWhen to achieve the desired result.
I've updated an example here: https://codesandbox.io/s/offline-sync-experiments-nimoox?file=/src/index.js:0-1357
The crucial part is:
const offlineOnlineSubject = new BehaviorSubject(false);
const dataSubject = new Subject();
const triggerFn = (_) => offlineOnlineSubject.pipe(filter((v) => v));
dataSubject.pipe(delayWhen(triggerFn)).subscribe((counter) => {
console.log("Syncing data", {
counter
});
syncedIndicator.innerHTML += `<li>${counter}</li>`;
});
Wrapped in a custom typescript operator:
import { MonoTypeOperatorFunction, Observable } from 'rxjs';
import { delayWhen, filter } from 'rxjs/operators';
export function gate<T>(gateTrigger: Observable<boolean>): MonoTypeOperatorFunction<T> {
const gateTriggerFn = () => gateTrigger.pipe(
filter((v) => v)
);
return (source: Observable<T | null | undefined>) => source.pipe(
delayWhen(gateTriggerFn)
);
}
It seems so far that this solution is doing what I intend it to do.
Answered By - Gernot R. Bauer
0 comments:
Post a Comment
Note: Only a member of this blog may post a comment.