From 6b931eefb9aa4a18758788167bdcf9e2fad1d7b9 Mon Sep 17 00:00:00 2001 From: Sri Harsha CH <57220027+harshachinta@users.noreply.github.com> Date: Thu, 2 Nov 2023 14:49:38 +0530 Subject: [PATCH] feat(spanner): Executor framework server and worker proxy (#8714) * feat(spanner): add executor code * feat(spanner): add license headers * feat(spanner): add proto and autogenerated code * feat(spanner): add cloud_executor file which has helper methods * feat(spanner): make code modular * feat(spanner): move autogenerated protos to a different PR * feat(spanner): rename file * feat(spanner): rename * feat(spanner): use string.join * feat(spanner): add file responsibility * feat(spanner): coder refactoring * feat(spanner): coder refactoring * feat(spanner): update go.mod * feat(spanner): lint fixes * feat(spanner): lint fixes --- spanner/go.mod | 2 +- .../executor/executor_proxy_server_impl.go | 48 ++++ .../executor/internal/inputstream/handler.go | 53 +++++ .../executor/internal/outputstream/handler.go | 213 ++++++++++++++++++ .../internal/utility/error_code_mapper.go | 79 +++++++ .../internal/utility/table_metadata_helper.go | 98 ++++++++ spanner/test/cloudexecutor/worker_proxy.go | 153 +++++++++++++ 7 files changed, 645 insertions(+), 1 deletion(-) create mode 100644 spanner/test/cloudexecutor/executor/executor_proxy_server_impl.go create mode 100644 spanner/test/cloudexecutor/executor/internal/inputstream/handler.go create mode 100644 spanner/test/cloudexecutor/executor/internal/outputstream/handler.go create mode 100644 spanner/test/cloudexecutor/executor/internal/utility/error_code_mapper.go create mode 100644 spanner/test/cloudexecutor/executor/internal/utility/table_metadata_helper.go create mode 100644 spanner/test/cloudexecutor/worker_proxy.go diff --git a/spanner/go.mod b/spanner/go.mod index f851efcd63e3..8912cb5a0cba 100644 --- a/spanner/go.mod +++ b/spanner/go.mod @@ -10,6 +10,7 @@ require ( github.com/google/go-cmp v0.6.0 github.com/googleapis/gax-go/v2 v2.12.0 go.opencensus.io v0.24.0 + golang.org/x/oauth2 v0.13.0 golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 google.golang.org/api v0.149.0 google.golang.org/genproto v0.0.0-20231016165738-49dd2c1f3d0b @@ -33,7 +34,6 @@ require ( github.com/googleapis/enterprise-certificate-proxy v0.3.2 // indirect golang.org/x/crypto v0.14.0 // indirect golang.org/x/net v0.17.0 // indirect - golang.org/x/oauth2 v0.13.0 // indirect golang.org/x/sync v0.4.0 // indirect golang.org/x/sys v0.13.0 // indirect golang.org/x/text v0.13.0 // indirect diff --git a/spanner/test/cloudexecutor/executor/executor_proxy_server_impl.go b/spanner/test/cloudexecutor/executor/executor_proxy_server_impl.go new file mode 100644 index 000000000000..575e85e19e6d --- /dev/null +++ b/spanner/test/cloudexecutor/executor/executor_proxy_server_impl.go @@ -0,0 +1,48 @@ +// Copyright 2023 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package executor + +// executor_proxy_server_impl.go contains the implementation of the executor proxy RPC. +// This RPC gets invoked through the gRPC stream exposed via proxy port by worker_proxy.go file. + +import ( + "context" + + "cloud.google.com/go/spanner/test/cloudexecutor/executor/internal/inputstream" + executorpb "cloud.google.com/go/spanner/test/cloudexecutor/proto" + "google.golang.org/api/option" +) + +// CloudProxyServer holds the cloud executor server. +type CloudProxyServer struct { + serverContext context.Context + options []option.ClientOption +} + +// NewCloudProxyServer initializes and returns a new CloudProxyServer instance. +func NewCloudProxyServer(ctx context.Context, opts []option.ClientOption) (*CloudProxyServer, error) { + return &CloudProxyServer{serverContext: ctx, options: opts}, nil +} + +// ExecuteActionAsync is implementation of ExecuteActionAsync in SpannerExecutorProxyServer. It's a +// streaming method in which client and server exchange SpannerActions and SpannerActionOutcomes. +func (s *CloudProxyServer) ExecuteActionAsync(inputStream executorpb.SpannerExecutorProxy_ExecuteActionAsyncServer) error { + handler := &inputstream.CloudStreamHandler{ + Stream: inputStream, + ServerContext: s.serverContext, + Options: s.options, + } + return handler.Execute() +} diff --git a/spanner/test/cloudexecutor/executor/internal/inputstream/handler.go b/spanner/test/cloudexecutor/executor/internal/inputstream/handler.go new file mode 100644 index 000000000000..fd639d193c4a --- /dev/null +++ b/spanner/test/cloudexecutor/executor/internal/inputstream/handler.go @@ -0,0 +1,53 @@ +// Copyright 2023 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package inputstream + +// input_stream_handler.go is responsible for handling input requests to the server and +// handles mapping from executor actions (SpannerAsyncActionRequest) to client library code. + +import ( + "context" + "sync" + + executorpb "cloud.google.com/go/spanner/test/cloudexecutor/proto" + "google.golang.org/api/option" +) + +// CloudStreamHandler handles a single streaming ExecuteActions request by performing incoming +// actions. It maintains a state associated with the request, such as current transaction. +// +// CloudStreamHandler uses contexts (context.Context) to coordinate execution of asynchronous +// actions. The Stubby stream's context becomes a parent for all individual actions' contexts. This +// is done so that we don't leak anything when the stream is closed. +// +// startTxnHandler is a bit different from other actions. Read-write transactions that it +// starts outlive the action itself, so the Stubby stream's context is used for transactions +// instead of the action's context. +// +// For more info about contexts in Go, read golang.org/pkg/context +type CloudStreamHandler struct { + // members below should be set by the caller + Stream executorpb.SpannerExecutorProxy_ExecuteActionAsyncServer + ServerContext context.Context + Options []option.ClientOption + // members below represent internal state + mu sync.Mutex // protects mutable internal state +} + +// Execute executes the given ExecuteActions request, blocking until it's done. It takes care of +// properly closing the request stream in the end. +func (h *CloudStreamHandler) Execute() error { + return nil +} diff --git a/spanner/test/cloudexecutor/executor/internal/outputstream/handler.go b/spanner/test/cloudexecutor/executor/internal/outputstream/handler.go new file mode 100644 index 000000000000..86f0f0207643 --- /dev/null +++ b/spanner/test/cloudexecutor/executor/internal/outputstream/handler.go @@ -0,0 +1,213 @@ +// Copyright 2023 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package outputstream + +import ( + "log" + + "cloud.google.com/go/spanner" + "cloud.google.com/go/spanner/apiv1/spannerpb" + "cloud.google.com/go/spanner/test/cloudexecutor/executor/internal/utility" + executorpb "cloud.google.com/go/spanner/test/cloudexecutor/proto" + spb "google.golang.org/genproto/googleapis/rpc/status" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + "google.golang.org/protobuf/types/known/timestamppb" +) + +// if OutcomeSender.rowCount exceed maxRowsPerBatch value, we should send rows back to the client in batch. +const maxRowsPerBatch = 100 + +// OutcomeSender is a utility class used for sending action outcomes back to the client. For read +// actions, it buffers rows and sends partial read results in batches. +type OutcomeSender struct { + actionID int32 + stream executorpb.SpannerExecutorProxy_ExecuteActionAsyncServer + + // partialOutcome accumulates rows and other relevant information + partialOutcome *executorpb.SpannerActionOutcome + readResult *executorpb.ReadResult + queryResult *executorpb.QueryResult + + // All the relevant variables below should be set before first outcome is sent back, + // and unused variables should leave null. + timestamp *timestamppb.Timestamp + hasReadResult bool + hasQueryResult bool + hasChangeStreamRecords bool + table string // name of the table being read + index *string // name of the secondary index used for read + requestIndex *int32 // request index (for multireads) + rowType *spannerpb.StructType + + // Current row count in read/query result + rowCount int64 + // modified row count in dml result + rowsModified []int64 +} + +// NewOutcomeSender returns an OutcomeSender with default fields set. +func NewOutcomeSender(actionID int32, stream executorpb.SpannerExecutorProxy_ExecuteActionAsyncServer) *OutcomeSender { + return &OutcomeSender{ + actionID: actionID, + stream: stream, + hasReadResult: false, + hasQueryResult: false, + } +} + +// SetTimestamp sets the timestamp for commit. +func (s *OutcomeSender) SetTimestamp(timestamp *timestamppb.Timestamp) { + s.timestamp = timestamp +} + +// SetRowType sets the rowType for appending row. +func (s *OutcomeSender) SetRowType(rowType *spannerpb.StructType) { + s.rowType = rowType +} + +// InitForRead init the sender for read action, then set the table and index if there exists. +func (s *OutcomeSender) InitForRead(table string, index *string) { + s.hasReadResult = true + s.table = table + if index != nil { + s.index = index + } +} + +// InitForQuery init the sender for query action +func (s *OutcomeSender) InitForQuery() { + s.hasQueryResult = true +} + +// InitForBatchRead init the sender for batch read action, then set the table and index if there exists. +func (s *OutcomeSender) InitForBatchRead(table string, index *string) { + s.InitForRead(table, index) + // Cloud API supports only simple batch reads (not multi reads), so request index is always 0. + requestIndex := int32(0) + s.requestIndex = &requestIndex +} + +// AppendDmlRowsModified add rows modified in dml to result +func (s *OutcomeSender) AppendDmlRowsModified(rowsModified int64) { + s.rowsModified = append(s.rowsModified, rowsModified) +} + +// FinishSuccessfully sends the last outcome with OK status. +func (s *OutcomeSender) FinishSuccessfully() error { + s.buildOutcome() + s.partialOutcome.Status = &spb.Status{Code: int32(codes.OK)} + return s.flush() +} + +// FinishWithTransactionRestarted sends the last outcome with aborted error, +// this will set the TransactionRestarted to true +func (s *OutcomeSender) FinishWithTransactionRestarted() error { + s.buildOutcome() + transactionRestarted := true + s.partialOutcome.TransactionRestarted = &transactionRestarted + s.partialOutcome.Status = &spb.Status{Code: int32(codes.OK)} + return s.flush() +} + +// FinishWithError sends the last outcome with given error status. +func (s *OutcomeSender) FinishWithError(err error) error { + s.buildOutcome() + //TODO(harsha:oct10) uncomment below line and comment s.partialOutcome.Status = errToStatus(err) + //s.partialOutcome.Status = &status.Status{Code: int32(gstatus.Code(err)), Message: err.Error()} + s.partialOutcome.Status = utility.ErrToStatus(err) + return s.flush() +} + +// AppendRow adds another row to buffer. If buffer hits its size limit, the buffered rows will be sent back. +func (s *OutcomeSender) AppendRow(row *executorpb.ValueList) error { + if !s.hasReadResult && !s.hasQueryResult { + return spanner.ToSpannerError(status.Error(codes.InvalidArgument, "either hasReadResult or hasQueryResult should be true")) + } + if s.rowType == nil { + return spanner.ToSpannerError(status.Error(codes.InvalidArgument, "rowType should be set first")) + } + s.buildOutcome() + if s.hasReadResult { + s.readResult.Row = append(s.readResult.Row, row) + s.rowCount++ + } else if s.hasQueryResult { + s.queryResult.Row = append(s.queryResult.Row, row) + s.rowCount++ + } + if s.rowCount >= maxRowsPerBatch { + return s.flush() + } + return nil +} + +// buildOutcome will build the partialOutcome if not exists using relevant variables. +func (s *OutcomeSender) buildOutcome() { + if s.partialOutcome != nil { + return + } + s.partialOutcome = &executorpb.SpannerActionOutcome{ + CommitTime: s.timestamp, + } + if s.hasReadResult { + s.readResult = &executorpb.ReadResult{ + Table: s.table, + Index: s.index, + RowType: s.rowType, + RequestIndex: s.requestIndex, + } + } else if s.hasQueryResult { + s.queryResult = &executorpb.QueryResult{ + RowType: s.rowType, + } + } +} + +// flush sends partialOutcome to stream and clear the internal state +func (s *OutcomeSender) flush() error { + if s == nil || s.partialOutcome == nil { + log.Println("outcomeSender.flush() is called when there is no partial outcome to send. This is an internal error that should never happen") + return spanner.ToSpannerError(status.Error(codes.InvalidArgument, "either outcome sender or partial outcome is nil")) + } + s.partialOutcome.DmlRowsModified = s.rowsModified + if s.hasReadResult { + s.partialOutcome.ReadResult = s.readResult + } else if s.hasQueryResult { + s.partialOutcome.QueryResult = s.queryResult + } + err := s.SendOutcome(s.partialOutcome) + s.partialOutcome = nil + s.readResult = nil + s.queryResult = nil + s.rowCount = 0 + s.rowsModified = []int64{} + return err +} + +// SendOutcome sends the given SpannerActionOutcome. +func (s *OutcomeSender) SendOutcome(outcome *executorpb.SpannerActionOutcome) error { + log.Printf("sending result %v actionId %d", outcome, s.actionID) + resp := &executorpb.SpannerAsyncActionResponse{ + ActionId: s.actionID, + Outcome: outcome, + } + err := s.stream.Send(resp) + if err != nil { + log.Printf("Failed to send outcome with error: %s", err.Error()) + } else { + log.Printf("Sent result %v actionId %d", outcome, s.actionID) + } + return err +} diff --git a/spanner/test/cloudexecutor/executor/internal/utility/error_code_mapper.go b/spanner/test/cloudexecutor/executor/internal/utility/error_code_mapper.go new file mode 100644 index 000000000000..eea990848786 --- /dev/null +++ b/spanner/test/cloudexecutor/executor/internal/utility/error_code_mapper.go @@ -0,0 +1,79 @@ +// Copyright 2023 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package utility + +import ( + "fmt" + "log" + "strings" + + spb "google.golang.org/genproto/googleapis/rpc/status" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" +) + +// ErrToStatus maps cloud error to Status +func ErrToStatus(e error) *spb.Status { + log.Print(e.Error()) + if strings.Contains(e.Error(), "Transaction outcome unknown") { + return &spb.Status{Code: int32(codes.DeadlineExceeded), Message: e.Error()} + } + if status.Code(e) == codes.InvalidArgument { + return &spb.Status{Code: int32(codes.InvalidArgument), Message: e.Error()} + } + if status.Code(e) == codes.PermissionDenied { + return &spb.Status{Code: int32(codes.PermissionDenied), Message: e.Error()} + } + if status.Code(e) == codes.Aborted { + return &spb.Status{Code: int32(codes.Aborted), Message: e.Error()} + } + if status.Code(e) == codes.AlreadyExists { + return &spb.Status{Code: int32(codes.AlreadyExists), Message: e.Error()} + } + if status.Code(e) == codes.Canceled { + return &spb.Status{Code: int32(codes.Canceled), Message: e.Error()} + } + if status.Code(e) == codes.Internal { + return &spb.Status{Code: int32(codes.Internal), Message: e.Error()} + } + if status.Code(e) == codes.FailedPrecondition { + return &spb.Status{Code: int32(codes.FailedPrecondition), Message: e.Error()} + } + if status.Code(e) == codes.NotFound { + return &spb.Status{Code: int32(codes.NotFound), Message: e.Error()} + } + if status.Code(e) == codes.DeadlineExceeded { + return &spb.Status{Code: int32(codes.DeadlineExceeded), Message: e.Error()} + } + if status.Code(e) == codes.ResourceExhausted { + return &spb.Status{Code: int32(codes.ResourceExhausted), Message: e.Error()} + } + if status.Code(e) == codes.OutOfRange { + return &spb.Status{Code: int32(codes.OutOfRange), Message: e.Error()} + } + if status.Code(e) == codes.Unauthenticated { + return &spb.Status{Code: int32(codes.Unauthenticated), Message: e.Error()} + } + if status.Code(e) == codes.Unimplemented { + return &spb.Status{Code: int32(codes.Unimplemented), Message: e.Error()} + } + if status.Code(e) == codes.Unavailable { + return &spb.Status{Code: int32(codes.Unavailable), Message: e.Error()} + } + if status.Code(e) == codes.Unknown { + return &spb.Status{Code: int32(codes.Unknown), Message: e.Error()} + } + return &spb.Status{Code: int32(codes.Unknown), Message: fmt.Sprintf("Error: %v, Unsupported Spanner error code: %v", e.Error(), status.Code(e))} +} diff --git a/spanner/test/cloudexecutor/executor/internal/utility/table_metadata_helper.go b/spanner/test/cloudexecutor/executor/internal/utility/table_metadata_helper.go new file mode 100644 index 000000000000..03993407182b --- /dev/null +++ b/spanner/test/cloudexecutor/executor/internal/utility/table_metadata_helper.go @@ -0,0 +1,98 @@ +// Copyright 2023 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package utility + +import ( + "fmt" + "log" + + "cloud.google.com/go/spanner" + "cloud.google.com/go/spanner/apiv1/spannerpb" + executorpb "cloud.google.com/go/spanner/test/cloudexecutor/proto" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" +) + +// TableMetadataHelper is used to hold and retrieve metadata of tables and columns involved +// in a transaction. +type TableMetadataHelper struct { + tableColumnsInOrder map[string][]*executorpb.ColumnMetadata + tableColumnsByName map[string]map[string]*executorpb.ColumnMetadata + tableKeyColumnsInOrder map[string][]*executorpb.ColumnMetadata +} + +// InitFrom reads table metadata from the given StartTransactionAction. +func (t *TableMetadataHelper) InitFrom(a *executorpb.StartTransactionAction) { + t.InitFromTableMetadata(a.GetTable()) +} + +// InitFromTableMetadata extracts table metadata and make maps to store them. +func (t *TableMetadataHelper) InitFromTableMetadata(tables []*executorpb.TableMetadata) { + t.tableColumnsInOrder = make(map[string][]*executorpb.ColumnMetadata) + t.tableColumnsByName = make(map[string]map[string]*executorpb.ColumnMetadata) + t.tableKeyColumnsInOrder = make(map[string][]*executorpb.ColumnMetadata) + for _, table := range tables { + tableName := table.GetName() + t.tableColumnsInOrder[tableName] = table.GetColumn() + t.tableKeyColumnsInOrder[tableName] = table.GetKeyColumn() + t.tableColumnsByName[tableName] = make(map[string]*executorpb.ColumnMetadata) + for _, col := range table.GetColumn() { + t.tableColumnsByName[tableName][col.GetName()] = col + } + } +} + +// GetColumnType returns the column type of the given table and column. +func (t *TableMetadataHelper) GetColumnType(tableName string, colName string) (*spannerpb.Type, error) { + cols, ok := t.tableColumnsByName[tableName] + if !ok { + log.Printf("There is no metadata for table %s. Make sure that StartTransactionAction has TableMetadata correctly populated.", tableName) + return nil, spanner.ToSpannerError(status.Errorf(codes.InvalidArgument, "there is no metadata for table %s", tableName)) + } + colMetadata, ok := cols[colName] + if !ok { + log.Printf("Metadata for table %s contains no column named %s", tableName, colName) + return nil, spanner.ToSpannerError(status.Errorf(codes.InvalidArgument, "metadata for table %s contains no column named %s", tableName, colName)) + } + return colMetadata.GetType(), nil +} + +// getColumnTypes returns a list of column types of the given table. +func (t *TableMetadataHelper) getColumnTypes(tableName string) ([]*spannerpb.Type, error) { + cols, ok := t.tableColumnsInOrder[tableName] + if !ok { + log.Printf("There is no metadata for table %s. Make sure that StartTransactionAction has TableMetadata correctly populated.", tableName) + return nil, spanner.ToSpannerError(status.Errorf(codes.InvalidArgument, "there is no metadata for table %s", tableName)) + } + var colTypes []*spannerpb.Type + for _, col := range cols { + colTypes = append(colTypes, col.GetType()) + } + return colTypes, nil +} + +// GetKeyColumnTypes returns a list of key column types of the given table. +func (t *TableMetadataHelper) GetKeyColumnTypes(tableName string) ([]*spannerpb.Type, error) { + cols, ok := t.tableKeyColumnsInOrder[tableName] + if !ok { + log.Printf("There is no metadata for table %s. Make sure that StartTxnAction has TableMetadata correctly populated.", tableName) + return nil, fmt.Errorf("there is no metadata for table %s", tableName) + } + var colTypes []*spannerpb.Type + for _, col := range cols { + colTypes = append(colTypes, col.GetType()) + } + return colTypes, nil +} diff --git a/spanner/test/cloudexecutor/worker_proxy.go b/spanner/test/cloudexecutor/worker_proxy.go new file mode 100644 index 000000000000..bad33fbf9768 --- /dev/null +++ b/spanner/test/cloudexecutor/worker_proxy.go @@ -0,0 +1,153 @@ +// Copyright 2023 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package main + +// worker_proxy.go handles creation of the gRPC stream, and registering needed services. +// This file is responsible for spinning up the server for client to make requests to ExecuteActionAsync RPC. + +import ( + "context" + "flag" + "fmt" + "log" + "net" + "os" + "strings" + + "cloud.google.com/go/spanner/test/cloudexecutor/executor" + executorpb "cloud.google.com/go/spanner/test/cloudexecutor/proto" + "golang.org/x/oauth2" + "golang.org/x/oauth2/google" + "google.golang.org/api/option" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials" + "google.golang.org/grpc/health" + "google.golang.org/grpc/health/grpc_health_v1" +) + +var ( + proxyPort = flag.String("proxy_port", "", "Proxy port to start worker proxy on.") + spannerPort = flag.String("spanner_port", "", "Port of Spanner Frontend to which to send requests.") + cert = flag.String("cert", "", "Certificate used to connect to Spanner GFE.") + serviceKeyFile = flag.String("service_key_file", "", "Service key file used to set authentication.") + ipAddress = "127.0.0.1" +) + +func main() { + if d := os.Getenv("TEST_UNDECLARED_OUTPUTS_DIR"); d != "" { + os.Args = append(os.Args, "--log_dir="+d) + } + + flag.Parse() + if *proxyPort == "" { + log.Fatal("Proxy port need to be assigned in order to start worker proxy.") + } + if *spannerPort == "" { + log.Fatal("Spanner proxyPort need to be assigned in order to start worker proxy.") + } + if *cert == "" { + log.Fatalf("Certificate need to be assigned in order to start worker proxy.") + } + + lis, err := net.Listen("tcp", fmt.Sprintf(":%s", *proxyPort)) + if err != nil { + log.Fatal(err) + } + + // Create a new gRPC server + grpcServer := grpc.NewServer() + + clientOptions := getClientOptionsForSysTests() + // Create a new cloud proxy server + cloudProxyServer, err := executor.NewCloudProxyServer(context.Background(), clientOptions) + if err != nil { + log.Fatalf("Creating Cloud Proxy Server failed: %v", err) + } + // Register cloudProxyServer service on the grpcServer + executorpb.RegisterSpannerExecutorProxyServer(grpcServer, cloudProxyServer) + + // Create a new service health server + healthServer := health.NewServer() + // Register healthServer service on the grpcServer + grpc_health_v1.RegisterHealthServer(grpcServer, healthServer) + + log.Printf("Server started on proxyPort:%s\n", *proxyPort) + err = grpcServer.Serve(lis) + if err != nil { + log.Printf("Failed to start server on proxyPort: %s\n", *proxyPort) + } +} + +// Constructs client options needed to run executor for systests +func getClientOptionsForSysTests() []option.ClientOption { + var options []option.ClientOption + options = append(options, option.WithEndpoint(getEndPoint())) + options = append(options, option.WithGRPCDialOption(grpc.WithTransportCredentials(getCredentials()))) + + const ( + spannerAdminScope = "https://www.googleapis.com/auth/spanner.admin" + spannerDataScope = "https://www.googleapis.com/auth/spanner.data" + ) + + log.Println("Reading service key file in executor code") + cloudSystestCredentialsJSON, err := os.ReadFile(*serviceKeyFile) + if err != nil { + log.Fatal(err) + } + config, err := google.JWTConfigFromJSON([]byte(cloudSystestCredentialsJSON), spannerAdminScope, spannerDataScope) + if err != nil { + log.Println(err) + } + options = append(options, option.WithTokenSource(config.TokenSource(context.Background()))) + options = append(options, option.WithCredentialsFile(*serviceKeyFile)) + + return options +} + +type fakeTokenSource struct{} + +func (f *fakeTokenSource) Token() (*oauth2.Token, error) { + return &oauth2.Token{AccessToken: "fake token for test"}, nil +} + +// Constructs client options needed to run executor for unit tests +func getClientOptionsForUnitTests() []option.ClientOption { + var options []option.ClientOption + options = append(options, option.WithEndpoint(getEndPoint())) + options = append(options, option.WithGRPCDialOption(grpc.WithTransportCredentials(getCredentials()))) + options = append(options, option.WithTokenSource(&fakeTokenSource{})) + + return options +} + +func getEndPoint() string { + endpoint := strings.Join([]string{ipAddress, *spannerPort}, ":") + log.Printf("endpoint for grpc dial: %s", endpoint) + return endpoint +} + +func getCredentials() credentials.TransportCredentials { + creds, err := credentials.NewClientTLSFromFile(*cert, "test-cert-2") + if err != nil { + log.Println(err) + } + return creds +} + +// Constructs client options needed to run executor on local machine +func getClientOptionsForLocalTest() []option.ClientOption { + var options []option.ClientOption + return options +}