Issue
This should be a simple one for the angular pros out there. I have this simple method:
private addMultiple(products: Product[]): void {
let observables: Observable<Product>[] = [];
products.forEach((product: Product) => observables.push(this.productService.create(product)));
forkJoin(observables).subscribe();
}
If I send one product to the method, it works fine, but if I try multiple (100) I just get loads of cancelled requests and I am not sure why.
Does anyone know if I should be using a different method?
This is my productService:
import { Inject, Injectable } from '@angular/core';
import { HttpClient } from '@angular/common/http';
import { finalize, map } from 'rxjs/operators';
import { BehaviorSubject, Observable } from 'rxjs';
import { Product, Attempt, RequestOptions } from '../models';
import { HttpServiceConfig, HTTP_SERVICE_CONFIG } from '../configs';
@Injectable({
providedIn: 'root',
})
export class ProductService {
private endpoint: string = 'products';
public items: BehaviorSubject<Product[]>;
public loading: BehaviorSubject<boolean>;
constructor(
@Inject(HTTP_SERVICE_CONFIG) public config: HttpServiceConfig,
private httpClient: HttpClient
) {
this.items = new BehaviorSubject<Product[]>([]);
this.loading = new BehaviorSubject<boolean>(false);
}
list(categoryId: number, options?: RequestOptions): Observable<Product[]> {
this.loading.next(true);
return this.httpClient
.get<Attempt<Product[]>>(
`${this.config.apiUrl}/categories/${categoryId}/${this.endpoint}/master`,
options?.getRequestOptions()
)
.pipe(
map((response: Attempt<Product[]>) => {
if (response.failure) return response.result;
this.items.next(response.result);
return response.result;
}),
finalize(() => this.loading.next(false))
);
}
get(id: number, slug: string, options?: RequestOptions): Observable<Product> {
return this.httpClient
.get<Attempt<Product>>(
`${this.config.apiUrl}/${this.endpoint}/${id}?slug=${slug}`,
options?.getRequestOptions()
)
.pipe(
map((response: Attempt<Product>) => {
return response.result;
})
);
}
public create(item: Product, options?: RequestOptions): Observable<Product> {
return this.httpClient
.post<Attempt<Product>>(
`${this.config.apiUrl}/${this.endpoint}`,
item,
options?.getRequestOptions()
)
.pipe(
map((response: Attempt<Product>) => {
if (response.failure) return response.result;
const newItem = response.result;
const items = this.items.value;
items.push(newItem);
this.items.next(items);
return response.result;
})
);
}
public updateSpecification(
item: Product,
options?: RequestOptions
): Observable<Product> {
return this.httpClient
.put<Attempt<Product>>(
`${this.config.apiUrl}/${this.endpoint}/specification`,
item,
options?.getRequestOptions()
)
.pipe(
map((response: Attempt<Product>) => {
if (response.failure) return response.result;
const newItem = response.result;
const items = this.items.value;
this.remove(items, newItem.id);
items.push(newItem);
this.items.next(items);
return response.result;
})
);
}
public approve(item: Product, options?: RequestOptions): Observable<Product> {
return this.httpClient
.put<Attempt<Product>>(
`${this.config.apiUrl}/${this.endpoint}/approve`,
item,
options?.getRequestOptions()
)
.pipe(
map((response: Attempt<Product>) => {
if (response.failure) return response.result;
const newItem = response.result;
const items = this.items.value;
this.remove(items, newItem.id);
items.push(newItem);
this.items.next(items);
return response.result;
})
);
}
public reject(item: Product, options?: RequestOptions): Observable<Product> {
return this.httpClient
.put<Attempt<Product>>(
`${this.config.apiUrl}/${this.endpoint}/reject`,
item,
options?.getRequestOptions()
)
.pipe(
map((response: Attempt<Product>) => {
if (response.failure) return response.result;
const newItem = response.result;
const items = this.items.value;
this.remove(items, newItem.id);
items.push(newItem);
this.items.next(items);
return response.result;
})
);
}
private remove(items: Product[], id: number | string) {
items.forEach((item, i) => {
if (item.id !== id) {
return;
}
items.splice(i, 1);
});
}
}
Solution
It may be that the server cancels requests if it gets hit by too many requests at almost the same time, which is what happens using forkJoin
with a hundred of Observables.
forkJoin
starts the "execution" of all the Observables in parallel.
If you want to control the number of parallel executions, you should try mergeMap
with the concurrency
parameter set. This approach can be a bit more complex if you want to match the request with the response.
The code could look something like this
// set the number of requests you want to fly concurrently
const concurrency = 10;
// use from to generate an Observable from an array of Products
from(products).pipe(
// use mergeMap to execute the request
mergeMap(product => this.productService.create(product).pipe(
// return an object which contains the result and the product which started the request
map(result => ({result, product}))
), concurrency), // set the concurrency in mergeMap
// toArray transform the stream into an Array if this is what you want
toArray()
).subscribe(
res => {// do something with res, which is an array of objects containing objects with result and product}
)
Answered By - Picci
0 comments:
Post a Comment
Note: Only a member of this blog may post a comment.