diff --git a/go/adbc/driver/flightsql/flightsql_adbc.go b/go/adbc/driver/flightsql/flightsql_adbc.go index 70ba50182d..1dea6eb857 100644 --- a/go/adbc/driver/flightsql/flightsql_adbc.go +++ b/go/adbc/driver/flightsql/flightsql_adbc.go @@ -57,6 +57,7 @@ import ( "github.com/apache/arrow/go/v13/arrow/memory" "github.com/bluele/gcache" "golang.org/x/exp/maps" + "golang.org/x/exp/slog" "google.golang.org/grpc" grpccodes "google.golang.org/grpc/codes" "google.golang.org/grpc/credentials" @@ -156,6 +157,7 @@ func (d Driver) NewDatabase(opts map[string]string) (adbc.Database, error) { db.dialOpts.block = false db.dialOpts.maxMsgSize = 16 * 1024 * 1024 + db.logger = nilLogger() db.options = make(map[string]string) return db, db.SetOptions(opts) @@ -186,11 +188,20 @@ type database struct { timeout timeoutOption dialOpts dbDialOpts enableCookies bool + logger *slog.Logger options map[string]string alloc memory.Allocator } +func (d *database) SetLogger(logger *slog.Logger) { + if logger != nil { + d.logger = logger + } else { + d.logger = nilLogger() + } +} + func (d *database) SetOptions(cnOptions map[string]string) error { var tlsConfig tls.Config @@ -691,6 +702,10 @@ func (b *bearerAuthMiddleware) HeadersReceived(ctx context.Context, md metadata. func getFlightClient(ctx context.Context, loc string, d *database) (*flightsql.Client, error) { authMiddle := &bearerAuthMiddleware{hdrs: d.hdrs.Copy()} middleware := []flight.ClientMiddleware{ + { + Unary: makeUnaryLoggingInterceptor(d.logger), + Stream: makeStreamLoggingInterceptor(d.logger), + }, flight.CreateClientMiddleware(authMiddle), { Unary: unaryTimeoutInterceptor, diff --git a/go/adbc/driver/flightsql/logging.go b/go/adbc/driver/flightsql/logging.go new file mode 100644 index 0000000000..3d0f51c83a --- /dev/null +++ b/go/adbc/driver/flightsql/logging.go @@ -0,0 +1,103 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package flightsql + +import ( + "context" + "io" + "os" + "time" + + "golang.org/x/exp/maps" + "golang.org/x/exp/slices" + "golang.org/x/exp/slog" + "google.golang.org/grpc" + "google.golang.org/grpc/metadata" +) + +func nilLogger() *slog.Logger { + h := slog.NewJSONHandler(os.Stderr, &slog.HandlerOptions{ + AddSource: false, + Level: slog.LevelError, + }) + return slog.New(h) +} + +func makeUnaryLoggingInterceptor(logger *slog.Logger) grpc.UnaryClientInterceptor { + interceptor := func(ctx context.Context, method string, req, reply any, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { + start := time.Now() + // Ignore errors + outgoing, _ := metadata.FromOutgoingContext(ctx) + err := invoker(ctx, method, req, reply, cc, opts...) + if logger.Enabled(ctx, slog.LevelDebug) { + logger.DebugContext(ctx, method, "target", cc.Target(), "duration", time.Since(start), "err", err, "metadata", outgoing) + } else { + keys := maps.Keys(outgoing) + slices.Sort(keys) + logger.InfoContext(ctx, method, "target", cc.Target(), "duration", time.Since(start), "err", err, "metadata", keys) + } + return err + } + return interceptor +} + +func makeStreamLoggingInterceptor(logger *slog.Logger) grpc.StreamClientInterceptor { + interceptor := func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) { + start := time.Now() + // Ignore errors + outgoing, _ := metadata.FromOutgoingContext(ctx) + stream, err := streamer(ctx, desc, cc, method, opts...) + if err != nil { + logger.InfoContext(ctx, method, "target", cc.Target(), "duration", time.Since(start), "err", err) + return stream, err + } + + return &loggedStream{ClientStream: stream, logger: logger, ctx: ctx, method: method, start: start, target: cc.Target(), outgoing: outgoing}, err + } + return interceptor +} + +type loggedStream struct { + grpc.ClientStream + + logger *slog.Logger + ctx context.Context + method string + start time.Time + target string + outgoing metadata.MD +} + +func (stream *loggedStream) RecvMsg(m any) error { + err := stream.ClientStream.RecvMsg(m) + if err != nil { + loggedErr := err + if loggedErr == io.EOF { + loggedErr = nil + } + + if stream.logger.Enabled(stream.ctx, slog.LevelDebug) { + stream.logger.DebugContext(stream.ctx, stream.method, "target", stream.target, "duration", time.Since(stream.start), "err", loggedErr, "metadata", stream.outgoing) + } else { + keys := maps.Keys(stream.outgoing) + slices.Sort(keys) + stream.logger.InfoContext(stream.ctx, stream.method, "target", stream.target, "duration", time.Since(stream.start), "err", loggedErr, "metadata", keys) + } + } + return err +} diff --git a/go/adbc/ext.go b/go/adbc/ext.go new file mode 100644 index 0000000000..dcfaeaebf9 --- /dev/null +++ b/go/adbc/ext.go @@ -0,0 +1,30 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package adbc + +import ( + "golang.org/x/exp/slog" +) + +// DatabaseLogging is a Database that also supports logging information to an +// application-supplied log sink. +// +// EXPERIMENTAL. Not formally part of the ADBC APIs. +type DatabaseLogging interface { + SetLogger(*slog.Logger) +} diff --git a/go/adbc/go.mod b/go/adbc/go.mod index b1bb5234c0..b3d0ecd5c6 100644 --- a/go/adbc/go.mod +++ b/go/adbc/go.mod @@ -17,7 +17,7 @@ module github.com/apache/arrow-adbc/go/adbc -go 1.18 +go 1.19 require ( github.com/apache/arrow/go/v13 v13.0.0 diff --git a/go/adbc/pkg/_tmpl/driver.go.tmpl b/go/adbc/pkg/_tmpl/driver.go.tmpl index 24c15f3960..1ffa1f851d 100644 --- a/go/adbc/pkg/_tmpl/driver.go.tmpl +++ b/go/adbc/pkg/_tmpl/driver.go.tmpl @@ -53,6 +53,7 @@ import ( "os" "runtime" "runtime/cgo" + "strings" "sync/atomic" "unsafe" @@ -61,6 +62,7 @@ import ( "github.com/apache/arrow/go/v13/arrow/cdata" "github.com/apache/arrow/go/v13/arrow/memory" "github.com/apache/arrow/go/v13/arrow/memory/mallocator" + "golang.org/x/exp/slog" ) // Must use malloc() to respect CGO rules @@ -71,6 +73,7 @@ var drv = {{.Driver}}{Alloc: mallocator.NewMallocator()} var globalPoison int32 = 0 const errPrefix = "[{{.Prefix}}] " +const logLevelEnvVar = "ADBC_DRIVER_{{.PrefixUpper}}_LOG_LEVEL" func setErr(err *C.struct_AdbcError, format string, vals ...interface{}) { if err == nil { @@ -162,6 +165,45 @@ func poison(err *C.struct_AdbcError, fname string, e interface{}) C.AdbcStatusCo return C.ADBC_STATUS_INTERNAL } +// Check environment variables and enable logging if possible. +func initLoggingFromEnv(db adbc.Database) { + logLevel := slog.LevelError + switch strings.ToLower(os.Getenv(logLevelEnvVar)) { + case "debug": + logLevel = slog.LevelDebug + case "info": + logLevel = slog.LevelInfo + case "warn": + case "warning": + logLevel = slog.LevelWarn + case "error": + logLevel = slog.LevelError + case "": + return + default: + printLoggingHelp() + return + } + + h := slog.NewJSONHandler(os.Stderr, &slog.HandlerOptions{ + AddSource: false, + Level: logLevel, + }) + logger := slog.New(h) + + ext, ok := db.(adbc.DatabaseLogging) + if !ok { + logger.Error("{{.Prefix}} does not support logging") + return + } + ext.SetLogger(logger) +} + +func printLoggingHelp() { + fmt.Fprintf(os.Stderr, "{{.Prefix}}: to enable logging, set %s to 'debug', 'info', 'warn', or 'error'", logLevelEnvVar) +} + + // Allocate a new cgo.Handle and store its address in a heap-allocated // uintptr_t. Experimentally, this was found to be necessary, else // something (the Go runtime?) would corrupt (garbage-collect?) the @@ -305,7 +347,7 @@ func (cStream *cArrayStream) maybeError() C.int { //export {{.Prefix}}ArrayStreamGetLastError func {{.Prefix}}ArrayStreamGetLastError(stream *C.struct_ArrowArrayStream) *C.cchar_t { - if stream == nil || stream.release != (*[0]byte)(C.{{.Prefix}}ArrayStreamRelease) { + if stream == nil || stream.release != (*[0]byte)(C.{{.Prefix}}ArrayStreamRelease) || stream.private_data == nil { return nil } cStream := getFromHandle[cArrayStream](stream.private_data) @@ -317,7 +359,7 @@ func {{.Prefix}}ArrayStreamGetLastError(stream *C.struct_ArrowArrayStream) *C.cc //export {{.Prefix}}ArrayStreamGetNext func {{.Prefix}}ArrayStreamGetNext(stream *C.struct_ArrowArrayStream, array *C.struct_ArrowArray) C.int { - if stream == nil || stream.release != (*[0]byte)(C.{{.Prefix}}ArrayStreamRelease) { + if stream == nil || stream.release != (*[0]byte)(C.{{.Prefix}}ArrayStreamRelease) || stream.private_data == nil { return C.EINVAL } cStream := getFromHandle[cArrayStream](stream.private_data) @@ -332,7 +374,7 @@ func {{.Prefix}}ArrayStreamGetNext(stream *C.struct_ArrowArrayStream, array *C.s //export {{.Prefix}}ArrayStreamGetSchema func {{.Prefix}}ArrayStreamGetSchema(stream *C.struct_ArrowArrayStream, schema *C.struct_ArrowSchema) C.int { - if stream == nil || stream.release != (*[0]byte)(C.{{.Prefix}}ArrayStreamRelease) { + if stream == nil || stream.release != (*[0]byte)(C.{{.Prefix}}ArrayStreamRelease) || stream.private_data == nil { return C.EINVAL } cStream := getFromHandle[cArrayStream](stream.private_data) @@ -346,7 +388,7 @@ func {{.Prefix}}ArrayStreamGetSchema(stream *C.struct_ArrowArrayStream, schema * //export {{.Prefix}}ArrayStreamRelease func {{.Prefix}}ArrayStreamRelease(stream *C.struct_ArrowArrayStream) { - if stream == nil || stream.release != (*[0]byte)(C.{{.Prefix}}ArrayStreamRelease) { + if stream == nil || stream.release != (*[0]byte)(C.{{.Prefix}}ArrayStreamRelease) || stream.private_data == nil { return } h := (*(*cgo.Handle)(stream.private_data)) @@ -365,7 +407,7 @@ func {{.Prefix}}ArrayStreamRelease(stream *C.struct_ArrowArrayStream) { //export {{.Prefix}}ErrorFromArrayStream func {{.Prefix}}ErrorFromArrayStream(stream *C.struct_ArrowArrayStream, status *C.AdbcStatusCode) (*C.struct_AdbcError) { - if stream == nil || stream.release != (*[0]byte)(C.{{.Prefix}}ArrayStreamRelease) { + if stream == nil || stream.release != (*[0]byte)(C.{{.Prefix}}ArrayStreamRelease) || stream.private_data == nil { return nil } cStream := getFromHandle[cArrayStream](stream.private_data) @@ -509,6 +551,8 @@ func {{.Prefix}}DatabaseInit(db *C.struct_AdbcDatabase, err *C.struct_AdbcError) return C.AdbcStatusCode(errToAdbcErr(err, aerr)) } + initLoggingFromEnv(adb) + cdb.db = adb return C.ADBC_STATUS_OK } diff --git a/go/adbc/pkg/flightsql/driver.go b/go/adbc/pkg/flightsql/driver.go index 46e096952c..270ffaafe1 100644 --- a/go/adbc/pkg/flightsql/driver.go +++ b/go/adbc/pkg/flightsql/driver.go @@ -55,6 +55,7 @@ import ( "os" "runtime" "runtime/cgo" + "strings" "sync/atomic" "unsafe" @@ -64,6 +65,7 @@ import ( "github.com/apache/arrow/go/v13/arrow/cdata" "github.com/apache/arrow/go/v13/arrow/memory" "github.com/apache/arrow/go/v13/arrow/memory/mallocator" + "golang.org/x/exp/slog" ) // Must use malloc() to respect CGO rules @@ -75,6 +77,7 @@ var drv = flightsql.Driver{Alloc: mallocator.NewMallocator()} var globalPoison int32 = 0 const errPrefix = "[FlightSQL] " +const logLevelEnvVar = "ADBC_DRIVER_FLIGHTSQL_LOG_LEVEL" func setErr(err *C.struct_AdbcError, format string, vals ...interface{}) { if err == nil { @@ -166,6 +169,44 @@ func poison(err *C.struct_AdbcError, fname string, e interface{}) C.AdbcStatusCo return C.ADBC_STATUS_INTERNAL } +// Check environment variables and enable logging if possible. +func initLoggingFromEnv(db adbc.Database) { + logLevel := slog.LevelError + switch strings.ToLower(os.Getenv(logLevelEnvVar)) { + case "debug": + logLevel = slog.LevelDebug + case "info": + logLevel = slog.LevelInfo + case "warn": + case "warning": + logLevel = slog.LevelWarn + case "error": + logLevel = slog.LevelError + case "": + return + default: + printLoggingHelp() + return + } + + h := slog.NewJSONHandler(os.Stderr, &slog.HandlerOptions{ + AddSource: false, + Level: logLevel, + }) + logger := slog.New(h) + + ext, ok := db.(adbc.DatabaseLogging) + if !ok { + logger.Error("FlightSQL does not support logging") + return + } + ext.SetLogger(logger) +} + +func printLoggingHelp() { + fmt.Fprintf(os.Stderr, "FlightSQL: to enable logging, set %s to 'debug', 'info', 'warn', or 'error'", logLevelEnvVar) +} + // Allocate a new cgo.Handle and store its address in a heap-allocated // uintptr_t. Experimentally, this was found to be necessary, else // something (the Go runtime?) would corrupt (garbage-collect?) the @@ -309,7 +350,7 @@ func (cStream *cArrayStream) maybeError() C.int { //export FlightSQLArrayStreamGetLastError func FlightSQLArrayStreamGetLastError(stream *C.struct_ArrowArrayStream) *C.cchar_t { - if stream == nil || stream.release != (*[0]byte)(C.FlightSQLArrayStreamRelease) { + if stream == nil || stream.release != (*[0]byte)(C.FlightSQLArrayStreamRelease) || stream.private_data == nil { return nil } cStream := getFromHandle[cArrayStream](stream.private_data) @@ -321,7 +362,7 @@ func FlightSQLArrayStreamGetLastError(stream *C.struct_ArrowArrayStream) *C.ccha //export FlightSQLArrayStreamGetNext func FlightSQLArrayStreamGetNext(stream *C.struct_ArrowArrayStream, array *C.struct_ArrowArray) C.int { - if stream == nil || stream.release != (*[0]byte)(C.FlightSQLArrayStreamRelease) { + if stream == nil || stream.release != (*[0]byte)(C.FlightSQLArrayStreamRelease) || stream.private_data == nil { return C.EINVAL } cStream := getFromHandle[cArrayStream](stream.private_data) @@ -336,7 +377,7 @@ func FlightSQLArrayStreamGetNext(stream *C.struct_ArrowArrayStream, array *C.str //export FlightSQLArrayStreamGetSchema func FlightSQLArrayStreamGetSchema(stream *C.struct_ArrowArrayStream, schema *C.struct_ArrowSchema) C.int { - if stream == nil || stream.release != (*[0]byte)(C.FlightSQLArrayStreamRelease) { + if stream == nil || stream.release != (*[0]byte)(C.FlightSQLArrayStreamRelease) || stream.private_data == nil { return C.EINVAL } cStream := getFromHandle[cArrayStream](stream.private_data) @@ -350,7 +391,7 @@ func FlightSQLArrayStreamGetSchema(stream *C.struct_ArrowArrayStream, schema *C. //export FlightSQLArrayStreamRelease func FlightSQLArrayStreamRelease(stream *C.struct_ArrowArrayStream) { - if stream == nil || stream.release != (*[0]byte)(C.FlightSQLArrayStreamRelease) { + if stream == nil || stream.release != (*[0]byte)(C.FlightSQLArrayStreamRelease) || stream.private_data == nil { return } h := (*(*cgo.Handle)(stream.private_data)) @@ -369,7 +410,7 @@ func FlightSQLArrayStreamRelease(stream *C.struct_ArrowArrayStream) { //export FlightSQLErrorFromArrayStream func FlightSQLErrorFromArrayStream(stream *C.struct_ArrowArrayStream, status *C.AdbcStatusCode) *C.struct_AdbcError { - if stream == nil || stream.release != (*[0]byte)(C.FlightSQLArrayStreamRelease) { + if stream == nil || stream.release != (*[0]byte)(C.FlightSQLArrayStreamRelease) || stream.private_data == nil { return nil } cStream := getFromHandle[cArrayStream](stream.private_data) @@ -513,6 +554,8 @@ func FlightSQLDatabaseInit(db *C.struct_AdbcDatabase, err *C.struct_AdbcError) ( return C.AdbcStatusCode(errToAdbcErr(err, aerr)) } + initLoggingFromEnv(adb) + cdb.db = adb return C.ADBC_STATUS_OK } diff --git a/go/adbc/pkg/gen/main.go b/go/adbc/pkg/gen/main.go index 24ae7d2596..16f0001f03 100644 --- a/go/adbc/pkg/gen/main.go +++ b/go/adbc/pkg/gen/main.go @@ -75,8 +75,9 @@ func (p *pathSpec) IsGoFile() bool { return filepath.Ext(p.out) == ".go" } func (p *pathSpec) IsCFile() bool { return filepath.Ext(p.out) == ".c" || filepath.Ext(p.out) == ".h" } type tmplData struct { - Driver string - Prefix string + Driver string + Prefix string + PrefixUpper string } var fileList = []string{ @@ -125,7 +126,11 @@ func main() { out: filepath.Join(*outDir, strings.TrimSuffix(f, Ext))} } - process(tmplData{Driver: pkg[0].Name + "." + *driverType, Prefix: *prefix}, specs) + process(tmplData{ + Driver: pkg[0].Name + "." + *driverType, + Prefix: *prefix, + PrefixUpper: strings.ToUpper(*prefix), + }, specs) } func mustReadAll(path string) []byte { diff --git a/go/adbc/pkg/panicdummy/driver.go b/go/adbc/pkg/panicdummy/driver.go index c99153ccb5..1e2279e0ef 100644 --- a/go/adbc/pkg/panicdummy/driver.go +++ b/go/adbc/pkg/panicdummy/driver.go @@ -55,6 +55,7 @@ import ( "os" "runtime" "runtime/cgo" + "strings" "sync/atomic" "unsafe" @@ -64,6 +65,7 @@ import ( "github.com/apache/arrow/go/v13/arrow/cdata" "github.com/apache/arrow/go/v13/arrow/memory" "github.com/apache/arrow/go/v13/arrow/memory/mallocator" + "golang.org/x/exp/slog" ) // Must use malloc() to respect CGO rules @@ -75,6 +77,7 @@ var drv = panicdummy.Driver{Alloc: mallocator.NewMallocator()} var globalPoison int32 = 0 const errPrefix = "[PanicDummy] " +const logLevelEnvVar = "ADBC_DRIVER_PANICDUMMY_LOG_LEVEL" func setErr(err *C.struct_AdbcError, format string, vals ...interface{}) { if err == nil { @@ -166,6 +169,44 @@ func poison(err *C.struct_AdbcError, fname string, e interface{}) C.AdbcStatusCo return C.ADBC_STATUS_INTERNAL } +// Check environment variables and enable logging if possible. +func initLoggingFromEnv(db adbc.Database) { + logLevel := slog.LevelError + switch strings.ToLower(os.Getenv(logLevelEnvVar)) { + case "debug": + logLevel = slog.LevelDebug + case "info": + logLevel = slog.LevelInfo + case "warn": + case "warning": + logLevel = slog.LevelWarn + case "error": + logLevel = slog.LevelError + case "": + return + default: + printLoggingHelp() + return + } + + h := slog.NewJSONHandler(os.Stderr, &slog.HandlerOptions{ + AddSource: false, + Level: logLevel, + }) + logger := slog.New(h) + + ext, ok := db.(adbc.DatabaseLogging) + if !ok { + logger.Error("PanicDummy does not support logging") + return + } + ext.SetLogger(logger) +} + +func printLoggingHelp() { + fmt.Fprintf(os.Stderr, "PanicDummy: to enable logging, set %s to 'debug', 'info', 'warn', or 'error'", logLevelEnvVar) +} + // Allocate a new cgo.Handle and store its address in a heap-allocated // uintptr_t. Experimentally, this was found to be necessary, else // something (the Go runtime?) would corrupt (garbage-collect?) the @@ -309,7 +350,7 @@ func (cStream *cArrayStream) maybeError() C.int { //export PanicDummyArrayStreamGetLastError func PanicDummyArrayStreamGetLastError(stream *C.struct_ArrowArrayStream) *C.cchar_t { - if stream == nil || stream.release != (*[0]byte)(C.PanicDummyArrayStreamRelease) { + if stream == nil || stream.release != (*[0]byte)(C.PanicDummyArrayStreamRelease) || stream.private_data == nil { return nil } cStream := getFromHandle[cArrayStream](stream.private_data) @@ -321,7 +362,7 @@ func PanicDummyArrayStreamGetLastError(stream *C.struct_ArrowArrayStream) *C.cch //export PanicDummyArrayStreamGetNext func PanicDummyArrayStreamGetNext(stream *C.struct_ArrowArrayStream, array *C.struct_ArrowArray) C.int { - if stream == nil || stream.release != (*[0]byte)(C.PanicDummyArrayStreamRelease) { + if stream == nil || stream.release != (*[0]byte)(C.PanicDummyArrayStreamRelease) || stream.private_data == nil { return C.EINVAL } cStream := getFromHandle[cArrayStream](stream.private_data) @@ -336,7 +377,7 @@ func PanicDummyArrayStreamGetNext(stream *C.struct_ArrowArrayStream, array *C.st //export PanicDummyArrayStreamGetSchema func PanicDummyArrayStreamGetSchema(stream *C.struct_ArrowArrayStream, schema *C.struct_ArrowSchema) C.int { - if stream == nil || stream.release != (*[0]byte)(C.PanicDummyArrayStreamRelease) { + if stream == nil || stream.release != (*[0]byte)(C.PanicDummyArrayStreamRelease) || stream.private_data == nil { return C.EINVAL } cStream := getFromHandle[cArrayStream](stream.private_data) @@ -350,7 +391,7 @@ func PanicDummyArrayStreamGetSchema(stream *C.struct_ArrowArrayStream, schema *C //export PanicDummyArrayStreamRelease func PanicDummyArrayStreamRelease(stream *C.struct_ArrowArrayStream) { - if stream == nil || stream.release != (*[0]byte)(C.PanicDummyArrayStreamRelease) { + if stream == nil || stream.release != (*[0]byte)(C.PanicDummyArrayStreamRelease) || stream.private_data == nil { return } h := (*(*cgo.Handle)(stream.private_data)) @@ -369,7 +410,7 @@ func PanicDummyArrayStreamRelease(stream *C.struct_ArrowArrayStream) { //export PanicDummyErrorFromArrayStream func PanicDummyErrorFromArrayStream(stream *C.struct_ArrowArrayStream, status *C.AdbcStatusCode) *C.struct_AdbcError { - if stream == nil || stream.release != (*[0]byte)(C.PanicDummyArrayStreamRelease) { + if stream == nil || stream.release != (*[0]byte)(C.PanicDummyArrayStreamRelease) || stream.private_data == nil { return nil } cStream := getFromHandle[cArrayStream](stream.private_data) @@ -513,6 +554,8 @@ func PanicDummyDatabaseInit(db *C.struct_AdbcDatabase, err *C.struct_AdbcError) return C.AdbcStatusCode(errToAdbcErr(err, aerr)) } + initLoggingFromEnv(adb) + cdb.db = adb return C.ADBC_STATUS_OK } diff --git a/go/adbc/pkg/snowflake/driver.go b/go/adbc/pkg/snowflake/driver.go index 4804e32e38..e468924d30 100644 --- a/go/adbc/pkg/snowflake/driver.go +++ b/go/adbc/pkg/snowflake/driver.go @@ -55,6 +55,7 @@ import ( "os" "runtime" "runtime/cgo" + "strings" "sync/atomic" "unsafe" @@ -64,6 +65,7 @@ import ( "github.com/apache/arrow/go/v13/arrow/cdata" "github.com/apache/arrow/go/v13/arrow/memory" "github.com/apache/arrow/go/v13/arrow/memory/mallocator" + "golang.org/x/exp/slog" ) // Must use malloc() to respect CGO rules @@ -75,6 +77,7 @@ var drv = snowflake.Driver{Alloc: mallocator.NewMallocator()} var globalPoison int32 = 0 const errPrefix = "[Snowflake] " +const logLevelEnvVar = "ADBC_DRIVER_SNOWFLAKE_LOG_LEVEL" func setErr(err *C.struct_AdbcError, format string, vals ...interface{}) { if err == nil { @@ -166,6 +169,44 @@ func poison(err *C.struct_AdbcError, fname string, e interface{}) C.AdbcStatusCo return C.ADBC_STATUS_INTERNAL } +// Check environment variables and enable logging if possible. +func initLoggingFromEnv(db adbc.Database) { + logLevel := slog.LevelError + switch strings.ToLower(os.Getenv(logLevelEnvVar)) { + case "debug": + logLevel = slog.LevelDebug + case "info": + logLevel = slog.LevelInfo + case "warn": + case "warning": + logLevel = slog.LevelWarn + case "error": + logLevel = slog.LevelError + case "": + return + default: + printLoggingHelp() + return + } + + h := slog.NewJSONHandler(os.Stderr, &slog.HandlerOptions{ + AddSource: false, + Level: logLevel, + }) + logger := slog.New(h) + + ext, ok := db.(adbc.DatabaseLogging) + if !ok { + logger.Error("Snowflake does not support logging") + return + } + ext.SetLogger(logger) +} + +func printLoggingHelp() { + fmt.Fprintf(os.Stderr, "Snowflake: to enable logging, set %s to 'debug', 'info', 'warn', or 'error'", logLevelEnvVar) +} + // Allocate a new cgo.Handle and store its address in a heap-allocated // uintptr_t. Experimentally, this was found to be necessary, else // something (the Go runtime?) would corrupt (garbage-collect?) the @@ -309,7 +350,7 @@ func (cStream *cArrayStream) maybeError() C.int { //export SnowflakeArrayStreamGetLastError func SnowflakeArrayStreamGetLastError(stream *C.struct_ArrowArrayStream) *C.cchar_t { - if stream == nil || stream.release != (*[0]byte)(C.SnowflakeArrayStreamRelease) { + if stream == nil || stream.release != (*[0]byte)(C.SnowflakeArrayStreamRelease) || stream.private_data == nil { return nil } cStream := getFromHandle[cArrayStream](stream.private_data) @@ -321,7 +362,7 @@ func SnowflakeArrayStreamGetLastError(stream *C.struct_ArrowArrayStream) *C.ccha //export SnowflakeArrayStreamGetNext func SnowflakeArrayStreamGetNext(stream *C.struct_ArrowArrayStream, array *C.struct_ArrowArray) C.int { - if stream == nil || stream.release != (*[0]byte)(C.SnowflakeArrayStreamRelease) { + if stream == nil || stream.release != (*[0]byte)(C.SnowflakeArrayStreamRelease) || stream.private_data == nil { return C.EINVAL } cStream := getFromHandle[cArrayStream](stream.private_data) @@ -336,7 +377,7 @@ func SnowflakeArrayStreamGetNext(stream *C.struct_ArrowArrayStream, array *C.str //export SnowflakeArrayStreamGetSchema func SnowflakeArrayStreamGetSchema(stream *C.struct_ArrowArrayStream, schema *C.struct_ArrowSchema) C.int { - if stream == nil || stream.release != (*[0]byte)(C.SnowflakeArrayStreamRelease) { + if stream == nil || stream.release != (*[0]byte)(C.SnowflakeArrayStreamRelease) || stream.private_data == nil { return C.EINVAL } cStream := getFromHandle[cArrayStream](stream.private_data) @@ -350,7 +391,7 @@ func SnowflakeArrayStreamGetSchema(stream *C.struct_ArrowArrayStream, schema *C. //export SnowflakeArrayStreamRelease func SnowflakeArrayStreamRelease(stream *C.struct_ArrowArrayStream) { - if stream == nil || stream.release != (*[0]byte)(C.SnowflakeArrayStreamRelease) { + if stream == nil || stream.release != (*[0]byte)(C.SnowflakeArrayStreamRelease) || stream.private_data == nil { return } h := (*(*cgo.Handle)(stream.private_data)) @@ -369,7 +410,7 @@ func SnowflakeArrayStreamRelease(stream *C.struct_ArrowArrayStream) { //export SnowflakeErrorFromArrayStream func SnowflakeErrorFromArrayStream(stream *C.struct_ArrowArrayStream, status *C.AdbcStatusCode) *C.struct_AdbcError { - if stream == nil || stream.release != (*[0]byte)(C.SnowflakeArrayStreamRelease) { + if stream == nil || stream.release != (*[0]byte)(C.SnowflakeArrayStreamRelease) || stream.private_data == nil { return nil } cStream := getFromHandle[cArrayStream](stream.private_data) @@ -513,6 +554,8 @@ func SnowflakeDatabaseInit(db *C.struct_AdbcDatabase, err *C.struct_AdbcError) ( return C.AdbcStatusCode(errToAdbcErr(err, aerr)) } + initLoggingFromEnv(adb) + cdb.db = adb return C.ADBC_STATUS_OK }