-
-
Notifications
You must be signed in to change notification settings - Fork 22
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #46 from devalexandre/feat-adapter-from-db-pgx
feat: add SQLAdapter for postgres
- Loading branch information
Showing
17 changed files
with
488 additions
and
13 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,22 @@ | ||
module github.com/vingarcia/ksql/adapters/kpostgres | ||
|
||
go 1.14 | ||
|
||
require ( | ||
github.com/Azure/go-ansiterm v0.0.0-20210617225240-d185dfc1b5a1 // indirect | ||
github.com/Microsoft/go-winio v0.5.2 // indirect | ||
github.com/Nvveen/Gotty v0.0.0-20120604004816-cd527374f1e5 // indirect | ||
github.com/cenkalti/backoff v2.2.1+incompatible // indirect | ||
github.com/containerd/continuity v0.2.2 // indirect | ||
github.com/docker/go-connections v0.4.0 // indirect | ||
github.com/google/go-cmp v0.5.6 // indirect | ||
github.com/gotestyourself/gotestyourself v2.2.0+incompatible // indirect | ||
github.com/lib/pq v1.10.4 | ||
github.com/opencontainers/image-spec v1.0.2 // indirect | ||
github.com/opencontainers/runc v1.1.0 // indirect | ||
github.com/ory/dockertest v3.3.5+incompatible | ||
github.com/vingarcia/ksql v1.11.1 | ||
golang.org/x/net v0.0.0-20220225172249-27dd8689420f // indirect | ||
golang.org/x/sys v0.0.0-20220315194320-039c03cc5b86 // indirect | ||
gotest.tools v2.2.0+incompatible // indirect | ||
) |
Large diffs are not rendered by default.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,13 @@ | ||
package kpostgres | ||
|
||
import ( | ||
"database/sql" | ||
|
||
"github.com/vingarcia/ksql" | ||
"github.com/vingarcia/ksql/sqldialect" | ||
) | ||
|
||
// NewFromSQLDB builds a ksql.DB from a *sql.DB instance | ||
func NewFromSQLDB(db *sql.DB) (ksql.DB, error) { | ||
return ksql.NewWithAdapter(NewSQLAdapter(db), sqldialect.PostgresDialect{}) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,92 @@ | ||
package kpostgres | ||
|
||
import ( | ||
"context" | ||
"database/sql" | ||
"fmt" | ||
"io" | ||
"log" | ||
"testing" | ||
"time" | ||
|
||
_ "github.com/lib/pq" | ||
"github.com/ory/dockertest" | ||
"github.com/ory/dockertest/docker" | ||
"github.com/vingarcia/ksql" | ||
"github.com/vingarcia/ksql/sqldialect" | ||
) | ||
|
||
func TestSQLAdapter(t *testing.T) { | ||
ctx := context.Background() | ||
|
||
postgresURL, closePostgres := startPostgresDB(ctx, "ksql") | ||
defer closePostgres() | ||
|
||
ksql.RunTestsForAdapter(t, "kpostgres", sqldialect.PostgresDialect{}, postgresURL, func(t *testing.T) (ksql.DBAdapter, io.Closer) { | ||
sqldb, err := sql.Open("postgres", postgresURL) | ||
if err != nil { | ||
t.Fatal(err.Error()) | ||
} | ||
|
||
return SQLAdapter{sqldb}, sqldb | ||
}) | ||
} | ||
|
||
func startPostgresDB(ctx context.Context, dbName string) (databaseURL string, closer func()) { | ||
// uses a sensible default on windows (tcp/http) and linux/osx (socket) | ||
dockerPool, err := dockertest.NewPool("") | ||
if err != nil { | ||
log.Fatalf("Could not connect to docker: %s", err) | ||
} | ||
|
||
// pulls an image, creates a container based on it and runs it | ||
resource, err := dockerPool.RunWithOptions( | ||
&dockertest.RunOptions{ | ||
Repository: "postgres", | ||
Tag: "14.0", | ||
Env: []string{ | ||
"POSTGRES_PASSWORD=postgres", | ||
"POSTGRES_USER=postgres", | ||
"POSTGRES_DB=" + dbName, | ||
"listen_addresses = '*'", | ||
}, | ||
}, | ||
func(config *docker.HostConfig) { | ||
// set AutoRemove to true so that stopped container goes away by itself | ||
config.AutoRemove = true | ||
config.RestartPolicy = docker.RestartPolicy{Name: "no"} | ||
}, | ||
) | ||
if err != nil { | ||
log.Fatalf("Could not start resource: %s", err) | ||
} | ||
|
||
hostAndPort := resource.GetHostPort("5432/tcp") | ||
databaseUrl := fmt.Sprintf("postgres://postgres:postgres@%s/%s?sslmode=disable", hostAndPort, dbName) | ||
|
||
fmt.Println("Connecting to postgres on url: ", databaseUrl) | ||
|
||
resource.Expire(40) // Tell docker to hard kill the container in 40 seconds | ||
|
||
var sqlDB *sql.DB | ||
// exponential backoff-retry, because the application in the container might not be ready to accept connections yet | ||
dockerPool.MaxWait = 10 * time.Second | ||
dockerPool.Retry(func() error { | ||
sqlDB, err = sql.Open("postgres", databaseUrl) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
return sqlDB.Ping() | ||
}) | ||
if err != nil { | ||
log.Fatalf("Could not connect to docker: %s", err) | ||
} | ||
sqlDB.Close() | ||
|
||
return databaseUrl, func() { | ||
if err := dockerPool.Purge(resource); err != nil { | ||
log.Fatalf("Could not purge resource: %s", err) | ||
} | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,114 @@ | ||
package kpostgres | ||
|
||
import ( | ||
"context" | ||
"database/sql" | ||
|
||
"github.com/vingarcia/ksql" | ||
"strconv" | ||
"strings" | ||
"unicode" | ||
) | ||
|
||
// SQLAdapter adapts the sql.DB type to be compatible with the `DBAdapter` interface | ||
type SQLAdapter struct { | ||
*sql.DB | ||
} | ||
|
||
var _ ksql.DBAdapter = SQLAdapter{} | ||
|
||
// NewSQLAdapter returns a new instance of SQLAdapter with | ||
// the provided database instance. | ||
func NewSQLAdapter(db *sql.DB) SQLAdapter { | ||
return SQLAdapter{ | ||
DB: db, | ||
} | ||
} | ||
|
||
// ExecContext implements the DBAdapter interface | ||
func (s SQLAdapter) ExecContext(ctx context.Context, query string, args ...interface{}) (ksql.Result, error) { | ||
return s.DB.ExecContext(ctx, query, args...) | ||
} | ||
|
||
// QueryContext implements the DBAdapter interface | ||
func (s SQLAdapter) QueryContext(ctx context.Context, query string, args ...interface{}) (ksql.Rows, error) { | ||
rows, err := s.DB.QueryContext(ctx, query, args...) | ||
return SQLRows{rows}, err | ||
} | ||
|
||
// BeginTx implements the Tx interface | ||
func (s SQLAdapter) BeginTx(ctx context.Context) (ksql.Tx, error) { | ||
tx, err := s.DB.BeginTx(ctx, nil) | ||
return SQLTx{Tx: tx}, err | ||
} | ||
|
||
// Close implements the io.Closer interface | ||
func (s SQLAdapter) Close() error { | ||
return s.DB.Close() | ||
} | ||
|
||
// SQLTx is used to implement the DBAdapter interface and implements | ||
// the Tx interface | ||
type SQLTx struct { | ||
*sql.Tx | ||
} | ||
|
||
// ExecContext implements the Tx interface | ||
func (s SQLTx) ExecContext(ctx context.Context, query string, args ...interface{}) (ksql.Result, error) { | ||
return s.Tx.ExecContext(ctx, query, args...) | ||
} | ||
|
||
// QueryContext implements the Tx interface | ||
func (s SQLTx) QueryContext(ctx context.Context, query string, args ...interface{}) (ksql.Rows, error) { | ||
rows, err := s.Tx.QueryContext(ctx, query, args...) | ||
return SQLRows{rows}, err | ||
} | ||
|
||
// Rollback implements the Tx interface | ||
func (s SQLTx) Rollback(ctx context.Context) error { | ||
return s.Tx.Rollback() | ||
} | ||
|
||
// Commit implements the Tx interface | ||
func (s SQLTx) Commit(ctx context.Context) error { | ||
return s.Tx.Commit() | ||
} | ||
|
||
var _ ksql.Tx = SQLTx{} | ||
|
||
// SQLRows implements the ksql.Rows interface and is used to help | ||
// the SQLAdapter to implement the ksql.DBAdapter interface. | ||
type SQLRows struct { | ||
*sql.Rows | ||
} | ||
|
||
var _ ksql.Rows = SQLRows{} | ||
|
||
// Scan implements the ksql.Rows interface | ||
func (p SQLRows) Scan(args ...interface{}) error { | ||
err := p.Rows.Scan(args...) | ||
if err != nil { | ||
// Since this is the error flow we decided it would be ok | ||
// to spend a little bit more time parsing this error in order | ||
// to produce better error messages. | ||
// | ||
// If the parsing fails we just return the error unchanged. | ||
const scanErrPrefix = "sql: Scan error on column index " | ||
var errMsg = err.Error() | ||
if strings.HasPrefix(errMsg, scanErrPrefix) { | ||
i := len(scanErrPrefix) | ||
for unicode.IsDigit(rune(errMsg[i])) { | ||
i++ | ||
} | ||
colIndex, convErr := strconv.Atoi(errMsg[len(scanErrPrefix):i]) | ||
if convErr == nil { | ||
return ksql.ScanArgError{ | ||
ColumnIndex: colIndex, | ||
Err: err, | ||
} | ||
} | ||
} | ||
} | ||
|
||
return err | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Oops, something went wrong.
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Oops, something went wrong.
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters