Update module github.com/ClickHouse/clickhouse-go/v2 to v2.37.1 (#152)

Co-authored-by: renovate[bot] <29139614+renovate[bot]@users.noreply.github.com>
This commit is contained in:
renovate[bot]
2025-06-21 14:18:48 +03:00
committed by GitHub
parent 1dc330150c
commit e3d93fa9c0
43 changed files with 890 additions and 405 deletions

6
go.mod
View File

@@ -5,7 +5,7 @@ go 1.24
toolchain go1.24.4
require (
github.com/ClickHouse/clickhouse-go/v2 v2.34.0
github.com/ClickHouse/clickhouse-go/v2 v2.37.1
github.com/PuerkitoBio/goquery v1.9.2
github.com/badoux/checkmail v1.2.4
github.com/go-gomail/gomail v0.0.0-20160411212932-81ebce5c23df
@@ -54,8 +54,8 @@ require (
github.com/rs/dnscache v0.0.0-20230804202142-fc85eb664529 // indirect
github.com/segmentio/asm v1.2.0 // indirect
github.com/shopspring/decimal v1.4.0 // indirect
go.opentelemetry.io/otel v1.35.0 // indirect
go.opentelemetry.io/otel/trace v1.35.0 // indirect
go.opentelemetry.io/otel v1.36.0 // indirect
go.opentelemetry.io/otel/trace v1.36.0 // indirect
go.uber.org/atomic v1.11.0 // indirect
golang.org/x/sys v0.33.0 // indirect
golang.org/x/text v0.26.0 // indirect

6
go.sum
View File

@@ -6,6 +6,8 @@ github.com/ClickHouse/clickhouse-go v1.5.4 h1:cKjXeYLNWVJIx2J1K6H2CqyRmfwVJVY1OV
github.com/ClickHouse/clickhouse-go v1.5.4/go.mod h1:EaI/sW7Azgz9UATzd5ZdZHRUhHgv5+JMS9NSr2smCJI=
github.com/ClickHouse/clickhouse-go/v2 v2.34.0 h1:Y4rqkdrRHgExvC4o/NTbLdY5LFQ3LHS77/RNFxFX3Co=
github.com/ClickHouse/clickhouse-go/v2 v2.34.0/go.mod h1:yioSINoRLVZkLyDzdMXPLRIqhDvel8iLBlwh6Iefso8=
github.com/ClickHouse/clickhouse-go/v2 v2.37.1 h1:AvNJQW0QJudpl6JjH8WyMfu2s3ruWxtp0E1WZKmZXLc=
github.com/ClickHouse/clickhouse-go/v2 v2.37.1/go.mod h1:1KKjGFSWu2R/oa7DKWJLlhTOtyCld7VJDEtXTe+2QKU=
github.com/Microsoft/go-winio v0.6.2 h1:F2VQgta7ecxGYO8k3ZZz3RS8fVIXVxONVUPlNERoyfY=
github.com/Microsoft/go-winio v0.6.2/go.mod h1:yd8OoFMLzJbo9gZq8j5qaps8bJ9aShtEA8Ipt1oGCvU=
github.com/PuerkitoBio/goquery v1.9.2 h1:4/wZksC3KgkQw7SQgkKotmKljk0M6V8TUvA8Wb4yPeE=
@@ -194,10 +196,14 @@ go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.54.0 h1:TT4fX+n
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.54.0/go.mod h1:L7UH0GbB0p47T4Rri3uHjbpCFYrVrwc1I25QhNPiGK8=
go.opentelemetry.io/otel v1.35.0 h1:xKWKPxrxB6OtMCbmMY021CqC45J+3Onta9MqjhnusiQ=
go.opentelemetry.io/otel v1.35.0/go.mod h1:UEqy8Zp11hpkUrL73gSlELM0DupHoiq72dR+Zqel/+Y=
go.opentelemetry.io/otel v1.36.0 h1:UumtzIklRBY6cI/lllNZlALOF5nNIzJVb16APdvgTXg=
go.opentelemetry.io/otel v1.36.0/go.mod h1:/TcFMXYjyRNh8khOAO9ybYkqaDBb/70aVwkNML4pP8E=
go.opentelemetry.io/otel/metric v1.35.0 h1:0znxYu2SNyuMSQT4Y9WDWej0VpcsxkuklLa4/siN90M=
go.opentelemetry.io/otel/metric v1.35.0/go.mod h1:nKVFgxBZ2fReX6IlyW28MgZojkoAkJGaE8CpgeAU3oE=
go.opentelemetry.io/otel/trace v1.35.0 h1:dPpEfJu1sDIqruz7BHFG3c7528f6ddfSWfFDVt/xgMs=
go.opentelemetry.io/otel/trace v1.35.0/go.mod h1:WUk7DtFp1Aw2MkvqGdwiXYDZZNvA/1J8o6xRXLrIkyc=
go.opentelemetry.io/otel/trace v1.36.0 h1:ahxWNuqZjpdiFAyrIoQ4GIiAIhxAunQR6MUoKrsNd4w=
go.opentelemetry.io/otel/trace v1.36.0/go.mod h1:gQ+OnDZzrybY4k4seLzPAWNwVBBVlF2szhehOBB/tGA=
go.uber.org/atomic v1.11.0 h1:ZvwS0R+56ePWxUNi+Atn9dWONBPp/AUETXlHW0DxSjE=
go.uber.org/atomic v1.11.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=

View File

@@ -1,3 +1,62 @@
# v2.37.1, 2025-06-17 <!-- Release notes generated using configuration in .github/release.yml at main -->
## What's Changed
### Bug fixes 🐛
* Fix Native API HTTP bugs by @SpencerTorres in https://github.com/ClickHouse/clickhouse-go/pull/1578
* Fixed critical bug with the new HTTP Native API where connections were not being released.
* Also fixed random HTTP 400 errors with large batches over HTTP
**Full Changelog**: https://github.com/ClickHouse/clickhouse-go/compare/v2.37.0...v2.37.1
# v2.37.0, 2025-06-16 <!-- Release notes generated using configuration in .github/release.yml at main -->
## What's Changed
### Enhancements 🎉
* Scanning Datetime and Datetime64 into int64 by @vaibhav-kt in https://github.com/ClickHouse/clickhouse-go/pull/1560
* Supports scanning of Array, IPv4, IPv6, and Map types into Go values that implement the `sql.Scanner` interface. by @zapateo in https://github.com/ClickHouse/clickhouse-go/pull/1570
* Support HTTP connnections in Native Go interface by @SpencerTorres in https://github.com/ClickHouse/clickhouse-go/pull/1577 (see details at bottom of release notes!)
### Bug fixes 🐛
* fix: Prevent panic on slice map keys by @disq in https://github.com/ClickHouse/clickhouse-go/pull/1567
* object_json: split JSON tag to remove any trailing omitempty by @dschofie in https://github.com/ClickHouse/clickhouse-go/pull/1547
* fix: namedValue namedDatevalue usage error by @tosolveit in https://github.com/ClickHouse/clickhouse-go/pull/1575
* Fix false positives in TestInterfaceArray by @tosolveit in https://github.com/ClickHouse/clickhouse-go/pull/1572
### HTTP in ClickHouse API
This release includes a bug fix / enhancement for the "ClickHouse API" interface. Previously the only way to use HTTP was through the `database/sql` interface, but now you can use `Protocol: clickhouse.HTTP` in your `clickhouse.Open` options.
HTTP still has some limitations to be aware of for things like batch flushing and session context, so be cautious when switching over code to this protocol. Please report any issues you may have with this change. Native protocol shouldn't be affected, but you can downgrade to v2.36.x if you notice any issues.
## New Contributors
* @disq made their first contribution in https://github.com/ClickHouse/clickhouse-go/pull/1567
* @vaibhav-kt made their first contribution in https://github.com/ClickHouse/clickhouse-go/pull/1560
* @tosolveit made their first contribution in https://github.com/ClickHouse/clickhouse-go/pull/1572
* @dschofie made their first contribution in https://github.com/ClickHouse/clickhouse-go/pull/1547
* @zapateo made their first contribution in https://github.com/ClickHouse/clickhouse-go/pull/1570
**Full Changelog**: https://github.com/ClickHouse/clickhouse-go/compare/v2.36.0...v2.37.0
# v2.36.0, 2025-06-03 <!-- Release notes generated using configuration in .github/release.yml at main -->
## What's Changed
### Enhancements 🎉
* Add `Close` function to batch interface by @SpencerTorres in https://github.com/ClickHouse/clickhouse-go/pull/1566
**Full Changelog**: https://github.com/ClickHouse/clickhouse-go/compare/v2.35.0...v2.36.0
# v2.35.0, 2025-05-22 <!-- Release notes generated using configuration in .github/release.yml at main -->
## What's Changed
### Enhancements 🎉
* JWT Authentication by @SpencerTorres in https://github.com/ClickHouse/clickhouse-go/pull/1538
* Add support for overriding `database` in DSN URL by @kokizzu in https://github.com/ClickHouse/clickhouse-go/pull/1541
**Full Changelog**: https://github.com/ClickHouse/clickhouse-go/compare/v2.34.0...v2.35.0
# v2.34.0, 2025-04-01 <!-- Release notes generated using configuration in .github/release.yml at main -->
## What's Changed

View File

@@ -1,4 +1,4 @@
The following table aims to capture the Golang types supported for each ClickHouse Column Type.
The following table aims to capture the Golang types supported for each ClickHouse Column Type.
Whilst each ClickHouse type often has a logical Golang type, we aim to support implicit conversions where possible and provided no precision loss will be incurred - thus alleviating the need for users to ensure their data aligns perfectly with ClickHouse types.
@@ -56,7 +56,7 @@ All types can be read into a pointer or pointer to a pointer.
| uint16 | | | | | | | X | | | | | | | | | | | | | | | | | | | | | | | |
| uint8 | | | | | | X | | | | | | | | | | | | | | | | | | | | | | | | |
| int | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | |
| int64 | | | | | | | | | | | | | | | X | | | | | | | | | | | | | | | |
| int64 | | | | | | | | | | | | | | | X | | | | | | | | x | x | | | | | | |
| int32 | | | | | | | | | | | | | | X | | | | | | | | | | | | | | | | |
| int16 | | | | | | | | | | | | | X | | | | | | | | | | | | | | | | | |
| int8 | | | | | | | | | | | | X | | | | | | | | | | | | | | | | | | |

View File

@@ -27,7 +27,6 @@ import (
_ "time/tzdata"
chproto "github.com/ClickHouse/ch-go/proto"
"github.com/ClickHouse/clickhouse-go/v2/contributors"
"github.com/ClickHouse/clickhouse-go/v2/lib/column"
"github.com/ClickHouse/clickhouse-go/v2/lib/driver"
@@ -83,9 +82,10 @@ func Open(opt *Options) (driver.Conn, error) {
opt = &Options{}
}
o := opt.setDefaults()
conn := &clickhouse{
opt: o,
idle: make(chan *connect, o.MaxIdleConns),
idle: make(chan nativeTransport, o.MaxIdleConns),
open: make(chan struct{}, o.MaxOpenConns),
exit: make(chan struct{}),
}
@@ -93,9 +93,32 @@ func Open(opt *Options) (driver.Conn, error) {
return conn, nil
}
// nativeTransport represents an implementation (TCP or HTTP) that can be pooled by the main clickhouse struct.
// Implementations are not expected to be thread safe, which is why we provide acquire/release functions.
type nativeTransport interface {
serverVersion() (*ServerVersion, error)
query(ctx context.Context, release nativeTransportRelease, query string, args ...any) (*rows, error)
queryRow(ctx context.Context, release nativeTransportRelease, query string, args ...any) *row
prepareBatch(ctx context.Context, release nativeTransportRelease, acquire nativeTransportAcquire, query string, opts driver.PrepareBatchOptions) (driver.Batch, error)
exec(ctx context.Context, query string, args ...any) error
asyncInsert(ctx context.Context, query string, wait bool, args ...any) error
ping(context.Context) error
isBad() bool
connID() int
connectedAtTime() time.Time
isReleased() bool
setReleased(released bool)
debugf(format string, v ...any)
// freeBuffer is called if Options.FreeBufOnConnRelease is set
freeBuffer()
close() error
}
type nativeTransportAcquire func(context.Context) (nativeTransport, error)
type nativeTransportRelease func(nativeTransport, error)
type clickhouse struct {
opt *Options
idle chan *connect
idle chan nativeTransport
open chan struct{}
exit chan struct{}
connID int64
@@ -118,8 +141,8 @@ func (ch *clickhouse) ServerVersion() (*driver.ServerVersion, error) {
if err != nil {
return nil, err
}
ch.release(conn, nil)
return &conn.server, nil
defer ch.release(conn, nil)
return conn.serverVersion()
}
func (ch *clickhouse) Query(ctx context.Context, query string, args ...any) (rows driver.Rows, err error) {
@@ -127,18 +150,19 @@ func (ch *clickhouse) Query(ctx context.Context, query string, args ...any) (row
if err != nil {
return nil, err
}
conn.debugf("[acquired] connection [%d]", conn.id)
conn.debugf("[query] \"%s\"", query)
return conn.query(ctx, ch.release, query, args...)
}
func (ch *clickhouse) QueryRow(ctx context.Context, query string, args ...any) (rows driver.Row) {
func (ch *clickhouse) QueryRow(ctx context.Context, query string, args ...any) driver.Row {
conn, err := ch.acquire(ctx)
if err != nil {
return &row{
err: err,
}
}
conn.debugf("[acquired] connection [%d]", conn.id)
conn.debugf("[query row] \"%s\"", query)
return conn.queryRow(ctx, ch.release, query, args...)
}
@@ -147,6 +171,7 @@ func (ch *clickhouse) Exec(ctx context.Context, query string, args ...any) error
if err != nil {
return err
}
conn.debugf("[exec] \"%s\"", query)
if err := conn.exec(ctx, query, args...); err != nil {
ch.release(conn, err)
return err
@@ -160,7 +185,8 @@ func (ch *clickhouse) PrepareBatch(ctx context.Context, query string, opts ...dr
if err != nil {
return nil, err
}
batch, err := conn.prepareBatch(ctx, query, getPrepareBatchOptions(opts...), ch.release, ch.acquire)
conn.debugf("[prepare batch] \"%s\"", query)
batch, err := conn.prepareBatch(ctx, ch.release, ch.acquire, query, getPrepareBatchOptions(opts...))
if err != nil {
return nil, err
}
@@ -182,6 +208,7 @@ func (ch *clickhouse) AsyncInsert(ctx context.Context, query string, wait bool,
if err != nil {
return err
}
conn.debugf("[async insert] \"%s\"", query)
if err := conn.asyncInsert(ctx, query, wait, args...); err != nil {
ch.release(conn, err)
return err
@@ -195,6 +222,7 @@ func (ch *clickhouse) Ping(ctx context.Context) (err error) {
if err != nil {
return err
}
conn.debugf("[ping]")
if err := conn.ping(ctx); err != nil {
ch.release(conn, err)
return err
@@ -212,11 +240,18 @@ func (ch *clickhouse) Stats() driver.Stats {
}
}
func (ch *clickhouse) dial(ctx context.Context) (conn *connect, err error) {
func (ch *clickhouse) dial(ctx context.Context) (conn nativeTransport, err error) {
connID := int(atomic.AddInt64(&ch.connID, 1))
dialFunc := func(ctx context.Context, addr string, opt *Options) (DialResult, error) {
conn, err := dial(ctx, addr, connID, opt)
var conn nativeTransport
var err error
switch opt.Protocol {
case HTTP:
conn, err = dialHttp(ctx, addr, connID, opt)
default:
conn, err = dial(ctx, addr, connID, opt)
}
return DialResult{conn}, err
}
@@ -258,7 +293,7 @@ func DefaultDialStrategy(ctx context.Context, connID int, opt *Options, dial Dia
return r, err
}
func (ch *clickhouse) acquire(ctx context.Context) (conn *connect, err error) {
func (ch *clickhouse) acquire(ctx context.Context) (conn nativeTransport, err error) {
timer := time.NewTimer(ch.opt.DialTimeout)
defer timer.Stop()
select {
@@ -291,7 +326,8 @@ func (ch *clickhouse) acquire(ctx context.Context) (conn *connect, err error) {
return nil, err
}
}
conn.released = false
conn.setReleased(false)
conn.debugf("[acquired from pool]")
return conn, nil
default:
}
@@ -302,6 +338,7 @@ func (ch *clickhouse) acquire(ctx context.Context) (conn *connect, err error) {
}
return nil, err
}
conn.debugf("[acquired new]")
return conn, nil
}
@@ -324,7 +361,7 @@ func (ch *clickhouse) closeIdleExpired() {
for {
select {
case conn := <-ch.idle:
if conn.connectedAt.Before(cutoff) {
if conn.connectedAtTime().Before(cutoff) {
conn.close()
} else {
select {
@@ -340,26 +377,42 @@ func (ch *clickhouse) closeIdleExpired() {
}
}
func (ch *clickhouse) release(conn *connect, err error) {
if conn.released {
func (ch *clickhouse) release(conn nativeTransport, err error) {
if conn.isReleased() {
return
}
conn.released = true
conn.setReleased(true)
if err != nil {
conn.debugf("[released with error]")
} else {
conn.debugf("[released]")
}
select {
case <-ch.open:
default:
}
if err != nil || time.Since(conn.connectedAt) >= ch.opt.ConnMaxLifetime {
if err != nil {
conn.debugf("[close: error] %s", err.Error())
conn.close()
return
} else if time.Since(conn.connectedAtTime()) >= ch.opt.ConnMaxLifetime {
conn.debugf("[close: lifetime expired]")
conn.close()
return
}
if ch.opt.FreeBufOnConnRelease {
conn.buffer = new(chproto.Buffer)
conn.compressor.Data = nil
conn.debugf("[free buffer]")
conn.freeBuffer()
}
select {
case ch.idle <- conn:
default:
conn.debugf("[close: idle pool full %d/%d]", len(ch.idle), cap(ch.idle))
conn.close()
}
}
@@ -367,10 +420,16 @@ func (ch *clickhouse) release(conn *connect, err error) {
func (ch *clickhouse) Close() error {
for {
select {
case c := <-ch.idle:
c.close()
case conn := <-ch.idle:
conn.debugf("[close: closing pool]")
conn.close()
default:
ch.exit <- struct{}{}
// In rare cases, close may be called multiple times, don't block
//TODO: add proper close flag to indicate this pool is unusable after Close
select {
case ch.exit <- struct{}{}:
default:
}
return nil
}
}

View File

@@ -77,6 +77,7 @@ var compressionMap = map[string]CompressionMethod{
type Auth struct { // has_control_character
Database string
Username string
Password string
}
@@ -123,7 +124,7 @@ func ParseDSN(dsn string) (*Options, error) {
type Dial func(ctx context.Context, addr string, opt *Options) (DialResult, error)
type DialResult struct {
conn *connect
conn nativeTransport
}
type HTTPProxy func(*http.Request) (*url.URL, error)
@@ -156,6 +157,11 @@ type Options struct {
// HTTPProxy specifies an HTTP proxy URL to use for requests made by the client.
HTTPProxyURL *url.URL
// GetJWT should return a JWT for authentication with ClickHouse Cloud.
// This is called per connection/request, so you may cache the token in your app if needed.
// Use this instead of Auth.Username and Auth.Password if you're using JWT auth.
GetJWT GetJWTFunc
scheme string
ReadTimeout time.Duration
}
@@ -302,6 +308,8 @@ func (o *Options) fromDSN(in string) error {
o.Auth.Username = params.Get(v)
case "password":
o.Auth.Password = params.Get(v)
case "database":
o.Auth.Database = params.Get(v)
case "client_info_product":
chunks := strings.Split(params.Get(v), ",")

View File

@@ -34,7 +34,7 @@ import (
"syscall"
"github.com/ClickHouse/clickhouse-go/v2/lib/column"
ldriver "github.com/ClickHouse/clickhouse-go/v2/lib/driver"
chdriver "github.com/ClickHouse/clickhouse-go/v2/lib/driver"
)
var globalConnID int64
@@ -54,7 +54,10 @@ func (o *stdConnOpener) Driver() driver.Driver {
debugf = log.New(os.Stdout, "[clickhouse-std] ", 0).Printf
}
}
return &stdDriver{debugf: debugf}
return &stdDriver{
opt: o.opt,
debugf: debugf,
}
}
func (o *stdConnOpener) Connect(ctx context.Context) (_ driver.Conn, err error) {
@@ -193,14 +196,15 @@ func OpenDB(opt *Options) *sql.DB {
type stdConnect interface {
isBad() bool
close() error
query(ctx context.Context, release func(*connect, error), query string, args ...any) (*rows, error)
query(ctx context.Context, release nativeTransportRelease, query string, args ...any) (*rows, error)
exec(ctx context.Context, query string, args ...any) error
ping(ctx context.Context) (err error)
prepareBatch(ctx context.Context, query string, options ldriver.PrepareBatchOptions, release func(*connect, error), acquire func(context.Context) (*connect, error)) (ldriver.Batch, error)
prepareBatch(ctx context.Context, release nativeTransportRelease, acquire nativeTransportAcquire, query string, options chdriver.PrepareBatchOptions) (chdriver.Batch, error)
asyncInsert(ctx context.Context, query string, wait bool, args ...any) error
}
type stdDriver struct {
opt *Options
conn stdConnect
commit func() error
debugf func(format string, v ...any)
@@ -329,7 +333,7 @@ func (std *stdDriver) QueryContext(ctx context.Context, query string, args []dri
return nil, driver.ErrBadConn
}
r, err := std.conn.query(ctx, func(*connect, error) {}, query, rebind(args)...)
r, err := std.conn.query(ctx, func(nativeTransport, error) {}, query, rebind(args)...)
if isConnBrokenError(err) {
std.debugf("QueryContext got a fatal error, resetting connection: %v\n", err)
return nil, driver.ErrBadConn
@@ -354,7 +358,7 @@ func (std *stdDriver) PrepareContext(ctx context.Context, query string) (driver.
return nil, driver.ErrBadConn
}
batch, err := std.conn.prepareBatch(ctx, query, ldriver.PrepareBatchOptions{}, func(*connect, error) {}, func(context.Context) (*connect, error) { return nil, nil })
batch, err := std.conn.prepareBatch(ctx, func(nativeTransport, error) {}, func(context.Context) (nativeTransport, error) { return nil, nil }, query, chdriver.PrepareBatchOptions{})
if err != nil {
if isConnBrokenError(err) {
std.debugf("PrepareContext got a fatal error, resetting connection: %v\n", err)
@@ -383,7 +387,7 @@ func (std *stdDriver) Close() error {
}
type stdBatch struct {
batch ldriver.Batch
batch chdriver.Batch
debugf func(format string, v ...any)
}

View File

@@ -29,8 +29,8 @@ const ClientName = "clickhouse-go"
const (
ClientVersionMajor = 2
ClientVersionMinor = 34
ClientVersionPatch = 0
ClientVersionMinor = 37
ClientVersionPatch = 1
ClientTCPProtocolVersion = proto.DBMS_TCP_PROTOCOL_VERSION
)

View File

@@ -64,12 +64,12 @@ func dial(ctx context.Context, addr string, num int, opt *Options) (*connect, er
if opt.Debugf != nil {
debugf = func(format string, v ...any) {
opt.Debugf(
"[clickhouse][conn=%d][%s] "+format,
append([]interface{}{num, conn.RemoteAddr()}, v...)...,
"[clickhouse][%s][id=%d] "+format,
append([]interface{}{conn.RemoteAddr(), num}, v...)...,
)
}
} else {
debugf = log.New(os.Stdout, fmt.Sprintf("[clickhouse][conn=%d][%s]", num, conn.RemoteAddr()), 0).Printf
debugf = log.New(os.Stdout, fmt.Sprintf("[clickhouse][%s][id=%d]", conn.RemoteAddr(), num), 0).Printf
}
}
@@ -96,7 +96,7 @@ func dial(ctx context.Context, addr string, num int, opt *Options) (*connect, er
id: num,
opt: opt,
conn: conn,
debugf: debugf,
debugfFunc: debugf,
buffer: new(chproto.Buffer),
reader: chproto.NewReader(conn),
revision: ClientTCPProtocolVersion,
@@ -110,7 +110,18 @@ func dial(ctx context.Context, addr string, num int, opt *Options) (*connect, er
}
)
if err := connect.handshake(opt.Auth.Database, opt.Auth.Username, opt.Auth.Password); err != nil {
auth := opt.Auth
if useJWTAuth(opt) {
jwt, err := opt.GetJWT(ctx)
if err != nil {
return nil, fmt.Errorf("failed to get JWT: %w", err)
}
auth.Username = jwtAuthMarker
auth.Password = jwt
}
if err := connect.handshake(auth); err != nil {
return nil, err
}
@@ -133,7 +144,7 @@ type connect struct {
id int
opt *Options
conn net.Conn
debugf func(format string, v ...any)
debugfFunc func(format string, v ...any)
server ServerVersion
closed bool
buffer *chproto.Buffer
@@ -151,6 +162,22 @@ type connect struct {
closeMutex sync.Mutex
}
func (c *connect) debugf(format string, v ...any) {
c.debugfFunc(format, v...)
}
func (c *connect) connID() int {
return c.id
}
func (c *connect) connectedAtTime() time.Time {
return c.connectedAt
}
func (c *connect) serverVersion() (*ServerVersion, error) {
return &c.server, nil
}
func (c *connect) settings(querySettings Settings) []proto.Setting {
settingToProtoSetting := func(k string, v any) proto.Setting {
isCustom := false
@@ -195,6 +222,14 @@ func (c *connect) isBad() bool {
return false
}
func (c *connect) isReleased() bool {
return c.released
}
func (c *connect) setReleased(released bool) {
c.released = released
}
func (c *connect) isClosed() bool {
c.closeMutex.Lock()
defer c.closeMutex.Unlock()
@@ -361,6 +396,11 @@ func (c *connect) readData(ctx context.Context, packet byte, compressible bool)
return &block, nil
}
func (c *connect) freeBuffer() {
c.buffer = new(chproto.Buffer)
c.compressor.Data = nil
}
func (c *connect) flush() error {
if len(c.buffer.Buf) == 0 {
// Nothing to flush.

View File

@@ -35,7 +35,7 @@ import (
var insertMatch = regexp.MustCompile(`(?i)(INSERT\s+INTO\s+[^( ]+(?:\s*\([^()]*(?:\([^()]*\)[^()]*)*\))?)(?:\s*VALUES)?`)
var columnMatch = regexp.MustCompile(`INSERT INTO .+\s\((?P<Columns>.+)\)$`)
func (c *connect) prepareBatch(ctx context.Context, query string, opts driver.PrepareBatchOptions, release func(*connect, error), acquire func(context.Context) (*connect, error)) (driver.Batch, error) {
func (c *connect) prepareBatch(ctx context.Context, release nativeTransportRelease, acquire nativeTransportAcquire, query string, opts driver.PrepareBatchOptions) (driver.Batch, error) {
query, _, queryColumns, verr := extractNormalizedInsertQueryAndColumns(query)
if verr != nil {
return nil, verr
@@ -60,17 +60,29 @@ func (c *connect) prepareBatch(ctx context.Context, query string, opts driver.Pr
}
// resort batch to specified columns
if err = block.SortColumns(queryColumns); err != nil {
release(c, err)
return nil, err
}
connRelease := func(conn *connect, err error) {
release(conn, err)
}
connAcquire := func(ctx context.Context) (*connect, error) {
conn, err := acquire(ctx)
if err != nil {
return nil, err
}
return conn.(*connect), nil
}
b := &batch{
ctx: ctx,
query: query,
conn: c,
block: block,
released: false,
connRelease: release,
connAcquire: acquire,
connRelease: connRelease,
connAcquire: connAcquire,
onProcess: onProcess,
closeOnFlush: opts.CloseOnFlush,
}
@@ -320,6 +332,26 @@ func (b *batch) closeQuery() error {
return nil
}
// Close will end the current INSERT without sending the currently buffered rows, and release the connection.
// This may result in zero row inserts if no rows were appended.
// If a batch was already sent this does nothing.
// This should be called via defer after a batch is opened to prevent
// batches from falling out of scope and timing out.
func (b *batch) Close() error {
if b.sent || b.released {
return nil
}
if err := b.closeQuery(); err != nil {
return err
}
b.sent = true
b.release(nil)
return nil
}
type batchColumn struct {
err error
batch driver.Batch

View File

@@ -25,7 +25,7 @@ import (
"github.com/ClickHouse/clickhouse-go/v2/lib/proto"
)
func (c *connect) handshake(database, username, password string) error {
func (c *connect) handshake(auth Auth) error {
defer c.buffer.Reset()
c.debugf("[handshake] -> %s", proto.ClientHandshake{})
// set a read deadline - alternative to context.Read operation will fail if no data is received after deadline.
@@ -43,9 +43,9 @@ func (c *connect) handshake(database, username, password string) error {
}
handshake.Encode(c.buffer)
{
c.buffer.PutString(database)
c.buffer.PutString(username)
c.buffer.PutString(password)
c.buffer.PutString(auth.Database)
c.buffer.PutString(auth.Username)
c.buffer.PutString(auth.Password)
}
if err := c.flush(); err != nil {
return err

View File

@@ -23,7 +23,7 @@ import (
"compress/gzip"
"compress/zlib"
"context"
"database/sql/driver"
sqldriver "database/sql/driver"
"errors"
"fmt"
"io"
@@ -37,8 +37,6 @@ import (
"sync"
"time"
"github.com/ClickHouse/clickhouse-go/v2/resources"
"github.com/ClickHouse/ch-go/compress"
chproto "github.com/ClickHouse/ch-go/proto"
"github.com/ClickHouse/clickhouse-go/v2/lib/proto"
@@ -119,18 +117,59 @@ func (rw *HTTPReaderWriter) reset(pw *io.PipeWriter) io.WriteCloser {
}
}
// applyOptionsToRequest applies the client Options (such as auth, headers, client info) to the given http.Request
func applyOptionsToRequest(ctx context.Context, req *http.Request, opt *Options) error {
jwt := queryOptionsJWT(ctx)
useJWT := jwt != "" || useJWTAuth(opt)
if opt.TLS != nil && useJWT {
if jwt == "" {
var err error
jwt, err = opt.GetJWT(ctx)
if err != nil {
return fmt.Errorf("failed to get JWT: %w", err)
}
}
req.Header.Set("Authorization", "Bearer "+jwt)
} else if opt.TLS != nil && len(opt.Auth.Username) > 0 {
req.Header.Set("X-ClickHouse-User", opt.Auth.Username)
if len(opt.Auth.Password) > 0 {
req.Header.Set("X-ClickHouse-Key", opt.Auth.Password)
req.Header.Set("X-ClickHouse-SSL-Certificate-Auth", "off")
} else {
req.Header.Set("X-ClickHouse-SSL-Certificate-Auth", "on")
}
} else if opt.TLS == nil && len(opt.Auth.Username) > 0 {
if len(opt.Auth.Password) > 0 {
req.URL.User = url.UserPassword(opt.Auth.Username, opt.Auth.Password)
} else {
req.URL.User = url.User(opt.Auth.Username)
}
}
req.Header.Set("User-Agent", opt.ClientInfo.String())
for k, v := range opt.HttpHeaders {
req.Header.Set(k, v)
}
return nil
}
func dialHttp(ctx context.Context, addr string, num int, opt *Options) (*httpConnect, error) {
var debugf = func(format string, v ...any) {}
debugf := func(format string, v ...any) {}
if opt.Debug {
if opt.Debugf != nil {
debugf = func(format string, v ...any) {
opt.Debugf(
"[clickhouse][conn=%d][%s] "+format,
append([]interface{}{num, addr}, v...)...,
"[clickhouse-http][%s][id=%d] "+format,
append([]interface{}{addr, num}, v...)...,
)
}
} else {
debugf = log.New(os.Stdout, fmt.Sprintf("[clickhouse][conn=%d][%s]", num, addr), 0).Printf
debugf = log.New(os.Stdout, fmt.Sprintf("[clickhouse-http][%s][id=%d]", addr, num), 0).Printf
}
}
@@ -151,29 +190,6 @@ func dialHttp(ctx context.Context, addr string, num int, opt *Options) (*httpCon
Path: opt.HttpUrlPath,
}
headers := make(map[string]string)
for k, v := range opt.HttpHeaders {
headers[k] = v
}
if opt.TLS == nil && len(opt.Auth.Username) > 0 {
if len(opt.Auth.Password) > 0 {
u.User = url.UserPassword(opt.Auth.Username, opt.Auth.Password)
} else {
u.User = url.User(opt.Auth.Username)
}
} else if opt.TLS != nil && len(opt.Auth.Username) > 0 {
headers["X-ClickHouse-User"] = opt.Auth.Username
if len(opt.Auth.Password) > 0 {
headers["X-ClickHouse-Key"] = opt.Auth.Password
headers["X-ClickHouse-SSL-Certificate-Auth"] = "off"
} else {
headers["X-ClickHouse-SSL-Certificate-Auth"] = "on"
}
}
headers["User-Agent"] = opt.ClientInfo.String()
query := u.Query()
if len(opt.Auth.Database) > 0 {
query.Set("database", opt.Auth.Database)
@@ -199,6 +215,8 @@ func dialHttp(ctx context.Context, addr string, num int, opt *Options) (*httpCon
}
query.Set("default_format", "Native")
// TODO: we support newer revisions but for some reason this completely breaks Native format
//query.Set("client_protocol_version", strconv.Itoa(ClientTCPProtocolVersion))
u.RawQuery = query.Encode()
httpProxy := http.ProxyFromEnvironment
@@ -224,101 +242,118 @@ func dialHttp(ctx context.Context, addr string, num int, opt *Options) (*httpCon
}
}
conn := &httpConnect{
conn := httpConnect{
id: num,
connectedAt: time.Now(),
released: false,
debugfFunc: debugf,
opt: opt,
client: &http.Client{
Transport: t,
},
url: u,
url: u,
// TODO: learn more about why revision is broken
//revision: ClientTCPProtocolVersion,
buffer: new(chproto.Buffer),
compression: opt.Compression.Method,
blockCompressor: compress.NewWriter(compress.Level(opt.Compression.Level), compress.Method(opt.Compression.Method)),
compressionPool: compressionPool,
blockBufferSize: opt.BlockBufferSize,
headers: headers,
}
location, err := conn.readTimeZone(ctx)
if err != nil {
return nil, err
}
if num == 1 {
version, err := conn.readVersion(ctx)
if err != nil {
return nil, err
}
if !resources.ClientMeta.IsSupportedClickHouseVersion(version) {
debugf("WARNING: version %v of ClickHouse is not supported by this client\n", version)
}
}
return &httpConnect{
client: &http.Client{
Transport: t,
},
url: u,
buffer: new(chproto.Buffer),
compression: opt.Compression.Method,
blockCompressor: compress.NewWriter(compress.Level(opt.Compression.Level), compress.Method(opt.Compression.Method)),
compressionPool: compressionPool,
location: location,
blockBufferSize: opt.BlockBufferSize,
headers: headers,
}, nil
handshake, err := conn.queryHello(ctx, func(nativeTransport, error) {})
if err != nil {
return nil, fmt.Errorf("failed to query server hello: %w", err)
}
conn.handshake = handshake
return &conn, nil
}
type httpConnect struct {
id int
connectedAt time.Time
released bool
debugfFunc func(format string, v ...any)
opt *Options
revision uint64
url *url.URL
client *http.Client
location *time.Location
buffer *chproto.Buffer
compression CompressionMethod
blockCompressor *compress.Writer
compressionPool Pool[HTTPReaderWriter]
blockBufferSize uint8
headers map[string]string
handshake proto.ServerHandshake
}
func (h *httpConnect) serverVersion() (*ServerVersion, error) {
return &h.handshake, nil
}
func (h *httpConnect) connID() int {
return h.id
}
func (h *httpConnect) connectedAtTime() time.Time {
return h.connectedAt
}
func (h *httpConnect) isReleased() bool {
return h.released
}
func (h *httpConnect) setReleased(released bool) {
h.released = released
}
func (h *httpConnect) debugf(format string, v ...any) {
h.debugfFunc(format, v...)
}
func (h *httpConnect) freeBuffer() {
}
func (h *httpConnect) isBad() bool {
return h.client == nil
}
func (h *httpConnect) readTimeZone(ctx context.Context) (*time.Location, error) {
rows, err := h.query(Context(ctx, ignoreExternalTables()), func(*connect, error) {}, "SELECT timezone()")
func (h *httpConnect) queryHello(ctx context.Context, release nativeTransportRelease) (proto.ServerHandshake, error) {
h.debugf("[query hello]")
ctx = Context(ctx, ignoreExternalTables())
query := "SELECT displayName(), version(), revision(), timezone()"
rows, err := h.query(ctx, release, query)
if err != nil {
return nil, err
return proto.ServerHandshake{}, fmt.Errorf("failed to query server hello info: %w", err)
}
defer rows.Close()
if !rows.Next() {
return nil, errors.New("unable to determine server timezone")
return proto.ServerHandshake{}, errors.New("no rows returned for server hello query")
}
var serverLocation string
if err := rows.Scan(&serverLocation); err != nil {
return nil, err
var (
displayName string
versionStr string
revision uint32
timezone string
)
if err := rows.Scan(&displayName, &versionStr, &revision, &timezone); err != nil {
return proto.ServerHandshake{}, err
}
location, err := time.LoadLocation(serverLocation)
location, err := time.LoadLocation(timezone)
if err != nil {
return nil, err
}
return location, nil
}
func (h *httpConnect) readVersion(ctx context.Context) (proto.Version, error) {
rows, err := h.query(Context(ctx, ignoreExternalTables()), func(*connect, error) {}, "SELECT version()")
if err != nil {
return proto.Version{}, err
return proto.ServerHandshake{}, fmt.Errorf("failed to load timezone from server hello query: %w", err)
}
if !rows.Next() {
return proto.Version{}, errors.New("unable to determine version")
}
var v string
if err := rows.Scan(&v); err != nil {
return proto.Version{}, err
}
version := proto.ParseVersion(v)
return version, nil
return proto.ServerHandshake{
Name: displayName,
DisplayName: displayName,
Revision: uint64(revision),
Version: proto.ParseVersion(versionStr),
Timezone: location,
}, nil
}
func createCompressionPool(compression *Compression) (Pool[HTTPReaderWriter], error) {
@@ -372,8 +407,8 @@ func createCompressionPool(compression *Compression) (Pool[HTTPReaderWriter], er
func (h *httpConnect) writeData(block *proto.Block) error {
// Saving offset of compressible data
start := len(h.buffer.Buf)
if err := block.Encode(h.buffer, 0); err != nil {
return err
if err := block.Encode(h.buffer, h.revision); err != nil {
return fmt.Errorf("block encode: %w", err)
}
if h.compression == CompressionLZ4 || h.compression == CompressionZSTD {
// Performing compression. Supported and requires
@@ -387,7 +422,7 @@ func (h *httpConnect) writeData(block *proto.Block) error {
}
func (h *httpConnect) readData(reader *chproto.Reader, timezone *time.Location) (*proto.Block, error) {
location := h.location
location := h.handshake.Timezone
if timezone != nil {
location = timezone
}
@@ -397,8 +432,8 @@ func (h *httpConnect) readData(reader *chproto.Reader, timezone *time.Location)
reader.EnableCompression()
defer reader.DisableCompression()
}
if err := block.Decode(reader, 0); err != nil {
return nil, err
if err := block.Decode(reader, h.revision); err != nil {
return nil, fmt.Errorf("block decode: %w", err)
}
return &block, nil
}
@@ -456,9 +491,16 @@ func (h *httpConnect) createRequest(ctx context.Context, requestUrl string, read
if err != nil {
return nil, err
}
err = applyOptionsToRequest(ctx, req, h.opt)
if err != nil {
return nil, err
}
for k, v := range headers {
req.Header.Add(k, v)
}
var query url.Values
if options != nil {
query = req.URL.Query()
@@ -509,7 +551,7 @@ func (h *httpConnect) createRequestWithExternalTables(ctx context.Context, query
return nil, err
}
buf.Reset()
err = table.Block().Encode(buf, 0)
err = table.Block().Encode(buf, h.revision)
if err != nil {
return nil, err
}
@@ -533,7 +575,7 @@ func (h *httpConnect) createRequestWithExternalTables(ctx context.Context, query
func (h *httpConnect) executeRequest(req *http.Request) (*http.Response, error) {
if h.client == nil {
return nil, driver.ErrBadConn
return nil, sqldriver.ErrBadConn
}
resp, err := h.client.Do(req)
if err != nil {
@@ -541,21 +583,25 @@ func (h *httpConnect) executeRequest(req *http.Request) (*http.Response, error)
}
if resp.StatusCode != http.StatusOK {
defer resp.Body.Close()
msg, err := h.readRawResponse(resp)
defer discardAndClose(resp.Body)
msgBytes, err := h.readRawResponse(resp)
if err != nil {
return nil, fmt.Errorf("clickhouse [execute]:: %d code: failed to read the response: %w", resp.StatusCode, err)
return nil, fmt.Errorf("[HTTP %d] failed to read response: %w", resp.StatusCode, err)
}
return nil, fmt.Errorf("clickhouse [execute]:: %d code: %s", resp.StatusCode, string(msg))
return nil, fmt.Errorf("[HTTP %d] response body: \"%s\"", resp.StatusCode, string(msgBytes))
}
return resp, nil
}
func (h *httpConnect) ping(ctx context.Context) error {
rows, err := h.query(Context(ctx, ignoreExternalTables()), nil, "SELECT 1")
ctx = Context(ctx, ignoreExternalTables())
// release func is called by connection pool
rows, err := h.query(ctx, func(nativeTransport, error) {}, "SELECT 1")
if err != nil {
return err
}
defer rows.Close()
column := rows.Columns()
// check that we got column 1
if len(column) == 1 && column[0] == "1" {
@@ -572,3 +618,10 @@ func (h *httpConnect) close() error {
h.client = nil
return nil
}
// discardAndClose discards remaining data and closes the reader.
// Intended for freeing HTTP connections for re-use.
func discardAndClose(rc io.ReadCloser) {
_, _ = io.Copy(io.Discard, rc)
_ = rc.Close()
}

View File

@@ -19,7 +19,6 @@ package clickhouse
import (
"context"
"io"
)
func (h *httpConnect) asyncInsert(ctx context.Context, query string, wait bool, args ...any) error {
@@ -32,18 +31,17 @@ func (h *httpConnect) asyncInsert(ctx context.Context, query string, wait bool,
}
if len(args) > 0 {
var err error
query, err = bindQueryOrAppendParameters(true, &options, query, h.location, args...)
query, err = bindQueryOrAppendParameters(true, &options, query, h.handshake.Timezone, args...)
if err != nil {
return err
}
}
res, err := h.sendQuery(ctx, query, &options, h.headers)
if res != nil {
defer res.Body.Close()
// we don't care about result, so just discard it to reuse connection
_, _ = io.Copy(io.Discard, res.Body)
res, err := h.sendQuery(ctx, query, &options, nil)
if err != nil {
return err
}
defer discardAndClose(res.Body)
return err
return nil
}

View File

@@ -20,98 +20,161 @@ package clickhouse
import (
"context"
"fmt"
"io"
"slices"
"github.com/ClickHouse/clickhouse-go/v2/lib/column"
"github.com/ClickHouse/clickhouse-go/v2/lib/driver"
"github.com/ClickHouse/clickhouse-go/v2/lib/proto"
"io"
"os"
"slices"
)
// release is ignored, because http used by std with empty release function.
// Also opts ignored because all options unused in http batch.
func (h *httpConnect) prepareBatch(ctx context.Context, query string, opts driver.PrepareBatchOptions, release func(*connect, error), acquire func(context.Context) (*connect, error)) (driver.Batch, error) {
query, tableName, queryColumns, err := extractNormalizedInsertQueryAndColumns(query)
if err != nil {
return nil, err
}
func fetchColumnNamesAndTypesForInsert(h *httpConnect, release nativeTransportRelease, ctx context.Context, tableName string, requestedColumnNames []string) ([]ColumnNameAndType, error) {
describeTableQuery := fmt.Sprintf("DESCRIBE TABLE %s", tableName)
r, err := h.query(ctx, release, describeTableQuery)
if err != nil {
return nil, err
}
defer r.Close()
block := &proto.Block{}
columns := make(map[string]string)
var colNames []string
columnsToTypes := make(map[string]string)
var allColumns []string
for r.Next() {
var (
colName string
colType string
default_type string
ignore string
colName string
colType string
defaultType string
ignore string
)
if err = r.Scan(&colName, &colType, &default_type, &ignore, &ignore, &ignore, &ignore); err != nil {
if err = r.Scan(&colName, &colType, &defaultType, &ignore, &ignore, &ignore, &ignore); err != nil {
return nil, err
}
// these column types cannot be specified in INSERT queries
if default_type == "MATERIALIZED" || default_type == "ALIAS" {
if defaultType == "MATERIALIZED" || defaultType == "ALIAS" {
continue
}
colNames = append(colNames, colName)
columns[colName] = colType
columnsToTypes[colName] = colType
allColumns = append(allColumns, colName)
}
switch len(queryColumns) {
case 0:
for _, colName := range colNames {
if err = block.AddColumn(colName, column.Type(columns[colName])); err != nil {
return nil, err
}
}
default:
// user has requested specific columns so only include these
for _, colName := range queryColumns {
if colType, ok := columns[colName]; ok {
if err = block.AddColumn(colName, column.Type(colType)); err != nil {
return nil, err
}
} else {
// The order of the columns must match the INSERT list, or the DESC table if no insert list was provided
insertColumns := make([]ColumnNameAndType, 0, len(allColumns))
if len(requestedColumnNames) > 0 {
// Validate requested columns present
for _, colName := range requestedColumnNames {
colType, ok := columnsToTypes[colName]
if !ok {
return nil, fmt.Errorf("column %s is not present in the table %s", colName, tableName)
}
insertColumns = append(insertColumns, ColumnNameAndType{
Name: colName,
Type: colType,
})
}
} else {
// Use all columns
for _, colName := range allColumns {
colType := columnsToTypes[colName]
insertColumns = append(insertColumns, ColumnNameAndType{
Name: colName,
Type: colType,
})
}
}
return insertColumns, nil
}
func newBlock(h *httpConnect, release nativeTransportRelease, ctx context.Context, query string) (string, *proto.Block, error) {
normalizedQuery, tableName, requestedColumnNames, err := extractNormalizedInsertQueryAndColumns(query)
if err != nil {
return "", nil, err
}
opt := queryOptions(ctx)
columns := opt.columnNamesAndTypes
// If the user didn't supply known column names/types, do expensive DESC TABLE logic
if opt.columnNamesAndTypes == nil {
fetchedColumns, err := fetchColumnNamesAndTypesForInsert(h, release, ctx, tableName, requestedColumnNames)
if err != nil {
return "", nil, fmt.Errorf("failed to determine columns for HTTP insert: %w", err)
}
columns = fetchedColumns
}
var block proto.Block
for _, col := range columns {
if err := block.AddColumn(col.Name, column.Type(col.Type)); err != nil {
return "", nil, err
}
}
return normalizedQuery, &block, nil
}
func (h *httpConnect) prepareBatch(ctx context.Context, release nativeTransportRelease, acquire nativeTransportAcquire, query string, opts driver.PrepareBatchOptions) (driver.Batch, error) {
// release is not used within newBlock since the connection is held for the batch.
query, block, err := newBlock(h, func(nativeTransport, error) {}, ctx, query)
if err != nil {
err = fmt.Errorf("failed to init block for HTTP batch: %w", err)
release(h, err)
return nil, err
}
return &httpBatch{
ctx: ctx,
conn: h,
structMap: &structMap{},
block: block,
query: query,
ctx: ctx,
conn: h,
connRelease: release,
structMap: &structMap{},
block: block,
query: query,
}, nil
}
type httpBatch struct {
query string
err error
ctx context.Context
conn *httpConnect
structMap *structMap
sent bool
block *proto.Block
query string
err error
ctx context.Context
conn *httpConnect
released bool
connRelease nativeTransportRelease
structMap *structMap
sent bool
block *proto.Block
}
func (b *httpBatch) release(err error) {
if !b.released {
b.released = true
b.connRelease(b.conn, err)
}
}
// Flush TODO: noop on http currently - requires streaming to be implemented
func (b *httpBatch) Flush() error {
// Flush and Send are effectively the same for HTTP, but users should just use Send until we
// figure out a way to do proper streaming.
return nil
}
func (b *httpBatch) Close() error {
if b.sent || b.released {
return nil
}
b.sent = true
b.release(nil)
return nil
}
func (b *httpBatch) Abort() error {
defer func() {
b.sent = true
b.release(os.ErrProcessDone)
}()
if b.sent {
return ErrBatchAlreadySent
@@ -123,13 +186,23 @@ func (b *httpBatch) Append(v ...any) error {
if b.sent {
return ErrBatchAlreadySent
}
if b.err != nil {
return b.err
}
if err := b.block.Append(v...); err != nil {
b.err = fmt.Errorf("%w: %w", ErrBatchInvalid, err)
b.release(err)
return err
}
return nil
}
func (b *httpBatch) AppendStruct(v any) error {
if b.err != nil {
return b.err
}
values, err := b.structMap.Map("AppendStruct", b.block.ColumnsNames(), v, false)
if err != nil {
return err
@@ -162,6 +235,7 @@ func (b *httpBatch) IsSent() bool {
func (b *httpBatch) Send() (err error) {
defer func() {
b.sent = true
b.release(err)
}()
if b.sent {
return ErrBatchAlreadySent
@@ -169,16 +243,12 @@ func (b *httpBatch) Send() (err error) {
if b.err != nil {
return b.err
}
if b.block.Rows() == 0 {
return nil
}
options := queryOptions(b.ctx)
headers := make(map[string]string)
r, pw := io.Pipe()
crw := b.conn.compressionPool.Get()
w := crw.reset(pw)
defer b.conn.compressionPool.Put(crw)
switch b.conn.compression {
case CompressionGZIP, CompressionDeflate, CompressionBrotli:
headers["Content-Encoding"] = b.conn.compression.String()
@@ -187,38 +257,38 @@ func (b *httpBatch) Send() (err error) {
options.settings["compress"] = "1"
}
compressionWriter := b.conn.compressionPool.Get()
defer b.conn.compressionPool.Put(compressionWriter)
pipeReader, pipeWriter := io.Pipe()
connWriter := compressionWriter.reset(pipeWriter)
go func() {
var err error = nil
defer pw.CloseWithError(err)
defer w.Close()
var err error
defer pipeWriter.CloseWithError(err)
defer connWriter.Close()
b.conn.buffer.Reset()
if b.block.Rows() != 0 {
if err = b.conn.writeData(b.block); err != nil {
return
}
}
if err = b.conn.writeData(&proto.Block{}); err != nil {
if err = b.conn.writeData(b.block); err != nil {
return
}
if _, err = w.Write(b.conn.buffer.Buf); err != nil {
if _, err = connWriter.Write(b.conn.buffer.Buf); err != nil {
return
}
}()
options.settings["query"] = b.query
headers["Content-Type"] = "application/octet-stream"
for k, v := range b.conn.headers {
headers[k] = v
}
res, err := b.conn.sendStreamQuery(b.ctx, r, &options, headers)
if res != nil {
defer res.Body.Close()
// we don't care about result, so just discard it to reuse connection
_, _ = io.Copy(io.Discard, res.Body)
b.conn.debugf("[batch send start] columns=%d rows=%d", len(b.block.Columns), b.block.Rows())
res, err := b.conn.sendStreamQuery(b.ctx, pipeReader, &options, headers)
if err != nil {
return fmt.Errorf("batch sendStreamQuery: %w", err)
}
discardAndClose(res.Body)
return err
b.conn.debugf("[batch send complete]")
b.block.Reset()
return nil
}
func (b *httpBatch) Rows() int {

View File

@@ -19,22 +19,20 @@ package clickhouse
import (
"context"
"io"
)
func (h *httpConnect) exec(ctx context.Context, query string, args ...any) error {
options := queryOptions(ctx)
query, err := bindQueryOrAppendParameters(true, &options, query, h.location, args...)
query, err := bindQueryOrAppendParameters(true, &options, query, h.handshake.Timezone, args...)
if err != nil {
return err
}
res, err := h.sendQuery(ctx, query, &options, h.headers)
if res != nil {
defer res.Body.Close()
// we don't care about result, so just discard it to reuse connection
_, _ = io.Copy(io.Discard, res.Body)
res, err := h.sendQuery(ctx, query, &options, nil)
if err != nil {
return err
}
defer discardAndClose(res.Body)
return err
return nil
}

View File

@@ -20,6 +20,7 @@ package clickhouse
import (
"context"
"errors"
"fmt"
"io"
chproto "github.com/ClickHouse/ch-go/proto"
@@ -27,10 +28,13 @@ import (
)
// release is ignored, because http used by std with empty release function
func (h *httpConnect) query(ctx context.Context, release func(*connect, error), query string, args ...any) (*rows, error) {
func (h *httpConnect) query(ctx context.Context, release nativeTransportRelease, query string, args ...any) (*rows, error) {
h.debugf("[http query] \"%s\"", query)
options := queryOptions(ctx)
query, err := bindQueryOrAppendParameters(true, &options, query, h.location, args...)
query, err := bindQueryOrAppendParameters(true, &options, query, h.handshake.Timezone, args...)
if err != nil {
err = fmt.Errorf("bindQueryOrAppendParameters: %w", err)
release(h, err)
return nil, err
}
headers := make(map[string]string)
@@ -42,17 +46,17 @@ func (h *httpConnect) query(ctx context.Context, release func(*connect, error),
headers["Accept-Encoding"] = h.compression.String()
}
for k, v := range h.headers {
headers[k] = v
}
res, err := h.sendQuery(ctx, query, &options, headers)
if err != nil {
err = fmt.Errorf("sendQuery: %w", err)
release(h, err)
return nil, err
}
if res.ContentLength == 0 {
discardAndClose(res.Body)
block := &proto.Block{}
release(h, nil)
return &rows{
block: block,
columns: block.ColumnsNames(),
@@ -68,21 +72,25 @@ func (h *httpConnect) query(ctx context.Context, release func(*connect, error),
// automatically as they might not have permissions.
reader, err := rw.NewReader(res)
if err != nil {
res.Body.Close()
err = fmt.Errorf("NewReader: %w", err)
discardAndClose(res.Body)
h.compressionPool.Put(rw)
release(h, err)
return nil, err
}
chReader := chproto.NewReader(reader)
block, err := h.readData(chReader, options.userLocation)
if err != nil && !errors.Is(err, io.EOF) {
res.Body.Close()
err = fmt.Errorf("readData: %w", err)
discardAndClose(res.Body)
h.compressionPool.Put(rw)
release(h, err)
return nil, err
}
bufferSize := h.blockBufferSize
if options.blockBufferSize > 0 {
// allow block buffer sze to be overridden per query
// allow block buffer size to be overridden per query
bufferSize = options.blockBufferSize
}
var (
@@ -95,7 +103,7 @@ func (h *httpConnect) query(ctx context.Context, release func(*connect, error),
if err != nil {
// ch-go wraps EOF errors
if !errors.Is(err, io.EOF) {
errCh <- err
errCh <- fmt.Errorf("readData stream: %w", err)
}
break
}
@@ -106,15 +114,17 @@ func (h *httpConnect) query(ctx context.Context, release func(*connect, error),
case stream <- block:
}
}
res.Body.Close()
discardAndClose(res.Body)
h.compressionPool.Put(rw)
close(stream)
close(errCh)
release(h, nil)
}()
if block == nil {
block = &proto.Block{}
}
return &rows{
block: block,
stream: stream,
@@ -123,3 +133,16 @@ func (h *httpConnect) query(ctx context.Context, release func(*connect, error),
structMap: &structMap{},
}, nil
}
func (h *httpConnect) queryRow(ctx context.Context, release nativeTransportRelease, query string, args ...any) *row {
rows, err := h.query(ctx, release, query, args...)
if err != nil {
return &row{
err: err,
}
}
return &row{
rows: rows,
}
}

View File

@@ -24,7 +24,7 @@ import (
"github.com/ClickHouse/clickhouse-go/v2/lib/proto"
)
func (c *connect) query(ctx context.Context, release func(*connect, error), query string, args ...any) (*rows, error) {
func (c *connect) query(ctx context.Context, release nativeTransportRelease, query string, args ...any) (*rows, error) {
var (
options = queryOptions(ctx)
onProcess = options.onProcess()
@@ -92,7 +92,7 @@ func (c *connect) query(ctx context.Context, release func(*connect, error), quer
}, nil
}
func (c *connect) queryRow(ctx context.Context, release func(*connect, error), query string, args ...any) *row {
func (c *connect) queryRow(ctx context.Context, release nativeTransportRelease, query string, args ...any) *row {
rows, err := c.query(ctx, release, query, args...)
if err != nil {
return &row{

View File

@@ -20,6 +20,7 @@ package clickhouse
import (
"context"
"maps"
"slices"
"time"
"github.com/ClickHouse/clickhouse-go/v2/ext"
@@ -41,6 +42,12 @@ type CustomSetting struct {
Value string
}
// ColumnNameAndType represents a column name and type
type ColumnNameAndType struct {
Name string
Type string
}
type Parameters map[string]string
type (
QueryOption func(*QueryOptions) error
@@ -53,17 +60,19 @@ type (
async AsyncOptions
queryID string
quotaKey string
jwt string
events struct {
logs func(*Log)
progress func(*Progress)
profileInfo func(*ProfileInfo)
profileEvents func([]ProfileEvent)
}
settings Settings
parameters Parameters
external []*ext.Table
blockBufferSize uint8
userLocation *time.Location
settings Settings
parameters Parameters
external []*ext.Table
blockBufferSize uint8
userLocation *time.Location
columnNamesAndTypes []ColumnNameAndType
}
)
@@ -95,6 +104,26 @@ func WithQuotaKey(quotaKey string) QueryOption {
}
}
// WithJWT overrides the existing authentication with the given JWT.
// This only applies for clients connected with HTTPS to ClickHouse Cloud.
func WithJWT(jwt string) QueryOption {
return func(o *QueryOptions) error {
o.jwt = jwt
return nil
}
}
// WithColumnNamesAndTypes is used to provide a predetermined list of
// column names and types for HTTP inserts.
// Without this, the HTTP implementation will parse the query and run a
// DESCRIBE TABLE request to fetch and validate column names.
func WithColumnNamesAndTypes(columnNamesAndTypes []ColumnNameAndType) QueryOption {
return func(o *QueryOptions) error {
o.columnNamesAndTypes = columnNamesAndTypes
return nil
}
}
func WithSettings(settings Settings) QueryOption {
return func(o *QueryOptions) error {
o.settings = settings
@@ -211,6 +240,16 @@ func queryOptions(ctx context.Context) QueryOptions {
return opt
}
// queryOptionsJWT returns the JWT within the given context's QueryOptions.
// Empty string if not present.
func queryOptionsJWT(ctx context.Context) string {
if opt, ok := ctx.Value(_contextOptionKey).(QueryOptions); ok {
return opt.jwt
}
return ""
}
// queryOptionsAsync returns the AsyncOptions struct within the given context's QueryOptions.
func queryOptionsAsync(ctx context.Context) AsyncOptions {
if opt, ok := ctx.Value(_contextOptionKey).(QueryOptions); ok {
@@ -259,26 +298,29 @@ func (q *QueryOptions) onProcess() *onProcess {
// clone returns a copy of QueryOptions where Settings and Parameters are safely mutable.
func (q *QueryOptions) clone() QueryOptions {
c := QueryOptions{
span: q.span,
async: q.async,
queryID: q.queryID,
quotaKey: q.quotaKey,
events: q.events,
settings: nil,
parameters: nil,
external: q.external,
blockBufferSize: q.blockBufferSize,
userLocation: q.userLocation,
span: q.span,
async: q.async,
queryID: q.queryID,
quotaKey: q.quotaKey,
events: q.events,
settings: nil,
parameters: nil,
external: q.external,
blockBufferSize: q.blockBufferSize,
userLocation: q.userLocation,
columnNamesAndTypes: nil,
}
if q.settings != nil {
c.settings = make(Settings, len(q.settings))
maps.Copy(c.settings, q.settings)
c.settings = maps.Clone(q.settings)
}
if q.parameters != nil {
c.parameters = make(Parameters, len(q.parameters))
maps.Copy(c.parameters, q.parameters)
c.parameters = maps.Clone(q.parameters)
}
if q.columnNamesAndTypes != nil {
c.columnNamesAndTypes = slices.Clone(q.columnNamesAndTypes)
}
return c

View File

@@ -32,6 +32,7 @@ Damir Sayfutdinov <sayfutdinov@selectel.ru>
Dan Walters <dan@walters.io>
Daniel Bershatsky <daniel.bershatsky@skolkovotech.ru>
Danila Migalin <miga@uber.com>
Danny Schofield <dannyschofield28@gmail.com>
Danny.Dunn <danny@DannyDunndeMBP.lan>
Darío <dgrripoll@gmail.com>
Dave Josephsen <dave.josephsen@gmail.com>
@@ -56,6 +57,7 @@ Florian Lehner <flehner@optimyze.cloud>
Fredz <513317651@qq.com>
Félix Mattrat <felix@messagebird.com>
Geoff Genz <geoff@clickhouse.com>
Gianluca Mondini <mondini@open2b.com>
GitHub Action <ch-integrations-robot@clickhouse.com>
Gregory Petrosyan <gregory.petrosyan@gmail.com>
Guoqiang <wgq0319@gmail.com>
@@ -77,6 +79,7 @@ Jimmie Han <hanjinming@outlook.com>
John Troy <john@noxi.us>
Jon Aquino <jonathan.aquino@adroll.com>
Julian Maicher <jmaicher@users.noreply.github.com>
Kemal Hadimli <disq@users.noreply.github.com>
Kevin Joiner <10265309+KevinJoiner@users.noreply.github.com>
Kirill Shvakov <shvakov@gmail.com>
Kiswono Prayogo <kiswono@gmail.com>
@@ -186,6 +189,7 @@ hexchain <hexchain@users.noreply.github.com>
hongker <xiaok2013@live.com>
hulb <hulb@live.cn>
ianmcgraw <ian@arthur.ai>
ilker karapanca <ilkerkarapanca@gmail.com>
ilker moral <ilker.moral@comodo.com>
jiyongwang <jiyongwang@freewheel.tv>
kshvakov <shvakov@gmail.com>
@@ -199,6 +203,7 @@ rtkaratekid <42547811+rtkaratekid@users.noreply.github.com>
sentanos <froastj@gmail.com>
sundy-li <543950155@qq.com>
vahid sohrabloo <vahid4134@gmail.com>
vaibhav-kt <vaibhav.kt@nutanix.com>
vasily.popov <vasily.popov@arrival.com>
viktorzaharov <viktorzahar@gmail.com>
vl4deee11 <44677024+vl4deee11@users.noreply.github.com>

33
vendor/github.com/ClickHouse/clickhouse-go/v2/jwt.go generated vendored Normal file
View File

@@ -0,0 +1,33 @@
// Licensed to ClickHouse, Inc. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. ClickHouse, Inc. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package clickhouse
import (
"context"
)
// jwtAuthMarker is the marker for JSON Web Token authentication in ClickHouse Cloud.
// At the protocol level this is used in place of a username.
const jwtAuthMarker = " JWT AUTHENTICATION "
type GetJWTFunc = func(ctx context.Context) (string, error)
// useJWTAuth returns true if the client should use JWT auth
func useJWTAuth(opt *Options) bool {
return opt.GetJWT != nil
}

View File

@@ -18,6 +18,7 @@
package column
import (
"database/sql"
"fmt"
"github.com/ClickHouse/ch-go/proto"
"reflect"
@@ -25,6 +26,8 @@ import (
"time"
)
var scanTypeAny = reflect.TypeOf((*interface{})(nil)).Elem()
type offset struct {
values UInt64
scanType reflect.Type
@@ -268,6 +271,13 @@ func (col *Array) WriteStatePrefix(buffer *proto.Buffer) error {
}
func (col *Array) ScanRow(dest any, row int) error {
if scanner, ok := dest.(sql.Scanner); ok {
value, err := col.scan(scanTypeAny, row)
if err != nil {
return err
}
return scanner.Scan(value.Interface())
}
elem := reflect.Indirect(reflect.ValueOf(dest))
value, err := col.scan(elem.Type(), row)
if err != nil {

View File

@@ -95,6 +95,11 @@ func (col *DateTime) ScanRow(dest any, row int) error {
case **time.Time:
*d = new(time.Time)
**d = col.row(row)
case *int64:
*d = col.row(row).Unix()
case **int64:
*d = new(int64)
**d = col.row(row).Unix()
case *sql.NullTime:
return d.Scan(col.row(row))
default:

View File

@@ -119,6 +119,11 @@ func (col *DateTime64) ScanRow(dest any, row int) error {
case **time.Time:
*d = new(time.Time)
**d = col.row(row)
case *int64:
*d = int64(proto.ToDateTime64(col.row(row), col.col.Precision))
case **int64:
*d = new(int64)
**d = int64(proto.ToDateTime64(col.row(row), col.col.Precision))
case *sql.NullTime:
return d.Scan(col.row(row))
default:

View File

@@ -18,6 +18,7 @@
package column
import (
"database/sql"
"database/sql/driver"
"encoding/binary"
"fmt"
@@ -98,6 +99,8 @@ func (col *IPv4) ScanRow(dest any, row int) error {
}
*d = new(uint32)
**d = binary.BigEndian.Uint32(ipV4[:])
case sql.Scanner:
return d.Scan(col.row(row))
default:
return &ColumnConverterError{
Op: "ScanRow",

View File

@@ -18,6 +18,7 @@
package column
import (
"database/sql"
"database/sql/driver"
"fmt"
"github.com/ClickHouse/ch-go/proto"
@@ -91,6 +92,8 @@ func (col *IPv6) ScanRow(dest any, row int) error {
case **[16]byte:
*d = new([16]byte)
**d = col.col.Row(row)
case sql.Scanner:
return d.Scan(col.row(row))
default:
return &ColumnConverterError{
Op: "ScanRow",

View File

@@ -18,6 +18,7 @@
package column
import (
"database/sql"
"database/sql/driver"
"fmt"
"reflect"
@@ -83,11 +84,14 @@ func (col *Map) parse(t Type, tz *time.Location) (_ Interface, err error) {
if col.values, err = Type(strings.TrimSpace(types[1])).Column(col.name, tz); err != nil {
return nil, err
}
col.scanType = reflect.MapOf(
col.keys.ScanType(),
col.values.ScanType(),
)
return col, nil
if col.keys.ScanType().Comparable() {
col.scanType = reflect.MapOf(
col.keys.ScanType(),
col.values.ScanType(),
)
return col, nil
}
}
return nil, &UnsupportedColumnTypeError{
t: t,
@@ -111,6 +115,9 @@ func (col *Map) Row(i int, ptr bool) any {
}
func (col *Map) ScanRow(dest any, i int) error {
if scanner, ok := dest.(sql.Scanner); ok {
return scanner.Scan(col.row(i).Interface())
}
value := reflect.Indirect(reflect.ValueOf(dest))
if value.Type() == col.scanType {
value.Set(col.row(i))

View File

@@ -214,7 +214,8 @@ func getStructFieldName(field reflect.StructField) (string, bool) {
return name, true
}
if tag != "" {
return tag, false
// Some JSON tags contain omitempty after a comma but we don't want those in our field name.
return strings.Split(tag, ",")[0], false
}
// support ch tag as well as this is used elsewhere
tag = field.Tag.Get("ch")

View File

@@ -87,6 +87,7 @@ type (
IsSent() bool
Rows() int
Columns() []column.Interface
Close() error
}
BatchColumn interface {
Append(any) error

View File

@@ -27,6 +27,8 @@ import (
var (
ErrExpectedStringValueInNamedValueForQueryParameter = errors.New("expected string value in NamedValue for query parameter")
ErrInvalidValueInNamedDateValue = errors.New("invalid value in NamedDateValue for query parameter")
ErrUnsupportedQueryParameter = errors.New("unsupported query parameter type")
hasQueryParamsRe = regexp.MustCompile("{.+:.+}")
)
@@ -44,14 +46,25 @@ func bindQueryOrAppendParameters(paramsProtocolSupport bool, options *QueryOptio
hasQueryParamsRe.MatchString(query) {
options.parameters = make(Parameters, len(args))
for _, a := range args {
if p, ok := a.(driver.NamedValue); ok {
switch p := a.(type) {
case driver.NamedValue:
if str, ok := p.Value.(string); ok {
options.parameters[p.Name] = str
continue
}
}
return "", ErrExpectedStringValueInNamedValueForQueryParameter
return "", ErrExpectedStringValueInNamedValueForQueryParameter
case driver.NamedDateValue:
if !p.Value.IsZero() && p.Name != "" {
formatted := formatTimeWithScale(p.Value, TimeUnit(p.Scale))
options.parameters[p.Name] = formatted
continue
}
return "", ErrInvalidValueInNamedDateValue
default:
return "", ErrUnsupportedQueryParameter
}
}
return query, nil
@@ -59,3 +72,16 @@ func bindQueryOrAppendParameters(paramsProtocolSupport bool, options *QueryOptio
return bind(timezone, query, args...)
}
func formatTimeWithScale(t time.Time, scale TimeUnit) string {
switch scale {
case MicroSeconds:
return t.Format("2006-01-02 15:04:05.000000")
case MilliSeconds:
return t.Format("2006-01-02 15:04:05.000")
case NanoSeconds:
return t.Format("2006-01-02 15:04:05.000000000")
default:
return t.Format("2006-01-02 15:04:05")
}
}

View File

@@ -21,12 +21,15 @@ import (
"context"
"errors"
"fmt"
"github.com/ClickHouse/clickhouse-go/v2/lib/driver"
"reflect"
"github.com/ClickHouse/clickhouse-go/v2/lib/proto"
)
func (ch *clickhouse) Select(ctx context.Context, dest any, query string, args ...any) error {
type scanSelectQueryFunc func(ctx context.Context, query string, args ...any) (driver.Rows, error)
func scanSelect(queryFunc scanSelectQueryFunc, ctx context.Context, dest any, query string, args ...any) error {
value := reflect.ValueOf(dest)
if value.Kind() != reflect.Ptr {
return &OpError{
@@ -51,7 +54,7 @@ func (ch *clickhouse) Select(ctx context.Context, dest any, query string, args .
}
var (
base = direct.Type().Elem()
rows, err = ch.Query(ctx, query, args...)
rows, err = queryFunc(ctx, query, args...)
)
if err != nil {
return err
@@ -67,9 +70,14 @@ func (ch *clickhouse) Select(ctx context.Context, dest any, query string, args .
if err := rows.Close(); err != nil {
return err
}
return rows.Err()
}
func (ch *clickhouse) Select(ctx context.Context, dest any, query string, args ...any) error {
return scanSelect(ch.Query, ctx, dest, query, args...)
}
func scan(block *proto.Block, row int, dest ...any) error {
columns := block.Columns
if len(columns) != len(dest) {

View File

@@ -19,7 +19,7 @@ func NewAllowKeysFilter(keys ...Key) Filter {
return func(kv KeyValue) bool { return false }
}
allowed := make(map[Key]struct{})
allowed := make(map[Key]struct{}, len(keys))
for _, k := range keys {
allowed[k] = struct{}{}
}
@@ -38,7 +38,7 @@ func NewDenyKeysFilter(keys ...Key) Filter {
return func(kv KeyValue) bool { return true }
}
forbid := make(map[Key]struct{})
forbid := make(map[Key]struct{}, len(keys))
for _, k := range keys {
forbid[k] = struct{}{}
}

View File

@@ -5,7 +5,7 @@
Package attribute provide several helper functions for some commonly used
logic of processing attributes.
*/
package attribute // import "go.opentelemetry.io/otel/internal/attribute"
package attribute // import "go.opentelemetry.io/otel/attribute/internal"
import (
"reflect"

View File

@@ -0,0 +1,37 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package attribute // import "go.opentelemetry.io/otel/attribute"
import (
"math"
)
func boolToRaw(b bool) uint64 { // nolint:revive // b is not a control flag.
if b {
return 1
}
return 0
}
func rawToBool(r uint64) bool {
return r != 0
}
func int64ToRaw(i int64) uint64 {
// Assumes original was a valid int64 (overflow not checked).
return uint64(i) // nolint: gosec
}
func rawToInt64(r uint64) int64 {
// Assumes original was a valid int64 (overflow not checked).
return int64(r) // nolint: gosec
}
func float64ToRaw(f float64) uint64 {
return math.Float64bits(f)
}
func rawToFloat64(r uint64) float64 {
return math.Float64frombits(r)
}

View File

@@ -9,8 +9,7 @@ import (
"reflect"
"strconv"
"go.opentelemetry.io/otel/internal"
"go.opentelemetry.io/otel/internal/attribute"
attribute "go.opentelemetry.io/otel/attribute/internal"
)
//go:generate stringer -type=Type
@@ -51,7 +50,7 @@ const (
func BoolValue(v bool) Value {
return Value{
vtype: BOOL,
numeric: internal.BoolToRaw(v),
numeric: boolToRaw(v),
}
}
@@ -82,7 +81,7 @@ func IntSliceValue(v []int) Value {
func Int64Value(v int64) Value {
return Value{
vtype: INT64,
numeric: internal.Int64ToRaw(v),
numeric: int64ToRaw(v),
}
}
@@ -95,7 +94,7 @@ func Int64SliceValue(v []int64) Value {
func Float64Value(v float64) Value {
return Value{
vtype: FLOAT64,
numeric: internal.Float64ToRaw(v),
numeric: float64ToRaw(v),
}
}
@@ -125,7 +124,7 @@ func (v Value) Type() Type {
// AsBool returns the bool value. Make sure that the Value's type is
// BOOL.
func (v Value) AsBool() bool {
return internal.RawToBool(v.numeric)
return rawToBool(v.numeric)
}
// AsBoolSlice returns the []bool value. Make sure that the Value's type is
@@ -144,7 +143,7 @@ func (v Value) asBoolSlice() []bool {
// AsInt64 returns the int64 value. Make sure that the Value's type is
// INT64.
func (v Value) AsInt64() int64 {
return internal.RawToInt64(v.numeric)
return rawToInt64(v.numeric)
}
// AsInt64Slice returns the []int64 value. Make sure that the Value's type is
@@ -163,7 +162,7 @@ func (v Value) asInt64Slice() []int64 {
// AsFloat64 returns the float64 value. Make sure that the Value's
// type is FLOAT64.
func (v Value) AsFloat64() float64 {
return internal.RawToFloat64(v.numeric)
return rawToFloat64(v.numeric)
}
// AsFloat64Slice returns the []float64 value. Make sure that the Value's type is

View File

@@ -1,18 +0,0 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package internal // import "go.opentelemetry.io/otel/internal"
//go:generate gotmpl --body=./shared/matchers/expectation.go.tmpl "--data={}" --out=matchers/expectation.go
//go:generate gotmpl --body=./shared/matchers/expecter.go.tmpl "--data={}" --out=matchers/expecter.go
//go:generate gotmpl --body=./shared/matchers/temporal_matcher.go.tmpl "--data={}" --out=matchers/temporal_matcher.go
//go:generate gotmpl --body=./shared/internaltest/alignment.go.tmpl "--data={}" --out=internaltest/alignment.go
//go:generate gotmpl --body=./shared/internaltest/env.go.tmpl "--data={}" --out=internaltest/env.go
//go:generate gotmpl --body=./shared/internaltest/env_test.go.tmpl "--data={}" --out=internaltest/env_test.go
//go:generate gotmpl --body=./shared/internaltest/errors.go.tmpl "--data={}" --out=internaltest/errors.go
//go:generate gotmpl --body=./shared/internaltest/harness.go.tmpl "--data={\"matchersImportPath\": \"go.opentelemetry.io/otel/internal/matchers\"}" --out=internaltest/harness.go
//go:generate gotmpl --body=./shared/internaltest/text_map_carrier.go.tmpl "--data={}" --out=internaltest/text_map_carrier.go
//go:generate gotmpl --body=./shared/internaltest/text_map_carrier_test.go.tmpl "--data={}" --out=internaltest/text_map_carrier_test.go
//go:generate gotmpl --body=./shared/internaltest/text_map_propagator.go.tmpl "--data={}" --out=internaltest/text_map_propagator.go
//go:generate gotmpl --body=./shared/internaltest/text_map_propagator_test.go.tmpl "--data={}" --out=internaltest/text_map_propagator_test.go

View File

@@ -1,48 +0,0 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package internal // import "go.opentelemetry.io/otel/internal"
import (
"math"
"unsafe"
)
func BoolToRaw(b bool) uint64 { // nolint:revive // b is not a control flag.
if b {
return 1
}
return 0
}
func RawToBool(r uint64) bool {
return r != 0
}
func Int64ToRaw(i int64) uint64 {
// Assumes original was a valid int64 (overflow not checked).
return uint64(i) // nolint: gosec
}
func RawToInt64(r uint64) int64 {
// Assumes original was a valid int64 (overflow not checked).
return int64(r) // nolint: gosec
}
func Float64ToRaw(f float64) uint64 {
return math.Float64bits(f)
}
func RawToFloat64(r uint64) float64 {
return math.Float64frombits(r)
}
func RawPtrToFloat64Ptr(r *uint64) *float64 {
// Assumes original was a valid *float64 (overflow not checked).
return (*float64)(unsafe.Pointer(r)) // nolint: gosec
}
func RawPtrToInt64Ptr(r *uint64) *int64 {
// Assumes original was a valid *int64 (overflow not checked).
return (*int64)(unsafe.Pointer(r)) // nolint: gosec
}

View File

@@ -57,14 +57,15 @@ type autoTracer struct {
var _ Tracer = autoTracer{}
func (t autoTracer) Start(ctx context.Context, name string, opts ...SpanStartOption) (context.Context, Span) {
var psc SpanContext
var psc, sc SpanContext
sampled := true
span := new(autoSpan)
// Ask eBPF for sampling decision and span context info.
t.start(ctx, span, &psc, &sampled, &span.spanContext)
t.start(ctx, span, &psc, &sampled, &sc)
span.sampled.Store(sampled)
span.spanContext = sc
ctx = ContextWithSpan(ctx, span)

View File

@@ -251,13 +251,20 @@ func (s *Span) UnmarshalJSON(data []byte) error {
type SpanFlags int32
const (
// SpanFlagsTraceFlagsMask is a mask for trace-flags.
//
// Bits 0-7 are used for trace flags.
SpanFlagsTraceFlagsMask SpanFlags = 255
// Bits 8 and 9 are used to indicate that the parent span or link span is remote.
// Bit 8 (`HAS_IS_REMOTE`) indicates whether the value is known.
// Bit 9 (`IS_REMOTE`) indicates whether the span or link is remote.
// SpanFlagsContextHasIsRemoteMask is a mask for HAS_IS_REMOTE status.
//
// Bits 8 and 9 are used to indicate that the parent span or link span is
// remote. Bit 8 (`HAS_IS_REMOTE`) indicates whether the value is known.
SpanFlagsContextHasIsRemoteMask SpanFlags = 256
// SpanFlagsContextHasIsRemoteMask indicates the Span is remote.
// SpanFlagsContextIsRemoteMask is a mask for IS_REMOTE status.
//
// Bits 8 and 9 are used to indicate that the parent span or link span is
// remote. Bit 9 (`IS_REMOTE`) indicates whether the span or link is
// remote.
SpanFlagsContextIsRemoteMask SpanFlags = 512
)
@@ -266,27 +273,31 @@ const (
type SpanKind int32
const (
// Indicates that the span represents an internal operation within an application,
// as opposed to an operation happening at the boundaries. Default value.
// SpanKindInternal indicates that the span represents an internal
// operation within an application, as opposed to an operation happening at
// the boundaries.
SpanKindInternal SpanKind = 1
// Indicates that the span covers server-side handling of an RPC or other
// remote network request.
// SpanKindServer indicates that the span covers server-side handling of an
// RPC or other remote network request.
SpanKindServer SpanKind = 2
// Indicates that the span describes a request to some remote service.
// SpanKindClient indicates that the span describes a request to some
// remote service.
SpanKindClient SpanKind = 3
// Indicates that the span describes a producer sending a message to a broker.
// Unlike CLIENT and SERVER, there is often no direct critical path latency relationship
// between producer and consumer spans. A PRODUCER span ends when the message was accepted
// by the broker while the logical processing of the message might span a much longer time.
// SpanKindProducer indicates that the span describes a producer sending a
// message to a broker. Unlike SpanKindClient and SpanKindServer, there is
// often no direct critical path latency relationship between producer and
// consumer spans. A SpanKindProducer span ends when the message was
// accepted by the broker while the logical processing of the message might
// span a much longer time.
SpanKindProducer SpanKind = 4
// Indicates that the span describes consumer receiving a message from a broker.
// Like the PRODUCER kind, there is often no direct critical path latency relationship
// between producer and consumer spans.
// SpanKindConsumer indicates that the span describes a consumer receiving
// a message from a broker. Like SpanKindProducer, there is often no direct
// critical path latency relationship between producer and consumer spans.
SpanKindConsumer SpanKind = 5
)
// Event is a time-stamped annotation of the span, consisting of user-supplied
// text description and key-value pairs.
// SpanEvent is a time-stamped annotation of the span, consisting of
// user-supplied text description and key-value pairs.
type SpanEvent struct {
// time_unix_nano is the time the event occurred.
Time time.Time `json:"timeUnixNano,omitempty"`
@@ -369,10 +380,11 @@ func (se *SpanEvent) UnmarshalJSON(data []byte) error {
return nil
}
// A pointer from the current span to another span in the same trace or in a
// different trace. For example, this can be used in batching operations,
// where a single batch handler processes multiple requests from different
// traces or when the handler receives a request from a different project.
// SpanLink is a reference from the current span to another span in the same
// trace or in a different trace. For example, this can be used in batching
// operations, where a single batch handler processes multiple requests from
// different traces or when the handler receives a request from a different
// project.
type SpanLink struct {
// A unique identifier of a trace that this linked span is part of. The ID is a
// 16-byte array.

View File

@@ -3,17 +3,19 @@
package telemetry // import "go.opentelemetry.io/otel/trace/internal/telemetry"
// StatusCode is the status of a Span.
//
// For the semantics of status codes see
// https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/trace/api.md#set-status
type StatusCode int32
const (
// The default status.
// StatusCodeUnset is the default status.
StatusCodeUnset StatusCode = 0
// The Span has been validated by an Application developer or Operator to
// have completed successfully.
// StatusCodeOK is used when the Span has been validated by an Application
// developer or Operator to have completed successfully.
StatusCodeOK StatusCode = 1
// The Span contains an error.
// StatusCodeError is used when the Span contains an error.
StatusCodeError StatusCode = 2
)
@@ -30,7 +32,7 @@ func (s StatusCode) String() string {
return "<unknown telemetry.StatusCode>"
}
// The Status type defines a logical error model that is suitable for different
// Status defines a logical error model that is suitable for different
// programming environments, including REST APIs and RPC APIs.
type Status struct {
// A developer-facing human readable error message.

View File

@@ -71,7 +71,7 @@ func (td *Traces) UnmarshalJSON(data []byte) error {
return nil
}
// A collection of ScopeSpans from a Resource.
// ResourceSpans is a collection of ScopeSpans from a Resource.
type ResourceSpans struct {
// The resource for the spans in this message.
// If this field is not set then no resource info is known.
@@ -128,7 +128,7 @@ func (rs *ResourceSpans) UnmarshalJSON(data []byte) error {
return nil
}
// A collection of Spans produced by an InstrumentationScope.
// ScopeSpans is a collection of Spans produced by an InstrumentationScope.
type ScopeSpans struct {
// The instrumentation scope information for the spans in this message.
// Semantically when InstrumentationScope isn't set, it is equivalent with

View File

@@ -316,7 +316,7 @@ func (v Value) String() string {
case ValueKindBool:
return strconv.FormatBool(v.asBool())
case ValueKindBytes:
return fmt.Sprint(v.asBytes())
return string(v.asBytes())
case ValueKindMap:
return fmt.Sprint(v.asMap())
case ValueKindSlice:

View File

@@ -95,6 +95,8 @@ var autoInstEnabled = new(bool)
// tracerProvider return a noopTracerProvider if autoEnabled is false,
// otherwise it will return a TracerProvider from the sdk package used in
// auto-instrumentation.
//
//go:noinline
func (noopSpan) tracerProvider(autoEnabled *bool) TracerProvider {
if *autoEnabled {
return newAutoTracerProvider()

15
vendor/modules.txt vendored
View File

@@ -2,8 +2,8 @@
## explicit; go 1.23.0
github.com/ClickHouse/ch-go/compress
github.com/ClickHouse/ch-go/proto
# github.com/ClickHouse/clickhouse-go/v2 v2.34.0
## explicit; go 1.22.0
# github.com/ClickHouse/clickhouse-go/v2 v2.37.1
## explicit; go 1.23.0
github.com/ClickHouse/clickhouse-go/v2
github.com/ClickHouse/clickhouse-go/v2/contributors
github.com/ClickHouse/clickhouse-go/v2/ext
@@ -202,15 +202,14 @@ github.com/slok/go-http-metrics/middleware/std
# github.com/tsenart/vegeta/v12 v12.12.0
## explicit; go 1.22
github.com/tsenart/vegeta/v12/lib
# go.opentelemetry.io/otel v1.35.0
## explicit; go 1.22.0
# go.opentelemetry.io/otel v1.36.0
## explicit; go 1.23.0
go.opentelemetry.io/otel/attribute
go.opentelemetry.io/otel/attribute/internal
go.opentelemetry.io/otel/codes
go.opentelemetry.io/otel/internal
go.opentelemetry.io/otel/internal/attribute
go.opentelemetry.io/otel/semconv/v1.26.0
# go.opentelemetry.io/otel/trace v1.35.0
## explicit; go 1.22.0
# go.opentelemetry.io/otel/trace v1.36.0
## explicit; go 1.23.0
go.opentelemetry.io/otel/trace
go.opentelemetry.io/otel/trace/embedded
go.opentelemetry.io/otel/trace/internal/telemetry