mirror of
https://github.com/dolthub/dolt.git
synced 2026-02-28 10:19:56 -06:00
Merge remote-tracking branch 'origin/main' into aaron/undrop-cluster-tests
This commit is contained in:
2
.github/workflows/bump-dependency.yaml
vendored
2
.github/workflows/bump-dependency.yaml
vendored
@@ -98,7 +98,7 @@ jobs:
|
||||
- name: Set up Go 1.x
|
||||
uses: actions/setup-go@v3
|
||||
with:
|
||||
go-version: ^1.19
|
||||
go-version: ^1.21
|
||||
- name: Bump dependency
|
||||
working-directory: go
|
||||
run: |
|
||||
|
||||
2
.github/workflows/cd-release.yaml
vendored
2
.github/workflows/cd-release.yaml
vendored
@@ -48,7 +48,7 @@ jobs:
|
||||
run: |
|
||||
latest=$(git rev-parse HEAD)
|
||||
echo "commitish=$latest" >> $GITHUB_OUTPUT
|
||||
GO_BUILD_VERSION=1.19 go/utils/publishrelease/buildbinaries.sh
|
||||
GO_BUILD_VERSION=1.21 go/utils/publishrelease/buildbinaries.sh
|
||||
- name: Create Release
|
||||
id: create_release
|
||||
uses: dolthub/create-release@v1
|
||||
|
||||
2
.github/workflows/ci-bats-unix-remote.yaml
vendored
2
.github/workflows/ci-bats-unix-remote.yaml
vendored
@@ -46,7 +46,7 @@ jobs:
|
||||
- name: Setup Go 1.x
|
||||
uses: actions/setup-go@v3
|
||||
with:
|
||||
go-version: ^1.19
|
||||
go-version: ^1.21
|
||||
id: go
|
||||
- name: Setup Python 3.x
|
||||
uses: actions/setup-python@v4
|
||||
|
||||
2
.github/workflows/ci-bats-unix.yaml
vendored
2
.github/workflows/ci-bats-unix.yaml
vendored
@@ -46,7 +46,7 @@ jobs:
|
||||
- name: Setup Go 1.x
|
||||
uses: actions/setup-go@v3
|
||||
with:
|
||||
go-version: ^1.19
|
||||
go-version: ^1.21
|
||||
id: go
|
||||
- name: Setup Python 3.x
|
||||
uses: actions/setup-python@v4
|
||||
|
||||
2
.github/workflows/ci-bats-windows.yaml
vendored
2
.github/workflows/ci-bats-windows.yaml
vendored
@@ -95,7 +95,7 @@ jobs:
|
||||
- name: Setup Go 1.x
|
||||
uses: actions/setup-go@v3
|
||||
with:
|
||||
go-version: ^1.19
|
||||
go-version: ^1.21
|
||||
id: go
|
||||
- name: Setup Python 3.x
|
||||
uses: actions/setup-python@v4
|
||||
|
||||
6
.github/workflows/ci-check-repo.yaml
vendored
6
.github/workflows/ci-check-repo.yaml
vendored
@@ -18,7 +18,7 @@ jobs:
|
||||
- name: Setup Go 1.x
|
||||
uses: actions/setup-go@v3
|
||||
with:
|
||||
go-version: ^1.19
|
||||
go-version: ^1.21
|
||||
- uses: actions/checkout@v3
|
||||
with:
|
||||
submodules: true
|
||||
@@ -69,7 +69,7 @@ jobs:
|
||||
- name: Setup Go 1.x
|
||||
uses: actions/setup-go@v3
|
||||
with:
|
||||
go-version: ^1.19
|
||||
go-version: ^1.21
|
||||
- uses: actions/checkout@v3
|
||||
with:
|
||||
ref: "main"
|
||||
@@ -98,7 +98,7 @@ jobs:
|
||||
- name: Setup Go 1.x
|
||||
uses: actions/setup-go@v3
|
||||
with:
|
||||
go-version: ^1.19
|
||||
go-version: ^1.21
|
||||
- uses: actions/checkout@v3
|
||||
with:
|
||||
ref: ${{ github.event.pull_request.head.ref || github.ref }}
|
||||
|
||||
@@ -23,7 +23,7 @@ jobs:
|
||||
- name: Setup Go 1.x
|
||||
uses: actions/setup-go@v3
|
||||
with:
|
||||
go-version: ^1.19
|
||||
go-version: ^1.21
|
||||
id: go
|
||||
- uses: actions/checkout@v3
|
||||
- uses: actions/setup-node@v3
|
||||
|
||||
@@ -23,7 +23,7 @@ jobs:
|
||||
- name: Set up Go 1.x
|
||||
uses: actions/setup-go@v3
|
||||
with:
|
||||
go-version: ^1.19
|
||||
go-version: ^1.21
|
||||
id: go
|
||||
- uses: actions/checkout@v3
|
||||
- name: Test All
|
||||
|
||||
4
.github/workflows/ci-go-tests.yaml
vendored
4
.github/workflows/ci-go-tests.yaml
vendored
@@ -26,7 +26,7 @@ jobs:
|
||||
- name: Set up Go 1.x
|
||||
uses: actions/setup-go@v3
|
||||
with:
|
||||
go-version: ^1.19
|
||||
go-version: ^1.21
|
||||
id: go
|
||||
- uses: actions/checkout@v3
|
||||
- name: Test All
|
||||
@@ -73,7 +73,7 @@ jobs:
|
||||
- name: Set up Go 1.x
|
||||
uses: actions/setup-go@v3
|
||||
with:
|
||||
go-version: ^1.19
|
||||
go-version: ^1.21
|
||||
id: go
|
||||
- uses: actions/checkout@v3
|
||||
- name: Test All
|
||||
|
||||
@@ -26,7 +26,7 @@ jobs:
|
||||
- name: Setup Go 1.x
|
||||
uses: actions/setup-go@v3
|
||||
with:
|
||||
go-version: ^1.19
|
||||
go-version: ^1.21
|
||||
id: go
|
||||
- name: Create CI Bin
|
||||
run: |
|
||||
|
||||
2
.github/workflows/import-perf.yaml
vendored
2
.github/workflows/import-perf.yaml
vendored
@@ -20,7 +20,7 @@ jobs:
|
||||
id: go
|
||||
uses: actions/setup-go@v3
|
||||
with:
|
||||
go-version: ^1.19
|
||||
go-version: ^1.21
|
||||
|
||||
- name: Dolt version
|
||||
id: version
|
||||
|
||||
2
.github/workflows/merge-perf.yaml
vendored
2
.github/workflows/merge-perf.yaml
vendored
@@ -20,7 +20,7 @@ jobs:
|
||||
id: go
|
||||
uses: actions/setup-go@v3
|
||||
with:
|
||||
go-version: ^1.19
|
||||
go-version: ^1.21
|
||||
|
||||
- name: Setup Python 3.x
|
||||
uses: actions/setup-python@v4
|
||||
|
||||
2
.github/workflows/sysbench-perf.yaml
vendored
2
.github/workflows/sysbench-perf.yaml
vendored
@@ -20,7 +20,7 @@ jobs:
|
||||
id: go
|
||||
uses: actions/setup-go@v3
|
||||
with:
|
||||
go-version: ^1.19
|
||||
go-version: ^1.21
|
||||
|
||||
- name: Dolt version
|
||||
id: version
|
||||
|
||||
@@ -170,7 +170,7 @@ func NewSqlEngine(
|
||||
|
||||
// Load the branch control permissions, if they exist
|
||||
var bcController *branch_control.Controller
|
||||
if bcController, err = branch_control.LoadData(config.BranchCtrlFilePath, config.DoltCfgDirPath); err != nil {
|
||||
if bcController, err = branch_control.LoadData(ctx, config.BranchCtrlFilePath, config.DoltCfgDirPath); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
config.ClusterController.HookBranchControlPersistence(bcController, mrEnv.FileSystem())
|
||||
|
||||
@@ -64,7 +64,7 @@ import (
|
||||
)
|
||||
|
||||
const (
|
||||
Version = "1.18.0"
|
||||
Version = "1.18.1"
|
||||
)
|
||||
|
||||
var dumpDocsCommand = &commands.DumpDocsCmd{}
|
||||
|
||||
@@ -59,7 +59,7 @@ require (
|
||||
github.com/cespare/xxhash v1.1.0
|
||||
github.com/creasty/defaults v1.6.0
|
||||
github.com/dolthub/flatbuffers/v23 v23.3.3-dh.2
|
||||
github.com/dolthub/go-mysql-server v0.17.1-0.20231005070324-2a440c2c2bf1
|
||||
github.com/dolthub/go-mysql-server v0.17.1-0.20231005225621-4cc2f2ca38ce
|
||||
github.com/dolthub/swiss v0.1.0
|
||||
github.com/goccy/go-json v0.10.2
|
||||
github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510
|
||||
@@ -160,4 +160,4 @@ require (
|
||||
|
||||
replace github.com/dolthub/dolt/go/gen/proto/dolt/services/eventsapi => ./gen/proto/dolt/services/eventsapi
|
||||
|
||||
go 1.19
|
||||
go 1.21
|
||||
|
||||
10
go/go.sum
10
go/go.sum
@@ -121,6 +121,7 @@ github.com/aws/smithy-go v1.6.0/go.mod h1:SObp3lf9smib00L/v3U2eAKG8FyQ7iLrJnQiAm
|
||||
github.com/bcicen/jstream v1.0.0 h1:gOi+Sn9mHrpePlENynPKA6Dra/PjLaIpqrTevhfvLAA=
|
||||
github.com/bcicen/jstream v1.0.0/go.mod h1:9ielPxqFry7Y4Tg3j4BfjPocfJ3TbsRtXOAYXYmRuAQ=
|
||||
github.com/benbjohnson/clock v1.1.0 h1:Q92kusRqC1XV2MjkWETPvjJVqKetz1OzxZB7mHJLju8=
|
||||
github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA=
|
||||
github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q=
|
||||
github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8=
|
||||
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
|
||||
@@ -180,8 +181,8 @@ github.com/dolthub/fslock v0.0.3 h1:iLMpUIvJKMKm92+N1fmHVdxJP5NdyDK5bK7z7Ba2s2U=
|
||||
github.com/dolthub/fslock v0.0.3/go.mod h1:QWql+P17oAAMLnL4HGB5tiovtDuAjdDTPbuqx7bYfa0=
|
||||
github.com/dolthub/go-icu-regex v0.0.0-20230524105445-af7e7991c97e h1:kPsT4a47cw1+y/N5SSCkma7FhAPw7KeGmD6c9PBZW9Y=
|
||||
github.com/dolthub/go-icu-regex v0.0.0-20230524105445-af7e7991c97e/go.mod h1:KPUcpx070QOfJK1gNe0zx4pA5sicIK1GMikIGLKC168=
|
||||
github.com/dolthub/go-mysql-server v0.17.1-0.20231005070324-2a440c2c2bf1 h1:LfafO1oBejVN0UVV2YcQYnPjnXJLW3VnI532Qzf/8e4=
|
||||
github.com/dolthub/go-mysql-server v0.17.1-0.20231005070324-2a440c2c2bf1/go.mod h1:KWMgEn//scUZuT8vHeHdMWrvCvcE7FrizZ0HKB08zrU=
|
||||
github.com/dolthub/go-mysql-server v0.17.1-0.20231005225621-4cc2f2ca38ce h1:x27xw9s1Odf4MVCmjU8bqqOEY5wQY4YL18tOPUtJp1k=
|
||||
github.com/dolthub/go-mysql-server v0.17.1-0.20231005225621-4cc2f2ca38ce/go.mod h1:KWMgEn//scUZuT8vHeHdMWrvCvcE7FrizZ0HKB08zrU=
|
||||
github.com/dolthub/ishell v0.0.0-20221214210346-d7db0b066488 h1:0HHu0GWJH0N6a6keStrHhUAK5/o9LVfkh44pvsV4514=
|
||||
github.com/dolthub/ishell v0.0.0-20221214210346-d7db0b066488/go.mod h1:ehexgi1mPxRTk0Mok/pADALuHbvATulTh6gzr7NzZto=
|
||||
github.com/dolthub/jsonpath v0.0.2-0.20230525180605-8dc13778fd72 h1:NfWmngMi1CYUWU4Ix8wM+USEhjc+mhPlT9JUR/anvbQ=
|
||||
@@ -332,6 +333,7 @@ github.com/google/martian v2.1.0+incompatible h1:/CP5g8u/VJHijgedC/Legn3BAbAaWPg
|
||||
github.com/google/martian v2.1.0+incompatible/go.mod h1:9I4somxYTbIHy5NJKHRl3wXiIaQGbYVAs8BPL6v8lEs=
|
||||
github.com/google/martian/v3 v3.0.0/go.mod h1:y5Zk1BBys9G+gd6Jrk0W3cC1+ELVxBWuIGO+w/tUAp0=
|
||||
github.com/google/martian/v3 v3.3.2 h1:IqNFLAmvJOgVlpdEBiQbDc2EwKW77amAycfTuWKdfvw=
|
||||
github.com/google/martian/v3 v3.3.2/go.mod h1:oBOf6HBosgwRXnUGWUB05QECsc6uvmMiJ3+6W4l/CUk=
|
||||
github.com/google/pprof v0.0.0-20181206194817-3ea8567a2e57/go.mod h1:zfwlbNMJ+OItoe0UupaVj+oy1omPYYDuagoSzA8v9mc=
|
||||
github.com/google/pprof v0.0.0-20190515194954-54271f7e092f/go.mod h1:zfwlbNMJ+OItoe0UupaVj+oy1omPYYDuagoSzA8v9mc=
|
||||
github.com/google/pprof v0.0.0-20191218002539-d4f498aebedc/go.mod h1:ZgVRPoUq/hfqzAqh7sHMqb3I9Rq5C59dIz2SbBwJ4eM=
|
||||
@@ -655,6 +657,7 @@ github.com/tealeg/xlsx v1.0.5/go.mod h1:btRS8dz54TDnvKNosuAqxrM1QgN1udgk9O34bDCn
|
||||
github.com/tetratelabs/wazero v1.1.0 h1:EByoAhC+QcYpwSZJSs/aV0uokxPwBgKxfiokSUwAknQ=
|
||||
github.com/tetratelabs/wazero v1.1.0/go.mod h1:wYx2gNRg8/WihJfSDxA1TIL8H+GkfLYm+bIfbblu9VQ=
|
||||
github.com/thepudds/swisstable v0.0.0-20221011152303-9c77dc657777 h1:5u+6YWU2faS+Sr/x8j9yalMpSDUkatNOZWXV3wMUCGQ=
|
||||
github.com/thepudds/swisstable v0.0.0-20221011152303-9c77dc657777/go.mod h1:4af3KxEsswy6aTzsTcwa8QZUSh4V+80oHdp1QX9uJHA=
|
||||
github.com/tidwall/gjson v1.14.2/go.mod h1:/wbyibRr2FHMks5tjHJ5F8dMZh3AcwJEMf5vlfC0lxk=
|
||||
github.com/tidwall/gjson v1.14.4 h1:uo0p8EbA09J7RQaflQ1aBRffTR7xedD2bcIVSYxLnkM=
|
||||
github.com/tidwall/gjson v1.14.4/go.mod h1:/wbyibRr2FHMks5tjHJ5F8dMZh3AcwJEMf5vlfC0lxk=
|
||||
@@ -693,6 +696,7 @@ github.com/yusufpapurcu/wmi v1.2.2 h1:KBNDSne4vP5mbSWnJbO+51IMOXJB67QiYCSBrubbPR
|
||||
github.com/yusufpapurcu/wmi v1.2.2/go.mod h1:SBZ9tNy3G9/m5Oi98Zks0QjeHVDvuK0qfxQmPyzfmi0=
|
||||
github.com/zeebo/assert v1.1.0/go.mod h1:Pq9JiuJQpG8JLJdtkwrJESF0Foym2/D9XMU5ciN/wJ0=
|
||||
github.com/zeebo/assert v1.3.0 h1:g7C04CbJuIDKNPFHmsk4hwZDO5O+kntRxzaUoNXj+IQ=
|
||||
github.com/zeebo/assert v1.3.0/go.mod h1:Pq9JiuJQpG8JLJdtkwrJESF0Foym2/D9XMU5ciN/wJ0=
|
||||
github.com/zeebo/blake3 v0.2.3 h1:TFoLXsjeXqRNFxSbk35Dk4YtszE/MQQGK10BH4ptoTg=
|
||||
github.com/zeebo/blake3 v0.2.3/go.mod h1:mjJjZpnsyIVtVgTOSpJ9vmRE4wgDeyt2HU3qXvvKCaQ=
|
||||
github.com/zeebo/pcg v1.0.1 h1:lyqfGeWiv4ahac6ttHs+I5hwtH/+1mrhlCtVNQM2kHo=
|
||||
@@ -726,6 +730,7 @@ go.uber.org/atomic v1.5.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ=
|
||||
go.uber.org/atomic v1.7.0 h1:ADUqmZGgLDDfbSL9ZmPxKTybcoEYHgpYfELNoN+7hsw=
|
||||
go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
|
||||
go.uber.org/goleak v1.1.11 h1:wy28qYRKZgnJTxGxvye5/wgWr1EKjmUDGYox5mGlRlI=
|
||||
go.uber.org/goleak v1.1.11/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ=
|
||||
go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0=
|
||||
go.uber.org/multierr v1.3.0/go.mod h1:VgVr7evmIr6uPjLBxg28wmKNXyqE9akIJ5XnfpiKl+4=
|
||||
go.uber.org/multierr v1.6.0 h1:y6IPFStTAIT5Ytl7/XYmHvzXQ7S3g/IeZW9hyZ5thw4=
|
||||
@@ -1018,6 +1023,7 @@ golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2/go.mod h1:K8+ghG5WaK9qNq
|
||||
gonum.org/v1/gonum v0.0.0-20180816165407-929014505bf4/go.mod h1:Y+Yx5eoAFn32cQvJDxZx5Dpnq+c3wtXuadVZAcxbbBo=
|
||||
gonum.org/v1/gonum v0.8.2/go.mod h1:oe/vMfY3deqTw+1EZJhuvEW2iwGF1bW9wwu7XCu0+v0=
|
||||
gonum.org/v1/gonum v0.11.0 h1:f1IJhK4Km5tBJmaiJXtk/PkL4cdVX6J+tGiM187uT5E=
|
||||
gonum.org/v1/gonum v0.11.0/go.mod h1:fSG4YDCxxUZQJ7rKsQrj0gMOg00Il0Z96/qMA4bVQhA=
|
||||
gonum.org/v1/netlib v0.0.0-20190313105609-8cb42192e0e0/go.mod h1:wa6Ws7BG/ESfp6dHfk7C6KdzKA7wR7u/rKwOGE66zvw=
|
||||
gonum.org/v1/plot v0.0.0-20190515093506-e2840ee46a6b/go.mod h1:Wt8AAjI+ypCyYX3nZBvf6cAIx93T+c/OS2HFAYskSZc=
|
||||
gonum.org/v1/plot v0.11.0 h1:z2ZkgNqW34d0oYUzd80RRlc0L9kWtenqK4kflZG1lGc=
|
||||
|
||||
@@ -62,7 +62,7 @@ type Controller struct {
|
||||
|
||||
// A callback which we call when we successfully save new data.
|
||||
// The new data will be available in |Serialized|.
|
||||
SavedCallback func()
|
||||
SavedCallback func(context.Context)
|
||||
|
||||
branchControlFilePath string
|
||||
doltConfigDirPath string
|
||||
@@ -71,8 +71,8 @@ type Controller struct {
|
||||
// CreateDefaultController returns a default controller, which only has a single entry allowing all users to have write
|
||||
// permissions on all branches (only the super user has admin, if a super user has been set). This is equivalent to
|
||||
// passing empty strings to LoadData.
|
||||
func CreateDefaultController() *Controller {
|
||||
controller, err := LoadData("", "")
|
||||
func CreateDefaultController(ctx context.Context) *Controller {
|
||||
controller, err := LoadData(ctx, "", "")
|
||||
if err != nil {
|
||||
panic(err) // should never happen
|
||||
}
|
||||
@@ -81,7 +81,7 @@ func CreateDefaultController() *Controller {
|
||||
|
||||
// LoadData loads the data from the given location and returns a controller. Returns the default controller if the
|
||||
// `branchControlFilePath` is empty.
|
||||
func LoadData(branchControlFilePath string, doltConfigDirPath string) (*Controller, error) {
|
||||
func LoadData(ctx context.Context, branchControlFilePath string, doltConfigDirPath string) (*Controller, error) {
|
||||
accessTbl := newAccess()
|
||||
controller := &Controller{
|
||||
Access: accessTbl,
|
||||
@@ -102,14 +102,14 @@ func LoadData(branchControlFilePath string, doltConfigDirPath string) (*Controll
|
||||
return nil, err
|
||||
}
|
||||
|
||||
err = controller.LoadData(data /* isFirstLoad */, true)
|
||||
err = controller.LoadData(ctx, data /* isFirstLoad */, true)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to deserialize config at '%s': %w", branchControlFilePath, err)
|
||||
}
|
||||
return controller, nil
|
||||
}
|
||||
|
||||
func (controller *Controller) LoadData(data []byte, isFirstLoad bool) error {
|
||||
func (controller *Controller) LoadData(ctx context.Context, data []byte, isFirstLoad bool) error {
|
||||
controller.Access.RWMutex.Lock()
|
||||
defer controller.Access.RWMutex.Unlock()
|
||||
|
||||
@@ -119,7 +119,7 @@ func (controller *Controller) LoadData(data []byte, isFirstLoad bool) error {
|
||||
controller.Access.insertDefaultRow()
|
||||
controller.Serialized.Store(&data)
|
||||
if controller.SavedCallback != nil {
|
||||
controller.SavedCallback()
|
||||
controller.SavedCallback(ctx)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
@@ -148,18 +148,18 @@ func (controller *Controller) LoadData(data []byte, isFirstLoad bool) error {
|
||||
// The Deserialize functions acquire write locks, so we don't acquire them here
|
||||
if err = controller.Access.Deserialize(access); err != nil {
|
||||
// TODO: More principaled rollback. Hopefully this does not fail.
|
||||
controller.LoadData(*rollback, isFirstLoad)
|
||||
controller.LoadData(ctx, *rollback, isFirstLoad)
|
||||
return err
|
||||
}
|
||||
if err = controller.Namespace.Deserialize(namespace); err != nil {
|
||||
// TODO: More principaled rollback. Hopefully this does not fail.
|
||||
controller.LoadData(*rollback, isFirstLoad)
|
||||
controller.LoadData(ctx, *rollback, isFirstLoad)
|
||||
return err
|
||||
}
|
||||
|
||||
controller.Serialized.Store(&data)
|
||||
if controller.SavedCallback != nil {
|
||||
controller.SavedCallback()
|
||||
controller.SavedCallback(ctx)
|
||||
}
|
||||
|
||||
return nil
|
||||
@@ -178,10 +178,10 @@ func SaveData(ctx context.Context) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
return controller.SaveData(branchAwareSession.GetFileSystem())
|
||||
return controller.SaveData(ctx, branchAwareSession.GetFileSystem())
|
||||
}
|
||||
|
||||
func (controller *Controller) SaveData(fs filesys.Filesys) error {
|
||||
func (controller *Controller) SaveData(ctx context.Context, fs filesys.Filesys) error {
|
||||
// If we never set a save location then we just return
|
||||
if len(controller.branchControlFilePath) == 0 {
|
||||
return nil
|
||||
@@ -227,7 +227,7 @@ func (controller *Controller) SaveData(fs filesys.Filesys) error {
|
||||
|
||||
controller.Serialized.Store(&data)
|
||||
if controller.SavedCallback != nil {
|
||||
controller.SavedCallback()
|
||||
controller.SavedCallback(ctx)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -1212,6 +1212,12 @@ func (ddb *DoltDB) NewTagAtCommit(ctx context.Context, tagRef ref.DoltRef, c *Co
|
||||
return err
|
||||
}
|
||||
|
||||
// This should be used as the cancel cause for the context passed to a
|
||||
// ReplicationStatusController Wait function when the wait has been canceled
|
||||
// because it timed out. Seeing this error from a passed in context may be used
|
||||
// by some agents to open circuit breakers or tune timeouts.
|
||||
var ErrReplicationWaitFailed = errors.New("replication wait failed")
|
||||
|
||||
type ReplicationStatusController struct {
|
||||
// A slice of funcs which can be called to wait for the replication
|
||||
// associated with a commithook to complete. Must return if the
|
||||
|
||||
@@ -16,6 +16,8 @@ package cluster
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
@@ -24,6 +26,7 @@ import (
|
||||
|
||||
replicationapi "github.com/dolthub/dolt/go/gen/proto/dolt/services/replicationapi/v1alpha1"
|
||||
"github.com/dolthub/dolt/go/libraries/doltcore/branch_control"
|
||||
"github.com/dolthub/dolt/go/libraries/doltcore/doltdb"
|
||||
)
|
||||
|
||||
type branchControlReplication struct {
|
||||
@@ -52,11 +55,14 @@ type branchControlReplica struct {
|
||||
|
||||
waitNotify func()
|
||||
|
||||
progressNotifier ProgressNotifier
|
||||
fastFailReplicationWait bool
|
||||
|
||||
mu sync.Mutex
|
||||
cond *sync.Cond
|
||||
}
|
||||
|
||||
func (r *branchControlReplica) UpdateContents(contents []byte, version uint32) {
|
||||
func (r *branchControlReplica) UpdateContents(contents []byte, version uint32) func(context.Context) error {
|
||||
r.mu.Lock()
|
||||
defer r.mu.Unlock()
|
||||
r.contents = contents
|
||||
@@ -64,6 +70,20 @@ func (r *branchControlReplica) UpdateContents(contents []byte, version uint32) {
|
||||
r.nextAttempt = time.Time{}
|
||||
r.backoff.Reset()
|
||||
r.cond.Broadcast()
|
||||
if r.fastFailReplicationWait {
|
||||
remote := r.client.remote
|
||||
return func(ctx context.Context) error {
|
||||
return fmt.Errorf("circuit breaker for replication to %s/dolt_branch_control is open. this branch control update did not necessarily replicate successfully.", remote)
|
||||
}
|
||||
}
|
||||
w := r.progressNotifier.Wait()
|
||||
return func(ctx context.Context) error {
|
||||
err := w(ctx)
|
||||
if err != nil && errors.Is(err, doltdb.ErrReplicationWaitFailed) {
|
||||
r.setFastFailReplicationWait(true)
|
||||
}
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
func (r *branchControlReplica) Run() {
|
||||
@@ -94,6 +114,8 @@ func (r *branchControlReplica) Run() {
|
||||
// in order to avoid deadlock.
|
||||
contents := r.contents
|
||||
client := r.client.client
|
||||
version := r.version
|
||||
attempt := r.progressNotifier.BeginAttempt()
|
||||
r.mu.Unlock()
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
|
||||
_, err := client.UpdateBranchControl(ctx, &replicationapi.UpdateBranchControlRequest{
|
||||
@@ -102,6 +124,7 @@ func (r *branchControlReplica) Run() {
|
||||
cancel()
|
||||
r.mu.Lock()
|
||||
if err != nil {
|
||||
r.progressNotifier.RecordFailure(attempt)
|
||||
r.lgr.Warnf("branchControlReplica[%s]: error replicating branch control permissions. backing off. %v", r.client.remote, err)
|
||||
r.nextAttempt = time.Now().Add(r.backoff.NextBackOff())
|
||||
next := r.nextAttempt
|
||||
@@ -116,13 +139,19 @@ func (r *branchControlReplica) Run() {
|
||||
}()
|
||||
continue
|
||||
}
|
||||
r.progressNotifier.RecordSuccess(attempt)
|
||||
r.fastFailReplicationWait = false
|
||||
r.backoff.Reset()
|
||||
r.lgr.Debugf("branchControlReplica[%s]: sucessfully replicated branch control permissions.", r.client.remote)
|
||||
r.replicatedVersion = r.version
|
||||
r.replicatedVersion = version
|
||||
}
|
||||
}
|
||||
|
||||
func (r *branchControlReplica) wait() {
|
||||
if r.isCaughtUp() {
|
||||
attempt := r.progressNotifier.BeginAttempt()
|
||||
r.progressNotifier.RecordSuccess(attempt)
|
||||
}
|
||||
r.cond.Wait()
|
||||
}
|
||||
|
||||
@@ -130,6 +159,12 @@ func (r *branchControlReplica) isCaughtUp() bool {
|
||||
return r.version == r.replicatedVersion || r.role != RolePrimary
|
||||
}
|
||||
|
||||
func (r *branchControlReplica) setFastFailReplicationWait(v bool) {
|
||||
r.mu.Lock()
|
||||
defer r.mu.Unlock()
|
||||
r.fastFailReplicationWait = v
|
||||
}
|
||||
|
||||
func (r *branchControlReplica) setWaitNotify(notify func()) bool {
|
||||
r.mu.Lock()
|
||||
defer r.mu.Unlock()
|
||||
@@ -155,6 +190,7 @@ func (r *branchControlReplica) setRole(role Role) {
|
||||
defer r.mu.Unlock()
|
||||
r.role = role
|
||||
r.nextAttempt = time.Time{}
|
||||
r.fastFailReplicationWait = false
|
||||
r.cond.Broadcast()
|
||||
}
|
||||
|
||||
@@ -162,9 +198,9 @@ func (p *branchControlReplication) setRole(role Role) {
|
||||
if role == RolePrimary {
|
||||
cur := p.bcController.Serialized.Load()
|
||||
if cur == nil {
|
||||
p.UpdateBranchControlContents([]byte{})
|
||||
p.UpdateBranchControlContents(context.Background(), []byte{}, nil)
|
||||
} else {
|
||||
p.UpdateBranchControlContents(*cur)
|
||||
p.UpdateBranchControlContents(context.Background(), *cur, nil)
|
||||
}
|
||||
}
|
||||
for _, r := range p.replicas {
|
||||
@@ -191,13 +227,24 @@ func (p *branchControlReplication) GracefulStop() {
|
||||
}
|
||||
}
|
||||
|
||||
func (p *branchControlReplication) UpdateBranchControlContents(contents []byte) {
|
||||
func (p *branchControlReplication) UpdateBranchControlContents(ctx context.Context, contents []byte, rsc *doltdb.ReplicationStatusController) {
|
||||
p.mu.Lock()
|
||||
defer p.mu.Unlock()
|
||||
p.current = contents
|
||||
p.version += 1
|
||||
for _, r := range p.replicas {
|
||||
r.UpdateContents(p.current, p.version)
|
||||
|
||||
var j int
|
||||
if rsc != nil {
|
||||
j = len(rsc.Wait)
|
||||
rsc.Wait = append(rsc.Wait, make([]func(ctx context.Context) error, len(p.replicas))...)
|
||||
rsc.NotifyWaitFailed = append(rsc.NotifyWaitFailed, make([]func(), len(p.replicas))...)
|
||||
}
|
||||
for i, r := range p.replicas {
|
||||
w := r.UpdateContents(p.current, p.version)
|
||||
if rsc != nil {
|
||||
rsc.Wait[i+j] = w
|
||||
rsc.NotifyWaitFailed[i+j] = func() {}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -56,13 +56,8 @@ type commithook struct {
|
||||
// commithooks are caught up with replicating to the standby.
|
||||
waitNotify func()
|
||||
|
||||
// This is a slice of notification channels maintained by the
|
||||
// commithook. The semantics are:
|
||||
// 1. All accesses to |successChs| must happen with |mu| held.
|
||||
// 2. There may be |0| or more channels in the slice.
|
||||
// 3. As a reader, if |successChs| is non-empty, you should just read a value, for example, |successChs[0]| and use it. All entries will be closed at the same time. If |successChs| is empty when you need a channel, you should add one to it.
|
||||
// 4. If you read a channel out of |successChs|, that channel will be closed on the next successful replication attempt. It will not be closed before then.
|
||||
successChs []chan struct{}
|
||||
// |mu| must be held for all accesses.
|
||||
progressNotifier ProgressNotifier
|
||||
|
||||
// If this is true, the waitF returned by Execute() will fast fail if
|
||||
// we are not already caught up, instead of blocking on a successCh
|
||||
@@ -164,12 +159,23 @@ func (h *commithook) replicate(ctx context.Context) {
|
||||
h.waitNotify()
|
||||
}
|
||||
caughtUp := h.isCaughtUp()
|
||||
if len(h.successChs) != 0 && caughtUp {
|
||||
for _, ch := range h.successChs {
|
||||
close(ch)
|
||||
}
|
||||
h.successChs = nil
|
||||
if caughtUp {
|
||||
h.fastFailReplicationWait = false
|
||||
|
||||
// If we ABA on h.nextHead, so that it gets set
|
||||
// to one value, then another, then back to the
|
||||
// first, then the setter for B can make an
|
||||
// outstanding wait while we are replicating
|
||||
// the first set to A. We can be back to
|
||||
// nextHead == A by the time we complete
|
||||
// replicating the first A and we will have the
|
||||
// outstanding waiter for the work for B but we
|
||||
// will be fully quiesced. We make sure to
|
||||
// notify B of success here.
|
||||
if h.progressNotifier.HasWaiters() {
|
||||
a := h.progressNotifier.BeginAttempt()
|
||||
h.progressNotifier.RecordSuccess(a)
|
||||
}
|
||||
}
|
||||
if shouldHeartbeat {
|
||||
h.attemptHeartbeat(ctx)
|
||||
@@ -256,13 +262,8 @@ func (h *commithook) attemptReplicate(ctx context.Context) {
|
||||
}
|
||||
h.cancelReplicate = nil
|
||||
}()
|
||||
successChs := h.successChs
|
||||
h.successChs = nil
|
||||
defer func() {
|
||||
if len(successChs) != 0 {
|
||||
h.successChs = append(h.successChs, successChs...)
|
||||
}
|
||||
}()
|
||||
attempt := h.progressNotifier.BeginAttempt()
|
||||
defer h.progressNotifier.RecordFailure(attempt)
|
||||
h.mu.Unlock()
|
||||
|
||||
if destDB == nil {
|
||||
@@ -313,12 +314,7 @@ func (h *commithook) attemptReplicate(ctx context.Context) {
|
||||
h.lastPushedHead = toPush
|
||||
h.lastSuccess = incomingTime
|
||||
h.nextPushAttempt = time.Time{}
|
||||
if len(successChs) != 0 {
|
||||
for _, ch := range successChs {
|
||||
close(ch)
|
||||
}
|
||||
successChs = nil
|
||||
}
|
||||
h.progressNotifier.RecordSuccess(attempt)
|
||||
} else {
|
||||
h.currentError = new(string)
|
||||
*h.currentError = fmt.Sprintf("failed to commit chunks on destDB: %v", err)
|
||||
@@ -466,18 +462,7 @@ func (h *commithook) Execute(ctx context.Context, ds datas.Dataset, db datas.Dat
|
||||
return fmt.Errorf("circuit breaker for replication to %s/%s is open. this commit did not necessarily replicate successfully.", h.remotename, h.dbname)
|
||||
}
|
||||
} else {
|
||||
if len(h.successChs) == 0 {
|
||||
h.successChs = append(h.successChs, make(chan struct{}))
|
||||
}
|
||||
successCh := h.successChs[0]
|
||||
waitF = func(ctx context.Context) error {
|
||||
select {
|
||||
case <-successCh:
|
||||
return nil
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
}
|
||||
}
|
||||
waitF = h.progressNotifier.Wait()
|
||||
}
|
||||
}
|
||||
return waitF, nil
|
||||
|
||||
@@ -735,10 +735,14 @@ func (c *Controller) HookBranchControlPersistence(controller *branch_control.Con
|
||||
}
|
||||
c.bcReplication.setRole(c.role)
|
||||
|
||||
controller.SavedCallback = func() {
|
||||
controller.SavedCallback = func(ctx context.Context) {
|
||||
contents := controller.Serialized.Load()
|
||||
if contents != nil {
|
||||
c.bcReplication.UpdateBranchControlContents(*contents)
|
||||
var rsc doltdb.ReplicationStatusController
|
||||
c.bcReplication.UpdateBranchControlContents(ctx, *contents, &rsc)
|
||||
if sqlCtx, ok := ctx.(*sql.Context); ok {
|
||||
dsess.WaitForReplicationController(sqlCtx, rsc)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -16,6 +16,8 @@ package cluster
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
@@ -25,6 +27,8 @@ import (
|
||||
"github.com/sirupsen/logrus"
|
||||
|
||||
replicationapi "github.com/dolthub/dolt/go/gen/proto/dolt/services/replicationapi/v1alpha1"
|
||||
"github.com/dolthub/dolt/go/libraries/doltcore/doltdb"
|
||||
"github.com/dolthub/dolt/go/libraries/doltcore/sqle/dsess"
|
||||
)
|
||||
|
||||
type MySQLDbPersister interface {
|
||||
@@ -58,11 +62,14 @@ type mysqlDbReplica struct {
|
||||
|
||||
waitNotify func()
|
||||
|
||||
progressNotifier ProgressNotifier
|
||||
fastFailReplicationWait bool
|
||||
|
||||
mu sync.Mutex
|
||||
cond *sync.Cond
|
||||
}
|
||||
|
||||
func (r *mysqlDbReplica) UpdateMySQLDb(ctx context.Context, contents []byte, version uint32) error {
|
||||
func (r *mysqlDbReplica) UpdateMySQLDb(ctx context.Context, contents []byte, version uint32) func(context.Context) error {
|
||||
r.mu.Lock()
|
||||
defer r.mu.Unlock()
|
||||
r.lgr.Infof("mysqlDbReplica got new contents at version %d", version)
|
||||
@@ -71,7 +78,28 @@ func (r *mysqlDbReplica) UpdateMySQLDb(ctx context.Context, contents []byte, ver
|
||||
r.nextAttempt = time.Time{}
|
||||
r.backoff.Reset()
|
||||
r.cond.Broadcast()
|
||||
return nil
|
||||
|
||||
if r.fastFailReplicationWait {
|
||||
remote := r.client.remote
|
||||
return func(ctx context.Context) error {
|
||||
return fmt.Errorf("circuit breaker for replication to %s/mysql is open. this update to users and grants did not necessarily replicate successfully.", remote)
|
||||
}
|
||||
} else {
|
||||
w := r.progressNotifier.Wait()
|
||||
return func(ctx context.Context) error {
|
||||
err := w(ctx)
|
||||
if err != nil && errors.Is(err, doltdb.ErrReplicationWaitFailed) {
|
||||
r.setFastFailReplicationWait(true)
|
||||
}
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (r *mysqlDbReplica) setFastFailReplicationWait(v bool) {
|
||||
r.mu.Lock()
|
||||
defer r.mu.Unlock()
|
||||
r.fastFailReplicationWait = v
|
||||
}
|
||||
|
||||
func (r *mysqlDbReplica) Run() {
|
||||
@@ -103,6 +131,8 @@ func (r *mysqlDbReplica) Run() {
|
||||
// release this lock in order to avoid deadlock.
|
||||
contents := r.contents
|
||||
client := r.client.client
|
||||
version := r.version
|
||||
attempt := r.progressNotifier.BeginAttempt()
|
||||
r.mu.Unlock()
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
|
||||
_, err := client.UpdateUsersAndGrants(ctx, &replicationapi.UpdateUsersAndGrantsRequest{
|
||||
@@ -111,6 +141,7 @@ func (r *mysqlDbReplica) Run() {
|
||||
cancel()
|
||||
r.mu.Lock()
|
||||
if err != nil {
|
||||
r.progressNotifier.RecordFailure(attempt)
|
||||
r.lgr.Warnf("mysqlDbReplica[%s]: error replicating users and grants. backing off. %v", r.client.remote, err)
|
||||
r.nextAttempt = time.Now().Add(r.backoff.NextBackOff())
|
||||
next := r.nextAttempt
|
||||
@@ -125,12 +156,15 @@ func (r *mysqlDbReplica) Run() {
|
||||
}()
|
||||
continue
|
||||
}
|
||||
r.progressNotifier.RecordSuccess(attempt)
|
||||
r.fastFailReplicationWait = false
|
||||
r.backoff.Reset()
|
||||
r.lgr.Debugf("mysqlDbReplica[%s]: sucessfully replicated users and grants at version %d.", r.client.remote, r.version)
|
||||
r.lgr.Debugf("mysqlDbReplica[%s]: sucessfully replicated users and grants at version %d.", r.client.remote, version)
|
||||
r.replicatedVersion = version
|
||||
} else {
|
||||
r.lgr.Debugf("mysqlDbReplica[%s]: not replicating empty users and grants at version %d.", r.client.remote, r.version)
|
||||
r.replicatedVersion = r.version
|
||||
}
|
||||
r.replicatedVersion = r.version
|
||||
}
|
||||
}
|
||||
|
||||
@@ -156,6 +190,10 @@ func (r *mysqlDbReplica) wait() {
|
||||
r.waitNotify()
|
||||
}
|
||||
r.lgr.Infof("mysqlDbReplica waiting...")
|
||||
if r.isCaughtUp() {
|
||||
attempt := r.progressNotifier.BeginAttempt()
|
||||
r.progressNotifier.RecordSuccess(attempt)
|
||||
}
|
||||
r.cond.Wait()
|
||||
}
|
||||
|
||||
@@ -210,26 +248,33 @@ func (p *replicatingMySQLDbPersister) GracefulStop() {
|
||||
}
|
||||
|
||||
func (p *replicatingMySQLDbPersister) Persist(ctx *sql.Context, data []byte) error {
|
||||
p.mu.Lock()
|
||||
err := p.base.Persist(ctx, data)
|
||||
if err == nil {
|
||||
p.mu.Lock()
|
||||
p.current = data
|
||||
p.version += 1
|
||||
defer p.mu.Unlock()
|
||||
for _, r := range p.replicas {
|
||||
r.UpdateMySQLDb(ctx, p.current, p.version)
|
||||
var rsc doltdb.ReplicationStatusController
|
||||
rsc.Wait = make([]func(context.Context) error, len(p.replicas))
|
||||
rsc.NotifyWaitFailed = make([]func(), len(p.replicas))
|
||||
for i, r := range p.replicas {
|
||||
rsc.Wait[i] = r.UpdateMySQLDb(ctx, p.current, p.version)
|
||||
rsc.NotifyWaitFailed[i] = func() {}
|
||||
}
|
||||
p.mu.Unlock()
|
||||
dsess.WaitForReplicationController(ctx, rsc)
|
||||
} else {
|
||||
p.mu.Unlock()
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
func (p *replicatingMySQLDbPersister) LoadData(ctx context.Context) ([]byte, error) {
|
||||
p.mu.Lock()
|
||||
defer p.mu.Unlock()
|
||||
ret, err := p.base.LoadData(ctx)
|
||||
if err == nil {
|
||||
p.mu.Lock()
|
||||
p.current = ret
|
||||
p.version += 1
|
||||
defer p.mu.Unlock()
|
||||
for _, r := range p.replicas {
|
||||
r.UpdateMySQLDb(ctx, p.current, p.version)
|
||||
}
|
||||
|
||||
86
go/libraries/doltcore/sqle/cluster/progress_notifier.go
Normal file
86
go/libraries/doltcore/sqle/cluster/progress_notifier.go
Normal file
@@ -0,0 +1,86 @@
|
||||
// Copyright 2023 Dolthub, Inc.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package cluster
|
||||
|
||||
import (
|
||||
"context"
|
||||
)
|
||||
|
||||
// ProgressNotifier is a way for clients to be notified of successful progress
|
||||
// which a monotonic agent makes after they register to receive notification by
|
||||
// taking a callback with |Wait|.
|
||||
//
|
||||
// As a monotonic agent implementation, you should call |BeginAttempt()|
|
||||
// anytime you begin attempting to do work. If that work succeeds, you can call
|
||||
// |RecordSuccess|. If the work fails, you must call |RecordFailure|.
|
||||
// |RecordSuccess| makes a later call to |RecordFailure| with the same
|
||||
// |*Attempt| a no-op, so that the call to |RecordFailure| can be safely placed
|
||||
// in a defer block.
|
||||
//
|
||||
// As a client of the agent, you can call |Wait|, which will return a function
|
||||
// you can call to block until either progress was made since the call to
|
||||
// |Wait| or the provided |context.Context| is |Done|. If progress is made, the
|
||||
// function returned from |Wait| returns |nil|. If the context was canceled, it
|
||||
// returns |context.Cause(ctx)|.
|
||||
//
|
||||
// All accesses to ProgressNotifier should be externally synchronized except
|
||||
// for calling into the functions returned by |Wait|.
|
||||
type ProgressNotifier struct {
|
||||
chs []chan struct{}
|
||||
}
|
||||
|
||||
type Attempt struct {
|
||||
chs []chan struct{}
|
||||
}
|
||||
|
||||
func (p *ProgressNotifier) HasWaiters() bool {
|
||||
return len(p.chs) > 0
|
||||
}
|
||||
|
||||
func (p *ProgressNotifier) BeginAttempt() *Attempt {
|
||||
chs := p.chs
|
||||
p.chs = nil
|
||||
return &Attempt{chs: chs}
|
||||
}
|
||||
|
||||
func (*ProgressNotifier) RecordSuccess(a *Attempt) {
|
||||
if a.chs != nil {
|
||||
for i := range a.chs {
|
||||
close(a.chs[i])
|
||||
}
|
||||
a.chs = nil
|
||||
}
|
||||
}
|
||||
|
||||
func (p *ProgressNotifier) RecordFailure(a *Attempt) {
|
||||
if a.chs != nil {
|
||||
p.chs = append(p.chs, a.chs...)
|
||||
}
|
||||
}
|
||||
|
||||
func (p *ProgressNotifier) Wait() func(context.Context) error {
|
||||
if len(p.chs) == 0 {
|
||||
p.chs = append(p.chs, make(chan struct{}))
|
||||
}
|
||||
ch := p.chs[0]
|
||||
return func(ctx context.Context) error {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return context.Cause(ctx)
|
||||
case <-ch:
|
||||
return nil
|
||||
}
|
||||
}
|
||||
}
|
||||
67
go/libraries/doltcore/sqle/cluster/progress_notifier_test.go
Normal file
67
go/libraries/doltcore/sqle/cluster/progress_notifier_test.go
Normal file
@@ -0,0 +1,67 @@
|
||||
// Copyright 2023 Dolthub, Inc.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package cluster
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
func TestProgressNotifier(t *testing.T) {
|
||||
t.Run("WaitBeforeBeginAttempt", func(t *testing.T) {
|
||||
p := new(ProgressNotifier)
|
||||
f := p.Wait()
|
||||
a := p.BeginAttempt()
|
||||
p.RecordSuccess(a)
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond)
|
||||
t.Cleanup(cancel)
|
||||
assert.NoError(t, f(ctx))
|
||||
})
|
||||
|
||||
t.Run("WaitAfterBeginAttempt", func(t *testing.T) {
|
||||
p := new(ProgressNotifier)
|
||||
a := p.BeginAttempt()
|
||||
f := p.Wait()
|
||||
p.RecordSuccess(a)
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond)
|
||||
t.Cleanup(cancel)
|
||||
assert.ErrorIs(t, f(ctx), context.DeadlineExceeded)
|
||||
|
||||
a = p.BeginAttempt()
|
||||
p.RecordSuccess(a)
|
||||
ctx, cancel = context.WithTimeout(context.Background(), 10*time.Millisecond)
|
||||
t.Cleanup(cancel)
|
||||
assert.NoError(t, f(ctx))
|
||||
})
|
||||
|
||||
t.Run("WaitBeforeAttemptFailure", func(t *testing.T) {
|
||||
p := new(ProgressNotifier)
|
||||
f := p.Wait()
|
||||
a := p.BeginAttempt()
|
||||
p.RecordFailure(a)
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond)
|
||||
t.Cleanup(cancel)
|
||||
assert.ErrorIs(t, f(ctx), context.DeadlineExceeded)
|
||||
|
||||
a = p.BeginAttempt()
|
||||
p.RecordSuccess(a)
|
||||
ctx, cancel = context.WithTimeout(context.Background(), 10*time.Millisecond)
|
||||
t.Cleanup(cancel)
|
||||
assert.NoError(t, f(ctx))
|
||||
})
|
||||
}
|
||||
@@ -28,8 +28,8 @@ import (
|
||||
)
|
||||
|
||||
type BranchControlPersistence interface {
|
||||
LoadData([]byte, bool) error
|
||||
SaveData(filesys.Filesys) error
|
||||
LoadData(context.Context, []byte, bool) error
|
||||
SaveData(context.Context, filesys.Filesys) error
|
||||
}
|
||||
|
||||
type replicationServiceServer struct {
|
||||
@@ -66,11 +66,16 @@ func (s *replicationServiceServer) UpdateUsersAndGrants(ctx context.Context, req
|
||||
}
|
||||
|
||||
func (s *replicationServiceServer) UpdateBranchControl(ctx context.Context, req *replicationapi.UpdateBranchControlRequest) (*replicationapi.UpdateBranchControlResponse, error) {
|
||||
err := s.branchControl.LoadData(req.SerializedContents /* isFirstLoad */, false)
|
||||
sqlCtx, err := s.ctxFactory(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
err = s.branchControl.SaveData(s.branchControlFilesys)
|
||||
|
||||
err = s.branchControl.LoadData(sqlCtx, req.SerializedContents /* isFirstLoad */, false)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
err = s.branchControl.SaveData(sqlCtx, s.branchControlFilesys)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
@@ -82,7 +82,7 @@ func DefaultSession(pro DoltDatabaseProvider) *DoltSession {
|
||||
provider: pro,
|
||||
tempTables: make(map[string][]sql.Table),
|
||||
globalsConf: config.NewMapConfig(make(map[string]string)),
|
||||
branchController: branch_control.CreateDefaultController(), // Default sessions are fine with the default controller
|
||||
branchController: branch_control.CreateDefaultController(context.TODO()), // Default sessions are fine with the default controller
|
||||
mu: &sync.Mutex{},
|
||||
fs: pro.FileSystem(),
|
||||
}
|
||||
|
||||
@@ -300,7 +300,7 @@ func WaitForReplicationController(ctx *sql.Context, rsc doltdb.ReplicationStatus
|
||||
return
|
||||
}
|
||||
|
||||
cCtx, cancel := context.WithCancel(ctx)
|
||||
cCtx, cancel := context.WithCancelCause(ctx)
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(len(rsc.Wait))
|
||||
for i, f := range rsc.Wait {
|
||||
@@ -326,11 +326,11 @@ func WaitForReplicationController(ctx *sql.Context, rsc doltdb.ReplicationStatus
|
||||
case <-time.After(time.Duration(timeoutI) * time.Second):
|
||||
// We timed out before all the waiters were done.
|
||||
// First we make certain to finalize everything.
|
||||
cancel()
|
||||
cancel(doltdb.ErrReplicationWaitFailed)
|
||||
<-done
|
||||
waitFailed = true
|
||||
case <-done:
|
||||
cancel()
|
||||
cancel(context.Canceled)
|
||||
}
|
||||
|
||||
// Just because our waiters all completed does not mean they all
|
||||
@@ -345,11 +345,13 @@ func WaitForReplicationController(ctx *sql.Context, rsc doltdb.ReplicationStatus
|
||||
}
|
||||
}
|
||||
}
|
||||
ctx.Session.Warn(&sql.Warning{
|
||||
Level: "Warning",
|
||||
Code: mysql.ERQueryTimeout,
|
||||
Message: fmt.Sprintf("Timed out replication of commit to %d out of %d replicas.", numFailed, len(rsc.Wait)),
|
||||
})
|
||||
if numFailed > 0 {
|
||||
ctx.Session.Warn(&sql.Warning{
|
||||
Level: "Warning",
|
||||
Code: mysql.ERQueryTimeout,
|
||||
Message: fmt.Sprintf("Timed out replication of commit to %d out of %d replicas.", numFailed, len(rsc.Wait)),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// doCommit commits this transaction with the write function provided. It takes the same params as DoltCommit
|
||||
|
||||
@@ -165,7 +165,7 @@ func commitScripts(dbs []string) []setup.SetupScript {
|
||||
func (d *DoltHarness) NewEngine(t *testing.T) (enginetest.QueryEngine, error) {
|
||||
initializeEngine := d.engine == nil
|
||||
if initializeEngine {
|
||||
d.branchControl = branch_control.CreateDefaultController()
|
||||
d.branchControl = branch_control.CreateDefaultController(context.Background())
|
||||
|
||||
pro := d.newProvider()
|
||||
doltProvider, ok := pro.(*sqle.DoltDatabaseProvider)
|
||||
@@ -300,7 +300,7 @@ func (d *DoltHarness) NewDatabases(names ...string) []sql.Database {
|
||||
d.engine = nil
|
||||
d.provider = nil
|
||||
|
||||
d.branchControl = branch_control.CreateDefaultController()
|
||||
d.branchControl = branch_control.CreateDefaultController(context.Background())
|
||||
|
||||
pro := d.newProvider()
|
||||
doltProvider, ok := pro.(*sqle.DoltDatabaseProvider)
|
||||
|
||||
@@ -111,7 +111,7 @@ func ExecuteSql(dEnv *env.DoltEnv, root *doltdb.RootValue, statements string) (*
|
||||
}
|
||||
|
||||
func NewTestSQLCtxWithProvider(ctx context.Context, pro dsess.DoltDatabaseProvider) *sql.Context {
|
||||
s, err := dsess.NewDoltSession(sql.NewBaseSession(), pro, config2.NewMapConfig(make(map[string]string)), branch_control.CreateDefaultController())
|
||||
s, err := dsess.NewDoltSession(sql.NewBaseSession(), pro, config2.NewMapConfig(make(map[string]string)), branch_control.CreateDefaultController(ctx))
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
FROM --platform=linux/amd64 golang:1.19-bullseye
|
||||
FROM --platform=linux/amd64 golang:1.21.2-bullseye
|
||||
|
||||
ENV DEBIAN_FRONTEND=noninteractive
|
||||
|
||||
|
||||
@@ -8,7 +8,7 @@ cd $script_dir/../..
|
||||
|
||||
[ ! -z "$GO_BUILD_VERSION" ] || (echo "Must supply GO_BUILD_VERSION"; exit 1)
|
||||
|
||||
docker run --rm -v `pwd`:/src golang:"$GO_BUILD_VERSION"-buster /bin/bash -c '
|
||||
docker run --rm -v `pwd`:/src golang:"$GO_BUILD_VERSION"-bookworm /bin/bash -c '
|
||||
set -e
|
||||
set -o pipefail
|
||||
apt-get update && apt-get install -y p7zip-full pigz
|
||||
|
||||
@@ -19,7 +19,7 @@ RUN apt update -y && \
|
||||
|
||||
# install go
|
||||
WORKDIR /root
|
||||
ENV GO_VERSION=1.19
|
||||
ENV GO_VERSION=1.21.2
|
||||
ENV GOPATH=$HOME/go
|
||||
ENV PATH=$PATH:$GOPATH/bin
|
||||
ENV PATH=$PATH:$GOPATH/bin:/usr/local/go/bin
|
||||
|
||||
@@ -59,7 +59,7 @@ RUN apt update -y && \
|
||||
|
||||
# install go
|
||||
WORKDIR /root
|
||||
ENV GO_VERSION=1.19
|
||||
ENV GO_VERSION=1.21.2
|
||||
ENV GOPATH=$HOME/go
|
||||
ENV PATH=$PATH:$GOPATH/bin
|
||||
ENV PATH=$PATH:$GOPATH/bin:/usr/local/go/bin
|
||||
|
||||
@@ -25,7 +25,7 @@ RUN apt update -y && \
|
||||
|
||||
# install go
|
||||
WORKDIR /root
|
||||
ENV GO_VERSION=1.19
|
||||
ENV GO_VERSION=1.21.2
|
||||
ENV GOPATH=$HOME/go
|
||||
ENV PATH=$PATH:$GOPATH/bin
|
||||
ENV PATH=$PATH:$GOPATH/bin:/usr/local/go/bin
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
module github.com/dolthub/dolt/integration-tests/go-sql-server-driver
|
||||
|
||||
go 1.19
|
||||
go 1.21
|
||||
|
||||
require (
|
||||
github.com/dolthub/dolt/go v0.40.4
|
||||
|
||||
@@ -188,3 +188,154 @@ tests:
|
||||
queries:
|
||||
- exec: "use repo1"
|
||||
- exec: 'insert into vals values (31)'
|
||||
- name: branch control block on write replication
|
||||
multi_repos:
|
||||
- name: server1
|
||||
with_files:
|
||||
- name: server.yaml
|
||||
contents: |
|
||||
log_level: trace
|
||||
listener:
|
||||
host: 0.0.0.0
|
||||
port: 3309
|
||||
cluster:
|
||||
standby_remotes:
|
||||
- name: standby
|
||||
remote_url_template: http://localhost:3852/{database}
|
||||
bootstrap_role: primary
|
||||
bootstrap_epoch: 1
|
||||
remotesapi:
|
||||
port: 3851
|
||||
server:
|
||||
args: ["--config", "server.yaml"]
|
||||
port: 3309
|
||||
- name: server2
|
||||
with_files:
|
||||
- name: server.yaml
|
||||
contents: |
|
||||
log_level: trace
|
||||
listener:
|
||||
host: 0.0.0.0
|
||||
port: 3310
|
||||
cluster:
|
||||
standby_remotes:
|
||||
- name: standby
|
||||
remote_url_template: http://localhost:3851/{database}
|
||||
bootstrap_role: standby
|
||||
bootstrap_epoch: 1
|
||||
remotesapi:
|
||||
port: 3852
|
||||
- name: nocluster.yaml
|
||||
contents: |
|
||||
log_level: trace
|
||||
listener:
|
||||
host: 0.0.0.0
|
||||
port: 3310
|
||||
server:
|
||||
args: ["--config", "nocluster.yaml"]
|
||||
port: 3310
|
||||
connections:
|
||||
- on: server1
|
||||
queries:
|
||||
- exec: 'create user "aaron"@"%" IDENTIFIED BY "aaronspassword"'
|
||||
- exec: 'create database repo1'
|
||||
- exec: 'use repo1'
|
||||
- exec: 'SET @@PERSIST.dolt_cluster_ack_writes_timeout_secs = 2'
|
||||
- exec: 'delete from dolt_branch_control'
|
||||
- query: 'show warnings'
|
||||
result:
|
||||
columns: ["Level", "Code", "Message"]
|
||||
rows: [["Warning", "3024", "Timed out replication of commit to 1 out of 1 replicas."]]
|
||||
- exec: 'insert into dolt_branch_control values ("repo1", "main", "aaron", "%", "admin")'
|
||||
- query: 'show warnings'
|
||||
result:
|
||||
columns: ["Level", "Code", "Message"]
|
||||
rows: [["Warning", "3024", "Timed out replication of commit to 1 out of 1 replicas."]]
|
||||
- on: server2
|
||||
restart_server:
|
||||
args: ["--config", "server.yaml"]
|
||||
- on: server1
|
||||
restart_server: {}
|
||||
- on: server1
|
||||
queries:
|
||||
- exec: 'use repo1'
|
||||
- exec: 'delete from dolt_branch_control'
|
||||
- query: 'show warnings'
|
||||
result:
|
||||
columns: ["Level", "Code", "Message"]
|
||||
rows: []
|
||||
- name: users and grants block on write replication
|
||||
multi_repos:
|
||||
- name: server1
|
||||
with_files:
|
||||
- name: server.yaml
|
||||
contents: |
|
||||
log_level: trace
|
||||
listener:
|
||||
host: 0.0.0.0
|
||||
port: 3309
|
||||
cluster:
|
||||
standby_remotes:
|
||||
- name: standby
|
||||
remote_url_template: http://localhost:3852/{database}
|
||||
bootstrap_role: primary
|
||||
bootstrap_epoch: 1
|
||||
remotesapi:
|
||||
port: 3851
|
||||
server:
|
||||
args: ["--config", "server.yaml"]
|
||||
port: 3309
|
||||
- name: server2
|
||||
with_files:
|
||||
- name: server.yaml
|
||||
contents: |
|
||||
log_level: trace
|
||||
listener:
|
||||
host: 0.0.0.0
|
||||
port: 3310
|
||||
cluster:
|
||||
standby_remotes:
|
||||
- name: standby
|
||||
remote_url_template: http://localhost:3851/{database}
|
||||
bootstrap_role: standby
|
||||
bootstrap_epoch: 1
|
||||
remotesapi:
|
||||
port: 3852
|
||||
- name: nocluster.yaml
|
||||
contents: |
|
||||
log_level: trace
|
||||
listener:
|
||||
host: 0.0.0.0
|
||||
port: 3310
|
||||
server:
|
||||
args: ["--config", "nocluster.yaml"]
|
||||
port: 3310
|
||||
connections:
|
||||
- on: server1
|
||||
queries:
|
||||
- exec: 'SET @@PERSIST.dolt_cluster_ack_writes_timeout_secs = 2'
|
||||
- exec: 'create user "aaron"@"%" IDENTIFIED BY "aaronspassword"'
|
||||
- query: 'show warnings'
|
||||
result:
|
||||
columns: ["Level", "Code", "Message"]
|
||||
rows: [["Warning", "3024", "Timed out replication of commit to 1 out of 1 replicas."]]
|
||||
- on: server2
|
||||
restart_server:
|
||||
args: ["--config", "server.yaml"]
|
||||
- on: server1
|
||||
restart_server: {}
|
||||
- on: server1
|
||||
queries:
|
||||
- exec: 'create user "brian"@"%" IDENTIFIED BY "brianspassword"'
|
||||
- query: 'show warnings'
|
||||
result:
|
||||
columns: ["Level", "Code", "Message"]
|
||||
rows: []
|
||||
- on: server2
|
||||
user: 'brian'
|
||||
password: 'brianspassword'
|
||||
queries:
|
||||
- query: "show warnings"
|
||||
result:
|
||||
columns: ["Level", "Code", "Message"]
|
||||
rows: []
|
||||
|
||||
Reference in New Issue
Block a user