Skip to content

Commit

Permalink
feature: New Pipeline Stages (#2236)
Browse files Browse the repository at this point in the history
* new stages

* add tests

* fix

* fix

* Fix code examples

* Add unnest options to `d.ts` file.

* Fix union test

* Add test, fix documentation, refactor.

* Documentation

* Undo
  • Loading branch information
tom-andersen authored Jan 17, 2025
1 parent 61fc782 commit db8a43b
Show file tree
Hide file tree
Showing 9 changed files with 798 additions and 192 deletions.
32 changes: 0 additions & 32 deletions dev/src/expression.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1942,38 +1942,6 @@ export class Field extends Expr implements Selectable {
}
}

/**
* @beta
*/
export class Fields extends Expr implements Selectable {
exprType: ExprType = 'Field';
selectable = true as const;

private constructor(private fields: Field[]) {
super();
}

static of(name: string, ...others: string[]): Fields {
return new Fields([Field.of(name), ...others.map(Field.of)]);
}

static ofAll(): Fields {
return new Fields([]);
}

fieldList(): Field[] {
return this.fields.map(f => f);
}

_toProto(serializer: Serializer): api.IValue {
return {
arrayValue: {
values: this.fields.map(f => f._toProto(serializer)),
},
};
}
}

/**
* @beta
*
Expand Down
1 change: 0 additions & 1 deletion dev/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,6 @@ export {
Expr,
ExprWithAlias,
Field,
Fields,
Constant,
Function,
Ordering,
Expand Down
37 changes: 28 additions & 9 deletions dev/src/pipeline-util.ts
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,6 @@ export class ExecutionUtil<AppModelType> {
enc,
callback
) => {
console.log(`Pipeline response: ${JSON.stringify(proto, null, 2)}`);
if (proto === NOOP_MESSAGE) {
callback(undefined);
return;
Expand Down Expand Up @@ -215,14 +214,20 @@ export class ExecutionUtil<AppModelType> {
// `toProto()` might throw an exception. We rely on the behavior of an
// async function to convert this exception into the rejected Promise we
// catch below.
const request = pipeline._toProto(
transactionOrReadTime,
explainOptions
);

console.log(
`Executing pipeline: \n ${JSON.stringify(request, null, 2)}`
);
const request: api.IExecutePipelineRequest = {
database: this._firestore.formattedName,
structuredPipeline: {
pipeline: pipeline._toProto(),
},
};

if (transactionOrReadTime instanceof Uint8Array) {
request.transaction = transactionOrReadTime;
} else if (transactionOrReadTime instanceof Timestamp) {
request.readTime = transactionOrReadTime.toProto().timestampValue;
} else if (transactionOrReadTime) {
request.newTransaction = transactionOrReadTime;
}

let streamActive: Deferred<boolean>;
do {
Expand Down Expand Up @@ -392,6 +397,20 @@ export function isFirestoreValue(obj: any): obj is api.IValue {
return false;
}

export function selectableToExpr(
selectable: firestore.Selectable | string
): Expr {
if (typeof selectable === 'string') {
return Field.of(selectable);
} else if (selectable instanceof Field) {
return selectable;
} else if (selectable instanceof ExprWithAlias) {
return selectable.expr;
} else {
throw new Error('unexpected selectable: ' + selectable);
}
}

export function toPipelineFilterCondition(
f: FilterInternal,
serializer: Serializer
Expand Down
Loading

0 comments on commit db8a43b

Please sign in to comment.