An idiomatic KSQL go client with database/sql
integrations.
This project is currently an unofficial ksqlDB client for Go until it reaches maturity.
Once maturity it reached, the plan is to integrate it into the ksqlDB codebase and give it official support.
The original design doc is in the ksqlDB repository.
This client has been developed with the goals outlined in the ksqlDB developer guide here.
The best place to start is in the examples folder. It contains fully fledged examples for working with database/sql
, sqlx, and the client directly.
Querying a stream with sqlx (recommended)
package main
import (
"context"
"errors"
"log"
"time"
"github.com/jmoiron/sqlx"
_ "github.com/vancelongwill/ksql-go/stdlib" // import the database/sql driver
)
type Item struct {
K string `db:"K"`
V1 int `db:"V1"`
V2 string `db:"V2"`
V3 bool `db:"V3"`
}
func run(ctx context.Context) error {
db, err := sqlx.Open("ksqldb", "http://0.0.0.0:8088/")
if err != nil {
return err
}
rows, err := db.QueryxContext(ctx, "SELECT * FROM t1 WHERE v1 > -1 EMIT CHANGES;")
if err != nil {
return err
}
defer rows.Close()
// this will continue forever unless the context is cancelled, or rows.Close is called
for rows.Next() {
var r Item
err := rows.StructScan(&r)
if err != nil {
return err
}
log.Println(r)
}
return rows.Err()
}
// create a ksqldb client with a custom HTTP client (for auth etc)
httpClient := &http.Client{}
client := ksql.New("http://0.0.0.0:8088", ksql.WithHTTPClient(httpClient))
// convert to databse/sql connection
sqlDB := sql.OpenDB(stdlib.NewConnector(client))
// again, using jmoiron/sqlx to interact with the DB
db := sqlx.NewDb(sqlDB, "ksqldb")
type basicAuth struct {
Username string
Password string
}
func (bat basicAuth) RoundTrip(req *http.Request) (*http.Response, error) {
req.Header.Set("Authorization", fmt.Sprintf("Basic %s",
base64.StdEncoding.EncodeToString([]byte(fmt.Sprintf("%s:%s",
bat.Username, bat.Password)))))
return http.DefaultTransport.RoundTrip(req)
}
func run(ctx context.Context) error {
// create a ksqldb client with a custom HTTP client (for auth etc)
httpClient := &http.Client{
Transport: basicAuth{
Username: "youruser@email.com",
Password: "somepassword",
},
}
client := ksql.New("http://0.0.0.0:8088", ksql.WithHTTPClient(httpClient))
sqlDB := sql.OpenDB(stdlib.NewConnector(client))
db := sqlx.NewDb(sqlDB, "ksqldb")
...
}
Oauth2 (with x/oauth2)
creds := &clientcredentials.Config{
ClientID: "yourID",
ClientSecret: "yoursecret",
TokenURL: "https://yourauthservice.com/oauth2/token",
}
client := ksql.New("http://0.0.0.0:8088", ksql.WithHTTPClient(creds.Client(ctx)))
- Compatible with the
database/sql
driver - Supports all the features of the ksqlDB REST API
- Provides high level API for working with pull & pull queries
- Pull the repo
- Get the dependencies
go mod download
- Run all unit tests and generate a coverage report
make coverage
At the moment the primary focus is on unit testing although there are plans to add some integration tests based on the examples.
- Support all ksqlDB REST API methods
- TLS support (use custom http client)
- More examples
- GoDoc
- Passes golint & go vet
- More tests (see coverage reports)
- User documentation
- More semantic error handling
- Handle HTTP error codes