Issue
The following code works as expected (see output):
import { Observable, Subject } from "rxjs";
import { scan } from "rxjs/operators";
export type ColumnFilter = { field: string, value: string };
let sub$: Subject<ColumnFilter> = new Subject<ColumnFilter>();
let obs$: Observable<ColumnFilter[]> = new Observable<ColumnFilter[]>();
obs$ = sub$.pipe(
scan((acc: ColumnFilter[], curr) => {
const index = acc.findIndex(f => f.field === curr.field);
if (index === -1) {
acc.push({ field: curr.field, value: curr.value });
return acc;
}
curr.value === '' ?
acc.splice(index, 1) :
acc[index] = { field: curr.field, value: curr.value };
return acc;
}, [])
);
obs$.subscribe(
value => console.log(value)
);
sub$.next({ field: 'size', value: 'xl' })
sub$.next({ field: 'color', value: 'black' })
sub$.next({ field: 'color', value: '' })
// output:
// [ { field: 'size', value: 'xl' } ]
// [ { field: 'size', value: 'xl' }, { field: 'color', value: 'black' } ]
// [ { field: 'size', value: 'xl' } ]
Now if I add a second Subscription to the obs$
observable, the output changes: (Check the last value of the stream. It now contains { field: 'color', value: '' }
)
import { Observable, Subject } from "rxjs";
import { scan } from "rxjs/operators";
export type ColumnFilter = { field: string, value: string };
let sub$: Subject<ColumnFilter> = new Subject<ColumnFilter>();
let obs$: Observable<ColumnFilter[]> = new Observable<ColumnFilter[]>();
obs$ = sub$.pipe(
scan((acc: ColumnFilter[], curr) => {
const index = acc.findIndex(f => f.field === curr.field);
if (index === -1) {
acc.push({ field: curr.field, value: curr.value });
return acc;
}
curr.value === '' ?
acc.splice(index, 1) :
acc[index] = { field: curr.field, value: curr.value };
return acc;
}, [])
);
// vvv this was added
obs$.subscribe()
// ^^^ this was added
obs$.subscribe(
value => console.log(value)
);
sub$.next({ field: 'size', value: 'xl' })
sub$.next({ field: 'color', value: 'black' })
sub$.next({ field: 'color', value: '' })
// output:
// [ { field: 'size', value: 'xl' } ]
// [ { field: 'size', value: 'xl' }, { field: 'color', value: 'black' } ]
// [ { field: 'size', value: 'xl' }, { field: 'color', value: '' } ]
I don't understand why the second subscription is changing the outcome of the stream. I can only guess it has something to do with mutating the accumulator? Because if I use acc.filter
instead of acc.splice
, it works as expected.
Edit
Added a stackblitz: https://stackblitz.com/edit/rxjs-6-opeartors-uzojgw?file=index.ts
Solution
Here's what happens in your second sample code: When an event is triggered by sub$.next()
, the pipe is executed once for each subscribed observable. Since the acc: ColumnFilter[]
of the scan
operator is shared between these executions, the described side effects occur.
My suggestion is to use either the share
or shareReplay
operator so that the pipe is executed only once per event and the values emitted by the pipe are multicasted for the subscribers (= all subscribers get the same object reference).
obs$ = sub$.pipe(
scan((acc: ColumnFilter[], curr) => {
const index = acc.findIndex((f) => f.field === curr.field);
if (index === -1) {
acc.push({ field: curr.field, value: curr.value });
return acc;
}
curr.value === ''
? acc.splice(index, 1)
: (acc[index] = { field: curr.field, value: curr.value });
return acc;
}, []),
share() // you might use shareReplay(1) instead, e.g. if you have late subscribers
);
Answered By - kellermat
0 comments:
Post a Comment
Note: Only a member of this blog may post a comment.