Skip to content

Commit

Permalink
feat(database): adds auditTime for queries
Browse files Browse the repository at this point in the history
Adds an auditTime(0) operator to the composed query observable so that
changes made to multiple subjects are emitted as a single query (emitted
in the event loop) rather than as separate queries (emitted
immediately).

Closes #389 and #770.
  • Loading branch information
cartant authored and davideast committed Feb 9, 2017
1 parent 1993c01 commit f9cb5c3
Show file tree
Hide file tree
Showing 2 changed files with 85 additions and 19 deletions.
83 changes: 72 additions & 11 deletions src/database/query_observable.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,17 @@ function scalarQueryTest(query: Query, done: any) {
}

function observableQueryTest(query: Query, nextProp: any, done: any) {
const nextSpy = jasmine.createSpy('next');
const queryObservable = observeQuery(query);
const toMerge: any = {};
queryObservable.subscribe(nextSpy);
queryObservable.subscribe(result => {
const merged = Object.assign(query, toMerge);
expect(result).toEqual(merged);
done();
});
Object.keys(nextProp).forEach(prop => {
query[prop].next(nextProp[prop]);
toMerge[prop] = nextProp[prop];
})
const merged = Object.assign(query, toMerge);
expect(nextSpy).toHaveBeenCalledWith(merged);
done();
});
}

describe('observeQuery', () => {
Expand All @@ -37,7 +37,7 @@ describe('observeQuery', () => {
var nextSpy = jasmine.createSpy('next');
var completeSpy = jasmine.createSpy('complete');
var query = { orderByChild: 'height', equalTo: 10 };
var obs = observeQuery(query);
var obs = observeQuery(query, false);
obs.subscribe(nextSpy, null, completeSpy);
expect(nextSpy).toHaveBeenCalledWith({
orderByChild: 'height',
Expand All @@ -50,7 +50,7 @@ describe('observeQuery', () => {
var nextSpy = jasmine.createSpy('next');
var completeSpy = jasmine.createSpy('complete');
var query:any = null;
var obs = observeQuery(query);
var obs = observeQuery(query, false);
obs.subscribe(nextSpy, null, completeSpy);
expect(nextSpy).toHaveBeenCalledWith(null);
expect(completeSpy).toHaveBeenCalled();
Expand All @@ -63,7 +63,7 @@ describe('observeQuery', () => {
var query = {
orderByKey: new Subject<boolean>()
};
var obs = observeQuery(query);
var obs = observeQuery(query, false);
var noOrderyQuery = { orderByKey: false };
obs.subscribe(nextSpy, null, completeSpy);
query.orderByKey.next(true);
Expand All @@ -81,7 +81,7 @@ describe('observeQuery', () => {
var query = {
orderByKey: new Subject<boolean>()
};
var obs = observeQuery(query);
var obs = observeQuery(query, false);
obs.subscribe(nextSpy, null, completeSpy);
query.orderByKey.next(true);
expect(nextSpy).toHaveBeenCalledWith({ orderByKey: true });
Expand All @@ -99,7 +99,7 @@ describe('observeQuery', () => {
orderByValue: new Subject<boolean>(),
orderByChild: new Subject<string>()
};
var obs = observeQuery(query);
var obs = observeQuery(query, false);
obs.subscribe(nextSpy);
query.orderByChild.next('height');
expect(nextSpy).toHaveBeenCalledWith({
Expand Down Expand Up @@ -429,3 +429,64 @@ describe('query combinations', () => {
});

});


describe('audited queries', () => {

it('should immediately emit if not audited', () => {
var nextSpy = jasmine.createSpy('next');
var query = { orderByChild: 'height', startAt: new Subject(), endAt: new Subject() };
var obs = observeQuery(query, false);
obs.subscribe(nextSpy);
query.startAt.next(5);
expect(nextSpy).not.toHaveBeenCalled();
query.endAt.next(10);
expect(nextSpy).toHaveBeenCalledWith({
orderByChild: 'height',
startAt: 5,
endAt: 10
});
query.startAt.next(10);
expect(nextSpy).toHaveBeenCalledWith({
orderByChild: 'height',
startAt: 10,
endAt: 10
});
query.endAt.next(15);
expect(nextSpy).toHaveBeenCalledWith({
orderByChild: 'height',
startAt: 10,
endAt: 15
});
});

it('should emit the last query (in the event loop) if audited', (done: any) => {
let emits = 0;
var query = { orderByChild: 'height', startAt: new Subject(), endAt: new Subject() };
var obs = observeQuery(query, true);
obs.subscribe(result => {
switch (++emits) {
case 1:
expect(result).toEqual({
orderByChild: 'height',
startAt: 5,
endAt: 10
});
query.startAt.next(10);
query.endAt.next(15);
break;
case 2:
expect(result).toEqual({
orderByChild: 'height',
startAt: 10,
endAt: 15
});
done();
break;
}
});
query.startAt.next(5);
query.endAt.next(10);
});

});
21 changes: 13 additions & 8 deletions src/database/query_observable.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import { Observer } from 'rxjs/Observer';
import { combineLatest } from 'rxjs/operator/combineLatest';
import { merge } from 'rxjs/operator/merge';
import { map } from 'rxjs/operator/map';
import { auditTime } from 'rxjs/operator/auditTime';
import {
Query,
ScalarQuery,
Expand All @@ -16,20 +17,24 @@ import {
} from '../interfaces';
import { isNil } from '../utils';

export function observeQuery(query: Query): Observable<ScalarQuery> {
export function observeQuery(query: Query, audit: boolean = true): Observable<ScalarQuery> {
if (isNil(query)) {
return observableOf(null);
}

return Observable.create((observer: Observer<ScalarQuery>) => {

combineLatest.call(
getOrderObservables(query),
getStartAtObservable(query),
getEndAtObservable(query),
getEqualToObservable(query),
getLimitToObservables(query)
)
let combined = combineLatest.call(
getOrderObservables(query),
getStartAtObservable(query),
getEndAtObservable(query),
getEqualToObservable(query),
getLimitToObservables(query)
);
if (audit) {
combined = auditTime.call(combined, 0);
}
combined
.subscribe(([orderBy, startAt, endAt, equalTo, limitTo]
: [OrderBySelection, Primitive, Primitive, Primitive, LimitToSelection]) => {

Expand Down

0 comments on commit f9cb5c3

Please sign in to comment.