Skip to content

Commit

Permalink
support streaming insert;
Browse files Browse the repository at this point in the history
block in streaming_buffer can be replaced by data generated outside.
  • Loading branch information
leo-cai-timeplus committed Oct 27, 2023
1 parent 86e77b8 commit c66adfd
Show file tree
Hide file tree
Showing 5 changed files with 377 additions and 0 deletions.
9 changes: 9 additions & 0 deletions clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ type (
)

var (
ErrStreamingBufferClosed = errors.New("proton: streaming buffer has already been closed")
ErrBatchAlreadySent = errors.New("proton: batch has already been sent")
ErrAcquireConnTimeout = errors.New("proton: acquire conn timeout. you can increase the number of max open conn or the dial timeout")
ErrUnsupportedServerRevision = errors.New("proton: unsupported server revision")
Expand Down Expand Up @@ -146,6 +147,14 @@ func (ch *proton) PrepareBatch(ctx context.Context, query string) (driver.Batch,
return conn.prepareBatch(ctx, query, ch.release)
}

func (ch *proton) PrepareStreamingBuffer(ctx context.Context, query string) (driver.StreamingBuffer, error) {
conn, err := ch.acquire(ctx)
if err != nil {
return nil, err
}
return conn.prepareStreamingBuffer(ctx, query, ch.release)
}

func (ch *proton) AsyncInsert(ctx context.Context, query string, wait bool) error {
conn, err := ch.acquire(ctx)
if err != nil {
Expand Down
203 changes: 203 additions & 0 deletions conn_streaming_buffer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,203 @@
package proton

import (
"context"
"errors"
"fmt"
"log"
"strings"
"sync"
"time"

"github.com/timeplus-io/proton-go-driver/v2/lib/column"
"github.com/timeplus-io/proton-go-driver/v2/lib/driver"
"github.com/timeplus-io/proton-go-driver/v2/lib/proto"
)

func (c *connect) prepareStreamingBuffer(ctx context.Context, query string, release func(*connect, error)) (*streamingBuffer, error) {
query = splitInsertRe.Split(query, -1)[0]
if !strings.HasSuffix(strings.TrimSpace(strings.ToUpper(query)), "VALUES") {
query += " VALUES"
}
options := queryOptions(ctx)
if deadline, ok := ctx.Deadline(); ok {
c.conn.SetDeadline(deadline)
defer c.conn.SetDeadline(time.Time{})
}
if err := c.sendQuery(query, &options); err != nil {
release(c, err)
return nil, err
}
var (
onProcess = options.onProcess()
block, err = c.firstBlock(ctx, onProcess)
)
if err != nil {
release(c, err)
return nil, err
}
return &streamingBuffer{
ctx: ctx,
conn: c,
block: block,
release: func(err error) {
release(c, err)
},
onProcess: onProcess,
done: make(chan struct{}),
}, nil
}

type streamingBuffer struct {
err error
ctx context.Context
conn *connect
sent bool
block *proto.Block
release func(error)
onProcess *onProcess
once sync.Once
done chan struct{}
}

func (b *streamingBuffer) Append(v ...interface{}) error {
if b.sent {
return ErrStreamingBufferClosed
}
if err := b.block.Append(v...); err != nil {
b.release(err)
return err
}
return nil
}

func (b *streamingBuffer) AppendStruct(v interface{}) error {
values, err := b.conn.structMap.Map("AppendStruct", b.block.ColumnsNames(), v, false)
if err != nil {
return err
}
return b.Append(values...)
}

func (b *streamingBuffer) Column(idx int) driver.StreamingBufferColumn {
if len(b.block.Columns) <= idx {
b.release(nil)
return &streamingBufferColumn{
err: &OpError{
Op: "streamingBuffer.Column",
Err: fmt.Errorf("invalid column index %d", idx),
},
}
}
return &streamingBufferColumn{
buffer: b,
column: b.block.Columns[idx],
release: func(err error) {
b.err = err
b.release(err)
},
}
}

func (b *streamingBuffer) Close() (err error) {
defer func() {
b.sent = true
b.release(err)
}()
if err = b.Send(); err != nil {
return err
}
if err = b.conn.sendData(&proto.Block{}, ""); err != nil {
return err
}
if err = b.conn.encoder.Flush(); err != nil {
return err
}
<-b.done
return nil
}

func (b *streamingBuffer) Clear() (err error) {
for i := range b.block.Columns {
b.block.Columns[i], err = b.block.Columns[i].Type().Column()
if err != nil {
return
}
}
return nil
}

func (b *streamingBuffer) Send() (err error) {
if b.sent {
return ErrStreamingBufferClosed
}
if b.err != nil {
return b.err
}
if b.block.Rows() != 0 {
if err = b.conn.sendData(b.block, ""); err != nil {
return err
}
}
if err = b.conn.encoder.Flush(); err != nil {
return err
}
b.once.Do(func() {
go func() {
if err = b.conn.process(b.ctx, b.onProcess); err != nil {
log.Fatal(err)
}
b.done <- struct{}{}
}()
})
if err = b.Clear(); err != nil {
return err
}
return nil
}

func (b *streamingBuffer) ReplaceBy(cols ...column.Interface) (err error) {
if len(b.block.Columns) != len(cols) {
return errors.New(fmt.Sprintf("colomn number is %d, not %d", len(b.block.Columns), len(cols)))
}
for i := 0; i < len(cols); i++ {
if b.block.Columns[i].Type() != cols[i].Type() {
return errors.New(fmt.Sprintf("type of colomn[%d] is %s, not %s", i, b.block.Columns[i].Type(), cols[i].Type()))
}
}
rows := cols[0].Rows()
for i := 1; i < len(cols); i++ {
if rows != cols[i].Rows() {
return errors.New("cols with different length")
}
}
b.block.Columns = cols
return nil
}

type streamingBufferColumn struct {
err error
buffer *streamingBuffer
column column.Interface
release func(error)
}

func (b *streamingBufferColumn) Append(v interface{}) (err error) {
if b.buffer.sent {
return ErrStreamingBufferClosed
}
if b.err != nil {
b.release(b.err)
return b.err
}
if _, err = b.column.Append(v); err != nil {
b.release(err)
return err
}
return nil
}

var (
_ (driver.StreamingBuffer) = (*streamingBuffer)(nil)
_ (driver.StreamingBufferColumn) = (*streamingBufferColumn)(nil)
)
76 changes: 76 additions & 0 deletions examples/streaming/buffer_replace/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package main

import (
"context"
"fmt"
"github.com/timeplus-io/proton-go-driver/v2"
"github.com/timeplus-io/proton-go-driver/v2/lib/column"
"log"
"time"
)

func BufferReplaceExample() {
var (
ctx = context.Background()
conn, err = proton.Open(&proton.Options{
Addr: []string{"127.0.0.1:8463"},
Auth: proton.Auth{
Database: "default",
Username: "default",
Password: "",
},
//Debug: true,
DialTimeout: time.Second,
MaxOpenConns: 10,
MaxIdleConns: 5,
ConnMaxLifetime: time.Hour,
})
)
ctx = proton.Context(ctx, proton.WithProgress(func(p *proton.Progress) {
fmt.Println("progress: ", p)
}))
if err != nil {
log.Fatal(err)
}
if err := conn.Exec(ctx, `DROP STREAM IF EXISTS example`); err != nil {
log.Fatal(err)
}
err = conn.Exec(ctx, `
CREATE STREAM IF NOT EXISTS example (
Col1 uint64
, Col2 string
)
`)
if err != nil {
log.Fatal(err)
}
const rows = 200_000
var (
col1 column.UInt64 = make([]uint64, rows)
col2 column.String = make([]string, rows)
)
for i := 0; i < rows; i++ {
col1[i] = uint64(i)
col2[i] = fmt.Sprintf("num%03d", i)
}
buffer, err := conn.PrepareStreamingBuffer(ctx, "INSERT INTO example (* except _tp_time)")
err = buffer.ReplaceBy(
&col1,
&col2,
)
if err != nil {
return
}
err = buffer.Send()
if err != nil {
return
}
err = buffer.Close()
if err != nil {
return
}
}

func main() {
BufferReplaceExample()
}
75 changes: 75 additions & 0 deletions examples/streaming/streaming_send/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
package main

import (
"context"
"fmt"
"log"
"time"

"github.com/timeplus-io/proton-go-driver/v2"
)

func example() error {
var (
ctx = context.Background()
conn, err = proton.Open(&proton.Options{
Addr: []string{"127.0.0.1:8463"},
Auth: proton.Auth{
Database: "default",
Username: "default",
Password: "",
},
// Debug: true,
DialTimeout: time.Second,
MaxOpenConns: 10,
MaxIdleConns: 5,
ConnMaxLifetime: time.Hour,
})
)
ctx = proton.Context(ctx, proton.WithProgress(func(p *proton.Progress) {
fmt.Println("progress: ", p)
}))
if err != nil {
return err
}
if err := conn.Exec(ctx, `DROP STREAM IF EXISTS example`); err != nil {
return err
}
err = conn.Exec(ctx, `
CREATE STREAM IF NOT EXISTS example (
Col1 uint64
, Col2 string
)
`)
if err != nil {
return err
}

buffer, err := conn.PrepareStreamingBuffer(ctx, "INSERT INTO example (* except _tp_time)")
if err != nil {
return err
}
for i := 0; i < 100; i++ {
for j := 0; j < 100000; j++ {
err := buffer.Append(
uint64(i*100000+j),
fmt.Sprintf("num_%d_%d", j, i),
)
if err != nil {
return err
}
}
if err := buffer.Send(); err != nil {
return err
}
}
return buffer.Close()
}

func main() {
start := time.Now()
if err := example(); err != nil {
log.Fatal(err)
}
fmt.Println(time.Since(start))
}
Loading

0 comments on commit c66adfd

Please sign in to comment.