Skip to content

Commit

Permalink
Merge pull request #833 from ClickHouse/issue_798_2
Browse files Browse the repository at this point in the history
concurrency on batch fix
  • Loading branch information
gingerwizard authored Nov 29, 2022
2 parents 815d34a + 8910a59 commit 100f039
Show file tree
Hide file tree
Showing 2 changed files with 91 additions and 15 deletions.
47 changes: 32 additions & 15 deletions conn_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,11 @@ var splitInsertRe = regexp.MustCompile(`(?i)\sVALUES\s*\(`)
var columnMatch = regexp.MustCompile(`.*\((?P<Columns>.+)\)$`)

func (c *connect) prepareBatch(ctx context.Context, query string, release func(*connect, error)) (driver.Batch, error) {
//defer func() {
// if err := recover(); err != nil {
// fmt.Printf("panic occurred on %d:\n", c.num)
// }
//}()
query = splitInsertRe.Split(query, -1)[0]
colMatch := columnMatch.FindStringSubmatch(query)
var columns []string
Expand Down Expand Up @@ -69,24 +74,31 @@ func (c *connect) prepareBatch(ctx context.Context, query string, release func(*
return nil, err
}
return &batch{
ctx: ctx,
conn: c,
block: block,
release: func(err error) {
release(c, err)
},
onProcess: onProcess,
ctx: ctx,
conn: c,
block: block,
released: false,
connRelease: release,
onProcess: onProcess,
}, nil
}

type batch struct {
err error
ctx context.Context
conn *connect
sent bool
block *proto.Block
release func(error)
onProcess *onProcess
err error
ctx context.Context
conn *connect
sent bool
released bool
block *proto.Block
connRelease func(*connect, error)
onProcess *onProcess
}

func (b *batch) release(err error) {
if !b.released {
b.released = true
b.connRelease(b.conn, err)
}
}

func (b *batch) Abort() error {
Expand All @@ -104,7 +116,9 @@ func (b *batch) Append(v ...interface{}) error {
if b.sent {
return ErrBatchAlreadySent
}
//
if b.err != nil {
return b.err
}
if err := b.block.Append(v...); err != nil {
b.err = errors.Wrap(ErrBatchInvalid, err.Error())
b.release(err)
Expand All @@ -114,6 +128,9 @@ func (b *batch) Append(v ...interface{}) error {
}

func (b *batch) AppendStruct(v interface{}) error {
if b.err != nil {
return b.err
}
values, err := b.conn.structMap.Map("AppendStruct", b.block.ColumnsNames(), v, false)
if err != nil {
return err
Expand Down
59 changes: 59 additions & 0 deletions tests/issues/798_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"github.com/ClickHouse/clickhouse-go/v2"
clickhouse_tests "github.com/ClickHouse/clickhouse-go/v2/tests"
"github.com/stretchr/testify/require"
"sync"
"sync/atomic"
"testing"
)

Expand Down Expand Up @@ -51,3 +53,60 @@ func Test798(t *testing.T) {
require.NoError(t, batch.Send())
require.ErrorIs(t, batch.Append(true, false, []bool{true, false, true}), clickhouse.ErrBatchAlreadySent)
}

func writeRows(prepareSQL string, rows [][]interface{}, conn clickhouse.Conn) (err error) {
batch, err := conn.PrepareBatch(context.Background(), prepareSQL)
if err != nil {
return err
}
defer func() {
if batch != nil {
_ = batch.Abort()
}
}()
for _, row := range rows {
batch.Append(row...)
}
return batch.Send()
}

func Test798Concurrent(t *testing.T) {

var (
conn, err = clickhouse_tests.GetConnection("issues", clickhouse.Settings{
"max_execution_time": 60,
}, nil, &clickhouse.Compression{
Method: clickhouse.CompressionLZ4,
})
)
ctx := context.Background()
const ddl = `
CREATE TABLE test_issue_798 (
Col1 Bool
, Col2 Bool
) Engine MergeTree() ORDER BY tuple()
`
defer func() {
conn.Exec(ctx, "DROP TABLE IF EXISTS test_issue_798")
}()
require.NoError(t, conn.Exec(ctx, ddl))

require.NoError(t, err)
todo, done := int64(1000), int64(-1)
var wg sync.WaitGroup
wg.Add(10)
for i := 0; i < 10; i++ {
go func() {
for {
index := atomic.AddInt64(&done, 1)
if index >= todo {
wg.Done()
break
}
writeRows("INSERT INTO test_issue_798", [][]interface{}{{true, false}, {false, true}}, conn)
}
}()
}
wg.Wait()

}

0 comments on commit 100f039

Please sign in to comment.