Skip to content

Commit

Permalink
Merge Cadence at 25e007d
Browse files Browse the repository at this point in the history
  • Loading branch information
samarabbas authored Apr 24, 2020
2 parents 9fea934 + 042ce3c commit 65b0884
Show file tree
Hide file tree
Showing 10 changed files with 550 additions and 13 deletions.
5 changes: 3 additions & 2 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ type (
// NOTE: DO NOT USE THIS API INSIDE A WORKFLOW, USE workflow.ExecuteChildWorkflow instead
ExecuteWorkflow(ctx context.Context, options StartWorkflowOptions, workflow interface{}, args ...interface{}) (WorkflowRun, error)

// GetWorkfow retrieves a workflow execution and return a WorkflowRun instance (described above)
// GetWorkflow retrieves a workflow execution and return a WorkflowRun instance (described above)
// - workflow ID of the workflow.
// - runID can be default(empty string). if empty string then it will pick the last running execution of that workflow ID.
//
Expand All @@ -126,6 +126,7 @@ type (
// - Get(ctx context.Context, valuePtr interface{}) error: which will fill the workflow
// execution result to valuePtr, if workflow execution is a success, or return corresponding
// error. This is a blocking API.
// If workflow not found, the Get() will return EntityNotExistsError.
// NOTE: if the started workflow return ContinueAsNewError during the workflow execution, the
// return result of GetRunID() will be the started workflow run ID, not the new run ID caused by ContinueAsNewError,
// however, Get(ctx context.Context, valuePtr interface{}) will return result from the run which did not return ContinueAsNewError.
Expand Down Expand Up @@ -186,7 +187,7 @@ type (
// - whether return all history events or just the last event, which contains the workflow execution end result
// Example:-
// To iterate all events,
// iter := GetWorkflowHistory(ctx, workflowID, runID, isLongPoll, filterType)
// iter := GetWorkflowHistory(ctx, workflowID, runID, isLongPoll, filterType)
// events := []*shared.HistoryEvent{}
// for iter.HasNext() {
// event, err := iter.Next()
Expand Down
1 change: 1 addition & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4 h1:gQz4mCbXsO+nc9n1hCxHcGA3Zx3Eo+UHZoInFGUIXNM=
github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
github.com/robfig/cron v1.2.0 h1:ZjScXvvxeQ63Dbyxy76Fj3AT3Ut0aKsyd2/tl3DTMuQ=
github.com/robfig/cron v1.2.0/go.mod h1:JGuDeoQd7Z6yL4zQhZ3OPEVHB7fL6Ka6skscFHfmt2k=
Expand Down
70 changes: 70 additions & 0 deletions internal/common/serializer/jsonpb.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
// The MIT License
//
// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved.
//
// Copyright (c) 2020 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package serializer

import (
"bytes"

"github.com/gogo/protobuf/jsonpb"
"github.com/gogo/protobuf/proto"
)

type (
// JSONPBEncoder is JSON encoder/decoder for protobuf structs and slices of protobuf structs.
// This is an wrapper on top of jsonpb.Marshaler which supports not only single object serialization
// but also slices of concrete objects.
JSONPBEncoder struct {
marshaler jsonpb.Marshaler
ubmarshaler jsonpb.Unmarshaler
}
)

// NewJSONPBEncoder creates a new JSONPBEncoder.
func NewJSONPBEncoder() *JSONPBEncoder {
return &JSONPBEncoder{
marshaler: jsonpb.Marshaler{},
ubmarshaler: jsonpb.Unmarshaler{},
}
}

// NewJSONPBIndentEncoder creates a new JSONPBEncoder with indent.
func NewJSONPBIndentEncoder(indent string) *JSONPBEncoder {
return &JSONPBEncoder{
marshaler: jsonpb.Marshaler{Indent: indent},
ubmarshaler: jsonpb.Unmarshaler{},
}
}

// Encode protobuf struct to bytes.
func (e *JSONPBEncoder) Encode(pb proto.Message) ([]byte, error) {
var buf bytes.Buffer
err := e.marshaler.Marshal(&buf, pb)
return buf.Bytes(), err
}

// Decode bytes to protobuf struct.
func (e *JSONPBEncoder) Decode(data []byte, pb proto.Message) error {
return e.ubmarshaler.Unmarshal(bytes.NewReader(data), pb)
}
229 changes: 229 additions & 0 deletions internal/common/serializer/serializer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,229 @@
// The MIT License
//
// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved.
//
// Copyright (c) 2020 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package serializer

import (
"encoding/json"
"fmt"

"github.com/gogo/protobuf/proto"
commonpb "go.temporal.io/temporal-proto/common"
eventpb "go.temporal.io/temporal-proto/event"
filterpb "go.temporal.io/temporal-proto/filter"
"go.temporal.io/temporal-proto/serviceerror"
)

// Data encoding types
const (
// todo: Deprecate and use protoEncodingEnum.ToString()
EncodingTypeJSON EncodingType = "json"
EncodingTypeGob EncodingType = "gob"
EncodingTypeUnknown EncodingType = "unknow"
EncodingTypeEmpty EncodingType = ""
EncodingTypeProto3 EncodingType = "proto3"
)

func (e EncodingType) String() string {
return string(e)
}

type (
// EncodingType is an enum that represents various data encoding types
EncodingType string
)

type (

// SerializationError is an error type for serialization
SerializationError struct {
msg string
}

// DeserializationError is an error type for deserialization
DeserializationError struct {
msg string
}

// UnknownEncodingTypeError is an error type for unknown or unsupported encoding type
UnknownEncodingTypeError struct {
encodingType commonpb.EncodingType
}
)

// SerializeBatchEvents serializes batch events into a datablob proto
func SerializeBatchEvents(events []*eventpb.HistoryEvent, encodingType commonpb.EncodingType) (*commonpb.DataBlob, error) {
return serialize(&eventpb.History{Events: events}, encodingType)
}

func serializeProto(p proto.Marshaler, encodingType commonpb.EncodingType) (*commonpb.DataBlob, error) {
if p == nil {
return nil, nil
}

var data []byte
var err error

switch encodingType {
case commonpb.EncodingType_Proto3:
data, err = p.Marshal()
case commonpb.EncodingType_JSON:
encodingType = commonpb.EncodingType_JSON
pb, ok := p.(proto.Message)
if !ok {
return nil, NewSerializationError("could not cast protomarshal interface to proto.message")
}
data, err = NewJSONPBEncoder().Encode(pb)
default:
return nil, NewUnknownEncodingTypeError(encodingType)
}

if err != nil {
return nil, NewSerializationError(err.Error())
}

// Shouldn't happen, but keeping
if data == nil {
return nil, nil
}

return NewDataBlob(data, encodingType), nil
}

// DeserializeBatchEvents deserializes batch events from a datablob proto
func DeserializeBatchEvents(data *commonpb.DataBlob) ([]*eventpb.HistoryEvent, error) {
if data == nil {
return nil, nil
}
if len(data.Data) == 0 {
return nil, nil
}

events := &eventpb.History{}
var err error
switch data.EncodingType {
case commonpb.EncodingType_JSON:
err = NewJSONPBEncoder().Decode(data.Data, events)
case commonpb.EncodingType_Proto3:
err = proto.Unmarshal(data.Data, events)
default:
return nil, NewDeserializationError("DeserializeBatchEvents invalid encoding")
}
if err != nil {
return nil, err
}
return events.Events, nil
}

func serialize(input interface{}, encodingType commonpb.EncodingType) (*commonpb.DataBlob, error) {
if input == nil {
return nil, nil
}

if p, ok := input.(proto.Marshaler); ok {
return serializeProto(p, encodingType)
}

var data []byte
var err error

switch encodingType {
case commonpb.EncodingType_JSON: // For backward-compatibility
data, err = json.Marshal(input)
default:
return nil, NewUnknownEncodingTypeError(encodingType)
}

if err != nil {
return nil, NewSerializationError(err.Error())
}

return NewDataBlob(data, encodingType), nil
}

// NewUnknownEncodingTypeError returns a new instance of encoding type error
func NewUnknownEncodingTypeError(encodingType commonpb.EncodingType) error {
return &UnknownEncodingTypeError{encodingType: encodingType}
}

func (e *UnknownEncodingTypeError) Error() string {
return fmt.Sprintf("unknown or unsupported encoding type %v", e.encodingType)
}

// NewSerializationError returns a SerializationError
func NewSerializationError(msg string) *SerializationError {
return &SerializationError{msg: msg}
}

func (e *SerializationError) Error() string {
return fmt.Sprintf("cadence serialization error: %v", e.msg)
}

// NewDeserializationError returns a DeserializationError
func NewDeserializationError(msg string) *DeserializationError {
return &DeserializationError{msg: msg}
}

func (e *DeserializationError) Error() string {
return fmt.Sprintf("cadence deserialization error: %v", e.msg)
}

// NewDataBlob creates new blob data
func NewDataBlob(data []byte, encodingType commonpb.EncodingType) *commonpb.DataBlob {
if len(data) == 0 {
return nil
}

return &commonpb.DataBlob{
Data: data,
EncodingType: encodingType,
}
}

// DeserializeBlobDataToHistoryEvents deserialize the blob data to history event data
func DeserializeBlobDataToHistoryEvents(
dataBlobs []*commonpb.DataBlob, filterType filterpb.HistoryEventFilterType,
) (*eventpb.History, error) {

var historyEvents []*eventpb.HistoryEvent

for _, batch := range dataBlobs {
events, err := DeserializeBatchEvents(batch)
if err != nil {
return nil, err
}
if len(events) == 0 {
return nil, &serviceerror.Internal{
Message: "corrupted history event batch, empty events",
}
}

historyEvents = append(historyEvents, events...)
}

if filterType == filterpb.HistoryEventFilterType_CloseEvent {
historyEvents = []*eventpb.HistoryEvent{historyEvents[len(historyEvents)-1]}
}
return &eventpb.History{Events: historyEvents}, nil
}
18 changes: 16 additions & 2 deletions internal/internal_task_pollers.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"github.com/opentracing/opentracing-go"
"github.com/pborman/uuid"
"github.com/uber-go/tally"
filterpb "go.temporal.io/temporal-proto/filter"
commonpb "go.temporal.io/temporal-proto/common"
"go.uber.org/zap"

Expand All @@ -48,10 +49,11 @@ import (
"go.temporal.io/temporal/internal/common"
"go.temporal.io/temporal/internal/common/backoff"
"go.temporal.io/temporal/internal/common/metrics"
"go.temporal.io/temporal/internal/common/serializer"
)

const (
pollTaskServiceTimeOut = 3 * time.Minute // Server long poll is 1 * Minutes + delta
pollTaskServiceTimeOut = 150 * time.Second // Server long poll is 2 * Minutes + delta

stickyDecisionScheduleToStartTimeoutSeconds = 5

Expand Down Expand Up @@ -760,7 +762,19 @@ func newGetHistoryPageFunc(

metricsScope.Counter(metrics.WorkflowGetHistorySucceedCounter).Inc(1)
metricsScope.Timer(metrics.WorkflowGetHistoryLatency).Record(time.Since(startTime))
h := resp.History

var h *eventpb.History

if resp.RawHistory != nil {
var err1 error
h, err1 = serializer.DeserializeBlobDataToHistoryEvents(resp.RawHistory, filterpb.HistoryEventFilterType_AllEvent)
if err1 != nil {
return nil, nil, nil
}
} else {
h = resp.History
}

size := len(h.Events)
if size > 0 && atDecisionTaskCompletedEventID > 0 &&
h.Events[size-1].GetEventId() > atDecisionTaskCompletedEventID {
Expand Down
4 changes: 2 additions & 2 deletions internal/internal_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,8 @@ const (
defaultRPCTimeout = 10 * time.Second
// minRPCTimeout is minimum rpc call timeout allowed
minRPCTimeout = 1 * time.Second
// maxRPCTimeout is maximum rpc call timeout allowed
maxRPCTimeout = 20 * time.Second
//maxRPCTimeout is maximum rpc call timeout allowed
maxRPCTimeout = 5 * time.Second
)

var (
Expand Down
Loading

0 comments on commit 65b0884

Please sign in to comment.