Issue
I would like to cache and expire the HTTP responses from e.g. a GET request with certain params.
An example of a use case:
Suppose I build a service like this:
@Injectable()
export class ProductService {
constructor(private http: HttpClient) {}
getProduct$(id: string): Observable<Product> {
return this.http.get<Product>(`${API_URL}/${id}`);
}
getProductA$(id: string): Observable<ProductA> {
return this.getProduct$(id).pipe(
// bunch of complicated operations here
};
}
getProductB$(id: string): Observable<ProductB> {
return this.getProduct$(id).pipe(
// bunch of other complicated operations here
};
}
}
Now for whatever reason, function A is called in component A and function B is called in component B. I know this could be done in another way (such as a top-level smart component getting the HTTP data and passing it through input params), but for whatever reason, the two components are "smart" and they each call a service function.
Both components are loaded on the same page, so two subscriptions happen = two HTTP requests to the same endpoint - even though we know that the result will most likely be the same.
I want to simply cache the response of getProduct$
, BUT I also want this cache to expire pretty quickly, because in 2 minutes Margareth from product management is going to change the product price.
What I tried but doesn't work:
Basically, I tried keeping a dictionary of hot observables using shareReplay with a window time of 5s. My assumption was that if the (source) observable completes or the number of subscriptions is 0, then the next subscriber would simply refire the observable, but this does not seem to be the case.
private product$: { [productId: string]: Observable<Product> } = {};
getProduct$(id: string): Observable<Product> {
if (this.product$[id]) {
return this.product$[id];
}
this.product$[id] = this.http.get<Product>(`${API_URL}/${id}`)
.pipe(
shareReplay(1, 5000), // expire after 5s
)
);
return this.product$[id];
}
I thought, I can try to remove the observable from my dictionary on completion using finalize or finally, but unfortunately those are also called on every unsubscribe.
So the solution must be perhaps more complicated.
Any suggestions?
Solution
EDIT July 2022: Since the original solution worked only on older RxJS versions and was basically based on a bug in RxJS here's the same functionality for RxJS 7.0+:
import { of, defer, share, delay, tap, timestamp, map, Observable } from 'rxjs';
let counter = 1;
const mockHttpRequest = () =>
defer(() => {
console.log('Making request...');
return of(`Response ${counter++}`).pipe(delay(100));
});
const createCachedSource = (
makeRequest: () => Observable<any>,
windowTime: number
) => {
let cache;
return new Observable((obs) => {
const isFresh = cache?.timestamp + windowTime > new Date().getTime();
// console.log(isFresh, cache);
if (isFresh) {
obs.next(cache.value);
obs.complete();
} else {
return makeRequest()
.pipe(
timestamp(),
tap((current) => (cache = current)),
map(({ value }) => value)
)
.subscribe(obs);
}
}).pipe(share());
};
const cached$ = createCachedSource(() => mockHttpRequest(), 1000);
// Triggers the 1st request.
cached$.subscribe(console.log);
cached$.subscribe(console.log);
setTimeout(() => cached$.subscribe(console.log), 50);
setTimeout(() => cached$.subscribe(console.log), 200);
// Triggers the 2nd request.
setTimeout(() => cached$.subscribe(console.log), 1500);
setTimeout(() => cached$.subscribe(console.log), 1900);
setTimeout(() => cached$.subscribe(console.log), 2400);
// Triggers the 3nd request.
setTimeout(() => cached$.subscribe(console.log), 3000);
Live demo: https://stackblitz.com/edit/rxjs-rudgkn?devtoolsheight=60&file=index.ts
ORIGINAL ANSWER: If I understand you correctly you want to cache responses based on their id
parameter so when I make two getProduct()
with different id
s I'll get two different uncached results.
I think the last variant is almost correct. You want it to unsubscribe from its parent so it can re-subscribe and refresh the cached value later.
The shareReplay
operator worked differently until RxJS 5.5 if I recall this correctly where shareReplay
has changed and it didn't re-subscribe to its source. It was later reimplemented in RxJS 6.4 where you can modify its behavior based on a config object passed to shareReplay
. Since you're using shareReplay(1, 5000)
it seems like you're using RxJS <6.4 so it's better to use publishReplay()
and refCount()
operators instead.
private cache: Observable<Product>[] = {}
getProduct$(id: string): Observable<Product> {
if (!this.cache[id]) {
this.cache[id] = this.http.get<Product>(`${API_URL}/${id}`).pipe(
publishReplay(1, 5000),
refCount(),
take(1),
);
}
return this.cache[id];
}
Notice that I also included take(1)
. That's because I want the chain to complete immediately after publishReplay
emits its buffer and before it subscribes to its source Observable. It's not necessary to subscribe to its source because we just want to use the cached value. After 5s the cached value is discarded and publishReplay
will subscribe to its source again.
I hope this all makes sense :).
Answered By - martin
0 comments:
Post a Comment
Note: Only a member of this blog may post a comment.