Skip to content

Commit

Permalink
AirbyteEstimateTraceMessage (#18875)
Browse files Browse the repository at this point in the history
* `AirbyteEstimateTraceMessage`

* Add PR number

* fix method name

* Lint

* Lint

* fix merge

* Update docs/understanding-airbyte/airbyte-protocol.md

Co-authored-by: Davin Chia <davinchia@gmail.com>

* `EstimateType` sub type in python

* lint

Co-authored-by: Davin Chia <davinchia@gmail.com>
  • Loading branch information
evantahler and davinchia authored Nov 7, 2022
1 parent 29676e1 commit e6b06a8
Show file tree
Hide file tree
Showing 4 changed files with 109 additions and 2 deletions.
27 changes: 27 additions & 0 deletions airbyte-cdk/python/airbyte_cdk/models/airbyte_protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ class Config:

class TraceType(Enum):
ERROR = "ERROR"
ESTIMATE = "ESTIMATE"


class FailureType(Enum):
Expand All @@ -98,6 +99,28 @@ class Config:
failure_type: Optional[FailureType] = Field(None, description="The type of error")


class EstimateType(Enum):
STREAM = "STREAM"
SYNC = "SYNC"


class AirbyteEstimateTraceMessage(BaseModel):
class Config:
extra = Extra.allow

name: str = Field(..., description="The name of the stream")
type: EstimateType = Field(..., description="The type of estimate", title="estimate type")
namespace: Optional[str] = Field(None, description="The namespace of the stream")
row_estimate: Optional[int] = Field(
None,
description="The estimated number of rows to be emitted by this sync for this stream",
)
byte_estimate: Optional[int] = Field(
None,
description="The estimated number of bytes to be emitted by this sync for this stream",
)


class OrchestratorType(Enum):
CONNECTOR_CONFIG = "CONNECTOR_CONFIG"

Expand Down Expand Up @@ -213,6 +236,10 @@ class Config:
type: TraceType = Field(..., description="the type of trace message", title="trace type")
emitted_at: float = Field(..., description="the time in ms that the message was emitted")
error: Optional[AirbyteErrorTraceMessage] = Field(None, description="error trace message: the error object")
estimate: Optional[AirbyteEstimateTraceMessage] = Field(
None,
description="Estimate trace message: a guess at how much data will be produced in this sync",
)


class AirbyteControlMessage(BaseModel):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,7 @@ private void handleEmittedOrchestratorConnectorConfig(final AirbyteControlConnec
*/
private void handleEmittedTrace(final AirbyteTraceMessage traceMessage, final ConnectorType connectorType) {
switch (traceMessage.getType()) {
case ESTIMATE -> handleEmittedEstimateTrace(traceMessage, connectorType);
case ERROR -> handleEmittedErrorTrace(traceMessage, connectorType);
default -> log.warn("Invalid message type for trace message: {}", traceMessage);
}
Expand All @@ -263,6 +264,11 @@ private void handleEmittedErrorTrace(final AirbyteTraceMessage errorTraceMessage
}
}

@SuppressWarnings("PMD") // until method is implemented
private void handleEmittedEstimateTrace(final AirbyteTraceMessage estimateTraceMessage, final ConnectorType connectorType) {

}

private short getStreamIndex(final String streamName) {
if (!streamNameToIndex.containsKey(streamName)) {
streamNameToIndex.put(streamName, nextStreamIndex);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
title: AirbyteProtocol
type: object
description: AirbyteProtocol structs
version: 0.3.1
version: 0.3.2
properties:
airbyte_message:
"$ref": "#/definitions/AirbyteMessage"
Expand Down Expand Up @@ -174,12 +174,16 @@ definitions:
type: string
enum:
- ERROR
- ESTIMATE
emitted_at:
description: "the time in ms that the message was emitted"
type: number
error:
description: "error trace message: the error object"
"$ref": "#/definitions/AirbyteErrorTraceMessage"
estimate:
description: "Estimate trace message: a guess at how much data will be produced in this sync"
"$ref": "#/definitions/AirbyteEstimateTraceMessage"
AirbyteErrorTraceMessage:
type: object
additionalProperties: true
Expand All @@ -201,6 +205,32 @@ definitions:
enum:
- system_error
- config_error
AirbyteEstimateTraceMessage:
type: object
additionalProperties: true
required:
- name
- type
properties:
name:
description: The name of the stream
type: string
type:
title: "estimate type" # this title is required to avoid python codegen conflicts with the "type" parameter in AirbyteMessage. See https://github.com/airbytehq/airbyte/pull/12581
description: The type of estimate
type: string
enum:
- STREAM
- SYNC
namespace:
description: The namespace of the stream
type: string
row_estimate:
description: The estimated number of rows to be emitted by this sync for this stream
type: integer
byte_estimate:
description: The estimated number of bytes to be emitted by this sync for this stream
type: integer
AirbyteControlMessage:
type: object
additionalProperties: true
Expand Down
46 changes: 45 additions & 1 deletion docs/understanding-airbyte/airbyte-protocol.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ The Airbyte Protocol is versioned independently of the Airbyte Platform, and the

| Version | Date of Change | Pull Request(s) | Subject |
| :------- | :------------- | :------------------------------------------------------------------------------------------------------------------ | :------------------------------------------------------------------------------- |
| `v0.3.2` | 2022-10-128 | [18875](https://github.com/airbytehq/airbyte/pull/18875) | `AirbyteEstimateTraceMessage` added |
| `v0.3.1` | 2022-10-12 | [17907](https://github.com/airbytehq/airbyte/pull/17907) | `AirbyteControlMessage.ConnectorConfig` added |
| `v0.3.0` | 2022-09-09 | [16479](https://github.com/airbytehq/airbyte/pull/16479) | `AirbyteLogMessage.stack_trace` added |
| `v0.2.0` | 2022-06-10 | [13573](https://github.com/airbytehq/airbyte/pull/13573) & [12586](https://github.com/airbytehq/airbyte/pull/12586) | `STREAM` and `GLOBAL` STATE messages |
Expand Down Expand Up @@ -759,7 +760,7 @@ AirbyteLogMessage:

### AirbyteTraceMessage

The trace message allows an Actor to emit metadata about the runtime of the Actor. As currently implemented, it allows an Actor to surface information about errors. This message is designed to grow to handle other use cases, including progress and performance metrics.
The trace message allows an Actor to emit metadata about the runtime of the Actor, such as errors or estimates. This message is designed to grow to handle other use cases, including additonal performance metrics.

```yaml
AirbyteTraceMessage:
Expand All @@ -775,12 +776,16 @@ AirbyteTraceMessage:
type: string
enum:
- ERROR
- ESTIMATE
emitted_at:
description: "the time in ms that the message was emitted"
type: number
error:
description: "error trace message: the error object"
"$ref": "#/definitions/AirbyteErrorTraceMessage"
estimate:
description: "Estimate trace message: a guess at how much data will be produced in this sync"
"$ref": "#/definitions/AirbyteEstimateTraceMessage"
AirbyteErrorTraceMessage:
type: object
additionalProperties: true
Expand All @@ -802,8 +807,47 @@ AirbyteErrorTraceMessage:
enum:
- system_error
- config_error
AirbyteEstimateTraceMessage:
type: object
additionalProperties: true
required:
- name
- type
properties:
name:
description: The name of the stream
type: string
type:
description: The type of estimate
type: string
enum:
- STREAM
- SYNC
namespace:
description: The namespace of the stream
type: string
row_estimate:
description: The estimated number of rows to be emitted by this sync for this stream
type: integer
byte_estimate:
description: The estimated number of bytes to be emitted by this sync for this stream
type: integer
```

#### AirbyteErrorTraceMessage

Error Trace Messages are used when a sync is about to fail and the connector can provide meaningful information to the orhcestrator or user about what to do next.

Of note, an `internal_message` might be an exception code, but an `external_message` is meant to be user-facing, e.g. "Your API Key is invalid".

Syncs can fail for multiple reasons, and therefore multiple `AirbyteErrorTraceMessage` can be sent from a connector.

#### AirbyteEstimateTraceMessage

Estimate Trace Messages are used by connectors to inform the orchestrator about how much data they expect to move within the sync. This ise useful to present the user with estimates of the time remaining in the sync, or percentage complete. An example of this would be for every stream about to be synced from a databse to provde a `COUNT (*) from {table_name} where updated_at > {state}` to provide an estimate of the rows to be sent in this sync.

`AirbyteEstimateTraceMessage` should be emitted early in the sync to provide an early estimate of the sync's duration. Multiple `AirbyteEstimateTraceMessage`s can be sent for the same stream, and an updated estimate will replace the previous value.

### AirbyteControlMessage

An `AirbyteControlMessage` is for connectors to signal to the Airbyte Platform or Orchestrator that an action with a side-effect should be taken. This means that the Orchestrator will likely be altering some stored data about the connector, connection, or sync.
Expand Down

0 comments on commit e6b06a8

Please sign in to comment.