Merge branch 'main' into tim/remove-server-query

This commit is contained in:
Tim Sehn
2022-10-26 17:56:44 -07:00
45 changed files with 936 additions and 551 deletions
-1
View File
@@ -1 +0,0 @@
README.md
@@ -28,7 +28,7 @@ if [ -z "$MODE" ]; then
fi
nomsFormat="ldnbf"
if [ "$NOMS_BIN_FORMAT" == "__DOLT__"]; then
if [ "$NOMS_BIN_FORMAT" == "__DOLT__" ]; then
nomsFormat="doltnbf"
fi
@@ -38,6 +38,11 @@ if [ -z "$MODE" ]; then
exit 1
fi
nomsFormat="ldnbf"
if [ "$NOMS_BIN_FORMAT" == "__DOLT__" ]; then
nomsFormat="doltnbf"
fi
# use first 8 characters of TO_VERSION to differentiate
# jobs
short=${TO_VERSION:0:8}
@@ -49,7 +54,7 @@ sleep 0.$[ ( $RANDOM % 10 ) + 1 ]s
timesuffix=`date +%s%N`
jobname="$actorShort-$timesuffix"
jobname="$actorShort-$nomsFormat-$timesuffix"
timeprefix=$(date +%Y/%m/%d)
+1 -9
View File
@@ -37,18 +37,10 @@ jobs:
env:
FILE: ${{ format('{0}/go/cmd/dolt/dolt.go', github.workspace) }}
NEW_VERSION: ${{ needs.format-version.outputs.version }}
- name: Update Dockerfile
run: sed -i -e 's/ARG DOLT_VERSION=.*/ARG DOLT_VERSION='"$NEW_VERSION"'/' "$AMD64" "$ARM64" "$SERVERAMD64" "$SERVERARM64"
env:
AMD64: ${{ format('{0}/docker/Dockerfile', github.workspace) }}
ARM64: ${{ format('{0}/docker/Dockerfile.arm64', github.workspace) }}
SERVERAMD64: ${{ format('{0}/docker/serverDockerfile', github.workspace) }}
SERVERARM64: ${{ format('{0}/docker/serverDockerfile.arm64', github.workspace) }}
NEW_VERSION: ${{ needs.format-version.outputs.version }}
- uses: EndBug/add-and-commit@v9.1.1
with:
message: ${{ format('[ga-bump-release] Update Dolt version to {0} and release v{0}', needs.format-version.outputs.version) }}
add: ${{ format('[{0}/go/cmd/dolt/dolt.go,{0}/docker/Dockerfile,{0}/docker/Dockerfile.arm64,{0}/docker/serverDockerfile,{0}/docker/serverDockerfile.arm64]', github.workspace) }}
add: ${{ format('{0}/go/cmd/dolt/dolt.go', github.workspace) }}
cwd: "."
pull: "--ff"
- name: Build Binaries
+9 -6
View File
@@ -1,6 +1,6 @@
# Dolt is Git for Data!
Dolt is a SQL database that you can fork, clone, branch, merge, push
[Dolt](https://doltdb.com) is a SQL database that you can fork, clone, branch, merge, push
and pull just like a Git repository. Connect to Dolt just like any
MySQL database to run queries or update the data using SQL
commands. Use the command line interface to import CSV files, commit
@@ -24,12 +24,15 @@ Lots of things! Dolt is a generally useful tool with countless
applications. But if you want some ideas, [here's how people are using
it so far](https://www.dolthub.com/blog/2022-07-11-dolt-case-studies/).
# Dolt CLI
Learn more about Dolt use cases, configuration and guides to use dolt on our [documentation page](https://docs.dolthub.com/introduction/what-is-dolt).
The `dolt` CLI has the same commands as `git`, with some extras.
# How to use this image
```
$ dolt
This image is for Dolt CLI, which has the same commands as `git`, with some extras. Running this image without any
arguments is equivalent to running `dolt` command locally.
```shell
$ docker run dolthub/dolt:latest
Valid commands for dolt are
init - Create an empty Dolt data repository.
status - Show the working tree status.
@@ -72,4 +75,4 @@ Valid commands for dolt are
dump - Export all tables in the working set into a file.
```
Learn more about Dolt use cases, configuration and guides to use dolt on our [documentation page](https://docs.dolthub.com/introduction/what-is-dolt).
This image is useful for creating custom Docker Image using this image as base image.
+2 -3
View File
@@ -32,7 +32,6 @@ _create_dir() {
}
check_for_dolt() {
mysql_log "Verifying dolt executable..."
local dolt_bin=$(which dolt)
if [ ! -x "$dolt_bin" ]; then
mysql_error "dolt binary executable not found"
@@ -60,14 +59,14 @@ get_config_file_path_if_exists() {
CONFIG_DIR=$1
FILE_TYPE=$2
if [ -d "$CONFIG_DIR" ]; then
mysql_log "Checking for config provided in $CONFIG_DIR"
mysql_note "Checking for config provided in $CONFIG_DIR"
number_of_files_found=( `find .$CONFIG_DIR -type f -name "*.$FILE_TYPE" | wc -l` )
if [ $number_of_files_found -gt 1 ]; then
CONFIG_PROVIDED=
mysql_warn "multiple config file found in $CONFIG_DIR, using default config"
elif [ $number_of_files_found -eq 1 ]; then
files_found=( `ls $CONFIG_DIR/*$FILE_TYPE` )
mysql_log "$files_found file is found"
mysql_note "$files_found file is found"
CONFIG_PROVIDED=$files_found
else
CONFIG_PROVIDED=
+65 -1
View File
@@ -1,6 +1,6 @@
# Dolt is Git for Data!
Dolt is a SQL database that you can fork, clone, branch, merge, push
[Dolt](https://doltdb.com) is a SQL database that you can fork, clone, branch, merge, push
and pull just like a Git repository. Connect to Dolt just like any
MySQL database to run queries or update the data using SQL
commands. Use the command line interface to import CSV files, commit
@@ -73,3 +73,67 @@ Valid commands for dolt are
```
Learn more about Dolt use cases, configuration and guides to use dolt on our [documentation page](https://docs.dolthub.com/introduction/what-is-dolt).
# How to use this image
This image is for Dolt SQL Server, which is similar to MySQL Docker Image. Running this image without any arguments
is equivalent to running `dolt sql-server --host 0.0.0.0 --port 3306` command locally. The reason for persisted host
and port is that it allows user to connect to the server inside the container from the local host system through
port-mapping.
To check out supported options for `dolt sql-server`, you can run the image with `--help` flag.
```shell
$ docker run dolthub/dolt-sql-server:latest --help
```
### Connect to the server in the container from the host system
To be able to connect to the server running in the container, we need to set up a port to connect to locally that
maps to the port in the container. The host is set to `0.0.0.0` for accepting connections to any available network
interface.
```shell
$ docker run -p 3307:3306 dolthub/dolt-sql-server:latest
```
Now, you have a running server in the container, and we can connect to it by specifying our host, 3307 for the port, and root for the user,
since that's the default user and we didn't provide any configuration when running the server.
For example, you can run mysql client to connect to the server like this:
```shell
$ mysql --host 0.0.0.0 -P 3307 -u root
```
### Define configuration for the server
You can either define server configuration as commandline arguments, or you can use yaml configuration file.
For the commandline argument definition you can simply define arguments after whole docker command.
```shell
$ docker run -p 3307:3306 dolthub/dolt-sql-server:latest -l debug --no-auto-commit
```
Or, we can mount a local directory to specific directories in the container.
The special directory for server configuration is `/etc/dolt/servercfg.d/`. You can only have one `.yaml` configuration
file in this directory. If there are multiple, the default configuration will be used. If the location of
configuration file was `/Users/jennifer/docker/server/config.yaml`, this is how to use `-v` flag which mounts
`/Users/jennifer/docker/server/` local directory to `/etc/dolt/servercfg.d/` directory in the container.
```shell
$ docker run -p 3307:3306 -v /Users/jennifer/docker/server/:/etc/dolt/servercfg.d/ dolthub/dolt-sql-server:latest
```
The Dolt configuration and data directories can be configured similarly:
- The dolt configuration directory is `/etc/dolt/doltcfg.d/`
There should be one `.json` dolt configuration file. It will replace the global dolt configuration file in the
container.
- We set the location of where data to be stored to default location at `/var/lib/dolt/` in the container.
The data directory does not need to be defined in server configuration for container, but to store the data
on the host system, it can also be mounted to this default location.
```shell
$ docker run -p 3307:3306 -v /Users/jennifer/docker/databases/:/var/lib/dolt/ dolthub/dolt-sql-server:latest
```
+12 -2
View File
@@ -232,7 +232,12 @@ func Serve(
HttpPort: *serverConfig.RemotesapiPort(),
GrpcPort: *serverConfig.RemotesapiPort(),
})
remoteSrv = remotesrv.NewServer(args)
remoteSrv, err = remotesrv.NewServer(args)
if err != nil {
lgr.Errorf("error creating remotesapi server on port %d: %v", *serverConfig.RemotesapiPort(), err)
startError = err
return
}
listeners, err := remoteSrv.Listeners()
if err != nil {
lgr.Errorf("error starting remotesapi server listeners on port %d: %v", *serverConfig.RemotesapiPort(), err)
@@ -256,7 +261,12 @@ func Serve(
args := clusterController.RemoteSrvServerArgs(remoteSrvSqlCtx, remotesrv.ServerArgs{
Logger: logrus.NewEntry(lgr),
})
clusterRemoteSrv = remotesrv.NewServer(args)
clusterRemoteSrv, err = remotesrv.NewServer(args)
if err != nil {
lgr.Errorf("error creating remotesapi server on port %d: %v", *serverConfig.RemotesapiPort(), err)
startError = err
return
}
listeners, err := clusterRemoteSrv.Listeners()
if err != nil {
lgr.Errorf("error starting remotesapi server listeners for cluster config on port %d: %v", clusterController.RemoteSrvPort(), err)
+1 -1
View File
@@ -57,7 +57,7 @@ import (
)
const (
Version = "0.50.8"
Version = "0.50.10"
)
var dumpDocsCommand = &commands.DumpDocsCmd{}
+25 -39
View File
@@ -16,6 +16,7 @@ package actions
import (
"context"
"errors"
"io"
"math"
"strconv"
@@ -61,7 +62,6 @@ func InferColumnTypesFromTableReader(ctx context.Context, rd table.ReadCloser, a
var curr, prev row.Row
i := newInferrer(rd.GetSchema(), args)
OUTER:
for j := 0; true; j++ {
var err error
@@ -130,10 +130,8 @@ func (inf *inferrer) inferColumnTypes() (*schema.ColCollection, error) {
col.TypeInfo = inferredTypes[tag]
col.Tag = schema.ReservedTagMin + tag
col.Constraints = []schema.ColConstraint{schema.NotNullConstraint{}}
if inf.nullable.Contains(tag) {
col.Constraints = []schema.ColConstraint(nil)
}
// for large imports, it is possible to miss all the null values, so we cannot accurately add not null constraint
col.Constraints = []schema.ColConstraint(nil)
cols = append(cols, col)
return false, nil
@@ -218,32 +216,27 @@ func leastPermissiveNumericType(strVal string, floatThreshold float64) (ti typei
return ti
}
if strings.Contains(strVal, "-") {
i, err := strconv.ParseInt(strVal, 10, 64)
if err != nil {
return typeinfo.UnknownType
}
if i >= math.MinInt32 && i <= math.MaxInt32 {
return typeinfo.Int32Type
} else {
return typeinfo.Int64Type
}
// always parse as signed int
i, err := strconv.ParseInt(strVal, 10, 64)
// use string for out of range
if errors.Is(err, strconv.ErrRange) {
return typeinfo.StringDefaultType
}
if err != nil {
return typeinfo.UnknownType
}
// handle leading zero case
if len(strVal) > 1 && strVal[0] == '0' {
return typeinfo.StringDefaultType
}
if i >= math.MinInt32 && i <= math.MaxInt32 {
return typeinfo.Int32Type
} else {
ui, err := strconv.ParseUint(strVal, 10, 64)
if err != nil {
return typeinfo.UnknownType
}
// handle leading zero case
if len(strVal) > 1 && strVal[0] == '0' {
return typeinfo.StringDefaultType
}
if ui <= math.MaxUint32 {
return typeinfo.Uint32Type
} else {
return typeinfo.Uint64Type
}
return typeinfo.Int64Type
}
}
@@ -286,14 +279,13 @@ func chronoTypes() []typeinfo.TypeInfo {
func numericTypes() []typeinfo.TypeInfo {
// prefer:
// ints over floats
// unsigned over signed
// smaller over larger
return []typeinfo.TypeInfo{
//typeinfo.Uint8Type,
//typeinfo.Uint16Type,
//typeinfo.Uint24Type,
typeinfo.Uint32Type,
typeinfo.Uint64Type,
//typeinfo.Uint32Type,
//typeinfo.Uint64Type,
//typeinfo.Int8Type,
//typeinfo.Int16Type,
@@ -398,12 +390,6 @@ func findCommonNumericType(nums typeInfoSet) typeinfo.TypeInfo {
typeinfo.Int24Type,
typeinfo.Int16Type,
typeinfo.Int8Type,
typeinfo.Uint64Type,
typeinfo.Uint32Type,
typeinfo.Uint24Type,
typeinfo.Uint16Type,
typeinfo.Uint8Type,
}
for _, numType := range mostToLeast {
if setHasType(nums, numType) {
+14 -31
View File
@@ -49,14 +49,14 @@ func TestLeastPermissiveType(t *testing.T) {
{"lower bool", "true", 0.0, typeinfo.BoolType},
{"upper bool", "FALSE", 0.0, typeinfo.BoolType},
{"yes", "yes", 0.0, typeinfo.StringDefaultType},
{"one", "1", 0.0, typeinfo.Uint32Type},
{"one", "1", 0.0, typeinfo.Int32Type},
{"negative one", "-1", 0.0, typeinfo.Int32Type},
{"negative one point 0", "-1.0", 0.0, typeinfo.Float32Type},
{"negative one point 0 with FT of 0.1", "-1.0", 0.1, typeinfo.Int32Type},
{"negative one point one with FT of 0.1", "-1.1", 0.1, typeinfo.Float32Type},
{"negative one point 999 with FT of 1.0", "-1.999", 1.0, typeinfo.Int32Type},
{"zero point zero zero zero zero", "0.0000", 0.0, typeinfo.Float32Type},
{"max int", strconv.FormatUint(math.MaxInt64, 10), 0.0, typeinfo.Uint64Type},
{"max int", strconv.FormatUint(math.MaxInt64, 10), 0.0, typeinfo.Int64Type},
{"bigger than max int", strconv.FormatUint(math.MaxUint64, 10) + "0", 0.0, typeinfo.StringDefaultType},
}
@@ -75,7 +75,7 @@ func TestLeastPermissiveNumericType(t *testing.T) {
floatThreshold float64
expType typeinfo.TypeInfo
}{
{"zero", "0", 0.0, typeinfo.Uint32Type},
{"zero", "0", 0.0, typeinfo.Int32Type},
{"zero float", "0.0", 0.0, typeinfo.Float32Type},
{"zero float with floatThreshold of 0.1", "0.0", 0.1, typeinfo.Int32Type},
{"negative float", "-1.3451234", 0.0, typeinfo.Float32Type},
@@ -85,8 +85,8 @@ func TestLeastPermissiveNumericType(t *testing.T) {
{"all zeroes", "0000", 0.0, typeinfo.StringDefaultType},
{"leading zeroes", "01", 0.0, typeinfo.StringDefaultType},
{"negative int", "-1234", 0.0, typeinfo.Int32Type},
{"fits in uint64 but not int64", strconv.FormatUint(math.MaxUint64, 10), 0.0, typeinfo.Uint64Type},
{"negative less than math.MinInt64", "-" + strconv.FormatUint(math.MaxUint64, 10), 0.0, typeinfo.UnknownType},
{"fits in uint64 but not int64", strconv.FormatUint(math.MaxUint64, 10), 0.0, typeinfo.StringDefaultType},
{"negative less than math.MinInt64", "-" + strconv.FormatUint(math.MaxUint64, 10), 0.0, typeinfo.StringDefaultType},
{"math.MinInt64", strconv.FormatInt(math.MinInt64, 10), 0.0, typeinfo.Int64Type},
}
@@ -142,14 +142,6 @@ func testFindCommonType(t *testing.T) {
},
expType: typeinfo.Int64Type,
},
{
name: "all unsigned ints",
inferSet: typeInfoSet{
typeinfo.Uint32Type: {},
typeinfo.Uint64Type: {},
},
expType: typeinfo.Uint64Type,
},
{
name: "all floats",
inferSet: typeInfoSet{
@@ -159,35 +151,31 @@ func testFindCommonType(t *testing.T) {
expType: typeinfo.Float64Type,
},
{
name: "32 bit ints and uints",
name: "32 bit ints",
inferSet: typeInfoSet{
typeinfo.Int32Type: {},
typeinfo.Uint32Type: {},
typeinfo.Int32Type: {},
},
expType: typeinfo.Int32Type,
},
{
name: "64 bit ints and uints",
name: "64 bit ints",
inferSet: typeInfoSet{
typeinfo.Int64Type: {},
typeinfo.Uint64Type: {},
typeinfo.Int64Type: {},
},
expType: typeinfo.Int64Type,
},
{
name: "32 bit ints, uints, and floats",
name: "32 bit ints and floats",
inferSet: typeInfoSet{
typeinfo.Int32Type: {},
typeinfo.Uint32Type: {},
typeinfo.Float32Type: {},
},
expType: typeinfo.Float32Type,
},
{
name: "64 bit ints, uints, and floats",
name: "64 bit ints and floats",
inferSet: typeInfoSet{
typeinfo.Int64Type: {},
typeinfo.Uint64Type: {},
typeinfo.Float64Type: {},
},
expType: typeinfo.Float64Type,
@@ -228,11 +216,6 @@ func testFindCommonType(t *testing.T) {
func testFindCommonTypeFromSingleType(t *testing.T) {
allTypes := []typeinfo.TypeInfo{
typeinfo.Uint8Type,
typeinfo.Uint16Type,
typeinfo.Uint24Type,
typeinfo.Uint32Type,
typeinfo.Uint64Type,
typeinfo.Int8Type,
typeinfo.Int16Type,
typeinfo.Int24Type,
@@ -388,7 +371,7 @@ func TestInferSchema(t *testing.T) {
},
map[string]typeinfo.TypeInfo{
"int": typeinfo.Int32Type,
"uint": typeinfo.Uint64Type,
"uint": typeinfo.StringDefaultType,
"uuid": typeinfo.UuidType,
"float": typeinfo.Float32Type,
"bool": typeinfo.BoolType,
@@ -404,7 +387,7 @@ func TestInferSchema(t *testing.T) {
floatThreshold: 0,
},
map[string]typeinfo.TypeInfo{
"mix": typeinfo.Uint64Type,
"mix": typeinfo.StringDefaultType,
"uuid": typeinfo.UuidType,
},
nil,
@@ -500,7 +483,7 @@ func TestInferSchema(t *testing.T) {
err = allCols.Iter(func(tag uint64, col schema.Column) (stop bool, err error) {
idx := schema.IndexOfConstraint(col.Constraints, schema.NotNullConstraintType)
assert.True(t, idx == -1 == test.nullableCols.Contains(col.Name), "%s unexpected nullability", col.Name)
assert.True(t, idx == -1, "%s unexpected not null constraint", col.Name)
return false, nil
})
require.NoError(t, err)
+23 -4
View File
@@ -74,6 +74,16 @@ func migrateWorkingSet(ctx context.Context, brRef ref.BranchRef, wsRef ref.Worki
return err
}
err = validateRootValue(ctx, oldHeadRoot, oldWs.WorkingRoot(), wr)
if err != nil {
return err
}
err = validateRootValue(ctx, oldHeadRoot, oldWs.StagedRoot(), sr)
if err != nil {
return err
}
newWs := doltdb.EmptyWorkingSet(wsRef).WithWorkingRoot(wr).WithStagedRoot(sr)
return new.UpdateWorkingSet(ctx, wsRef, newWs, hash.Hash{}, oldWs.Meta())
@@ -179,7 +189,7 @@ func migrateCommit(ctx context.Context, oldCm *doltdb.Commit, new *doltdb.DoltDB
// validate root after we flush the ChunkStore to facilitate
// investigating failed migrations
if err = validateRootValue(ctx, oldRoot, mRoot); err != nil {
if err = validateRootValue(ctx, oldParentRoot, oldRoot, mRoot); err != nil {
return err
}
@@ -481,10 +491,19 @@ func migrateSchema(ctx context.Context, tableName string, existing schema.Schema
}
}
if patched {
return schema.SchemaFromCols(schema.NewColCollection(cols...))
if !patched {
return existing, nil
}
return existing, nil
sch, err := schema.SchemaFromCols(schema.NewColCollection(cols...))
if err != nil {
return nil, err
}
if err = sch.SetPkOrdinals(existing.GetPkOrdinals()); err != nil {
return nil, err
}
return sch, nil
}
func migrateIndexSet(
+28 -3
View File
@@ -52,7 +52,7 @@ func validateBranchMapping(ctx context.Context, old, new *doltdb.DoltDB) error {
return nil
}
func validateRootValue(ctx context.Context, old, new *doltdb.RootValue) error {
func validateRootValue(ctx context.Context, oldParent, old, new *doltdb.RootValue) error {
names, err := old.GetTableNames(ctx)
if err != nil {
return err
@@ -67,6 +67,25 @@ func validateRootValue(ctx context.Context, old, new *doltdb.RootValue) error {
return fmt.Errorf("expected to find table %s in root value (%s)", name, h.String())
}
// Skip tables that haven't changed
op, ok, err := oldParent.GetTable(ctx, name)
if err != nil {
return err
}
if ok {
oldHash, err := o.HashOf()
if err != nil {
return err
}
oldParentHash, err := op.HashOf()
if err != nil {
return err
}
if oldHash.Equal(oldParentHash) {
continue
}
}
n, ok, err := new.GetTable(ctx, name)
if err != nil {
return err
@@ -197,11 +216,17 @@ func validateSchema(existing schema.Schema) error {
func nomsKindsFromQueryTypes(qt query.Type) []types.NomsKind {
switch qt {
case query.Type_UINT8, query.Type_UINT16, query.Type_UINT24,
case query.Type_UINT8:
return []types.NomsKind{types.UintKind, types.BoolKind}
case query.Type_UINT16, query.Type_UINT24,
query.Type_UINT32, query.Type_UINT64:
return []types.NomsKind{types.UintKind}
case query.Type_INT8, query.Type_INT16, query.Type_INT24,
case query.Type_INT8:
return []types.NomsKind{types.IntKind, types.BoolKind}
case query.Type_INT16, query.Type_INT24,
query.Type_INT32, query.Type_INT64:
return []types.NomsKind{types.IntKind}
+36 -17
View File
@@ -47,10 +47,11 @@ type RemoteChunkStore struct {
bucket string
fs filesys.Filesys
lgr *logrus.Entry
sealer Sealer
remotesapi.UnimplementedChunkStoreServiceServer
}
func NewHttpFSBackedChunkStore(lgr *logrus.Entry, httpHost string, csCache DBCache, fs filesys.Filesys) *RemoteChunkStore {
func NewHttpFSBackedChunkStore(lgr *logrus.Entry, httpHost string, csCache DBCache, fs filesys.Filesys, sealer Sealer) *RemoteChunkStore {
return &RemoteChunkStore{
HttpHost: httpHost,
csCache: csCache,
@@ -59,6 +60,7 @@ func NewHttpFSBackedChunkStore(lgr *logrus.Entry, httpHost string, csCache DBCac
lgr: lgr.WithFields(logrus.Fields{
"service": "dolt.services.remotesapi.v1alpha1.ChunkStoreServiceServer",
}),
sealer: sealer,
}
}
@@ -177,10 +179,15 @@ func (rs *RemoteChunkStore) GetDownloadLocations(ctx context.Context, req *remot
logger.Println("Failed to sign request", err)
return nil, err
}
preurl := url.String()
url, err = rs.sealer.Seal(url)
if err != nil {
logger.Println("Failed to seal request", err)
return nil, err
}
logger.Println("The URL is", preurl, "the ranges are", ranges, "sealed url", url.String())
logger.Println("The URL is", url)
getRange := &remotesapi.HttpGetRange{Url: url, Ranges: ranges}
getRange := &remotesapi.HttpGetRange{Url: url.String(), Ranges: ranges}
locs = append(locs, &remotesapi.DownloadLoc{Location: &remotesapi.DownloadLoc_HttpGetRange{HttpGetRange: getRange}})
}
@@ -242,10 +249,15 @@ func (rs *RemoteChunkStore) StreamDownloadLocations(stream remotesapi.ChunkStore
logger.Println("Failed to sign request", err)
return err
}
preurl := url.String()
url, err = rs.sealer.Seal(url)
if err != nil {
logger.Println("Failed to seal request", err)
return err
}
logger.Println("The URL is", preurl, "the ranges are", ranges, "sealed url", url.String())
logger.Println("The URL is", url)
getRange := &remotesapi.HttpGetRange{Url: url, Ranges: ranges}
getRange := &remotesapi.HttpGetRange{Url: url.String(), Ranges: ranges}
locs = append(locs, &remotesapi.DownloadLoc{Location: &remotesapi.DownloadLoc_HttpGetRange{HttpGetRange: getRange}})
}
@@ -271,13 +283,13 @@ func (rs *RemoteChunkStore) getHost(md metadata.MD) string {
return host
}
func (rs *RemoteChunkStore) getDownloadUrl(logger *logrus.Entry, md metadata.MD, path string) (string, error) {
func (rs *RemoteChunkStore) getDownloadUrl(logger *logrus.Entry, md metadata.MD, path string) (*url.URL, error) {
host := rs.getHost(md)
return (&url.URL{
return &url.URL{
Scheme: "http",
Host: host,
Path: path,
}).String(), nil
}, nil
}
func parseTableFileDetails(req *remotesapi.GetUploadLocsRequest) []*remotesapi.TableFileDetails {
@@ -323,32 +335,35 @@ func (rs *RemoteChunkStore) GetUploadLocations(ctx context.Context, req *remotes
for _, tfd := range tfds {
h := hash.New(tfd.Id)
url, err := rs.getUploadUrl(logger, md, repoPath, tfd)
if err != nil {
return nil, status.Error(codes.Internal, "Failed to get upload Url.")
}
url, err = rs.sealer.Seal(url)
if err != nil {
return nil, status.Error(codes.Internal, "Failed to seal upload Url.")
}
loc := &remotesapi.UploadLoc_HttpPost{HttpPost: &remotesapi.HttpPostTableFile{Url: url}}
loc := &remotesapi.UploadLoc_HttpPost{HttpPost: &remotesapi.HttpPostTableFile{Url: url.String()}}
locs = append(locs, &remotesapi.UploadLoc{TableFileHash: h[:], Location: loc})
logger.Printf("sending upload location for chunk %s: %s", h.String(), url)
logger.Printf("sending upload location for chunk %s: %s", h.String(), url.String())
}
return &remotesapi.GetUploadLocsResponse{Locs: locs}, nil
}
func (rs *RemoteChunkStore) getUploadUrl(logger *logrus.Entry, md metadata.MD, repoPath string, tfd *remotesapi.TableFileDetails) (string, error) {
func (rs *RemoteChunkStore) getUploadUrl(logger *logrus.Entry, md metadata.MD, repoPath string, tfd *remotesapi.TableFileDetails) (*url.URL, error) {
fileID := hash.New(tfd.Id).String()
params := url.Values{}
params.Add("num_chunks", strconv.Itoa(int(tfd.NumChunks)))
params.Add("content_length", strconv.Itoa(int(tfd.ContentLength)))
params.Add("content_hash", base64.RawURLEncoding.EncodeToString(tfd.ContentHash))
return (&url.URL{
return &url.URL{
Scheme: "http",
Host: rs.getHost(md),
Path: fmt.Sprintf("%s/%s", repoPath, fileID),
RawQuery: params.Encode(),
}).String(), nil
}, nil
}
func (rs *RemoteChunkStore) Rebase(ctx context.Context, req *remotesapi.RebaseRequest) (*remotesapi.RebaseResponse, error) {
@@ -536,11 +551,15 @@ func getTableFileInfo(
if err != nil {
return nil, status.Error(codes.Internal, "failed to get download url for "+t.FileID())
}
url, err = rs.sealer.Seal(url)
if err != nil {
return nil, status.Error(codes.Internal, "failed to get seal download url for "+t.FileID())
}
appendixTableFileInfo = append(appendixTableFileInfo, &remotesapi.TableFileInfo{
FileId: t.FileID(),
NumChunks: uint32(t.NumChunks()),
Url: url,
Url: url.String(),
})
}
return appendixTableFileInfo, nil
+16 -7
View File
@@ -46,9 +46,10 @@ type filehandler struct {
fs filesys.Filesys
readOnly bool
lgr *logrus.Entry
sealer Sealer
}
func newFileHandler(lgr *logrus.Entry, dbCache DBCache, fs filesys.Filesys, readOnly bool) filehandler {
func newFileHandler(lgr *logrus.Entry, dbCache DBCache, fs filesys.Filesys, readOnly bool, sealer Sealer) filehandler {
return filehandler{
dbCache,
fs,
@@ -56,6 +57,7 @@ func newFileHandler(lgr *logrus.Entry, dbCache DBCache, fs filesys.Filesys, read
lgr.WithFields(logrus.Fields{
"service": "dolt.services.remotesapi.v1alpha1.HttpFileServer",
}),
sealer,
}
}
@@ -63,6 +65,15 @@ func (fh filehandler) ServeHTTP(respWr http.ResponseWriter, req *http.Request) {
logger := getReqLogger(fh.lgr, req.Method+"_"+req.RequestURI)
defer func() { logger.Println("finished") }()
var err error
req.URL, err = fh.sealer.Unseal(req.URL)
if err != nil {
logger.Printf("could not unseal incoming request URL: %s", err.Error())
respWr.WriteHeader(http.StatusBadRequest)
return
}
logger.Printf("unsealed url %s", req.URL.String())
path := strings.TrimLeft(req.URL.Path, "/")
statusCode := http.StatusMethodNotAllowed
@@ -92,7 +103,7 @@ func (fh filehandler) ServeHTTP(respWr http.ResponseWriter, req *http.Request) {
respWr.WriteHeader(http.StatusInternalServerError)
return
}
statusCode = readTableFile(logger, abs, respWr, req)
statusCode = readTableFile(logger, abs, respWr, req.Header.Get("Range"))
case http.MethodPost, http.MethodPut:
if fh.readOnly {
@@ -157,15 +168,13 @@ func (fh filehandler) ServeHTTP(respWr http.ResponseWriter, req *http.Request) {
}
}
func readTableFile(logger *logrus.Entry, path string, respWr http.ResponseWriter, req *http.Request) int {
rangeStr := req.Header.Get("Range")
func readTableFile(logger *logrus.Entry, path string, respWr http.ResponseWriter, rangeStr string) int {
var r io.ReadCloser
var readSize int64
var fileErr error
{
if rangeStr == "" {
logger.Println("going to read entire file")
logger.Println("going to read entire file", path)
r, readSize, fileErr = getFileReader(path)
} else {
offset, length, err := offsetAndLenFromRange(rangeStr)
@@ -173,7 +182,7 @@ func readTableFile(logger *logrus.Entry, path string, respWr http.ResponseWriter
logger.Println(err.Error())
return http.StatusBadRequest
}
logger.Printf("going to read file at offset %d, length %d", offset, length)
logger.Printf("going to read file %s at offset %d, length %d", path, offset, length)
readSize = length
r, fileErr = getFileReaderAt(path, offset, length)
}
+179
View File
@@ -0,0 +1,179 @@
// Copyright 2022 Dolthub, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package remotesrv
import (
"crypto/aes"
"crypto/cipher"
"crypto/rand"
"encoding/base64"
"errors"
"fmt"
"net/url"
"strconv"
"strings"
"time"
)
// Interface to seal requests to the HTTP server so that they cannot be forged.
// The gRPC server seals URLs and the HTTP server unseals them.
type Sealer interface {
Seal(*url.URL) (*url.URL, error)
Unseal(*url.URL) (*url.URL, error)
}
var _ Sealer = identitySealer{}
type identitySealer struct {
}
func (identitySealer) Seal(u *url.URL) (*url.URL, error) {
return u, nil
}
func (identitySealer) Unseal(u *url.URL) (*url.URL, error) {
return u, nil
}
// Seals a URL by encrypting its Path and Query components and passing those in
// a base64 encoded query parameter. Adds a not before timestamp (nbf) and an
// expiration timestamp (exp) as query parameters. Encrypts the URL with
// AES-256 GCM and adds the nbf and exp parameters as authenticated data.
type singleSymmetricKeySealer struct {
privateKeyBytes []byte
}
func NewSingleSymmetricKeySealer() (Sealer, error) {
var key [32]byte
_, err := rand.Read(key[:])
if err != nil {
return nil, err
}
return singleSymmetricKeySealer{privateKeyBytes: key[:]}, nil
}
func (s singleSymmetricKeySealer) Seal(u *url.URL) (*url.URL, error) {
requestURI := (&url.URL{
Path: u.EscapedPath(),
RawQuery: u.RawQuery,
}).String()
nbf := time.Now().Add(-10 * time.Second)
exp := time.Now().Add(15 * time.Minute)
nbfStr := strconv.FormatInt(nbf.UnixMilli(), 10)
expStr := strconv.FormatInt(exp.UnixMilli(), 10)
var nonceBytes [12]byte
_, err := rand.Read(nonceBytes[:])
if err != nil {
return nil, err
}
nonceStr := base64.RawURLEncoding.EncodeToString(nonceBytes[:])
block, err := aes.NewCipher(s.privateKeyBytes)
if err != nil {
return nil, fmt.Errorf("internal error: error making aes cipher with key: %w", err)
}
aesgcm, err := cipher.NewGCM(block)
if err != nil {
return nil, fmt.Errorf("internal error: error making gcm mode opener with key: %w", err)
}
reqBytes := aesgcm.Seal(nil, nonceBytes[:], []byte(requestURI), []byte(nbfStr+":"+expStr))
reqStr := base64.RawURLEncoding.EncodeToString(reqBytes)
ret := *u
ret.Path = "/single_symmetric_key_sealed_request/" + u.EscapedPath()
ret.RawQuery = url.Values(map[string][]string{
"req": []string{reqStr},
"nbf": []string{strconv.FormatInt(nbf.UnixMilli(), 10)},
"exp": []string{strconv.FormatInt(exp.UnixMilli(), 10)},
"nonce": []string{nonceStr},
}).Encode()
return &ret, nil
}
func (s singleSymmetricKeySealer) Unseal(u *url.URL) (*url.URL, error) {
if !strings.HasPrefix(u.Path, "/single_symmetric_key_sealed_request/") {
return nil, errors.New("bad request: cannot unseal URL whose path does not start with /single_symmetric_key_sealed_request/")
}
q := u.Query()
if !q.Has("nbf") {
return nil, errors.New("bad request: cannot unseal URL which does not include an nbf")
}
if !q.Has("exp") {
return nil, errors.New("bad request: cannot unseal URL which does not include an exp")
}
if !q.Has("nonce") {
return nil, errors.New("bad request: cannot unseal URL which does not include a nonce")
}
if !q.Has("req") {
return nil, errors.New("bad request: cannot unseal URL which does not include a req")
}
nbfStr := q.Get("nbf")
expStr := q.Get("exp")
nonceStr := q.Get("nonce")
nbf, err := strconv.ParseInt(nbfStr, 10, 64)
if err != nil {
return nil, fmt.Errorf("bad request: error parsing nbf as int64: %w", err)
}
exp, err := strconv.ParseInt(expStr, 10, 64)
if err != nil {
return nil, fmt.Errorf("bad request: error parsing exp as int64: %w", err)
}
nonce, err := base64.RawURLEncoding.DecodeString(nonceStr)
if err != nil {
return nil, fmt.Errorf("bad request: error parsing nonce as base64 URL encoded: %w", err)
}
if time.Now().Before(time.UnixMilli(nbf)) {
return nil, fmt.Errorf("bad request: nbf is invalid")
}
if time.Now().After(time.UnixMilli(exp)) {
return nil, fmt.Errorf("bad request: exp is invalid")
}
block, err := aes.NewCipher(s.privateKeyBytes)
if err != nil {
return nil, fmt.Errorf("internal error: error making aes cipher with key: %w", err)
}
aesgcm, err := cipher.NewGCM(block)
if err != nil {
return nil, fmt.Errorf("internal error: error making gcm mode opener with key: %w", err)
}
reqStr := q.Get("req")
reqBytes, err := base64.RawURLEncoding.DecodeString(reqStr)
if err != nil {
return nil, fmt.Errorf("bad request: error parsing req as base64 URL encoded: %w", err)
}
requestURI, err := aesgcm.Open(nil, nonce, reqBytes, []byte(nbfStr+":"+expStr))
if err != nil {
return nil, fmt.Errorf("bad request: error opening sealed url: %w", err)
}
requestURL, err := url.Parse(string(requestURI))
if err != nil {
return nil, fmt.Errorf("bad request: error parsing unsealed request uri: %w", err)
}
if strings.TrimPrefix(u.Path, "/single_symmetric_key_sealed_request/") != requestURL.EscapedPath() {
return nil, fmt.Errorf("bad request: unsealed request path did not equal request path in sealed request")
}
ret := *u
ret.Path = requestURL.Path
ret.RawQuery = requestURL.RawQuery
return &ret, nil
}
@@ -0,0 +1,88 @@
// Copyright 2022 Dolthub, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package remotesrv
import (
"crypto/rand"
"encoding/base64"
"fmt"
"net/url"
"testing"
"time"
"github.com/stretchr/testify/assert"
)
func TestSingleSymmetricKeySealer(t *testing.T) {
s, err := NewSingleSymmetricKeySealer()
assert.NoError(t, err)
assert.NotNil(t, s)
u := &url.URL{
Scheme: "https",
Host: "remotesapi.dolthub.com:443",
Path: "somedatabasename/sometablefilename",
}
sealed, err := s.Seal(u)
assert.NoError(t, err)
unsealed, err := s.Unseal(sealed)
assert.NoError(t, err)
assert.Equal(t, u, unsealed)
corruptednbf := &(*sealed)
ps := corruptednbf.Query()
ps.Set("nbf", fmt.Sprintf("%v", time.Now()))
corruptednbf.RawQuery = ps.Encode()
unsealed, err = s.Unseal(corruptednbf)
assert.Error(t, err)
nonbf := &(*sealed)
ps = nonbf.Query()
ps.Del("nbf")
nonbf.RawQuery = ps.Encode()
unsealed, err = s.Unseal(nonbf)
assert.Error(t, err)
corruptedexp := &(*sealed)
ps = corruptedexp.Query()
ps.Set("exp", fmt.Sprintf("%v", time.Now()))
corruptedexp.RawQuery = ps.Encode()
unsealed, err = s.Unseal(corruptedexp)
assert.Error(t, err)
noexp := &(*sealed)
ps = noexp.Query()
ps.Del("exp")
noexp.RawQuery = ps.Encode()
unsealed, err = s.Unseal(noexp)
assert.Error(t, err)
corruptednonce := &(*sealed)
ps = corruptednonce.Query()
var differentnonce [12]byte
_, err = rand.Read(differentnonce[:])
assert.NoError(t, err)
ps.Set("nonce", base64.RawURLEncoding.EncodeToString(differentnonce[:]))
corruptednonce.RawQuery = ps.Encode()
unsealed, err = s.Unseal(corruptednonce)
assert.Error(t, err)
nononce := &(*sealed)
ps = nononce.Query()
ps.Del("nonce")
nononce.RawQuery = ps.Encode()
unsealed, err = s.Unseal(nononce)
assert.Error(t, err)
}
+9 -4
View File
@@ -57,7 +57,7 @@ type ServerArgs struct {
Options []grpc.ServerOption
}
func NewServer(args ServerArgs) *Server {
func NewServer(args ServerArgs) (*Server, error) {
if args.Logger == nil {
args.Logger = logrus.NewEntry(logrus.StandardLogger())
}
@@ -65,16 +65,21 @@ func NewServer(args ServerArgs) *Server {
s := new(Server)
s.stopChan = make(chan struct{})
sealer, err := NewSingleSymmetricKeySealer()
if err != nil {
return nil, err
}
s.wg.Add(2)
s.grpcPort = args.GrpcPort
s.grpcSrv = grpc.NewServer(append([]grpc.ServerOption{grpc.MaxRecvMsgSize(128 * 1024 * 1024)}, args.Options...)...)
var chnkSt remotesapi.ChunkStoreServiceServer = NewHttpFSBackedChunkStore(args.Logger, args.HttpHost, args.DBCache, args.FS)
var chnkSt remotesapi.ChunkStoreServiceServer = NewHttpFSBackedChunkStore(args.Logger, args.HttpHost, args.DBCache, args.FS, sealer)
if args.ReadOnly {
chnkSt = ReadOnlyChunkStore{chnkSt}
}
remotesapi.RegisterChunkStoreServiceServer(s.grpcSrv, chnkSt)
var handler http.Handler = newFileHandler(args.Logger, args.DBCache, args.FS, args.ReadOnly)
var handler http.Handler = newFileHandler(args.Logger, args.DBCache, args.FS, args.ReadOnly, sealer)
if args.HttpPort == args.GrpcPort {
handler = grpcMultiplexHandler(s.grpcSrv, handler)
} else {
@@ -87,7 +92,7 @@ func NewServer(args ServerArgs) *Server {
Handler: handler,
}
return s
return s, nil
}
func grpcMultiplexHandler(grpcSrv *grpc.Server, handler http.Handler) http.Handler {
+48 -20
View File
@@ -79,20 +79,21 @@ func newRowIterator(ctx context.Context, tbl *doltdb.Table, sqlSch sql.Schema, p
return ProllyRowIterFromPartition(ctx, sch, sqlSch, projCols, partition)
}
if schema.IsKeyless(sch) {
// would be more optimal to project columns into keyless tables also
return newKeylessRowIterator(ctx, tbl, projCols, partition)
} else {
return newKeyedRowIter(ctx, tbl, projCols, partition)
}
}
func newKeylessRowIterator(ctx context.Context, tbl *doltdb.Table, projectedCols []uint64, partition doltTablePartition) (sql.RowIter, error) {
mapIter, err := iterForPartition(ctx, partition)
if err != nil {
return nil, err
}
if schema.IsKeyless(sch) {
// would be more optimal to project columns into keyless tables also
return newKeylessRowIterator(ctx, tbl, projCols, mapIter)
} else {
return newKeyedRowIter(ctx, tbl, projCols, mapIter)
}
}
func newKeylessRowIterator(ctx context.Context, tbl *doltdb.Table, projectedCols []uint64, mapIter types.MapTupleIterator) (sql.RowIter, error) {
cols, tagToSqlColIdx, err := getTagToResColIdx(ctx, tbl, projectedCols)
if err != nil {
return nil, err
@@ -118,11 +119,7 @@ func newKeylessRowIterator(ctx context.Context, tbl *doltdb.Table, projectedCols
}, nil
}
func newKeyedRowIter(ctx context.Context, tbl *doltdb.Table, projectedCols []uint64, partition doltTablePartition) (sql.RowIter, error) {
mapIter, err := iterForPartition(ctx, partition)
if err != nil {
return nil, err
}
func newKeyedRowIter(ctx context.Context, tbl *doltdb.Table, projectedCols []uint64, mapIter types.MapTupleIterator) (sql.RowIter, error) {
cols, tagToSqlColIdx, err := getTagToResColIdx(ctx, tbl, projectedCols)
if err != nil {
@@ -247,15 +244,46 @@ func DoltTablePartitionToRowIter(ctx *sql.Context, name string, table *doltdb.Ta
return nil, nil, err
}
p := doltTablePartition{
start: start,
end: end,
rowData: data,
if types.IsFormat_DOLT(data.Format()) {
idx := durable.ProllyMapFromIndex(data)
c, err := idx.Count()
if err != nil {
return nil, nil, err
}
if end > uint64(c) {
end = uint64(c)
}
iter, err := idx.IterOrdinalRange(ctx, start, end)
if err != nil {
return nil, nil, err
}
rowIter, err := index.NewProllyRowIter(sch, pkSch.Schema, idx, iter, nil)
if err != nil {
return nil, nil, err
}
return pkSch.Schema, rowIter, nil
}
iter, err := newRowIterator(ctx, table, pkSch.Schema, nil, p)
idx := durable.NomsMapFromIndex(data)
iterAt, err := idx.IteratorAt(ctx, start)
if err != nil {
return nil, nil, err
}
return pkSch.Schema, iter, nil
iter := types.NewLimitingMapIterator(iterAt, end-start)
var rowIter sql.RowIter
if schema.IsKeyless(sch) {
rowIter, err = newKeylessRowIterator(ctx, table, nil, iter)
if err != nil {
return nil, nil, err
}
} else {
rowIter, err = newKeyedRowIter(ctx, table, nil, iter)
if err != nil {
return nil, nil, err
}
}
return pkSch.Schema, rowIter, nil
}
+11 -18
View File
@@ -194,7 +194,7 @@ func (suite *BlockStoreSuite) TestChunkStorePutMoreThanMemTable() {
if suite.putCountFn != nil {
suite.Equal(2, suite.putCountFn())
}
specs, err := suite.store.tables.ToSpecs()
specs, err := suite.store.tables.toSpecs()
suite.NoError(err)
suite.Len(specs, 2)
}
@@ -415,22 +415,15 @@ func (suite *BlockStoreSuite) TestChunkStorePutWithRebase() {
func TestBlockStoreConjoinOnCommit(t *testing.T) {
stats := &Stats{}
assertContainAll := func(t *testing.T, store chunks.ChunkStore, srcs ...chunkSource) {
rdrs := make(chunkReaderGroup, len(srcs))
for i, src := range srcs {
c, err := src.Clone()
assertContainAll := func(t *testing.T, store chunks.ChunkStore, sources ...chunkSource) {
ctx := context.Background()
for _, src := range sources {
err := extractAllChunks(ctx, src, func(rec extractRecord) {
ok, err := store.Has(context.Background(), hash.Hash(rec.a))
require.NoError(t, err)
assert.True(t, ok)
})
require.NoError(t, err)
rdrs[i] = c
}
chunkChan := make(chan extractRecord, mustUint32(rdrs.count()))
err := rdrs.extract(context.Background(), chunkChan)
require.NoError(t, err)
close(chunkChan)
for rec := range chunkChan {
ok, err := store.Has(context.Background(), hash.Hash(rec.a))
require.NoError(t, err)
assert.True(t, ok)
}
}
@@ -509,7 +502,7 @@ func TestBlockStoreConjoinOnCommit(t *testing.T) {
assert.True(t, ok)
assertContainAll(t, smallTableStore, srcs...)
for _, src := range srcs {
err := src.Close()
err := src.close()
require.NoError(t, err)
}
})
@@ -546,7 +539,7 @@ func TestBlockStoreConjoinOnCommit(t *testing.T) {
assert.True(t, ok)
assertContainAll(t, smallTableStore, srcs...)
for _, src := range srcs {
err := src.Close()
err := src.close()
require.NoError(t, err)
}
})
+4 -4
View File
@@ -36,12 +36,12 @@ func newReaderFromIndexData(q MemoryQuotaProvider, idxData []byte, name addr, tr
return &chunkSourceAdapter{tr, name}, nil
}
func (csa chunkSourceAdapter) Close() error {
return csa.tableReader.Close()
func (csa chunkSourceAdapter) close() error {
return csa.tableReader.close()
}
func (csa chunkSourceAdapter) Clone() (chunkSource, error) {
tr, err := csa.tableReader.Clone()
func (csa chunkSourceAdapter) clone() (chunkSource, error) {
tr, err := csa.tableReader.clone()
if err != nil {
return &chunkSourceAdapter{}, err
}
+20 -14
View File
@@ -64,7 +64,7 @@ func makeTestSrcs(t *testing.T, tableSizes []uint32, p tablePersister) (srcs chu
}
cs, err := p.Persist(context.Background(), mt, nil, &Stats{})
require.NoError(t, err)
c, err := cs.Clone()
c, err := cs.clone()
require.NoError(t, err)
srcs = append(srcs, c)
}
@@ -76,7 +76,7 @@ func TestConjoin(t *testing.T) {
makeTestTableSpecs := func(tableSizes []uint32, p tablePersister) (specs []tableSpec) {
for _, src := range makeTestSrcs(t, tableSizes, p) {
specs = append(specs, tableSpec{mustAddr(src.hash()), mustUint32(src.count())})
err := src.Close()
err := src.close()
require.NoError(t, err)
}
return
@@ -93,28 +93,34 @@ func TestConjoin(t *testing.T) {
}
assertContainAll := func(t *testing.T, p tablePersister, expect, actual []tableSpec) {
open := func(specs []tableSpec) (srcs chunkReaderGroup) {
open := func(specs []tableSpec) (sources chunkSources) {
for _, sp := range specs {
cs, err := p.Open(context.Background(), sp.name, sp.chunkCount, nil)
if err != nil {
require.NoError(t, err)
}
srcs = append(srcs, cs)
sources = append(sources, cs)
}
return
}
expectSrcs, actualSrcs := open(expect), open(actual)
chunkChan := make(chan extractRecord, mustUint32(expectSrcs.count()))
err := expectSrcs.extract(context.Background(), chunkChan)
require.NoError(t, err)
close(chunkChan)
for rec := range chunkChan {
has, err := actualSrcs.has(rec.a)
expectSrcs, actualSrcs := open(expect), open(actual)
ctx := context.Background()
for _, src := range expectSrcs {
err := extractAllChunks(ctx, src, func(rec extractRecord) {
var ok bool
for _, src := range actualSrcs {
var err error
ok, err = src.has(rec.a)
require.NoError(t, err)
if ok {
break
}
}
assert.True(t, ok)
})
require.NoError(t, err)
assert.True(t, has)
}
}
+4 -4
View File
@@ -118,12 +118,12 @@ func (mmtr *fileTableReader) hash() (addr, error) {
return mmtr.h, nil
}
func (mmtr *fileTableReader) Close() error {
return mmtr.tableReader.Close()
func (mmtr *fileTableReader) close() error {
return mmtr.tableReader.close()
}
func (mmtr *fileTableReader) Clone() (chunkSource, error) {
tr, err := mmtr.tableReader.Clone()
func (mmtr *fileTableReader) clone() (chunkSource, error) {
tr, err := mmtr.tableReader.clone()
if err != nil {
return &fileTableReader{}, err
}
+2 -2
View File
@@ -153,14 +153,14 @@ func main() {
if i+1 == numGroups { // last group
go func(i int) {
defer wg.Done()
reads[i], _, err = store.CalcReads(orderedChildren[i*branchFactor:].HashSet(), 0)
reads[i], _, err = nbs.CalcReads(store, orderedChildren[i*branchFactor:].HashSet(), 0)
d.PanicIfError(err)
}(i)
continue
}
go func(i int) {
defer wg.Done()
reads[i], _, err = store.CalcReads(orderedChildren[i*branchFactor:(i+1)*branchFactor].HashSet(), 0)
reads[i], _, err = nbs.CalcReads(store, orderedChildren[i*branchFactor:(i+1)*branchFactor].HashSet(), 0)
d.PanicIfError(err)
}(i)
}
+1 -1
View File
@@ -218,6 +218,6 @@ func (mt *memTable) write(haver chunkReader, stats *Stats) (name addr, data []by
return name, buff[:tableSize], count, nil
}
func (mt *memTable) Close() error {
func (mt *memTable) close() error {
return nil
}
+2 -14
View File
@@ -307,22 +307,10 @@ func (crg chunkReaderGroup) uncompressedLen() (data uint64, err error) {
return
}
func (crg chunkReaderGroup) extract(ctx context.Context, chunks chan<- extractRecord) error {
for _, haver := range crg {
err := haver.extract(ctx, chunks)
if err != nil {
return err
}
}
return nil
}
func (crg chunkReaderGroup) Close() error {
func (crg chunkReaderGroup) close() error {
var firstErr error
for _, c := range crg {
err := c.Close()
err := c.close()
if err != nil && firstErr == nil {
firstErr = err
}
+4 -36
View File
@@ -95,12 +95,12 @@ func (ccs *persistingChunkSource) getReader() chunkReader {
return ccs.cs
}
func (ccs *persistingChunkSource) Close() error {
func (ccs *persistingChunkSource) close() error {
// persistingChunkSource does not own |cs| or |mt|. No need to close them.
return nil
}
func (ccs *persistingChunkSource) Clone() (chunkSource, error) {
func (ccs *persistingChunkSource) clone() (chunkSource, error) {
// persistingChunkSource does not own |cs| or |mt|. No need to Clone.
return ccs, nil
}
@@ -240,34 +240,6 @@ func (ccs *persistingChunkSource) size() (uint64, error) {
return ccs.cs.size()
}
func (ccs *persistingChunkSource) calcReads(reqs []getRecord, blockSize uint64) (reads int, remaining bool, err error) {
err = ccs.wait()
if err != nil {
return 0, false, err
}
if ccs.cs == nil {
return 0, false, ErrNoChunkSource
}
return ccs.cs.calcReads(reqs, blockSize)
}
func (ccs *persistingChunkSource) extract(ctx context.Context, chunks chan<- extractRecord) error {
err := ccs.wait()
if err != nil {
return err
}
if ccs.cs == nil {
return ErrNoChunkSource
}
return ccs.cs.extract(ctx, chunks)
}
type emptyChunkSource struct{}
func (ecs emptyChunkSource) has(h addr) (bool, error) {
@@ -318,14 +290,10 @@ func (ecs emptyChunkSource) calcReads(reqs []getRecord, blockSize uint64) (reads
return 0, true, nil
}
func (ecs emptyChunkSource) extract(ctx context.Context, chunks chan<- extractRecord) error {
func (ecs emptyChunkSource) close() error {
return nil
}
func (ecs emptyChunkSource) Close() error {
return nil
}
func (ecs emptyChunkSource) Clone() (chunkSource, error) {
func (ecs emptyChunkSource) clone() (chunkSource, error) {
return ecs, nil
}
+30 -6
View File
@@ -579,18 +579,20 @@ func compactSourcesToBuffer(sources chunkSources) (name addr, data []byte, chunk
tw := newTableWriter(buff, nil)
errString := ""
ctx := context.Background()
for _, src := range sources {
chunks := make(chan extractRecord)
ch := make(chan extractRecord)
go func() {
defer close(chunks)
err := src.extract(context.Background(), chunks)
defer close(ch)
err = extractAllChunks(ctx, src, func(rec extractRecord) {
ch <- rec
})
if err != nil {
chunks <- extractRecord{a: mustAddr(src.hash()), err: err}
ch <- extractRecord{a: mustAddr(src.hash()), err: err}
}
}()
for rec := range chunks {
for rec := range ch {
if rec.err != nil {
errString += fmt.Sprintf("Failed to extract %s:\n %v\n******\n\n", rec.a, rec.err)
continue
@@ -625,3 +627,25 @@ func (ftp fakeTablePersister) Open(ctx context.Context, name addr, chunkCount ui
func (ftp fakeTablePersister) PruneTableFiles(_ context.Context, _ manifestContents) error {
return chunks.ErrUnsupportedOperation
}
func extractAllChunks(ctx context.Context, src chunkSource, cb func(rec extractRecord)) (err error) {
var index tableIndex
if index, err = src.index(); err != nil {
return err
}
var a addr
for i := uint32(0); i < index.ChunkCount(); i++ {
_, err = index.IndexEntry(i, &a)
if err != nil {
return err
}
data, err := src.get(ctx, a, nil)
if err != nil {
return err
}
cb(extractRecord{a: a, data: data})
}
return
}
+46 -47
View File
@@ -67,8 +67,6 @@ const (
defaultManifestCacheSize = 1 << 23 // 8MB
preflushChunkCount = 8
copyTableFileBufferSize = 128 * 1024 * 1024
)
var (
@@ -296,7 +294,7 @@ func (nbs *NomsBlockStore) UpdateManifest(ctx context.Context, updates map[hash.
}
}
newTables, err := nbs.tables.Rebase(ctx, updatedContents.specs, nbs.stats)
newTables, err := nbs.tables.rebase(ctx, updatedContents.specs, nbs.stats)
if err != nil {
return manifestContents{}, err
}
@@ -304,7 +302,7 @@ func (nbs *NomsBlockStore) UpdateManifest(ctx context.Context, updates map[hash.
nbs.upstream = updatedContents
oldTables := nbs.tables
nbs.tables = newTables
err = oldTables.Close()
err = oldTables.close()
if err != nil {
return manifestContents{}, err
}
@@ -373,7 +371,7 @@ func (nbs *NomsBlockStore) UpdateManifestWithAppendix(ctx context.Context, updat
}
}
newTables, err := nbs.tables.Rebase(ctx, updatedContents.specs, nbs.stats)
newTables, err := nbs.tables.rebase(ctx, updatedContents.specs, nbs.stats)
if err != nil {
return manifestContents{}, err
}
@@ -381,7 +379,7 @@ func (nbs *NomsBlockStore) UpdateManifestWithAppendix(ctx context.Context, updat
nbs.upstream = updatedContents
oldTables := nbs.tables
nbs.tables = newTables
err = oldTables.Close()
err = oldTables.close()
if err != nil {
return manifestContents{}, err
}
@@ -589,7 +587,7 @@ func newNomsBlockStore(ctx context.Context, nbfVerStr string, mm manifestManager
}
if exists {
newTables, err := nbs.tables.Rebase(ctx, contents.specs, nbs.stats)
newTables, err := nbs.tables.rebase(ctx, contents.specs, nbs.stats)
if err != nil {
return nil, err
@@ -598,7 +596,7 @@ func newNomsBlockStore(ctx context.Context, nbfVerStr string, mm manifestManager
nbs.upstream = contents
oldTables := nbs.tables
nbs.tables = newTables
err = oldTables.Close()
err = oldTables.close()
if err != nil {
return nil, err
}
@@ -649,7 +647,7 @@ func (nbs *NomsBlockStore) addChunk(ctx context.Context, h addr, data []byte) bo
nbs.mt = newMemTable(nbs.mtSize)
}
if !nbs.mt.addChunk(h, data) {
nbs.tables = nbs.tables.Prepend(ctx, nbs.mt, nbs.stats)
nbs.tables = nbs.tables.prepend(ctx, nbs.mt, nbs.stats)
nbs.mt = newMemTable(nbs.mtSize)
return nbs.mt.addChunk(h, data)
}
@@ -777,29 +775,6 @@ func toGetRecords(hashes hash.HashSet) []getRecord {
return reqs
}
func (nbs *NomsBlockStore) CalcReads(hashes hash.HashSet, blockSize uint64) (reads int, split bool, err error) {
reqs := toGetRecords(hashes)
tables := func() (tables tableSet) {
nbs.mu.RLock()
defer nbs.mu.RUnlock()
tables = nbs.tables
return
}()
reads, split, remaining, err := tables.calcReads(reqs, blockSize)
if err != nil {
return 0, false, err
}
if remaining {
return 0, false, errors.New("failed to find all chunks")
}
return
}
func (nbs *NomsBlockStore) Count() (uint32, error) {
count, tables, err := func() (count uint32, tables chunkReader, err error) {
nbs.mu.RLock()
@@ -947,7 +922,7 @@ func (nbs *NomsBlockStore) Rebase(ctx context.Context) error {
return nil
}
newTables, err := nbs.tables.Rebase(ctx, contents.specs, nbs.stats)
newTables, err := nbs.tables.rebase(ctx, contents.specs, nbs.stats)
if err != nil {
return err
}
@@ -955,7 +930,7 @@ func (nbs *NomsBlockStore) Rebase(ctx context.Context) error {
nbs.upstream = contents
oldTables := nbs.tables
nbs.tables = newTables
err = oldTables.Close()
err = oldTables.close()
if err != nil {
return err
}
@@ -977,7 +952,7 @@ func (nbs *NomsBlockStore) Commit(ctx context.Context, current, last hash.Hash)
anyPossiblyNovelChunks := func() bool {
nbs.mu.Lock()
defer nbs.mu.Unlock()
return nbs.mt != nil || nbs.tables.Novel() > 0
return nbs.mt != nil || len(nbs.tables.novel) > 0
}
if !anyPossiblyNovelChunks() && current == last {
@@ -1009,7 +984,7 @@ func (nbs *NomsBlockStore) Commit(ctx context.Context, current, last hash.Hash)
}
if cnt > preflushChunkCount {
nbs.tables = nbs.tables.Prepend(ctx, nbs.mt, nbs.stats)
nbs.tables = nbs.tables.prepend(ctx, nbs.mt, nbs.stats)
nbs.mt = nil
}
}
@@ -1058,7 +1033,7 @@ func (nbs *NomsBlockStore) updateManifest(ctx context.Context, current, last has
}
handleOptimisticLockFailure := func(upstream manifestContents) error {
newTables, err := nbs.tables.Rebase(ctx, upstream.specs, nbs.stats)
newTables, err := nbs.tables.rebase(ctx, upstream.specs, nbs.stats)
if err != nil {
return err
}
@@ -1066,7 +1041,7 @@ func (nbs *NomsBlockStore) updateManifest(ctx context.Context, current, last has
nbs.upstream = upstream
oldTables := nbs.tables
nbs.tables = newTables
err = oldTables.Close()
err = oldTables.close()
if last != upstream.root {
return errOptimisticLockFailedRoot
@@ -1092,7 +1067,7 @@ func (nbs *NomsBlockStore) updateManifest(ctx context.Context, current, last has
}
if cnt > 0 {
nbs.tables = nbs.tables.Prepend(ctx, nbs.mt, nbs.stats)
nbs.tables = nbs.tables.prepend(ctx, nbs.mt, nbs.stats)
nbs.mt = nil
}
}
@@ -1106,7 +1081,7 @@ func (nbs *NomsBlockStore) updateManifest(ctx context.Context, current, last has
return err
}
newTables, err := nbs.tables.Rebase(ctx, newUpstream.specs, nbs.stats)
newTables, err := nbs.tables.rebase(ctx, newUpstream.specs, nbs.stats)
if err != nil {
return err
@@ -1115,7 +1090,7 @@ func (nbs *NomsBlockStore) updateManifest(ctx context.Context, current, last has
nbs.upstream = newUpstream
oldTables := nbs.tables
nbs.tables = newTables
err = oldTables.Close()
err = oldTables.close()
if err != nil {
return err
}
@@ -1123,7 +1098,7 @@ func (nbs *NomsBlockStore) updateManifest(ctx context.Context, current, last has
return errOptimisticLockFailedTables
}
specs, err := nbs.tables.ToSpecs()
specs, err := nbs.tables.toSpecs()
if err != nil {
return err
}
@@ -1164,7 +1139,7 @@ func (nbs *NomsBlockStore) updateManifest(ctx context.Context, current, last has
return handleOptimisticLockFailure(upstream)
}
newTables, err := nbs.tables.Flatten(ctx)
newTables, err := nbs.tables.flatten(ctx)
if err != nil {
return nil
@@ -1183,7 +1158,7 @@ func (nbs *NomsBlockStore) Version() string {
}
func (nbs *NomsBlockStore) Close() error {
return nbs.tables.Close()
return nbs.tables.close()
}
func (nbs *NomsBlockStore) Stats() interface{} {
@@ -1599,7 +1574,7 @@ func (nbs *NomsBlockStore) gcTableSize() (uint64, error) {
return 0, err
}
avgTableSize := total / uint64(nbs.tables.Upstream()+nbs.tables.Novel()+1)
avgTableSize := total / uint64(nbs.tables.Size()+1)
// max(avgTableSize, defaultMemTableSize)
if avgTableSize > nbs.mtSize {
@@ -1647,14 +1622,14 @@ func (nbs *NomsBlockStore) swapTables(ctx context.Context, specs []tableSpec) (e
nbs.mt = newMemTable(nbs.mtSize)
// clear nbs.tables.novel
nbs.tables, err = nbs.tables.Flatten(ctx)
nbs.tables, err = nbs.tables.flatten(ctx)
if err != nil {
return err
}
// replace nbs.tables.upstream with gc compacted tables
nbs.upstream = upstream
nbs.tables, err = nbs.tables.Rebase(ctx, upstream.specs, nbs.stats)
nbs.tables, err = nbs.tables.rebase(ctx, upstream.specs, nbs.stats)
if err != nil {
return err
}
@@ -1681,3 +1656,27 @@ func (nbs *NomsBlockStore) SetRootChunk(ctx context.Context, root, previous hash
// I guess this thing infinitely retries without backoff in the case off errOptimisticLockFailedTables
}
}
// CalcReads computes the number of IO operations necessary to fetch |hashes|.
func CalcReads(nbs *NomsBlockStore, hashes hash.HashSet, blockSize uint64) (reads int, split bool, err error) {
reqs := toGetRecords(hashes)
tables := func() (tables tableSet) {
nbs.mu.RLock()
defer nbs.mu.RUnlock()
tables = nbs.tables
return
}()
reads, split, remaining, err := tableSetCalcReads(tables, reqs, blockSize)
if err != nil {
return 0, false, err
}
if remaining {
return 0, false, errors.New("failed to find all chunks")
}
return
}
+9 -24
View File
@@ -230,49 +230,34 @@ type chunkReader interface {
get(ctx context.Context, h addr, stats *Stats) ([]byte, error)
getMany(ctx context.Context, eg *errgroup.Group, reqs []getRecord, found func(context.Context, *chunks.Chunk), stats *Stats) (bool, error)
getManyCompressed(ctx context.Context, eg *errgroup.Group, reqs []getRecord, found func(context.Context, CompressedChunk), stats *Stats) (bool, error)
extract(ctx context.Context, chunks chan<- extractRecord) error
count() (uint32, error)
uncompressedLen() (uint64, error)
// Close releases resources retained by the |chunkReader|.
Close() error
}
type chunkReadPlanner interface {
findOffsets(reqs []getRecord) (ors offsetRecSlice, remaining bool, err error)
getManyAtOffsets(
ctx context.Context,
eg *errgroup.Group,
offsetRecords offsetRecSlice,
found func(context.Context, *chunks.Chunk),
stats *Stats,
) error
getManyCompressedAtOffsets(
ctx context.Context,
eg *errgroup.Group,
offsetRecords offsetRecSlice,
found func(context.Context, CompressedChunk),
stats *Stats,
) error
// close releases resources retained by the |chunkReader|.
close() error
}
type chunkSource interface {
chunkReader
// hash returns the hash address of this chunkSource.
hash() (addr, error)
calcReads(reqs []getRecord, blockSize uint64) (reads int, remaining bool, err error)
// opens a Reader to the first byte of the chunkData segment of this table.
reader(context.Context) (io.Reader, error)
// size returns the total size of the chunkSource: chunks, index, and footer
size() (uint64, error)
// index returns the tableIndex of this chunkSource.
index() (tableIndex, error)
// Clone returns a |chunkSource| with the same contents as the
// clone returns a |chunkSource| with the same contents as the
// original, but with independent |Close| behavior. A |chunkSource|
// cannot be |Close|d more than once, so if a |chunkSource| is being
// retained in two objects with independent life-cycle, it should be
// |Clone|d first.
Clone() (chunkSource, error)
clone() (chunkSource, error)
}
type chunkSources []chunkSource
+12 -10
View File
@@ -154,12 +154,12 @@ func removeFooter(p []byte, chunkCount uint32) (out []byte, err error) {
// Useful to create an onHeapTableIndex without retaining the entire underlying array of data.
func parseTableIndexByCopy(buff []byte, q MemoryQuotaProvider) (onHeapTableIndex, error) {
r := bytes.NewReader(buff)
return ReadTableIndexByCopy(r, q)
return readTableIndexByCopy(r, q)
}
// ReadTableIndexByCopy loads an index into memory from an io.ReadSeeker
// readTableIndexByCopy loads an index into memory from an io.ReadSeeker
// Caution: Allocates new memory for entire index
func ReadTableIndexByCopy(rd io.ReadSeeker, q MemoryQuotaProvider) (onHeapTableIndex, error) {
func readTableIndexByCopy(rd io.ReadSeeker, q MemoryQuotaProvider) (onHeapTableIndex, error) {
chunkCount, totalUncompressedData, err := ReadTableFooter(rd)
if err != nil {
return onHeapTableIndex{}, err
@@ -304,7 +304,7 @@ func (ti onHeapTableIndex) Lookup(h *addr) (indexEntry, bool, error) {
func (ti onHeapTableIndex) lookupOrdinal(h *addr) (uint32, error) {
prefix := h.Prefix()
for idx := ti.prefixIdx(prefix); idx < ti.chunkCount && ti.prefixAt(idx) == prefix; idx++ {
for idx := ti.findPrefix(prefix); idx < ti.chunkCount && ti.prefixAt(idx) == prefix; idx++ {
m, err := ti.EntrySuffixMatches(idx, h)
if err != nil {
return ti.chunkCount, err
@@ -317,22 +317,24 @@ func (ti onHeapTableIndex) lookupOrdinal(h *addr) (uint32, error) {
return ti.chunkCount, nil
}
// prefixIdx returns the first position in |tr.prefixes| whose value ==
// |prefix|. Returns |tr.chunkCount| if absent
func (ti onHeapTableIndex) prefixIdx(prefix uint64) (idx uint32) {
// findPrefix returns the first position in |tr.prefixes| whose value == |prefix|.
// Returns |tr.chunkCount| if absent
func (ti onHeapTableIndex) findPrefix(prefix uint64) (idx uint32) {
query := make([]byte, addrPrefixSize)
binary.BigEndian.PutUint64(query, prefix)
// NOTE: The golang impl of sort.Search is basically inlined here. This method can be called in
// an extremely tight loop and inlining the code was a significant perf improvement.
idx, j := 0, ti.chunkCount
for idx < j {
h := idx + (j-idx)/2 // avoid overflow when computing h
// i ≤ h < j
if ti.prefixAt(h) < prefix {
o := int64(prefixTupleSize * h)
if bytes.Compare(ti.tupleB[o:o+addrPrefixSize], query) < 0 {
idx = h + 1 // preserves f(i-1) == false
} else {
j = h // preserves f(j) == true
}
}
return
}
@@ -475,7 +477,7 @@ func (ti onHeapTableIndex) ResolveShortHash(short []byte) ([]string, error) {
sPrefix := ti.padStringAndDecode(shortHash, "0")
// Binary Search for prefix
pIdxL = ti.prefixIdx(sPrefix)
pIdxL = ti.findPrefix(sPrefix)
// Prefix doesn't exist
if pIdxL == ti.chunkCount {
+47
View File
@@ -50,6 +50,53 @@ func TestParseTableIndex(t *testing.T) {
}
}
func BenchmarkFindPrefix(b *testing.B) {
f, err := os.Open("testdata/0oa7mch34jg1rvghrnhr4shrp2fm4ftd.idx")
require.NoError(b, err)
defer f.Close()
bs, err := io.ReadAll(f)
require.NoError(b, err)
idx, err := parseTableIndexByCopy(bs, &noopQuotaProvider{})
require.NoError(b, err)
defer idx.Close()
assert.Equal(b, uint32(596), idx.ChunkCount())
prefixes, err := idx.Prefixes()
require.NoError(b, err)
b.Run("benchmark prefixIdx()", func(b *testing.B) {
var ord uint32
for i := 0; i < b.N; i++ {
ord = prefixIdx(idx, prefixes[uint(i)&uint(512)])
}
assert.True(b, ord < 596)
})
b.Run("benchmark findPrefix", func(b *testing.B) {
var ord uint32
for i := 0; i < b.N; i++ {
ord = idx.findPrefix(prefixes[uint(i)&uint(512)])
}
assert.True(b, ord < 596)
})
}
// previous implementation for findIndex().
func prefixIdx(ti onHeapTableIndex, prefix uint64) (idx uint32) {
// NOTE: The golang impl of sort.Search is basically inlined here. This method can be called in
// an extremely tight loop and inlining the code was a significant perf improvement.
idx, j := 0, ti.chunkCount
for idx < j {
h := idx + (j-idx)/2 // avoid overflow when computing h
// i ≤ h < j
if ti.prefixAt(h) < prefix {
idx = h + 1 // preserves f(i-1) == false
} else {
j = h // preserves f(j) == true
}
}
return
}
func TestMMapIndex(t *testing.T) {
f, err := os.Open("testdata/0oa7mch34jg1rvghrnhr4shrp2fm4ftd.idx")
require.NoError(t, err)
-4
View File
@@ -105,10 +105,6 @@ type chunkSourcesByDescendingDataSize struct {
err error
}
func newChunkSourcesByDescendingDataSize(sws []sourceWithSize) chunkSourcesByDescendingDataSize {
return chunkSourcesByDescendingDataSize{sws, nil}
}
func (csbds chunkSourcesByDescendingDataSize) Len() int { return len(csbds.sws) }
func (csbds chunkSourcesByDescendingDataSize) Less(i, j int) bool {
swsI, swsJ := csbds.sws[i], csbds.sws[j]
+2 -3
View File
@@ -277,7 +277,6 @@ func (hs offsetRecSlice) Len() int { return len(hs) }
func (hs offsetRecSlice) Less(i, j int) bool { return hs[i].offset < hs[j].offset }
func (hs offsetRecSlice) Swap(i, j int) { hs[i], hs[j] = hs[j], hs[i] }
var _ chunkReadPlanner = tableReader{}
var _ chunkReader = tableReader{}
func (tr tableReader) readCompressedAtOffsets(
@@ -655,11 +654,11 @@ func (tr tableReader) size() (uint64, error) {
return i.TableFileSize(), nil
}
func (tr tableReader) Close() error {
func (tr tableReader) close() error {
return tr.tableIndex.Close()
}
func (tr tableReader) Clone() (tableReader, error) {
func (tr tableReader) clone() (tableReader, error) {
ti, err := tr.tableIndex.Clone()
if err != nil {
return tableReader{}, err
+40 -115
View File
@@ -24,6 +24,7 @@ package nbs
import (
"context"
"errors"
"fmt"
"sync"
"sync/atomic"
@@ -136,20 +137,6 @@ func (ts tableSet) get(ctx context.Context, h addr, stats *Stats) ([]byte, error
func (ts tableSet) getMany(ctx context.Context, eg *errgroup.Group, reqs []getRecord, found func(context.Context, *chunks.Chunk), stats *Stats) (remaining bool, err error) {
f := func(css chunkSources) bool {
for _, haver := range css {
if rp, ok := haver.(chunkReadPlanner); ok {
offsets, remaining, err := rp.findOffsets(reqs)
if err != nil {
return true
}
err = rp.getManyAtOffsets(ctx, eg, offsets, found, stats)
if err != nil {
return true
}
if !remaining {
return false
}
continue
}
remaining, err = haver.getMany(ctx, eg, reqs, found, stats)
if err != nil {
return true
@@ -167,25 +154,6 @@ func (ts tableSet) getMany(ctx context.Context, eg *errgroup.Group, reqs []getRe
func (ts tableSet) getManyCompressed(ctx context.Context, eg *errgroup.Group, reqs []getRecord, found func(context.Context, CompressedChunk), stats *Stats) (remaining bool, err error) {
f := func(css chunkSources) bool {
for _, haver := range css {
if rp, ok := haver.(chunkReadPlanner); ok {
offsets, remaining, err := rp.findOffsets(reqs)
if err != nil {
return true
}
if len(offsets) > 0 {
err = rp.getManyCompressedAtOffsets(ctx, eg, offsets, found, stats)
if err != nil {
return true
}
}
if !remaining {
return false
}
continue
}
remaining, err = haver.getManyCompressed(ctx, eg, reqs, found, stats)
if err != nil {
return true
@@ -201,44 +169,6 @@ func (ts tableSet) getManyCompressed(ctx context.Context, eg *errgroup.Group, re
return f(ts.novel) && err == nil && f(ts.upstream), err
}
func (ts tableSet) calcReads(reqs []getRecord, blockSize uint64) (reads int, split, remaining bool, err error) {
f := func(css chunkSources) (int, bool, bool, error) {
reads, split := 0, false
for _, haver := range css {
rds, rmn, err := haver.calcReads(reqs, blockSize)
if err != nil {
return 0, false, false, err
}
reads += rds
if !rmn {
return reads, split, false, nil
}
split = true
}
return reads, split, true, nil
}
reads, split, remaining, err = f(ts.novel)
if err != nil {
return 0, false, false, err
}
if remaining {
var rds int
rds, split, remaining, err = f(ts.upstream)
if err != nil {
return 0, false, false, err
}
reads += rds
}
return reads, split, remaining, nil
}
func (ts tableSet) count() (uint32, error) {
f := func(css chunkSources) (count uint32, err error) {
for _, haver := range css {
@@ -322,7 +252,7 @@ func (ts tableSet) physicalLen() (uint64, error) {
return lenNovel + lenUp, nil
}
func (ts tableSet) Close() error {
func (ts tableSet) close() error {
var firstErr error
setErr := func(err error) {
if err != nil && firstErr == nil {
@@ -331,11 +261,11 @@ func (ts tableSet) Close() error {
}
for _, t := range ts.novel {
err := t.Close()
err := t.close()
setErr(err)
}
for _, t := range ts.upstream {
err := t.Close()
err := t.close()
setErr(err)
}
return firstErr
@@ -346,20 +276,9 @@ func (ts tableSet) Size() int {
return len(ts.novel) + len(ts.upstream)
}
// Novel returns the number of tables containing novel chunks in this
// tableSet.
func (ts tableSet) Novel() int {
return len(ts.novel)
}
// Upstream returns the number of known-persisted tables in this tableSet.
func (ts tableSet) Upstream() int {
return len(ts.upstream)
}
// Prepend adds a memTable to an existing tableSet, compacting |mt| and
// prepend adds a memTable to an existing tableSet, compacting |mt| and
// returning a new tableSet with newly compacted table added.
func (ts tableSet) Prepend(ctx context.Context, mt *memTable, stats *Stats) tableSet {
func (ts tableSet) prepend(ctx context.Context, mt *memTable, stats *Stats) tableSet {
newTs := tableSet{
novel: make(chunkSources, len(ts.novel)+1),
upstream: make(chunkSources, len(ts.upstream)),
@@ -373,29 +292,9 @@ func (ts tableSet) Prepend(ctx context.Context, mt *memTable, stats *Stats) tabl
return newTs
}
func (ts tableSet) extract(ctx context.Context, chunks chan<- extractRecord) error {
// Since new tables are _prepended_ to a tableSet, extracting chunks in insertOrder requires iterating ts.upstream back to front, followed by ts.novel.
for i := len(ts.upstream) - 1; i >= 0; i-- {
err := ts.upstream[i].extract(ctx, chunks)
if err != nil {
return err
}
}
for i := len(ts.novel) - 1; i >= 0; i-- {
err := ts.novel[i].extract(ctx, chunks)
if err != nil {
return err
}
}
return nil
}
// Flatten returns a new tableSet with |upstream| set to the union of ts.novel
// flatten returns a new tableSet with |upstream| set to the union of ts.novel
// and ts.upstream.
func (ts tableSet) Flatten(ctx context.Context) (tableSet, error) {
func (ts tableSet) flatten(ctx context.Context) (tableSet, error) {
flattened := tableSet{
upstream: make(chunkSources, 0, ts.Size()),
p: ts.p,
@@ -419,9 +318,9 @@ func (ts tableSet) Flatten(ctx context.Context) (tableSet, error) {
return flattened, nil
}
// Rebase returns a new tableSet holding the novel tables managed by |ts| and
// rebase returns a new tableSet holding the novel tables managed by |ts| and
// those specified by |specs|.
func (ts tableSet) Rebase(ctx context.Context, specs []tableSpec, stats *Stats) (tableSet, error) {
func (ts tableSet) rebase(ctx context.Context, specs []tableSpec, stats *Stats) (tableSet, error) {
merged := tableSet{
novel: make(chunkSources, 0, len(ts.novel)),
p: ts.p,
@@ -438,7 +337,7 @@ func (ts tableSet) Rebase(ctx context.Context, specs []tableSpec, stats *Stats)
}
if cnt > 0 {
t2, err := t.Clone()
t2, err := t.clone()
if err != nil {
return tableSet{}, err
}
@@ -474,7 +373,7 @@ OUTER:
return tableSet{}, err
}
if spec.name == h {
c, err := existing.Clone()
c, err := existing.clone()
if err != nil {
return tableSet{}, err
}
@@ -524,7 +423,7 @@ OUTER:
if err != nil {
// Close any opened chunkSources
for _, cs := range opened {
_ = cs.Close()
_ = cs.close()
}
if r := rp.Load(); r != nil {
@@ -536,7 +435,7 @@ OUTER:
return merged, nil
}
func (ts tableSet) ToSpecs() ([]tableSpec, error) {
func (ts tableSet) toSpecs() ([]tableSpec, error) {
tableSpecs := make([]tableSpec, 0, ts.Size())
for _, src := range ts.novel {
cnt, err := src.count()
@@ -576,3 +475,29 @@ func (ts tableSet) ToSpecs() ([]tableSpec, error) {
}
return tableSpecs, nil
}
func tableSetCalcReads(ts tableSet, reqs []getRecord, blockSize uint64) (reads int, split, remaining bool, err error) {
all := append(ts.novel, ts.upstream...)
for _, tbl := range all {
rdr, ok := tbl.(*fileTableReader)
if !ok {
h, _ := tbl.hash()
err = fmt.Errorf("chunkSource %s is not a fileTableReader", h.String())
return
}
var n int
var more bool
n, more, err = rdr.calcReads(reqs, blockSize)
if err != nil {
return 0, false, false, err
}
reads += n
if !more {
break
}
split = true
}
return
}
+30 -65
View File
@@ -33,8 +33,8 @@ import (
var testChunks = [][]byte{[]byte("hello2"), []byte("goodbye2"), []byte("badbye2")}
func TestTableSetPrependEmpty(t *testing.T) {
ts := newFakeTableSet(&noopQuotaProvider{}).Prepend(context.Background(), newMemTable(testMemTableSize), &Stats{})
specs, err := ts.ToSpecs()
ts := newFakeTableSet(&noopQuotaProvider{}).prepend(context.Background(), newMemTable(testMemTableSize), &Stats{})
specs, err := ts.toSpecs()
require.NoError(t, err)
assert.Empty(t, specs)
}
@@ -42,23 +42,23 @@ func TestTableSetPrependEmpty(t *testing.T) {
func TestTableSetPrepend(t *testing.T) {
assert := assert.New(t)
ts := newFakeTableSet(&noopQuotaProvider{})
specs, err := ts.ToSpecs()
specs, err := ts.toSpecs()
require.NoError(t, err)
assert.Empty(specs)
mt := newMemTable(testMemTableSize)
mt.addChunk(computeAddr(testChunks[0]), testChunks[0])
ts = ts.Prepend(context.Background(), mt, &Stats{})
ts = ts.prepend(context.Background(), mt, &Stats{})
firstSpecs, err := ts.ToSpecs()
firstSpecs, err := ts.toSpecs()
require.NoError(t, err)
assert.Len(firstSpecs, 1)
mt = newMemTable(testMemTableSize)
mt.addChunk(computeAddr(testChunks[1]), testChunks[1])
mt.addChunk(computeAddr(testChunks[2]), testChunks[2])
ts = ts.Prepend(context.Background(), mt, &Stats{})
ts = ts.prepend(context.Background(), mt, &Stats{})
secondSpecs, err := ts.ToSpecs()
secondSpecs, err := ts.toSpecs()
require.NoError(t, err)
assert.Len(secondSpecs, 2)
assert.Equal(firstSpecs, secondSpecs[1:])
@@ -67,22 +67,22 @@ func TestTableSetPrepend(t *testing.T) {
func TestTableSetToSpecsExcludesEmptyTable(t *testing.T) {
assert := assert.New(t)
ts := newFakeTableSet(&noopQuotaProvider{})
specs, err := ts.ToSpecs()
specs, err := ts.toSpecs()
require.NoError(t, err)
assert.Empty(specs)
mt := newMemTable(testMemTableSize)
mt.addChunk(computeAddr(testChunks[0]), testChunks[0])
ts = ts.Prepend(context.Background(), mt, &Stats{})
ts = ts.prepend(context.Background(), mt, &Stats{})
mt = newMemTable(testMemTableSize)
ts = ts.Prepend(context.Background(), mt, &Stats{})
ts = ts.prepend(context.Background(), mt, &Stats{})
mt = newMemTable(testMemTableSize)
mt.addChunk(computeAddr(testChunks[1]), testChunks[1])
mt.addChunk(computeAddr(testChunks[2]), testChunks[2])
ts = ts.Prepend(context.Background(), mt, &Stats{})
ts = ts.prepend(context.Background(), mt, &Stats{})
specs, err = ts.ToSpecs()
specs, err = ts.toSpecs()
require.NoError(t, err)
assert.Len(specs, 2)
}
@@ -90,61 +90,26 @@ func TestTableSetToSpecsExcludesEmptyTable(t *testing.T) {
func TestTableSetFlattenExcludesEmptyTable(t *testing.T) {
assert := assert.New(t)
ts := newFakeTableSet(&noopQuotaProvider{})
specs, err := ts.ToSpecs()
specs, err := ts.toSpecs()
require.NoError(t, err)
assert.Empty(specs)
mt := newMemTable(testMemTableSize)
mt.addChunk(computeAddr(testChunks[0]), testChunks[0])
ts = ts.Prepend(context.Background(), mt, &Stats{})
ts = ts.prepend(context.Background(), mt, &Stats{})
mt = newMemTable(testMemTableSize)
ts = ts.Prepend(context.Background(), mt, &Stats{})
ts = ts.prepend(context.Background(), mt, &Stats{})
mt = newMemTable(testMemTableSize)
mt.addChunk(computeAddr(testChunks[1]), testChunks[1])
mt.addChunk(computeAddr(testChunks[2]), testChunks[2])
ts = ts.Prepend(context.Background(), mt, &Stats{})
ts = ts.prepend(context.Background(), mt, &Stats{})
ts, err = ts.Flatten(context.Background())
ts, err = ts.flatten(context.Background())
require.NoError(t, err)
assert.EqualValues(ts.Size(), 2)
}
func TestTableSetExtract(t *testing.T) {
assert := assert.New(t)
ts := newFakeTableSet(&noopQuotaProvider{})
specs, err := ts.ToSpecs()
require.NoError(t, err)
assert.Empty(specs)
// Put in one table
mt := newMemTable(testMemTableSize)
mt.addChunk(computeAddr(testChunks[0]), testChunks[0])
ts = ts.Prepend(context.Background(), mt, &Stats{})
// Put in a second
mt = newMemTable(testMemTableSize)
mt.addChunk(computeAddr(testChunks[1]), testChunks[1])
mt.addChunk(computeAddr(testChunks[2]), testChunks[2])
ts = ts.Prepend(context.Background(), mt, &Stats{})
chunkChan := make(chan extractRecord)
go func() {
defer close(chunkChan)
err := ts.extract(context.Background(), chunkChan)
require.NoError(t, err)
}()
i := 0
for rec := range chunkChan {
a := computeAddr(testChunks[i])
assert.NotNil(rec.data, "Nothing for", a)
assert.Equal(testChunks[i], rec.data, "Item %d: %s != %s", i, string(testChunks[i]), string(rec.data))
assert.Equal(a, rec.a)
i++
}
}
func persist(t *testing.T, p tablePersister, chunks ...[]byte) {
for _, c := range chunks {
mt := newMemTable(testMemTableSize)
@@ -166,37 +131,37 @@ func TestTableSetRebase(t *testing.T) {
for _, c := range chunks {
mt := newMemTable(testMemTableSize)
mt.addChunk(computeAddr(c), c)
ts = ts.Prepend(context.Background(), mt, &Stats{})
ts = ts.prepend(context.Background(), mt, &Stats{})
}
return ts
}
fullTS := newTableSet(persister, q)
defer func() {
require.NoError(t, fullTS.Close())
require.NoError(t, fullTS.close())
}()
specs, err := fullTS.ToSpecs()
specs, err := fullTS.toSpecs()
require.NoError(t, err)
assert.Empty(specs)
fullTS = insert(fullTS, testChunks...)
fullTS, err = fullTS.Flatten(context.Background())
fullTS, err = fullTS.flatten(context.Background())
require.NoError(t, err)
ts := newTableSet(persister, q)
ts = insert(ts, testChunks[0])
assert.Equal(1, ts.Size())
ts, err = ts.Flatten(context.Background())
ts, err = ts.flatten(context.Background())
require.NoError(t, err)
ts = insert(ts, []byte("novel"))
specs, err = fullTS.ToSpecs()
specs, err = fullTS.toSpecs()
require.NoError(t, err)
ts2, err := ts.Rebase(context.Background(), specs, nil)
ts2, err := ts.rebase(context.Background(), specs, nil)
require.NoError(t, err)
defer func() {
require.NoError(t, ts2.Close())
require.NoError(t, ts2.close())
}()
err = ts.Close()
err = ts.close()
require.NoError(t, err)
assert.Equal(4, ts2.Size())
}
@@ -204,17 +169,17 @@ func TestTableSetRebase(t *testing.T) {
func TestTableSetPhysicalLen(t *testing.T) {
assert := assert.New(t)
ts := newFakeTableSet(&noopQuotaProvider{})
specs, err := ts.ToSpecs()
specs, err := ts.toSpecs()
require.NoError(t, err)
assert.Empty(specs)
mt := newMemTable(testMemTableSize)
mt.addChunk(computeAddr(testChunks[0]), testChunks[0])
ts = ts.Prepend(context.Background(), mt, &Stats{})
ts = ts.prepend(context.Background(), mt, &Stats{})
mt = newMemTable(testMemTableSize)
mt.addChunk(computeAddr(testChunks[1]), testChunks[1])
mt.addChunk(computeAddr(testChunks[2]), testChunks[2])
ts = ts.Prepend(context.Background(), mt, &Stats{})
ts = ts.prepend(context.Background(), mt, &Stats{})
assert.True(mustUint64(ts.physicalLen()) > indexSize(mustUint32(ts.count())))
}
@@ -241,7 +206,7 @@ func TestTableSetClosesOpenedChunkSourcesOnErr(t *testing.T) {
}
ts := tableSet{p: p, q: q, rl: make(chan struct{}, 1)}
_, err := ts.Rebase(context.Background(), specs, &Stats{})
_, err := ts.rebase(context.Background(), specs, &Stats{})
require.Error(t, err)
for _ = range p.opened {
+2 -2
View File
@@ -25,7 +25,7 @@ import (
)
func IterChunks(rd io.ReadSeeker, cb func(chunk chunks.Chunk) (stop bool, err error)) error {
idx, err := ReadTableIndexByCopy(rd, &noopQuotaProvider{})
idx, err := readTableIndexByCopy(rd, &noopQuotaProvider{})
if err != nil {
return err
}
@@ -69,7 +69,7 @@ func IterChunks(rd io.ReadSeeker, cb func(chunk chunks.Chunk) (stop bool, err er
}
func GetTableIndexPrefixes(rd io.ReadSeeker) (prefixes []uint64, err error) {
idx, err := ReadTableIndexByCopy(rd, &noopQuotaProvider{})
idx, err := readTableIndexByCopy(rd, &noopQuotaProvider{})
if err != nil {
return nil, err
}
+46
View File
@@ -136,3 +136,49 @@ func (m Map) RangeIterator(ctx context.Context, startIdx, endIdx uint64) (MapTup
return &mapRangeIter{collItr: collItr}, nil
}
// LimitingMapIterator iterates |iter| only returning up to |limit| results.
type LimitingMapIterator struct {
limit uint64
cnt uint64
iter MapIterator
}
var _ MapIterator = (*LimitingMapIterator)(nil)
// NewLimitingMapIterator returns a *LimitingMapIterator.
func NewLimitingMapIterator(iter MapIterator, limit uint64) *LimitingMapIterator {
return &LimitingMapIterator{
iter: iter,
limit: limit,
}
}
// Next implements MapIterator.
func (l *LimitingMapIterator) Next(ctx context.Context) (k, v Value, err error) {
if l.cnt == l.limit {
return nil, nil, nil
}
k, v, err = l.iter.Next(ctx)
if err != nil {
return nil, nil, err
}
if k == nil {
return nil, nil, nil
}
l.cnt++
return
}
// NextTuple implements MapIterator.
func (l *LimitingMapIterator) NextTuple(ctx context.Context) (k, v Tuple, err error) {
if l.cnt == l.limit {
return Tuple{}, Tuple{}, io.EOF
}
k, v, err = l.iter.NextTuple(ctx)
if err != nil {
return Tuple{}, Tuple{}, err
}
l.cnt++
return
}
+4 -1
View File
@@ -81,7 +81,7 @@ func main() {
dbCache = NewLocalCSCache(fs)
}
server := remotesrv.NewServer(remotesrv.ServerArgs{
server, err := remotesrv.NewServer(remotesrv.ServerArgs{
HttpHost: *httpHostParam,
HttpPort: *httpPortParam,
GrpcPort: *grpcPortParam,
@@ -89,6 +89,9 @@ func main() {
DBCache: dbCache,
ReadOnly: *readOnlyParam,
})
if err != nil {
log.Fatalf("error creating remotesrv Server: %v\n", err)
}
listeners, err := server.Listeners()
if err != nil {
log.Fatalf("error starting remotesrv Server listeners: %v\n", err)
@@ -114,7 +114,7 @@ wait_for_connection(port=int(port_str), timeout_ms=int(timeout_ms), database=dat
start_sql_server() {
DEFAULT_DB="$1"
PORT=$( definePORT )
dolt sql-server --host 0.0.0.0 --port=$PORT --user dolt &
dolt sql-server --host 0.0.0.0 --port=$PORT --user dolt --socket "dolt.$PORT.sock" &
SERVER_PID=$!
wait_for_connection $PORT 5000
}
@@ -125,7 +125,7 @@ start_sql_server() {
start_sql_server_with_args() {
DEFAULT_DB=""
PORT=$( definePORT )
dolt sql-server "$@" --port=$PORT &
dolt sql-server "$@" --port=$PORT --socket "dolt.$PORT.sock" &
SERVER_PID=$!
wait_for_connection $PORT 5000
}
@@ -148,7 +148,7 @@ behavior:
autocommit: false
" > .cliconfig.yaml
cat "$2" >> .cliconfig.yaml
dolt sql-server --config .cliconfig.yaml &
dolt sql-server --config .cliconfig.yaml --socket "dolt.$PORT.sock" &
SERVER_PID=$!
wait_for_connection $PORT 5000
}
@@ -170,7 +170,7 @@ listener:
behavior:
autocommit: false
" > .cliconfig.yaml
dolt sql-server --config .cliconfig.yaml &
dolt sql-server --config .cliconfig.yaml --socket "dolt.$PORT.sock" &
SERVER_PID=$!
wait_for_connection $PORT 5000
}
@@ -178,7 +178,7 @@ behavior:
start_multi_db_server() {
DEFAULT_DB="$1"
PORT=$( definePORT )
dolt sql-server --host 0.0.0.0 --port=$PORT --user dolt --data-dir ./ &
dolt sql-server --host 0.0.0.0 --port=$PORT --user dolt --data-dir ./ --socket "dolt.$PORT.sock" &
SERVER_PID=$!
wait_for_connection $PORT 5000
}
@@ -188,6 +188,9 @@ start_multi_db_server() {
# for an async replication push), pass 1.
# kill the process if it's still running
stop_sql_server() {
# Clean up any mysql.sock file in the default, global location
rm -f /tmp/mysql.sock
wait=$1
if [ ! -z "$SERVER_PID" ]; then
serverpidinuse=$(lsof -i -P -n | grep LISTEN | grep $SERVER_PID | wc -l)
@@ -237,7 +240,6 @@ stop_sql_server() {
# * param7: Expected exception value of 1. Mutually exclusive with param6.
#
server_query() {
PORT=$( definePORT )
server_query_with_port "$PORT" "$@"
}
@@ -256,7 +258,7 @@ definePORT() {
for i in {0..9}
do
let getPORT="($$ + $i) % (65536-1024) + 1024"
portinuse=$(lsof -i -P -n | grep LISTEN | grep $attemptedPORT | wc -l)
portinuse=$(lsof -i -P -n | grep LISTEN | grep $getPORT | wc -l)
if [ $portinuse -eq 0 ]; then
echo "$getPORT"
break
@@ -169,7 +169,7 @@ pk,c1,c2,c3,c4,c5
9,1,2,3,4,5
DELIM
dolt table import -c --pk=pk test 1pk5col-ints.csv
run dolt sql -q "create table fktest(id int not null, tpk int unsigned, c2 int, primary key(id), foreign key (tpk) references test(pk))"
run dolt sql -q "create table fktest(id int not null, tpk int, c2 int, primary key(id), foreign key (tpk) references test(pk))"
[ "$status" -eq 0 ]
run dolt sql -q "insert into fktest values (1, 0, 1)"
[ "$status" -eq 0 ]
@@ -567,7 +567,7 @@ DELIM
[[ "$output" =~ "CREATE TABLE \`test\`" ]]
[[ "$output" =~ "\`pk\` int" ]]
[[ "$output" =~ "\`str\` varchar(16383)" ]]
[[ "$output" =~ "\`int\` int unsigned" ]]
[[ "$output" =~ "\`int\` int" ]]
[[ "$output" =~ "\`bool\` tinyint" ]]
[[ "$output" =~ "\`float\` float" ]]
[[ "$output" =~ "\`date\` date" ]]
+19
View File
@@ -229,3 +229,22 @@ SQL
run dolt sql -q "SELECT * FROM t ORDER BY a LIMIT 1" -r csv
[[ "$output" =~ "h&a,B" ]] || false
}
@test "migrate: database with inverted primary key order" {
dolt sql <<SQL
CREATE TABLE t (
pk2 varchar(20) NOT NULL,
pk1 varchar(20) NOT NULL,
PRIMARY KEY (pk1, pk2));
INSERT INTO t (pk2, pk1) VALUES ("z","a"),("y","b"),("x","c");
SQL
dolt commit -Am "added table t"
run dolt schema show t
[[ "$output" =~ "PRIMARY KEY (\`pk1\`,\`pk2\`)" ]] || false
dolt migrate
run dolt schema show t
[[ "$output" =~ "PRIMARY KEY (\`pk1\`,\`pk2\`)" ]] || false
}
+10 -10
View File
@@ -82,7 +82,7 @@ teardown() {
[[ "$output" =~ "\`string\` varchar(16383)" ]] || false
[[ "$output" =~ "\`boolean\` tinyint" ]] || false
[[ "$output" =~ "\`float\` float" ]] || false
[[ "$output" =~ "\`uint\` int unsigned" ]] || false
[[ "$output" =~ "\`uint\` int" ]] || false
[[ "$output" =~ "\`uuid\` char(36) CHARACTER SET ascii COLLATE ascii_bin" ]] || false
}
@@ -259,9 +259,9 @@ DELIM
run dolt diff --schema
[ "$status" -eq 0 ]
[[ "$output" =~ '+ `x` varchar(16383) NOT NULL,' ]] || false
[[ "$output" =~ '+ `y` float NOT NULL,' ]] || false
[[ "$output" =~ '+ `z` int NOT NULL,' ]] || false
[[ "$output" =~ '+ `x` varchar(16383),' ]] || false
[[ "$output" =~ '+ `y` float,' ]] || false
[[ "$output" =~ '+ `z` int,' ]] || false
# assert no columns were deleted/replaced
[[ ! "$output" = "- \`" ]] || false
@@ -282,9 +282,9 @@ DELIM
run dolt diff --schema
[ "$status" -eq 0 ]
[[ "$output" =~ '+ `x` varchar(16383) NOT NULL,' ]] || false
[[ "$output" =~ '+ `y` float NOT NULL,' ]] || false
[[ "$output" =~ '+ `z` int NOT NULL,' ]] || false
[[ "$output" =~ '+ `x` varchar(16383),' ]] || false
[[ "$output" =~ '+ `y` float,' ]] || false
[[ "$output" =~ '+ `z` int,' ]] || false
# assert no columns were deleted/replaced
[[ ! "$output" = "- \`" ]] || false
@@ -308,9 +308,9 @@ DELIM
run dolt diff --schema
[ "$status" -eq 0 ]
[[ "$output" =~ '- `a` varchar(16383) NOT NULL,' ]] || false
[[ "$output" =~ '- `b` float NOT NULL,' ]] || false
[[ "$output" =~ '- `c` tinyint NOT NULL,' ]] || false
[[ "$output" =~ '- `a` varchar(16383),' ]] || false
[[ "$output" =~ '- `b` float,' ]] || false
[[ "$output" =~ '- `c` tinyint,' ]] || false
# assert no columns were added
[[ ! "$output" = "+ \`" ]] || false
}
@@ -49,6 +49,7 @@ call dolt_commit('-am', 'add some vals');
SQL
dolt pull
run dolt sql -q 'select count(*) from vals;'
[[ "$output" =~ "10" ]] || false
}
+16 -12
View File
@@ -59,7 +59,7 @@ teardown() {
dolt sql -q "create user dolt@'%' identified by '123'"
PORT=$( definePORT )
dolt sql-server --port=$PORT --user dolt > log.txt 2>&1 &
dolt sql-server --port=$PORT --user dolt --socket "dolt.$PORT.sock" > log.txt 2>&1 &
SERVER_PID=$!
sleep 5
@@ -836,7 +836,7 @@ listener:
host: "0.0.0.0"
port: $PORT
EOF
dolt sql-server --config ./config.yml &
dolt sql-server --config ./config.yml --socket "dolt.$PORT.sock" &
SERVER_PID=$!
# We do things manually here because we need to control CLIENT_MULTI_STATEMENTS.
python3 -c '
@@ -881,7 +881,7 @@ listener:
host: "0.0.0.0"
port: $PORT
EOF
dolt sql-server --config ./config.yml &
dolt sql-server --config ./config.yml --socket "dolt.$PORT.sock" &
SERVER_PID=$!
# We do things manually here because we need to control CLIENT_MULTI_STATEMENTS.
python3 -c '
@@ -1298,15 +1298,15 @@ databases:
cd repo1
start_sql_server
PORT=$( definePORT )
run dolt sql-server -P $PORT
run dolt sql-server -P $PORT --socket "dolt.$PORT.sock"
[ "$status" -eq 1 ]
}
@test "sql-server: multi dir sql-server locks out childen" {
@test "sql-server: multi dir sql-server locks out children" {
start_sql_server
cd repo2
PORT=$( definePORT )
run dolt sql-server -P $PORT
run dolt sql-server -P $PORT --socket "dolt.$PORT.sock"
[ "$status" -eq 1 ]
}
@@ -1315,7 +1315,7 @@ databases:
start_sql_server
cd ..
PORT=$( definePORT )
run dolt sql-server -P $PORT
run dolt sql-server -P $PORT --socket "dolt.$PORT.sock"
[ "$status" -eq 1 ]
}
@@ -1325,7 +1325,7 @@ databases:
server_query repo1 1 dolt "" "create database newdb" ""
cd newdb
PORT=$( definePORT )
run dolt sql-server -P $PORT
run dolt sql-server -P $PORT --socket "dolt.$PORT.sock"
[ "$status" -eq 1 ]
}
@@ -1363,6 +1363,8 @@ databases:
run dolt sql-client --host=0.0.0.0 --port=$PORT --user=dolt <<< "exit;"
[ "$status" -eq 0 ]
[[ "$output" =~ "# Welcome to the Dolt MySQL client." ]] || false
rm /tmp/mysql.sock
}
@test "sql-server: start server with socket option undefined should set default socket path" {
@@ -1380,6 +1382,8 @@ databases:
run grep '\"/tmp/mysql.sock\"' log.txt
[ "$status" -eq 0 ]
[ "${#lines[@]}" -eq 1 ]
rm /tmp/mysql.sock
}
@test "sql-server: server fails to start up if there is already a file in the socket file path" {
@@ -1417,7 +1421,7 @@ listener:
host: localhost
port: $PORT
max_connections: 10
socket: /tmp/mysql.sock
socket: dolt.$PORT.sock
behavior:
autocommit: true" > server.yaml
@@ -1428,7 +1432,7 @@ behavior:
server_query repo2 1 dolt "" "select 1 as col1" "col1\n1"
run grep '\"/tmp/mysql.sock\"' log.txt
run grep "dolt.$PORT.sock" log.txt
[ "$status" -eq 0 ]
[ "${#lines[@]}" -eq 1 ]
}
@@ -1470,7 +1474,7 @@ s.close()
" > port_finder.py
PORT=$(python3 port_finder.py)
run dolt sql-server --port=$PORT
run dolt sql-server --port=$PORT --socket "dolt.$PORT.sock"
[ "$status" -eq 1 ]
[[ "$output" =~ "database locked by another sql-server; either clone the database to run a second server" ]] || false
@@ -1524,7 +1528,7 @@ s.close()
[ $status -eq 0 ]
PORT=$( definePORT )
dolt sql-server --host 0.0.0.0 --port=$PORT --user dolt &
dolt sql-server --host 0.0.0.0 --port=$PORT --user dolt --socket "dolt.$PORT.sock" &
SERVER_PID=$! # will get killed by teardown_common
sleep 5 # not using python wait so this works on windows