Merge remote-tracking branch 'origin/dhruv/push-timeout-bug' into aaron/walk-addrs

This commit is contained in:
Aaron Son
2022-03-28 15:46:40 -07:00
40 changed files with 1872 additions and 564 deletions
+1 -1
View File
@@ -111,7 +111,7 @@ func pullHelper(ctx context.Context, dEnv *env.DoltEnv, pullSpec *env.PullSpec)
if remoteTrackRef != nil {
srcDBCommit, err := actions.FetchRemoteBranch(ctx, dEnv.TempTableFilesDir(), pullSpec.Remote, srcDB, dEnv.DoltDB, pullSpec.Branch, remoteTrackRef, buildProgStarter(downloadLanguage), stopProgFuncs)
srcDBCommit, err := actions.FetchRemoteBranch(ctx, dEnv.TempTableFilesDir(), pullSpec.Remote, srcDB, dEnv.DoltDB, pullSpec.Branch, buildProgStarter(downloadLanguage), stopProgFuncs)
if err != nil {
return err
}
+1 -1
View File
@@ -54,7 +54,7 @@ import (
)
const (
Version = "0.37.6"
Version = "0.37.7"
)
var dumpDocsCommand = &commands.DumpDocsCmd{}
+2 -2
View File
@@ -19,7 +19,7 @@ require (
github.com/dolthub/ishell v0.0.0-20220112232610-14e753f0f371
github.com/dolthub/mmap-go v1.0.4-0.20201107010347-f9f2a9588a66
github.com/dolthub/sqllogictest/go v0.0.0-20201107003712-816f3ae12d81
github.com/dolthub/vitess v0.0.0-20220317184555-442cf0a796cc
github.com/dolthub/vitess v0.0.0-20220323175412-7e0381fb7c3f
github.com/dustin/go-humanize v1.0.0
github.com/fatih/color v1.9.0
github.com/flynn-archive/go-shlex v0.0.0-20150515145356-3f9db97f8568
@@ -68,7 +68,7 @@ require (
)
require (
github.com/dolthub/go-mysql-server v0.11.1-0.20220318174955-93a6edd8d585
github.com/dolthub/go-mysql-server v0.11.1-0.20220324183628-b0a3bc9c2c2f
github.com/google/flatbuffers v2.0.5+incompatible
github.com/gosuri/uilive v0.0.4
github.com/kch42/buzhash v0.0.0-20160816060738-9bdec3dec7c6
+4 -4
View File
@@ -170,8 +170,8 @@ github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZm
github.com/dgryski/go-sip13 v0.0.0-20181026042036-e10d5fee7954/go.mod h1:vAd38F8PWV+bWy6jNmig1y/TA+kYO4g3RSRF0IAv0no=
github.com/dolthub/fslock v0.0.3 h1:iLMpUIvJKMKm92+N1fmHVdxJP5NdyDK5bK7z7Ba2s2U=
github.com/dolthub/fslock v0.0.3/go.mod h1:QWql+P17oAAMLnL4HGB5tiovtDuAjdDTPbuqx7bYfa0=
github.com/dolthub/go-mysql-server v0.11.1-0.20220318174955-93a6edd8d585 h1:B/9vfwwCndGlyoX2bPo1xZoTTij9YIRUG0PcOYH/1G4=
github.com/dolthub/go-mysql-server v0.11.1-0.20220318174955-93a6edd8d585/go.mod h1:tctGAo2umfCywx3/3S6f8rvyUu2FpkdO2H2Bx4E9PJY=
github.com/dolthub/go-mysql-server v0.11.1-0.20220324183628-b0a3bc9c2c2f h1:F1hhtWcel9an0/Ohbdg0gw6gSlgwWBxlnrSl7Jyi/2M=
github.com/dolthub/go-mysql-server v0.11.1-0.20220324183628-b0a3bc9c2c2f/go.mod h1:1Sq4rszjP6AW7AJaF9xfycWexkNKIwkkOuYnoS5XcPg=
github.com/dolthub/ishell v0.0.0-20220112232610-14e753f0f371 h1:oyPHJlzumKta1vnOQqUnfdz+pk3EmnHS3Nd0cCT0I2g=
github.com/dolthub/ishell v0.0.0-20220112232610-14e753f0f371/go.mod h1:dhGBqcCEfK5kuFmeO5+WOx3hqc1k3M29c1oS/R7N4ms=
github.com/dolthub/jsonpath v0.0.0-20210609232853-d49537a30474 h1:xTrR+l5l+1Lfq0NvhiEsctylXinUMFhhsqaEcl414p8=
@@ -180,8 +180,8 @@ github.com/dolthub/mmap-go v1.0.4-0.20201107010347-f9f2a9588a66 h1:WRPDbpJWEnPxP
github.com/dolthub/mmap-go v1.0.4-0.20201107010347-f9f2a9588a66/go.mod h1:N5ZIbMGuDUpTpOFQ7HcsN6WSIpTGQjHP+Mz27AfmAgk=
github.com/dolthub/sqllogictest/go v0.0.0-20201107003712-816f3ae12d81 h1:7/v8q9XGFa6q5Ap4Z/OhNkAMBaK5YeuEzwJt+NZdhiE=
github.com/dolthub/sqllogictest/go v0.0.0-20201107003712-816f3ae12d81/go.mod h1:siLfyv2c92W1eN/R4QqG/+RjjX5W2+gCTRjZxBjI3TY=
github.com/dolthub/vitess v0.0.0-20220317184555-442cf0a796cc h1:p+9ivR8aqWgO3CkL61xxNSAUK6D01BScK7/fqaynyWA=
github.com/dolthub/vitess v0.0.0-20220317184555-442cf0a796cc/go.mod h1:qpZ4j0dval04OgZJ5fyKnlniSFUosTH280pdzUjUJig=
github.com/dolthub/vitess v0.0.0-20220323175412-7e0381fb7c3f h1:5kW1g4mscnLfJJR0C871vqQhQVifLm8mkhKBU131Nt4=
github.com/dolthub/vitess v0.0.0-20220323175412-7e0381fb7c3f/go.mod h1:jxgvpEvrTNw2i4BKlwT75E775eUXBeMv5MPeQkIb9zI=
github.com/dustin/go-humanize v0.0.0-20171111073723-bb3d318650d4/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk=
github.com/dustin/go-humanize v1.0.0 h1:VSnTsYCnlFHaM2/igO1h6X3HA71jcobQuxemgkq4zYo=
github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk=
@@ -31,6 +31,14 @@ const (
To = "to"
)
func ToColNamer(name string) string {
return To + "_" + name
}
func FromColNamer(name string) string {
return From + "_" + name
}
type RowDiffSource struct {
ad RowDiffer
joiner *rowconv.Joiner
+1 -1
View File
@@ -66,7 +66,7 @@ func CommitItrForAllBranches(ctx context.Context, ddb *DoltDB) (CommitItr, error
return cmItr, nil
}
// CommitItrForRoots will return a CommitItr which will iterate over all descendant commits of the provided rootCommits.
// CommitItrForRoots will return a CommitItr which will iterate over all ancestor commits of the provided rootCommits.
func CommitItrForRoots(ddb *DoltDB, rootCommits ...*Commit) CommitItr {
return &commitItr{
ddb: ddb,
+16
View File
@@ -659,6 +659,22 @@ func (ddb *DoltDB) GetBranches(ctx context.Context) ([]ref.DoltRef, error) {
return ddb.GetRefsOfType(ctx, branchRefFilter)
}
// HasBranch returns whether the DB has a branch with the name given
func (ddb *DoltDB) HasBranch(ctx context.Context, branchName string) (bool, error) {
branches, err := ddb.GetRefsOfType(ctx, branchRefFilter)
if err != nil {
return false, err
}
for _, b := range branches {
if b.GetPath() == branchName {
return true, nil
}
}
return false, nil
}
type BranchWithHash struct {
Ref ref.DoltRef
Hash hash.Hash
@@ -123,7 +123,6 @@ func TestIsValidTableName(t *testing.T) {
assert.False(t, IsValidTableName("-"))
assert.False(t, IsValidTableName("-a"))
assert.False(t, IsValidTableName(""))
assert.False(t, IsValidTableName("1a"))
assert.False(t, IsValidTableName("a1-"))
assert.False(t, IsValidTableName("ab!!c"))
}
+4
View File
@@ -19,6 +19,8 @@ import (
"errors"
"fmt"
"regexp"
"strings"
"unicode"
"github.com/dolthub/go-mysql-server/sql"
@@ -54,6 +56,8 @@ var (
// IsValidTableName returns true if the name matches the regular expression TableNameRegexStr.
// Table names must be composed of 1 or more letters and non-initial numerals, as well as the characters _ and -
func IsValidTableName(name string) bool {
// Ignore all leading digits
name = strings.TrimLeftFunc(name, unicode.IsDigit)
return tableNameRegex.MatchString(name)
}
+12 -2
View File
@@ -334,7 +334,17 @@ func FetchFollowTags(ctx context.Context, tempTableDir string, srcDB, destDB *do
return nil
}
func FetchRemoteBranch(ctx context.Context, tempTablesDir string, rem env.Remote, srcDB, destDB *doltdb.DoltDB, srcRef, destRef ref.DoltRef, progStarter ProgStarter, progStopper ProgStopper) (*doltdb.Commit, error) {
// FetchRemoteBranch fetches and returns the |Commit| corresponding to the remote ref given. Returns an error if the
// remote reference doesn't exist or can't be fetched. Blocks until the fetch is complete.
func FetchRemoteBranch(
ctx context.Context,
tempTablesDir string,
rem env.Remote,
srcDB, destDB *doltdb.DoltDB,
srcRef ref.DoltRef,
progStarter ProgStarter,
progStopper ProgStopper,
) (*doltdb.Commit, error) {
evt := events.GetEventFromContext(ctx)
u, err := earl.Parse(rem.Url)
@@ -387,7 +397,7 @@ func FetchRefSpecs(ctx context.Context, dbData env.DbData, refSpecs []ref.Remote
if remoteTrackRef != nil {
rsSeen = true
srcDBCommit, err := FetchRemoteBranch(ctx, dbData.Rsw.TempTableFilesDir(), remote, srcDB, dbData.Ddb, branchRef, remoteTrackRef, progStarter, progStopper)
srcDBCommit, err := FetchRemoteBranch(ctx, dbData.Rsw.TempTableFilesDir(), remote, srcDB, dbData.Ddb, branchRef, progStarter, progStopper)
if err != nil {
return err
}
-11
View File
@@ -58,17 +58,6 @@ type Remote struct {
dialer dbfactory.GRPCDialProvider
}
func GetRemote(ctx context.Context, remoteName, remoteUrl string, params map[string]string, dialer dbfactory.GRPCDialProvider) (Remote, *doltdb.DoltDB, error) {
r := NewRemote(remoteName, remoteUrl, params, dialer)
ddb, err := r.GetRemoteDB(ctx, types.Format_Default)
if err != nil {
return NoRemote, nil, err
}
return r, ddb, nil
}
func NewRemote(name, url string, params map[string]string, dialer dbfactory.GRPCDialProvider) Remote {
return Remote{name, url, []string{"refs/heads/*:refs/remotes/" + name + "/*"}, params, dialer}
}
@@ -31,7 +31,6 @@ import (
"time"
"github.com/cenkalti/backoff"
"github.com/dustin/go-humanize"
"github.com/opentracing/opentracing-go"
"golang.org/x/sync/errgroup"
@@ -882,7 +881,7 @@ func (dcs *DoltChunkStore) uploadChunks(ctx context.Context) (map[hash.Hash]int,
hashToCount := make(map[hash.Hash]int)
hashToData := make(map[hash.Hash][]byte)
hashToDetails := make(map[hash.Hash]*remotesapi.TableFileDetails)
hashToContentHash := make(map[hash.Hash][]byte)
// structuring so this can be done as multiple files in the future.
{
@@ -897,37 +896,15 @@ func (dcs *DoltChunkStore) uploadChunks(ctx context.Context) (map[hash.Hash]int,
hashToCount[h] = len(chnks)
md5Bytes := md5.Sum(data)
hashToDetails[h] = &remotesapi.TableFileDetails{
Id: h[:],
ContentLength: uint64(len(data)),
ContentHash: md5Bytes[:],
}
hashToContentHash[h] = md5Bytes[:]
}
tfds := make([]*remotesapi.TableFileDetails, 0, len(hashToDetails))
for _, v := range hashToDetails {
tfds = append(tfds, v)
}
req := &remotesapi.GetUploadLocsRequest{RepoId: dcs.getRepoId(), TableFileDetails: tfds}
resp, err := dcs.csClient.GetUploadLocations(ctx, req)
if err != nil {
return map[hash.Hash]int{}, NewRpcError(err, "GetUploadLocations", dcs.host, req)
}
for _, loc := range resp.Locs {
var err error
h := hash.New(loc.TableFileHash)
data := hashToData[h]
details := hashToDetails[h]
switch typedLoc := loc.Location.(type) {
case *remotesapi.UploadLoc_HttpPost:
err = dcs.httpPostUpload(ctx, loc.TableFileHash, typedLoc.HttpPost, bytes.NewBuffer(data), details.ContentHash)
default:
break
}
for h, contentHash := range hashToContentHash {
// Can parallelize this in the future if needed
err := dcs.uploadTableFileWithRetries(ctx, h, contentHash, func() (io.ReadCloser, uint64, error) {
data := hashToData[h]
return io.NopCloser(bytes.NewReader(data)), uint64(len(data)), nil
})
if err != nil {
return map[hash.Hash]int{}, err
}
@@ -936,55 +913,97 @@ func (dcs *DoltChunkStore) uploadChunks(ctx context.Context) (map[hash.Hash]int,
return hashToCount, nil
}
func (dcs *DoltChunkStore) uploadTableFileWithRetries(ctx context.Context, tableFileId hash.Hash, tableFileContentHash []byte, getContent func() (io.ReadCloser, uint64, error)) error {
op := func() error {
body, contentLength, err := getContent()
if err != nil {
return err
}
tbfd := &remotesapi.TableFileDetails{
Id: tableFileId[:],
ContentLength: contentLength,
ContentHash: tableFileContentHash,
}
dcs.logf("getting upload location for file %s", tableFileId.String())
req := &remotesapi.GetUploadLocsRequest{RepoId: dcs.getRepoId(), TableFileDetails: []*remotesapi.TableFileDetails{tbfd}}
resp, err := dcs.csClient.GetUploadLocations(ctx, req)
if err != nil {
if err != nil {
return NewRpcError(err, "GetUploadLocations", dcs.host, req)
}
}
if len(resp.Locs) != 1 {
return NewRpcError(errors.New("unexpected upload location count"), "GetUploadLocations", dcs.host, req)
}
loc := resp.Locs[0]
switch typedLoc := loc.Location.(type) {
case *remotesapi.UploadLoc_HttpPost:
// strip off the query parameters as they clutter the logs. We only
// really care about being able to verify the table files are being
// uploaded to the correct places on S3.
urlStr := typedLoc.HttpPost.Url
qmIdx := strings.IndexRune(urlStr, '?')
if qmIdx != -1 {
urlStr = urlStr[:qmIdx]
}
dcs.logf("uploading file %s to %s", tableFileId.String(), urlStr)
err = dcs.httpPostUpload(ctx, typedLoc.HttpPost, tableFileContentHash, int64(contentLength), body)
if err != nil {
dcs.logf("failed to upload file %s to %s, err: %v", tableFileId.String(), urlStr, err)
return err
}
dcs.logf("successfully uploaded file %s to %s", tableFileId.String(), urlStr)
default:
break
}
return nil
}
return backoff.Retry(op, backoff.WithMaxRetries(uploadRetryParams, uploadRetryCount))
}
type Sizer interface {
Size() int64
}
func (dcs *DoltChunkStore) httpPostUpload(ctx context.Context, hashBytes []byte, post *remotesapi.HttpPostTableFile, rd io.Reader, contentHash []byte) error {
return HttpPostUpload(ctx, dcs.httpFetcher, post, rd, contentHash)
func (dcs *DoltChunkStore) httpPostUpload(ctx context.Context, post *remotesapi.HttpPostTableFile, contentHash []byte, contentLength int64, body io.ReadCloser) error {
return HttpPostUpload(ctx, dcs.httpFetcher, post, contentHash, contentLength, body)
}
func HttpPostUpload(ctx context.Context, httpFetcher HTTPFetcher, post *remotesapi.HttpPostTableFile, rd io.Reader, contentHash []byte) error {
req, err := http.NewRequest(http.MethodPut, post.Url, rd)
func HttpPostUpload(ctx context.Context, httpFetcher HTTPFetcher, post *remotesapi.HttpPostTableFile, contentHash []byte, contentLength int64, body io.ReadCloser) error {
fetcher := globalHttpFetcher
if httpFetcher != nil {
fetcher = httpFetcher
}
req, err := http.NewRequest(http.MethodPut, post.Url, body)
if err != nil {
return err
}
if sizer, ok := rd.(Sizer); ok {
req.ContentLength = sizer.Size()
}
req.ContentLength = contentLength
if len(contentHash) > 0 {
md5s := base64.StdEncoding.EncodeToString(contentHash)
req.Header.Set("Content-MD5", md5s)
}
fetcher := globalHttpFetcher
if httpFetcher != nil {
fetcher = httpFetcher
resp, err := fetcher.Do(req.WithContext(ctx))
if err == nil {
defer func() {
_ = resp.Body.Close()
}()
}
var resp *http.Response
op := func() error {
var err error
resp, err = fetcher.Do(req.WithContext(ctx))
if err == nil {
defer func() {
_ = resp.Body.Close()
}()
}
return processHttpResp(resp, err)
}
err = backoff.Retry(op, backoff.WithMaxRetries(uploadRetryParams, uploadRetryCount))
if err != nil {
return err
}
return nil
return processHttpResp(resp, err)
}
// aggregateDownloads looks for byte ranges that need to be downloaded, and tries to aggregate them into a smaller number
@@ -1175,58 +1194,12 @@ func (dcs *DoltChunkStore) SupportedOperations() nbs.TableFileStoreOps {
}
// WriteTableFile reads a table file from the provided reader and writes it to the chunk store.
func (dcs *DoltChunkStore) WriteTableFile(ctx context.Context, fileId string, numChunks int, rd io.Reader, contentLength uint64, contentHash []byte) error {
dcs.logf("getting upload location for file %s with %d chunks and size %s", fileId, numChunks, humanize.Bytes(contentLength))
func (dcs *DoltChunkStore) WriteTableFile(ctx context.Context, fileId string, numChunks int, contentHash []byte, getRd func() (io.ReadCloser, uint64, error)) error {
fileIdBytes := hash.Parse(fileId)
tfd := &remotesapi.TableFileDetails{
Id: fileIdBytes[:],
ContentLength: contentLength,
ContentHash: contentHash,
}
req := &remotesapi.GetUploadLocsRequest{
RepoId: dcs.getRepoId(),
TableFileDetails: []*remotesapi.TableFileDetails{tfd},
// redundant and deprecated. Still setting for compatibility, but will remove "promptly".
TableFileHashes: [][]byte{fileIdBytes[:]},
}
resp, err := dcs.csClient.GetUploadLocations(ctx, req)
err := dcs.uploadTableFileWithRetries(ctx, fileIdBytes, contentHash, getRd)
if err != nil {
return NewRpcError(err, "GetUploadLocations", dcs.host, req)
return err
}
if len(resp.Locs) != 1 {
return errors.New("unexpected upload location count")
}
loc := resp.Locs[0]
switch typedLoc := loc.Location.(type) {
case *remotesapi.UploadLoc_HttpPost:
urlStr := typedLoc.HttpPost.Url
// strip off the query parameters as they clutter the logs. We only really care about being able to verify the table
// files are being uploaded to the correct places on S3.
qmIdx := strings.IndexRune(urlStr, '?')
if qmIdx != -1 {
urlStr = urlStr[:qmIdx]
}
dcs.logf("uploading %s to %s", fileId, urlStr)
err = dcs.httpPostUpload(ctx, loc.TableFileHash, typedLoc.HttpPost, rd, contentHash)
if err != nil {
dcs.logf("failed to upload %s to %s. err: %s", fileId, urlStr, err.Error())
return err
}
dcs.logf("successfully uploaded %s to %s", fileId, urlStr)
default:
return errors.New("unsupported upload location")
}
return nil
}
@@ -48,6 +48,7 @@ const (
const (
DiffCommitTag = iota + SystemTableReservedMin + uint64(2000)
DiffCommitDateTag
DiffTypeTag
)
// Tags for dolt_query_catalog table
+1 -2
View File
@@ -201,10 +201,9 @@ func (ss *SuperSchema) nameColumns() map[uint64]string {
// GenerateColCollection creates a ColCollection from all the columns in the SuperSchema.
// Each column is assigned its latest name from its name history.
func (ss *SuperSchema) GenerateColCollection() (*ColCollection, error) {
uniqNames := ss.nameColumns()
cc := NewColCollection()
err := ss.Iter(func(tag uint64, col Column) (stop bool, err error) {
col.Name = uniqNames[tag]
col.Name = ss.LatestColumnName(tag)
cc = cc.Append(col)
stop = err != nil
return stop, err
@@ -125,25 +125,6 @@ var SuperSchemaTests = []SuperSchemaTest{
strCol("c", 3, false),
}),
},
{
Name: "SuperSchema appends tag to disambiguate name collisions",
Schemas: []Schema{sch1, nameCollisionWithSch1},
ExpectedSuperSchema: SuperSchema{
allCols: mustColColl([]Column{
strCol("", 1, true),
strCol("", 2, false),
strCol("", 3, false),
strCol("", 22, false),
}),
tagNames: map[uint64][]string{1: {"a"}, 2: {"b"}, 3: {"c"}, 22: {"b"}},
},
ExpectedGeneratedSchema: mustSchema([]Column{
strCol("a", 1, true),
strCol("b_2", 2, false),
strCol("c", 3, false),
strCol("b_22", 22, false),
}),
},
{
Name: "SuperSchema errors on tag collision",
Schemas: []Schema{sch1, tagCollisionWithSch1},
+63 -27
View File
@@ -239,23 +239,36 @@ func (p DoltDatabaseProvider) databaseForRevision(ctx context.Context, revDB str
return nil, dsess.InitialDbState{}, false, nil
}
if replicaDb, ok := srcDb.(ReadReplicaDatabase); ok {
// TODO move this out of analysis phase, should only happen at read time
err := switchAndFetchReplicaHead(ctx, revSpec, replicaDb)
if err != nil {
return nil, dsess.InitialDbState{}, false, err
}
isBranch, err := isBranch(ctx, srcDb, revSpec)
if err != nil {
return nil, dsess.InitialDbState{}, false, err
}
if isBranch(ctx, srcDb.DbData().Ddb, revSpec) {
if isBranch {
// fetch the upstream head if this is a replicated db
if replicaDb, ok := srcDb.(ReadReplicaDatabase); ok {
// TODO move this out of analysis phase, should only happen at read time
err := switchAndFetchReplicaHead(ctx, revSpec, replicaDb)
if err != nil {
return nil, dsess.InitialDbState{}, false, err
}
}
db, init, err := dbRevisionForBranch(ctx, srcDb, revSpec)
if err != nil {
return nil, dsess.InitialDbState{}, false, err
}
return db, init, true, nil
}
if doltdb.IsValidCommitHash(revSpec) {
// TODO: this should be an interface, not a struct
replicaDb, ok := srcDb.(ReadReplicaDatabase)
if ok {
srcDb = replicaDb.Database
}
srcDb, ok = srcDb.(Database)
if !ok {
return nil, dsess.InitialDbState{}, false, nil
@@ -281,7 +294,8 @@ func (p DoltDatabaseProvider) RevisionDbState(ctx context.Context, revDB string)
return init, nil
}
func (p DoltDatabaseProvider) Function(ctx *sql.Context, name string) (sql.Function, error) {
// Function implements the FunctionProvider interface
func (p DoltDatabaseProvider) Function(_ *sql.Context, name string) (sql.Function, error) {
fn, ok := p.functions[strings.ToLower(name)]
if !ok {
return nil, sql.ErrFunctionNotFound.New(name)
@@ -289,19 +303,26 @@ func (p DoltDatabaseProvider) Function(ctx *sql.Context, name string) (sql.Funct
return fn, nil
}
// TableFunction implements the TableFunctionProvider interface
func (p DoltDatabaseProvider) TableFunction(ctx *sql.Context, name string) (sql.TableFunction, error) {
// currently, only one table function is supported, if we extend this, we should clean this up
// and store table functions in a map, similar to regular functions.
if strings.ToLower(name) == "dolt_diff" {
dtf := &DiffTableFunction{}
return dtf, nil
}
return nil, sql.ErrTableFunctionNotFound.New(name)
}
// switchAndFetchReplicaHead tries to pull the latest version of a branch. Will fail if the branch
// does not exist on the ReadReplicaDatabase's remote. If the target branch is not a replication
// head, the new branch will not be continuously fetched.
func switchAndFetchReplicaHead(ctx context.Context, branch string, db sql.Database) error {
destDb, ok := db.(ReadReplicaDatabase)
if !ok {
return ErrFailedToCastToReplicaDb
}
func switchAndFetchReplicaHead(ctx context.Context, branch string, db ReadReplicaDatabase) error {
branchRef := ref.NewBranchRef(branch)
var branchExists bool
branches, err := destDb.ddb.GetBranches(ctx)
branches, err := db.ddb.GetBranches(ctx)
if err != nil {
return err
}
@@ -314,14 +335,14 @@ func switchAndFetchReplicaHead(ctx context.Context, branch string, db sql.Databa
}
// check whether branch is on remote before creating local tracking branch
cm, err := actions.FetchRemoteBranch(ctx, destDb.tmpDir, destDb.remote, destDb.srcDB, destDb.DbData().Ddb, branchRef, nil, actions.NoopRunProgFuncs, actions.NoopStopProgFuncs)
cm, err := actions.FetchRemoteBranch(ctx, db.tmpDir, db.remote, db.srcDB, db.DbData().Ddb, branchRef, actions.NoopRunProgFuncs, actions.NoopStopProgFuncs)
if err != nil {
return err
}
// create refs/heads/branch dataset
if !branchExists {
err = destDb.ddb.NewBranchAtCommit(ctx, branchRef, cm)
err = db.ddb.NewBranchAtCommit(ctx, branchRef, cm)
if err != nil {
return err
}
@@ -329,13 +350,13 @@ func switchAndFetchReplicaHead(ctx context.Context, branch string, db sql.Databa
// update ReadReplicaRemote with new HEAD
// dolt_replicate_heads configuration remains unchanged
destDb, err = destDb.SetHeadRef(branchRef)
db, err = db.SetHeadRef(branchRef)
if err != nil {
return err
}
// create workingSets/heads/branch and update the working set
err = pullBranches(ctx, destDb, []string{branch})
err = pullBranches(ctx, db, []string{branch})
if err != nil {
return err
}
@@ -343,19 +364,34 @@ func switchAndFetchReplicaHead(ctx context.Context, branch string, db sql.Databa
return nil
}
func isBranch(ctx context.Context, ddb *doltdb.DoltDB, revSpec string) bool {
branches, err := ddb.GetBranches(ctx)
if err != nil {
return false
// isBranch returns whether a branch with the given name is in scope for the database given
func isBranch(ctx context.Context, db SqlDatabase, branchName string) (bool, error) {
var ddbs []*doltdb.DoltDB
if rdb, ok := db.(ReadReplicaDatabase); ok {
remoteDB, err := rdb.remote.GetRemoteDB(ctx, rdb.ddb.Format())
if err != nil {
return false, err
}
ddbs = append(ddbs, rdb.ddb, remoteDB)
} else if ddb, ok := db.(Database); ok {
ddbs = append(ddbs, ddb.ddb)
} else {
return false, fmt.Errorf("unrecognized type of database %T", db)
}
for _, br := range branches {
if revSpec == br.GetPath() {
return true
for _, ddb := range ddbs {
branchExists, err := ddb.HasBranch(ctx, branchName)
if err != nil {
return false, err
}
if branchExists {
return true, nil
}
}
return false
return false, nil
}
func dbRevisionForBranch(ctx context.Context, srcDb SqlDatabase, revSpec string) (SqlDatabase, dsess.InitialDbState, error) {
@@ -113,7 +113,7 @@ func (d DoltPullFunc) Eval(ctx *sql.Context, row sql.Row) (interface{}, error) {
if remoteTrackRef != nil {
// todo: can we pass nil for either of the channels?
srcDBCommit, err := actions.FetchRemoteBranch(ctx, dbData.Rsw.TempTableFilesDir(), pullSpec.Remote, srcDB, dbData.Ddb, pullSpec.Branch, remoteTrackRef, runProgFuncs, stopProgFuncs)
srcDBCommit, err := actions.FetchRemoteBranch(ctx, dbData.Rsw.TempTableFilesDir(), pullSpec.Remote, srcDB, dbData.Ddb, pullSpec.Branch, runProgFuncs, stopProgFuncs)
if err != nil {
return noConflicts, err
}
@@ -37,7 +37,7 @@ var DoltFunctions = []sql.Function{
sql.FunctionN{Name: DoltBranchFuncName, Fn: NewDoltBranchFunc},
}
// These are the DoltFunctions that get exposed to Dolthub Api.
// DolthubApiFunctions are the DoltFunctions that get exposed to Dolthub Api.
var DolthubApiFunctions = []sql.Function{
sql.Function1{Name: HashOfFuncName, Fn: NewHashOf},
sql.Function0{Name: VersionFuncName, Fn: NewVersion},
@@ -0,0 +1,450 @@
// Copyright 2020-2021 Dolthub, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package sqle
import (
"fmt"
"io"
"github.com/dolthub/dolt/go/libraries/doltcore/diff"
"github.com/dolthub/dolt/go/libraries/doltcore/doltdb"
"github.com/dolthub/dolt/go/libraries/doltcore/rowconv"
"github.com/dolthub/dolt/go/libraries/doltcore/schema"
"github.com/dolthub/dolt/go/libraries/doltcore/sqle/dtables"
"github.com/dolthub/dolt/go/libraries/doltcore/sqle/sqlutil"
"github.com/dolthub/dolt/go/store/types"
"github.com/dolthub/go-mysql-server/sql"
"gopkg.in/src-d/go-errors.v1"
)
var ErrInvalidNonLiteralArgument = errors.NewKind("Invalid argument to %s: %s only literal values supported")
var _ sql.TableFunction = (*DiffTableFunction)(nil)
type DiffTableFunction struct {
ctx *sql.Context
tableNameExpr sql.Expression
fromCommitExpr sql.Expression
toCommitExpr sql.Expression
database sql.Database
sqlSch sql.Schema
joiner *rowconv.Joiner
toSch schema.Schema
fromSch schema.Schema
}
// NewInstance implements the TableFunction interface
func (dtf *DiffTableFunction) NewInstance(ctx *sql.Context, database sql.Database, expressions []sql.Expression) (sql.Node, error) {
newInstance := &DiffTableFunction{
ctx: ctx,
database: database,
}
node, err := newInstance.WithExpressions(expressions...)
if err != nil {
return nil, err
}
return node, nil
}
// Database implements the sql.Databaser interface
func (dtf *DiffTableFunction) Database() sql.Database {
return dtf.database
}
// WithDatabase implements the sql.Databaser interface
func (dtf *DiffTableFunction) WithDatabase(database sql.Database) (sql.Node, error) {
dtf.database = database
return dtf, nil
}
// Expressions implements the sql.Expressioner interface
func (dtf *DiffTableFunction) Expressions() []sql.Expression {
return []sql.Expression{
dtf.tableNameExpr, dtf.fromCommitExpr, dtf.toCommitExpr,
}
}
// WithExpressions implements the sql.Expressioner interface
func (dtf *DiffTableFunction) WithExpressions(expression ...sql.Expression) (sql.Node, error) {
if len(expression) != 3 {
return nil, sql.ErrInvalidArgumentNumber.New(dtf.FunctionName(), 3, len(expression))
}
// TODO: For now, we will only support literal / fully-resolved arguments to the
// DiffTableFunction to avoid issues where the schema is needed in the analyzer
// before the arguments could be resolved.
for _, expr := range expression {
if !expr.Resolved() {
return nil, ErrInvalidNonLiteralArgument.New(dtf.FunctionName(), expr.String())
}
}
dtf.tableNameExpr = expression[0]
dtf.fromCommitExpr = expression[1]
dtf.toCommitExpr = expression[2]
tableName, fromCommitVal, toCommitVal, err := dtf.evaluateArguments()
if err != nil {
return nil, err
}
dtf.sqlSch, err = dtf.generateSchema(tableName, fromCommitVal, toCommitVal)
if err != nil {
return nil, err
}
return dtf, nil
}
// Children implements the sql.Node interface
func (dtf *DiffTableFunction) Children() []sql.Node {
return nil
}
// RowIter implements the sql.Node interface
func (dtf *DiffTableFunction) RowIter(ctx *sql.Context, _ sql.Row) (sql.RowIter, error) {
// TODO: When we add support for joining on table functions, we'll need to evaluate this against the
// specified row. That row is what has the left_table context in a join query.
// This will expand the test cases we need to cover significantly.
tableName, fromCommitVal, toCommitVal, err := dtf.evaluateArguments()
if err != nil {
return nil, err
}
if dtf.joiner == nil {
panic("schema and joiner haven't been initialized")
}
sqledb, ok := dtf.database.(Database)
if !ok {
panic("unable to get dolt database")
}
ddb := sqledb.GetDoltDB()
toRoot, toHash, toDate, err := dtf.loadDetailsForRef(ctx, toCommitVal, ddb)
if err != nil {
return nil, err
}
toTable, _, _, err := toRoot.GetTableInsensitive(ctx, tableName)
if err != nil {
return nil, err
}
fromRoot, fromHash, fromDate, err := dtf.loadDetailsForRef(ctx, fromCommitVal, ddb)
if err != nil {
return nil, err
}
fromTable, _, _, err := fromRoot.GetTableInsensitive(ctx, tableName)
if err != nil {
return nil, err
}
dp := dtables.NewDiffPartition(toTable, fromTable, toHash, fromHash, toDate, fromDate, &dtf.toSch, &dtf.fromSch)
return NewDiffTableFunctionRowIterForSinglePartition(*dp, ddb, dtf.joiner), nil
}
// loadDetailsForRef loads the root, hash, and timestamp for the specified ref value
func (dtf *DiffTableFunction) loadDetailsForRef(ctx *sql.Context, ref interface{}, ddb *doltdb.DoltDB) (*doltdb.RootValue, string, *types.Timestamp, error) {
hashStr, ok := ref.(string)
if !ok {
return nil, "", nil, fmt.Errorf("received '%v' when expecting commit hash string", ref)
}
var root *doltdb.RootValue
var commitTime *types.Timestamp
cs, err := doltdb.NewCommitSpec(hashStr)
if err != nil {
return nil, "", nil, err
}
cm, err := ddb.Resolve(ctx, cs, nil)
if err != nil {
return nil, "", nil, err
}
root, err = cm.GetRootValue()
if err != nil {
return nil, "", nil, err
}
meta, err := cm.GetCommitMeta()
if err != nil {
return nil, "", nil, err
}
t := meta.Time()
commitTime = (*types.Timestamp)(&t)
return root, hashStr, commitTime, nil
}
// WithChildren implements the sql.Node interface
func (dtf *DiffTableFunction) WithChildren(node ...sql.Node) (sql.Node, error) {
if len(node) != 0 {
panic("unexpected children")
}
return dtf, nil
}
// CheckPrivileges implements the sql.Node interface
func (dtf *DiffTableFunction) CheckPrivileges(ctx *sql.Context, opChecker sql.PrivilegedOperationChecker) bool {
tableName, _, _, err := dtf.evaluateArguments()
if err != nil {
return false
}
// TODO: Add tests for privilege checking
return opChecker.UserHasPrivileges(ctx,
sql.NewPrivilegedOperation(dtf.database.Name(), tableName, "", sql.PrivilegeType_Select))
}
// evaluateArguments evaluates the argument expressions to turn them into values this DiffTableFunction
// can use. Note that this method only evals the expressions, and doesn't validate the values.
func (dtf *DiffTableFunction) evaluateArguments() (string, interface{}, interface{}, error) {
if !dtf.Resolved() {
return "", nil, nil, nil
}
if !sql.IsText(dtf.tableNameExpr.Type()) {
return "", nil, nil, sql.ErrInvalidArgumentDetails.New(dtf.FunctionName(), dtf.tableNameExpr.String())
}
if !sql.IsText(dtf.fromCommitExpr.Type()) {
return "", nil, nil, sql.ErrInvalidArgumentDetails.New(dtf.FunctionName(), dtf.fromCommitExpr.String())
}
if !sql.IsText(dtf.toCommitExpr.Type()) {
return "", nil, nil, sql.ErrInvalidArgumentDetails.New(dtf.FunctionName(), dtf.toCommitExpr.String())
}
tableNameVal, err := dtf.tableNameExpr.Eval(dtf.ctx, nil)
if err != nil {
return "", nil, nil, err
}
tableName, ok := tableNameVal.(string)
if !ok {
return "", nil, nil, ErrInvalidTableName.New(dtf.tableNameExpr.String())
}
fromCommitVal, err := dtf.fromCommitExpr.Eval(dtf.ctx, nil)
if err != nil {
return "", nil, nil, err
}
toCommitVal, err := dtf.toCommitExpr.Eval(dtf.ctx, nil)
if err != nil {
return "", nil, nil, err
}
return tableName, fromCommitVal, toCommitVal, nil
}
func (dtf *DiffTableFunction) generateSchema(tableName string, fromCommitVal, toCommitVal interface{}) (sql.Schema, error) {
if !dtf.Resolved() {
return nil, nil
}
sqledb, ok := dtf.database.(Database)
if !ok {
panic(fmt.Sprintf("unexpected database type: %T", dtf.database))
}
fromRoot, err := sqledb.rootAsOf(dtf.ctx, fromCommitVal)
if err != nil {
return nil, err
}
fromTable, _, ok, err := fromRoot.GetTableInsensitive(dtf.ctx, tableName)
if err != nil {
return nil, err
}
if !ok {
return nil, sql.ErrTableNotFound.New(tableName)
}
toRoot, err := sqledb.rootAsOf(dtf.ctx, toCommitVal)
if err != nil {
return nil, err
}
toTable, _, ok, err := toRoot.GetTableInsensitive(dtf.ctx, tableName)
if err != nil {
return nil, err
}
if !ok {
return nil, sql.ErrTableNotFound.New(tableName)
}
fromSchema, err := fromTable.GetSchema(dtf.ctx)
if err != nil {
return nil, err
}
toSchema, err := toTable.GetSchema(dtf.ctx)
if err != nil {
return nil, err
}
fromSchema = schema.MustSchemaFromCols(
fromSchema.GetAllCols().Append(
schema.NewColumn("commit", schema.DiffCommitTag, types.StringKind, false),
schema.NewColumn("commit_date", schema.DiffCommitDateTag, types.TimestampKind, false)))
dtf.fromSch = fromSchema
toSchema = schema.MustSchemaFromCols(
toSchema.GetAllCols().Append(
schema.NewColumn("commit", schema.DiffCommitTag, types.StringKind, false),
schema.NewColumn("commit_date", schema.DiffCommitDateTag, types.TimestampKind, false)))
dtf.toSch = toSchema
joiner, err := rowconv.NewJoiner(
[]rowconv.NamedSchema{{Name: diff.To, Sch: toSchema}, {Name: diff.From, Sch: fromSchema}},
map[string]rowconv.ColNamingFunc{
diff.To: diff.ToColNamer,
diff.From: diff.FromColNamer,
})
if err != nil {
return nil, err
}
sch := joiner.GetSchema()
sch = schema.MustSchemaFromCols(
sch.GetAllCols().Append(
schema.NewColumn("diff_type", schema.DiffTypeTag, types.StringKind, false)))
// TODO: sql.Columns include a Source that indicates the table it came from, but we don't have a real table
// when the column comes from a table function, so we omit the table name when we create these columns.
// This allows column projections to work correctly with table functions, but we will need to add a
// unique id (e.g. hash generated from method arguments) when we add support for aliasing and joining
// table functions in order for the analyzer to determine which table function result a column comes from.
sqlSchema, err := sqlutil.FromDoltSchema("", sch)
if err != nil {
return nil, err
}
dtf.joiner = joiner
return sqlSchema.Schema, nil
}
// Schema implements the sql.Node interface
func (dtf *DiffTableFunction) Schema() sql.Schema {
if !dtf.Resolved() {
return nil
}
if dtf.sqlSch == nil {
panic("schema hasn't been generated yet")
}
return dtf.sqlSch
}
// Resolved implements the sql.Resolvable interface
func (dtf *DiffTableFunction) Resolved() bool {
return dtf.tableNameExpr.Resolved() && dtf.fromCommitExpr.Resolved() && dtf.toCommitExpr.Resolved()
}
// String implements the Stringer interface
func (dtf *DiffTableFunction) String() string {
return fmt.Sprintf("DOLT_DIFF(%s, %s, %s)",
dtf.tableNameExpr.String(),
dtf.fromCommitExpr.String(),
dtf.toCommitExpr.String())
}
// FunctionName implements the sql.TableFunction interface
func (dtf *DiffTableFunction) FunctionName() string {
return "dolt_diff"
}
//------------------------------------
// diffTableFunctionRowIter
//------------------------------------
var _ sql.RowIter = (*diffTableFunctionRowIter)(nil)
type diffTableFunctionRowIter struct {
diffPartitions *dtables.DiffPartitions
ddb *doltdb.DoltDB
joiner *rowconv.Joiner
currentPartition *sql.Partition
currentRowIter *sql.RowIter
}
func NewDiffTableFunctionRowIter(partitions *dtables.DiffPartitions, ddb *doltdb.DoltDB, joiner *rowconv.Joiner) *diffTableFunctionRowIter {
return &diffTableFunctionRowIter{
diffPartitions: partitions,
ddb: ddb,
joiner: joiner,
}
}
func NewDiffTableFunctionRowIterForSinglePartition(partition sql.Partition, ddb *doltdb.DoltDB, joiner *rowconv.Joiner) *diffTableFunctionRowIter {
return &diffTableFunctionRowIter{
currentPartition: &partition,
ddb: ddb,
joiner: joiner,
}
}
func (itr *diffTableFunctionRowIter) Next(ctx *sql.Context) (sql.Row, error) {
for {
if itr.currentPartition == nil {
nextPartition, err := itr.diffPartitions.Next(ctx)
if err != nil {
return nil, err
}
itr.currentPartition = &nextPartition
}
if itr.currentRowIter == nil {
dp := (*itr.currentPartition).(dtables.DiffPartition)
rowIter, err := dp.GetRowIter(ctx, itr.ddb, itr.joiner)
if err != nil {
return nil, err
}
itr.currentRowIter = &rowIter
}
row, err := (*itr.currentRowIter).Next(ctx)
if err == io.EOF {
itr.currentPartition = nil
itr.currentRowIter = nil
if itr.diffPartitions == nil {
return nil, err
}
continue
} else if err != nil {
return nil, err
} else {
return row, nil
}
}
}
func (itr *diffTableFunctionRowIter) Close(_ *sql.Context) error {
return nil
}
@@ -15,7 +15,6 @@
package dtables
import (
"context"
"errors"
"fmt"
"io"
@@ -42,37 +41,34 @@ var _ sql.FilteredTable = (*CommitDiffTable)(nil)
type CommitDiffTable struct {
name string
ddb *doltdb.DoltDB
ss *schema.SuperSchema
joiner *rowconv.Joiner
sqlSch sql.PrimaryKeySchema
workingRoot *doltdb.RootValue
fromCommitFilter *expression.Equals
toCommitFilter *expression.Equals
requiredFilterErr error
targetSchema *schema.Schema
}
func NewCommitDiffTable(ctx *sql.Context, tblName string, ddb *doltdb.DoltDB, root *doltdb.RootValue) (sql.Table, error) {
tblName, ok, err := root.ResolveTableName(ctx, tblName)
diffTblName := doltdb.DoltCommitDiffTablePrefix + tblName
table, _, ok, err := root.GetTableInsensitive(ctx, tblName)
if err != nil {
return nil, err
}
if !ok {
return nil, sql.ErrTableNotFound.New(doltdb.DoltCommitDiffTablePrefix + tblName)
return nil, sql.ErrTableNotFound.New(diffTblName)
}
diffTblName := doltdb.DoltCommitDiffTablePrefix + tblName
ss, err := calcSuperDuperSchema(ctx, ddb, root, tblName)
sch, err := table.GetSchema(ctx)
if err != nil {
return nil, err
}
_ = ss.AddColumn(schema.NewColumn("commit", schema.DiffCommitTag, types.StringKind, false))
_ = ss.AddColumn(schema.NewColumn("commit_date", schema.DiffCommitDateTag, types.TimestampKind, false))
sch, err := ss.GenerateSchema()
if err != nil {
return nil, err
}
sch = schema.MustSchemaFromCols(sch.GetAllCols().Append(
schema.NewColumn("commit", schema.DiffCommitTag, types.StringKind, false),
schema.NewColumn("commit_date", schema.DiffCommitDateTag, types.TimestampKind, false)))
if sch.GetAllCols().Size() <= 1 {
return nil, sql.ErrTableNotFound.New(diffTblName)
@@ -81,8 +77,8 @@ func NewCommitDiffTable(ctx *sql.Context, tblName string, ddb *doltdb.DoltDB, ro
j, err := rowconv.NewJoiner(
[]rowconv.NamedSchema{{Name: diff.To, Sch: sch}, {Name: diff.From, Sch: sch}},
map[string]rowconv.ColNamingFunc{
diff.To: toNamer,
diff.From: fromNamer,
diff.To: diff.ToColNamer,
diff.From: diff.FromColNamer,
})
if err != nil {
return nil, err
@@ -101,70 +97,15 @@ func NewCommitDiffTable(ctx *sql.Context, tblName string, ddb *doltdb.DoltDB, ro
})
return &CommitDiffTable{
name: tblName,
ddb: ddb,
workingRoot: root,
ss: ss,
joiner: j,
sqlSch: sqlSch,
name: tblName,
ddb: ddb,
workingRoot: root,
joiner: j,
sqlSch: sqlSch,
targetSchema: &sch,
}, nil
}
func calcSuperDuperSchema(ctx context.Context, ddb *doltdb.DoltDB, working *doltdb.RootValue, tblName string) (*schema.SuperSchema, error) {
refs, err := ddb.GetBranches(ctx)
if err != nil {
return nil, err
}
var superSchemas []*schema.SuperSchema
ss, found, err := working.GetSuperSchema(ctx, tblName)
if err != nil {
return nil, err
}
if found {
superSchemas = append(superSchemas, ss)
}
for _, ref := range refs {
cm, err := ddb.ResolveCommitRef(ctx, ref)
if err != nil {
return nil, err
}
cmRoot, err := cm.GetRootValue()
if err != nil {
return nil, err
}
ss, found, err = cmRoot.GetSuperSchema(ctx, tblName)
if err != nil {
return nil, err
}
if found {
superSchemas = append(superSchemas, ss)
}
}
if len(superSchemas) == 0 {
return nil, sql.ErrTableNotFound.New(tblName)
}
superDuperSchema, err := schema.SuperSchemaUnion(superSchemas...)
if err != nil {
return nil, err
}
return superDuperSchema, nil
}
func (dt *CommitDiffTable) Name() string {
return doltdb.DoltCommitDiffTablePrefix + dt.name
}
@@ -217,37 +158,38 @@ func (dt *CommitDiffTable) Partitions(ctx *sql.Context) (sql.PartitionIter, erro
return nil, fmt.Errorf("error querying table %s: %w", dt.Name(), ErrExactlyOneFromCommit)
}
toRoot, toName, toDate, err := dt.rootValForFilter(ctx, dt.toCommitFilter)
toRoot, toHash, toDate, err := dt.rootValForFilter(ctx, dt.toCommitFilter)
if err != nil {
return nil, err
}
fromRoot, fromName, fromDate, err := dt.rootValForFilter(ctx, dt.fromCommitFilter)
fromRoot, fromHash, fromDate, err := dt.rootValForFilter(ctx, dt.fromCommitFilter)
if err != nil {
return nil, err
}
toTable, _, err := toRoot.GetTable(ctx, dt.name)
toTable, _, _, err := toRoot.GetTableInsensitive(ctx, dt.name)
if err != nil {
return nil, err
}
fromTable, _, err := fromRoot.GetTable(ctx, dt.name)
fromTable, _, _, err := fromRoot.GetTableInsensitive(ctx, dt.name)
if err != nil {
return nil, err
}
dp := diffPartition{
dp := DiffPartition{
to: toTable,
from: fromTable,
toName: toName,
fromName: fromName,
toName: toHash,
fromName: fromHash,
toDate: toDate,
fromDate: fromDate,
toSch: dt.targetSchema,
fromSch: dt.targetSchema,
}
isDiffable, err := dp.isDiffablePartition(ctx)
if err != nil {
return nil, err
}
@@ -361,14 +303,11 @@ func (dt *CommitDiffTable) Filters() []sql.Expression {
}
// WithFilters returns a new sql.Table instance with the filters applied
func (dt *CommitDiffTable) WithFilters(ctx *sql.Context, filters []sql.Expression) sql.Table {
func (dt *CommitDiffTable) WithFilters(_ *sql.Context, _ []sql.Expression) sql.Table {
return dt
}
func (dt *CommitDiffTable) PartitionRows(ctx *sql.Context, part sql.Partition) (sql.RowIter, error) {
dp := part.(diffPartition)
// TODO: commit_diff_table reuses diffPartition from diff_table and we've switched diff_table over
// to a new format. After we switch commit_diff_table over to the same new format, we can
// remove this getLegacyRowIter method.
return dp.getLegacyRowIter(ctx, dt.ddb, dt.ss, dt.joiner)
dp := part.(DiffPartition)
return dp.GetRowIter(ctx, dt.ddb, dt.joiner)
}
+85 -116
View File
@@ -46,14 +46,6 @@ const (
diffTypeRemoved = "removed"
)
func toNamer(name string) string {
return diff.To + "_" + name
}
func fromNamer(name string) string {
return diff.From + "_" + name
}
var _ sql.Table = (*DiffTable)(nil)
var _ sql.FilteredTable = (*DiffTable)(nil)
@@ -102,8 +94,8 @@ func NewDiffTable(ctx *sql.Context, tblName string, ddb *doltdb.DoltDB, root *do
j, err := rowconv.NewJoiner(
[]rowconv.NamedSchema{{Name: diff.To, Sch: sch}, {Name: diff.From, Sch: sch}},
map[string]rowconv.ColNamingFunc{
diff.To: toNamer,
diff.From: fromNamer,
diff.To: diff.ToColNamer,
diff.From: diff.FromColNamer,
})
if err != nil {
return nil, err
@@ -149,7 +141,7 @@ func (dt *DiffTable) Schema() sql.Schema {
func (dt *DiffTable) Partitions(ctx *sql.Context) (sql.PartitionIter, error) {
cmItr := doltdb.CommitItrForRoots(dt.ddb, dt.head)
sf, err := selectFuncForFilters(dt.ddb.Format(), dt.partitionFilters)
sf, err := SelectFuncForFilters(dt.ddb.Format(), dt.partitionFilters)
if err != nil {
return nil, err
}
@@ -173,20 +165,21 @@ func (dt *DiffTable) Partitions(ctx *sql.Context) (sql.PartitionIter, error) {
return nil, err
}
cmHashToTblInfo := make(map[hash.Hash]tblInfoAtCommit)
cmHashToTblInfo[cmHash] = tblInfoAtCommit{"WORKING", nil, t, wrTblHash}
cmHashToTblInfo := make(map[hash.Hash]TblInfoAtCommit)
cmHashToTblInfo[cmHash] = TblInfoAtCommit{"WORKING", nil, t, wrTblHash}
err = cmItr.Reset(ctx)
if err != nil {
return nil, err
}
return &diffPartitions{
return &DiffPartitions{
tblName: exactName,
cmItr: cmItr,
cmHashToTblInfo: cmHashToTblInfo,
selectFunc: sf,
targetSch: dt.targetSch,
toSch: dt.targetSch,
fromSch: dt.targetSch,
}, nil
}
@@ -208,7 +201,7 @@ func (dt *DiffTable) Filters() []sql.Expression {
}
// WithFilters returns a new sql.Table instance with the filters applied
func (dt *DiffTable) WithFilters(ctx *sql.Context, filters []sql.Expression) sql.Table {
func (dt *DiffTable) WithFilters(_ *sql.Context, filters []sql.Expression) sql.Table {
if dt.partitionFilters == nil {
dt.partitionFilters, dt.rowFilters = splitPartitionFilters(filters)
}
@@ -217,8 +210,8 @@ func (dt *DiffTable) WithFilters(ctx *sql.Context, filters []sql.Expression) sql
}
func (dt *DiffTable) PartitionRows(ctx *sql.Context, part sql.Partition) (sql.RowIter, error) {
dp := part.(diffPartition)
return dp.getRowIter(ctx, dt.ddb, dt.joiner)
dp := part.(DiffPartition)
return dp.GetRowIter(ctx, dt.ddb, dt.joiner)
}
// tableData returns the map of primary key to values for the specified table (or an empty map if the tbl is null)
@@ -343,35 +336,51 @@ func (itr *diffRowItr) Close(*sql.Context) (err error) {
return nil
}
type tblInfoAtCommit struct {
type TblInfoAtCommit struct {
name string
date *types.Timestamp
tbl *doltdb.Table
tblHash hash.Hash
}
// data partitioned into pairs of table states which get compared
type diffPartition struct {
to *doltdb.Table
from *doltdb.Table
toName string
fromName string
toDate *types.Timestamp
fromDate *types.Timestamp
targetSch schema.Schema
func NewTblInfoAtCommit(name string, date *types.Timestamp, tbl *doltdb.Table, tblHash hash.Hash) TblInfoAtCommit {
return TblInfoAtCommit{
name, date, tbl, tblHash,
}
}
func (dp diffPartition) Key() []byte {
var _ sql.Partition = (*DiffPartition)(nil)
// DiffPartition data partitioned into pairs of table states which get compared
type DiffPartition struct {
to *doltdb.Table
from *doltdb.Table
toName string
fromName string
toDate *types.Timestamp
fromDate *types.Timestamp
toSch *schema.Schema
fromSch *schema.Schema
}
func NewDiffPartition(to, from *doltdb.Table, toName, fromName string, toDate, fromDate *types.Timestamp, toSch, fromSch *schema.Schema) *DiffPartition {
return &DiffPartition{
to: to,
from: from,
toName: toName,
fromName: fromName,
toDate: toDate,
fromDate: fromDate,
toSch: toSch,
fromSch: fromSch,
}
}
func (dp DiffPartition) Key() []byte {
return []byte(dp.toName + dp.fromName)
}
// getLegacyRowIter returns a row iterator for this diffPartition, using older logic that disambiguates column
// names with unique tag suffixes when collisions occur. No new code should use this function.
//
// TODO: This legacy method is still used by commit_diff_table until we switch it over to the new, simplified
// diff format that diff_table just switched to. Once we switch commit_diff_table over, we should
// remove this legacy function.
func (dp diffPartition) getLegacyRowIter(ctx *sql.Context, ddb *doltdb.DoltDB, ss *schema.SuperSchema, joiner *rowconv.Joiner) (sql.RowIter, error) {
func (dp DiffPartition) GetRowIter(ctx *sql.Context, ddb *doltdb.DoltDB, joiner *rowconv.Joiner) (sql.RowIter, error) {
fromData, fromSch, err := tableData(ctx, dp.from, ddb)
if err != nil {
@@ -384,65 +393,13 @@ func (dp diffPartition) getLegacyRowIter(ctx *sql.Context, ddb *doltdb.DoltDB, s
return nil, err
}
fromConv, err := rowConvForSchema(ctx, ddb.ValueReadWriter(), ss, fromSch)
fromConv, err := dp.rowConvForSchema(ctx, ddb.ValueReadWriter(), *dp.fromSch, fromSch)
if err != nil {
return nil, err
}
toConv, err := rowConvForSchema(ctx, ddb.ValueReadWriter(), ss, toSch)
if err != nil {
return nil, err
}
sch := joiner.GetSchema()
toCol, _ := sch.GetAllCols().GetByName(toCommit)
fromCol, _ := sch.GetAllCols().GetByName(fromCommit)
toDateCol, _ := sch.GetAllCols().GetByName(toCommitDate)
fromDateCol, _ := sch.GetAllCols().GetByName(fromCommitDate)
fromCmInfo := commitInfo{types.String(dp.fromName), dp.fromDate, fromCol.Tag, fromDateCol.Tag}
toCmInfo := commitInfo{types.String(dp.toName), dp.toDate, toCol.Tag, toDateCol.Tag}
rd := diff.NewRowDiffer(ctx, fromSch, toSch, 1024)
rd.Start(ctx, fromData, toData)
// For the LegacyRowIter codepath, we don't want to change behavior,
// so we don't pass in a WarnFunction to ensure we get the old behavior.
src := diff.NewRowDiffSource(rd, joiner, nil)
src.AddInputRowConversion(fromConv, toConv)
return &diffRowItr{
ad: rd,
diffSrc: src,
joiner: joiner,
sch: joiner.GetSchema(),
fromCommitInfo: fromCmInfo,
toCommitInfo: toCmInfo,
}, nil
}
func (dp diffPartition) getRowIter(ctx *sql.Context, ddb *doltdb.DoltDB, joiner *rowconv.Joiner) (sql.RowIter, error) {
fromData, fromSch, err := tableData(ctx, dp.from, ddb)
if err != nil {
return nil, err
}
toData, toSch, err := tableData(ctx, dp.to, ddb)
if err != nil {
return nil, err
}
fromConv, err := dp.rowConvForSchema(ctx, ddb.ValueReadWriter(), dp.targetSch, fromSch)
if err != nil {
return nil, err
}
toConv, err := dp.rowConvForSchema(ctx, ddb.ValueReadWriter(), dp.targetSch, toSch)
toConv, err := dp.rowConvForSchema(ctx, ddb.ValueReadWriter(), *dp.toSch, toSch)
if err != nil {
return nil, err
@@ -480,7 +437,7 @@ func (dp diffPartition) getRowIter(ctx *sql.Context, ddb *doltdb.DoltDB, joiner
// isDiffablePartition checks if the commit pair for this partition is "diffable".
// If the primary key sets changed between the two commits, it may not be
// possible to diff them.
func (dp *diffPartition) isDiffablePartition(ctx *sql.Context) (bool, error) {
func (dp *DiffPartition) isDiffablePartition(ctx *sql.Context) (bool, error) {
// dp.from is nil when the to commit created a new table
if dp.from == nil {
return true, nil
@@ -506,9 +463,9 @@ func (dp *diffPartition) isDiffablePartition(ctx *sql.Context) (bool, error) {
return schema.ArePrimaryKeySetsDiffable(fromSch, toSch), nil
}
type partitionSelectFunc func(*sql.Context, diffPartition) (bool, error)
type partitionSelectFunc func(*sql.Context, DiffPartition) (bool, error)
func selectFuncForFilters(nbf *types.NomsBinFormat, filters []sql.Expression) (partitionSelectFunc, error) {
func SelectFuncForFilters(nbf *types.NomsBinFormat, filters []sql.Expression) (partitionSelectFunc, error) {
const (
toCommitTag uint64 = iota
fromCommitTag
@@ -529,7 +486,7 @@ func selectFuncForFilters(nbf *types.NomsBinFormat, filters []sql.Expression) (p
return nil, err
}
return func(ctx *sql.Context, partition diffPartition) (bool, error) {
return func(ctx *sql.Context, partition DiffPartition) (bool, error) {
vals := row.TaggedValues{
toCommitTag: types.String(partition.toName),
fromCommitTag: types.String(partition.fromName),
@@ -547,27 +504,39 @@ func selectFuncForFilters(nbf *types.NomsBinFormat, filters []sql.Expression) (p
}, nil
}
var _ sql.PartitionIter = &diffPartitions{}
var _ sql.PartitionIter = &DiffPartitions{}
// collection of partitions. Implements PartitionItr
type diffPartitions struct {
// DiffPartitions a collection of partitions. Implements PartitionItr
type DiffPartitions struct {
tblName string
cmItr doltdb.CommitItr
cmHashToTblInfo map[hash.Hash]tblInfoAtCommit
cmHashToTblInfo map[hash.Hash]TblInfoAtCommit
selectFunc partitionSelectFunc
targetSch schema.Schema
toSch schema.Schema
fromSch schema.Schema
}
// called in a commit iteration loop. Adds partitions when it finds a commit and it's parent that have different values
// for the hash of the table being looked at.
func (dp *diffPartitions) processCommit(ctx *sql.Context, cmHash hash.Hash, cm *doltdb.Commit, root *doltdb.RootValue, tbl *doltdb.Table) (*diffPartition, error) {
tblHash, _, err := root.GetTableHash(ctx, dp.tblName)
func NewDiffPartitions(tblName string, cmItr doltdb.CommitItr, cmHashToTblInfo map[hash.Hash]TblInfoAtCommit, selectFunc partitionSelectFunc, toSch, fromSch schema.Schema) *DiffPartitions {
return &DiffPartitions{
tblName: tblName,
cmItr: cmItr,
cmHashToTblInfo: cmHashToTblInfo,
selectFunc: selectFunc,
toSch: toSch,
fromSch: fromSch,
}
}
// processCommit is called in a commit iteration loop. Adds partitions when it finds a commit and its parent that have
// different values for the hash of the table being looked at.
func (dps *DiffPartitions) processCommit(ctx *sql.Context, cmHash hash.Hash, cm *doltdb.Commit, root *doltdb.RootValue, tbl *doltdb.Table) (*DiffPartition, error) {
tblHash, _, err := root.GetTableHash(ctx, dps.tblName)
if err != nil {
return nil, err
}
toInfoForCommit := dp.cmHashToTblInfo[cmHash]
toInfoForCommit := dps.cmHashToTblInfo[cmHash]
cmHashStr := cmHash.String()
meta, err := cm.GetCommitMeta()
@@ -577,10 +546,10 @@ func (dp *diffPartitions) processCommit(ctx *sql.Context, cmHash hash.Hash, cm *
ts := types.Timestamp(meta.Time())
var nextPartition *diffPartition
var nextPartition *DiffPartition
if tblHash != toInfoForCommit.tblHash {
partition := diffPartition{toInfoForCommit.tbl, tbl, toInfoForCommit.name, cmHashStr, toInfoForCommit.date, &ts, dp.targetSch}
selected, err := dp.selectFunc(ctx, partition)
partition := DiffPartition{toInfoForCommit.tbl, tbl, toInfoForCommit.name, cmHashStr, toInfoForCommit.date, &ts, &dps.toSch, &dps.fromSch}
selected, err := dps.selectFunc(ctx, partition)
if err != nil {
return nil, err
@@ -591,7 +560,7 @@ func (dp *diffPartitions) processCommit(ctx *sql.Context, cmHash hash.Hash, cm *
}
}
newInfo := tblInfoAtCommit{cmHashStr, &ts, tbl, tblHash}
newInfo := TblInfoAtCommit{cmHashStr, &ts, tbl, tblHash}
parentHashes, err := cm.ParentHashes(ctx)
if err != nil {
@@ -599,15 +568,15 @@ func (dp *diffPartitions) processCommit(ctx *sql.Context, cmHash hash.Hash, cm *
}
for _, h := range parentHashes {
dp.cmHashToTblInfo[h] = newInfo
dps.cmHashToTblInfo[h] = newInfo
}
return nextPartition, nil
}
func (dp *diffPartitions) Next(ctx *sql.Context) (sql.Partition, error) {
func (dps *DiffPartitions) Next(ctx *sql.Context) (sql.Partition, error) {
for {
cmHash, cm, err := dp.cmItr.Next(ctx)
cmHash, cm, err := dps.cmItr.Next(ctx)
if err != nil {
return nil, err
}
@@ -618,13 +587,13 @@ func (dp *diffPartitions) Next(ctx *sql.Context) (sql.Partition, error) {
return nil, err
}
tbl, _, _, err := root.GetTableInsensitive(ctx, dp.tblName)
tbl, _, _, err := root.GetTableInsensitive(ctx, dps.tblName)
if err != nil {
return nil, err
}
next, err := dp.processCommit(ctx, cmHash, cm, root, tbl)
next, err := dps.processCommit(ctx, cmHash, cm, root, tbl)
if err != nil {
return nil, err
@@ -647,12 +616,12 @@ func (dp *diffPartitions) Next(ctx *sql.Context) (sql.Partition, error) {
}
}
func (dp *diffPartitions) Close(*sql.Context) error {
func (dps *DiffPartitions) Close(*sql.Context) error {
return nil
}
// rowConvForSchema creates a RowConverter for transforming rows with the given schema to this super schema.
func (dp diffPartition) rowConvForSchema(ctx context.Context, vrw types.ValueReadWriter, targetSch, srcSch schema.Schema) (*rowconv.RowConverter, error) {
// rowConvForSchema creates a RowConverter for transforming rows with the given schema a target schema.
func (dp DiffPartition) rowConvForSchema(ctx context.Context, vrw types.ValueReadWriter, targetSch, srcSch schema.Schema) (*rowconv.RowConverter, error) {
if schema.SchemasAreEqual(srcSch, schema.EmptySchema) {
return rowconv.IdentityConverter, nil
}
@@ -52,17 +52,17 @@ var _ sql.FilteredTable = (*HistoryTable)(nil)
type HistoryTable struct {
name string
ddb *doltdb.DoltDB
ss *schema.SuperSchema
sqlSch sql.PrimaryKeySchema
commitFilters []sql.Expression
rowFilters []sql.Expression
cmItr doltdb.CommitItr
readerCreateFuncCache *ThreadSafeCRFuncCache
sqlSch sql.PrimaryKeySchema
targetSch schema.Schema
}
// NewHistoryTable creates a history table
func NewHistoryTable(ctx *sql.Context, tblName string, ddb *doltdb.DoltDB, root *doltdb.RootValue, head *doltdb.Commit) (sql.Table, error) {
tblName, ok, err := root.ResolveTableName(ctx, tblName)
table, tblName, ok, err := root.GetTableInsensitive(ctx, tblName)
if err != nil {
return nil, err
}
@@ -70,26 +70,22 @@ func NewHistoryTable(ctx *sql.Context, tblName string, ddb *doltdb.DoltDB, root
return nil, sql.ErrTableNotFound.New(doltdb.DoltHistoryTablePrefix + tblName)
}
ss, err := calcSuperSchema(ctx, root, tblName)
currentSch, err := table.GetSchema(ctx)
if err != nil {
return nil, err
}
_ = ss.AddColumn(schema.NewColumn(CommitHashCol, schema.HistoryCommitHashTag, types.StringKind, false))
_ = ss.AddColumn(schema.NewColumn(CommitterCol, schema.HistoryCommitterTag, types.StringKind, false))
_ = ss.AddColumn(schema.NewColumn(CommitDateCol, schema.HistoryCommitDateTag, types.TimestampKind, false))
sch, err := ss.GenerateSchema()
if err != nil {
return nil, err
}
sch := schema.MustSchemaFromCols(currentSch.GetAllCols().Append(
schema.NewColumn(CommitHashCol, schema.HistoryCommitHashTag, types.StringKind, false),
schema.NewColumn(CommitterCol, schema.HistoryCommitterTag, types.StringKind, false),
schema.NewColumn(CommitDateCol, schema.HistoryCommitDateTag, types.TimestampKind, false),
))
if sch.GetAllCols().Size() <= 3 {
return nil, sql.ErrTableNotFound.New(doltdb.DoltHistoryTablePrefix + tblName)
}
tableName := doltdb.DoltHistoryTablePrefix + tblName
sqlSch, err := sqlutil.FromDoltSchema(tableName, sch)
sqlSch, err := sqlutil.FromDoltSchema(doltdb.DoltHistoryTablePrefix+tblName, sch)
if err != nil {
return nil, err
}
@@ -98,10 +94,10 @@ func NewHistoryTable(ctx *sql.Context, tblName string, ddb *doltdb.DoltDB, root
return &HistoryTable{
name: tblName,
ddb: ddb,
ss: ss,
sqlSch: sqlSch,
cmItr: cmItr,
readerCreateFuncCache: NewThreadSafeCRFuncCache(),
targetSch: sch,
}, nil
}
@@ -245,7 +241,7 @@ func (ht *HistoryTable) String() string {
return doltdb.DoltHistoryTablePrefix + ht.name
}
// Schema returns the schema for the history table, which will be the super set of the schemas from the history
// Schema returns the schema for the history table
func (ht *HistoryTable) Schema() sql.Schema {
return ht.sqlSch.Schema
}
@@ -259,7 +255,7 @@ func (ht *HistoryTable) Partitions(ctx *sql.Context) (sql.PartitionIter, error)
func (ht *HistoryTable) PartitionRows(ctx *sql.Context, part sql.Partition) (sql.RowIter, error) {
cp := part.(*commitPartition)
return newRowItrForTableAtCommit(ctx, cp.h, cp.cm, ht.name, ht.ss, ht.rowFilters, ht.readerCreateFuncCache)
return newRowItrForTableAtCommit(ctx, cp.h, cp.cm, ht.name, ht.targetSch, ht.rowFilters, ht.readerCreateFuncCache)
}
// commitPartition is a single commit
@@ -295,11 +291,11 @@ func (cp commitPartitioner) Close(*sql.Context) error {
}
type rowItrForTableAtCommit struct {
rd table.TableReadCloser
sch schema.Schema
toSuperSchConv *rowconv.RowConverter
extraVals map[uint64]types.Value
empty bool
rd table.TableReadCloser
sch schema.Schema
rowConverter *rowconv.RowConverter
extraVals map[uint64]types.Value
empty bool
}
func newRowItrForTableAtCommit(
@@ -307,27 +303,23 @@ func newRowItrForTableAtCommit(
h hash.Hash,
cm *doltdb.Commit,
tblName string,
ss *schema.SuperSchema,
sch schema.Schema,
filters []sql.Expression,
readerCreateFuncCache *ThreadSafeCRFuncCache) (*rowItrForTableAtCommit, error) {
root, err := cm.GetRootValue()
if err != nil {
return nil, err
}
tbl, _, ok, err := root.GetTableInsensitive(ctx, tblName)
if err != nil {
return nil, err
}
if !ok {
return &rowItrForTableAtCommit{empty: true}, nil
}
m, err := tbl.GetNomsRowData(ctx)
if err != nil {
return nil, err
}
@@ -342,8 +334,7 @@ func newRowItrForTableAtCommit(
return nil, err
}
toSuperSchConv, err := rowConvForSchema(ctx, tbl.ValueReadWriter(), ss, tblSch)
rowConverter, err := rowConvForSchema(ctx, tbl.ValueReadWriter(), sch, tblSch)
if err != nil {
return nil, err
}
@@ -352,41 +343,32 @@ func newRowItrForTableAtCommit(
// with the unified indexing path that all other tables use. This logic existed before there was a
// reasonable way to apply multiple filter conditions to an indexed table scan.
createReaderFunc, err := readerCreateFuncCache.GetOrCreate(schHash, tbl.Format(), tblSch, filters)
if err != nil {
return nil, err
}
rd, err := createReaderFunc(ctx, m)
if err != nil {
return nil, err
}
sch, err := ss.GenerateSchema()
if err != nil {
return nil, err
}
hashCol, hashOK := sch.GetAllCols().GetByName(CommitHashCol)
dateCol, dateOK := sch.GetAllCols().GetByName(CommitDateCol)
committerCol, commiterOK := sch.GetAllCols().GetByName(CommitterCol)
committerCol, committerOK := sch.GetAllCols().GetByName(CommitterCol)
if !hashOK || !dateOK || !commiterOK {
panic("Bug: History table super schema should always have commit_hash")
if !hashOK || !dateOK || !committerOK {
panic("Bug: History table schema should always have commit_hash")
}
meta, err := cm.GetCommitMeta()
if err != nil {
return nil, err
}
return &rowItrForTableAtCommit{
rd: rd,
sch: sch,
toSuperSchConv: toSuperSchConv,
rd: rd,
sch: sch,
rowConverter: rowConverter,
extraVals: map[uint64]types.Value{
hashCol.Tag: types.String(h.String()),
dateCol.Tag: types.Timestamp(meta.Time()),
@@ -409,7 +391,7 @@ func (tblItr *rowItrForTableAtCommit) Next(ctx *sql.Context) (sql.Row, error) {
return nil, err
}
r, err = tblItr.toSuperSchConv.Convert(r)
r, err = tblItr.rowConverter.Convert(r)
if err != nil {
return nil, err
@@ -435,35 +417,13 @@ func (tblItr *rowItrForTableAtCommit) Close(ctx *sql.Context) error {
return nil
}
func calcSuperSchema(ctx context.Context, wr *doltdb.RootValue, tblName string) (*schema.SuperSchema, error) {
ss, found, err := wr.GetSuperSchema(ctx, tblName)
if err != nil {
return nil, err
} else if !found {
return nil, doltdb.ErrTableNotFound
}
return ss, nil
}
// rowConvForSchema creates a RowConverter for transforming rows with the given schema to this super schema.
func rowConvForSchema(ctx context.Context, vrw types.ValueReadWriter, ss *schema.SuperSchema, sch schema.Schema) (*rowconv.RowConverter, error) {
// rowConvForSchema creates a RowConverter for transforming rows with the given schema to the target schema.
func rowConvForSchema(ctx context.Context, vrw types.ValueReadWriter, targetSch schema.Schema, sch schema.Schema) (*rowconv.RowConverter, error) {
if schema.SchemasAreEqual(sch, schema.EmptySchema) {
return rowconv.IdentityConverter, nil
}
inNameToOutName, err := ss.NameMapForSchema(sch)
if err != nil {
return nil, err
}
ssch, err := ss.GenerateSchema()
if err != nil {
return nil, err
}
fm, err := rowconv.NameMapping(sch, ssch, inNameToOutName)
fm, err := rowconv.TagMappingWithNameFallback(sch, targetSch)
if err != nil {
return nil, err
}
@@ -407,17 +407,6 @@ func TestDoltMerge(t *testing.T) {
}
}
func TestScopedDoltHistorySystemTables(t *testing.T) {
harness := newDoltHarness(t)
for _, test := range ScopedDoltHistoryScriptTests {
databases := harness.NewDatabases("mydb")
engine := enginetest.NewEngineWithDbs(t, harness, databases)
t.Run(test.Name, func(t *testing.T) {
enginetest.TestScriptWithEngine(t, engine, harness, test)
})
}
}
// TestSingleTransactionScript is a convenience method for debugging a single transaction test. Unskip and set to the
// desired test.
func TestSingleTransactionScript(t *testing.T) {
@@ -504,13 +493,15 @@ func TestSingleTransactionScript(t *testing.T) {
enginetest.TestTransactionScript(t, newDoltHarness(t), script)
}
func TestSystemTableQueries(t *testing.T) {
func TestBrokenSystemTableQueries(t *testing.T) {
t.Skip()
enginetest.RunQueryTests(t, newDoltHarness(t), BrokenSystemTableQueries)
}
func TestUnscopedDoltDiffSystemTable(t *testing.T) {
func TestHistorySystemTable(t *testing.T) {
harness := newDoltHarness(t)
for _, test := range UnscopedDiffTableTests {
for _, test := range HistorySystemTableScriptTests {
databases := harness.NewDatabases("mydb")
engine := enginetest.NewEngineWithDbs(t, harness, databases)
t.Run(test.Name, func(t *testing.T) {
@@ -519,9 +510,43 @@ func TestUnscopedDoltDiffSystemTable(t *testing.T) {
}
}
func TestDoltDiffSystemTable(t *testing.T) {
func TestUnscopedDiffSystemTable(t *testing.T) {
harness := newDoltHarness(t)
for _, test := range DiffTableTests {
for _, test := range UnscopedDiffSystemTableScriptTests {
databases := harness.NewDatabases("mydb")
engine := enginetest.NewEngineWithDbs(t, harness, databases)
t.Run(test.Name, func(t *testing.T) {
enginetest.TestScriptWithEngine(t, engine, harness, test)
})
}
}
func TestDiffTableFunction(t *testing.T) {
harness := newDoltHarness(t)
for _, test := range DiffTableFunctionScriptTests {
databases := harness.NewDatabases("mydb")
engine := enginetest.NewEngineWithDbs(t, harness, databases)
t.Run(test.Name, func(t *testing.T) {
enginetest.TestScriptWithEngine(t, engine, harness, test)
})
}
}
func TestCommitDiffSystemTable(t *testing.T) {
harness := newDoltHarness(t)
for _, test := range CommitDiffSystemTableScriptTests {
databases := harness.NewDatabases("mydb")
engine := enginetest.NewEngineWithDbs(t, harness, databases)
t.Run(test.Name, func(t *testing.T) {
enginetest.TestScriptWithEngine(t, engine, harness, test)
})
}
}
func TestDiffSystemTable(t *testing.T) {
harness := newDoltHarness(t)
for _, test := range DiffSystemTableScriptTests {
databases := harness.NewDatabases("mydb")
engine := enginetest.NewEngineWithDbs(t, harness, databases)
t.Run(test.Name, func(t *testing.T) {
@@ -92,7 +92,6 @@ var defaultSkippedQueries = []string{
"json_arrayagg", // TODO: aggregation ordering
"json_objectagg", // TODO: aggregation ordering
"typestable", // Bit type isn't working?
"dolt_commit_diff_", // see broken queries in `dolt_system_table_queries.go`
"show global variables like", // we set extra variables
}
+828 -13
View File
@@ -19,8 +19,8 @@ import (
"github.com/dolthub/go-mysql-server/sql"
"github.com/dolthub/dolt/go/libraries/doltcore/doltdb"
"github.com/dolthub/dolt/go/libraries/doltcore/sqle"
"github.com/dolthub/dolt/go/libraries/doltcore/sqle/dfunctions"
"github.com/dolthub/dolt/go/libraries/doltcore/sqle/dsess"
)
@@ -94,18 +94,173 @@ var DoltScripts = []enginetest.ScriptTest{
},
}
var ScopedDoltHistoryScriptTests = []enginetest.ScriptTest{
var HistorySystemTableScriptTests = []enginetest.ScriptTest{
{
Name: "scoped-dolt-history-system-table: filtering results on non-pk tables",
Name: "empty table",
SetUpScript: []string{
"create table foo1 (n int, abcd text);",
"insert into foo1 values (1, 'Eins'), (2, 'Zwei'), (3, 'Drei');",
"select dolt_commit('-am', 'inserting into foo1');",
"create table t (n int, c text);",
"set @Commit1 = dolt_commit('-am', 'creating table t');",
},
Assertions: []enginetest.ScriptTestAssertion{
{
Query: "select n, abcd FROM DOLT_HISTORY_foo1 where n=1;",
Expected: []sql.Row{{1, "Eins"}},
Query: "select count(*) from DOLT_HISTORY_t;",
Expected: []sql.Row{{0}},
},
},
},
{
Name: "keyless table",
SetUpScript: []string{
"create table foo1 (n int, de text);",
"insert into foo1 values (1, 'Ein'), (2, 'Zwei'), (3, 'Drei');",
"set @Commit1 = dolt_commit('-am', 'inserting into foo1');",
"update foo1 set de='Eins' where n=1;",
"set @Commit2 = dolt_commit('-am', 'updating data in foo1');",
"insert into foo1 values (4, 'Vier');",
"set @Commit3 = dolt_commit('-am', 'inserting data in foo1');",
},
Assertions: []enginetest.ScriptTestAssertion{
{
Query: "select count(*) from DOLT_HISTORY_foO1;",
Expected: []sql.Row{{10}},
},
{
Query: "select n, de from dolt_history_foo1 where commit_hash=@Commit1;",
Expected: []sql.Row{{1, "Ein"}, {2, "Zwei"}, {3, "Drei"}},
},
{
Query: "select n, de from dolt_history_Foo1 where commit_hash=@Commit2;",
Expected: []sql.Row{{1, "Eins"}, {2, "Zwei"}, {3, "Drei"}},
},
{
Query: "select n, de from dolt_history_foo1 where commit_hash=@Commit3;",
Expected: []sql.Row{{1, "Eins"}, {2, "Zwei"}, {3, "Drei"}, {4, "Vier"}},
},
},
},
{
Name: "primary key table: basic cases",
SetUpScript: []string{
"create table foo1 (n int primary key, de text);",
"insert into foo1 values (1, 'Eins'), (2, 'Zwei'), (3, 'Drei');",
"set @Commit1 = dolt_commit('-am', 'inserting into foo1');",
"alter table foo1 add column fr text;",
"insert into foo1 values (4, 'Vier', 'Quatre');",
"set @Commit2 = dolt_commit('-am', 'adding column and inserting data in foo1');",
"update foo1 set fr='Un' where n=1;",
"set @Commit3 = dolt_commit('-am', 'updating data in foo1');",
},
Assertions: []enginetest.ScriptTestAssertion{
{
Query: "select count(*) from Dolt_History_Foo1;",
Expected: []sql.Row{{11}},
},
{
Query: "select n, de, fr from dolt_history_FOO1 where commit_hash = @Commit1;",
Expected: []sql.Row{{1, "Eins", nil}, {2, "Zwei", nil}, {3, "Drei", nil}},
},
{
Query: "select n, de, fr from dolt_history_foo1 where commit_hash = @Commit2;",
Expected: []sql.Row{{1, "Eins", nil}, {2, "Zwei", nil}, {3, "Drei", nil}, {4, "Vier", "Quatre"}},
},
{
Query: "select n, de, fr from dolt_history_foo1 where commit_hash = @Commit3;",
Expected: []sql.Row{{1, "Eins", "Un"}, {2, "Zwei", nil}, {3, "Drei", nil}, {4, "Vier", "Quatre"}},
},
},
},
{
Name: "primary key table: non-pk schema changes",
SetUpScript: []string{
"create table t (pk int primary key, c1 int, c2 text);",
"insert into t values (1, 2, '3'), (4, 5, '6');",
"set @Commit1 = DOLT_COMMIT('-am', 'creating table t');",
"alter table t drop column c2;",
"set @Commit2 = DOLT_COMMIT('-am', 'dropping column c2');",
"alter table t rename column c1 to c2;",
"set @Commit3 = DOLT_COMMIT('-am', 'renaming c1 to c2');",
},
Assertions: []enginetest.ScriptTestAssertion{
{
Query: "select count(*) from dolt_history_t;",
Expected: []sql.Row{{6}},
},
{
// TODO: Instead of just spot checking the non-existence of c1, it would be useful to be able to
// assert the full schema of the result set. ScriptTestAssertion doesn't support that currently,
// but the code from QueryTest could be ported over to ScriptTestAssertion.
Query: "select c1 from dolt_history_t;",
ExpectedErr: sql.ErrColumnNotFound,
},
{
Query: "select pk, c2 from dolt_history_t where commit_hash=@Commit1;",
Expected: []sql.Row{{1, 2}, {4, 5}},
},
{
Query: "select pk, c2 from dolt_history_t where commit_hash=@Commit2;",
Expected: []sql.Row{{1, 2}, {4, 5}},
},
{
Query: "select pk, c2 from dolt_history_t where commit_hash=@Commit3;",
Expected: []sql.Row{{1, 2}, {4, 5}},
},
},
},
{
Name: "primary key table: rename table",
SetUpScript: []string{
"create table t (pk int primary key, c1 int, c2 text);",
"insert into t values (1, 2, '3'), (4, 5, '6');",
"set @Commit1 = DOLT_COMMIT('-am', 'creating table t');",
"alter table t rename to t2;",
"set @Commit2 = DOLT_COMMIT('-am', 'renaming table to t2');",
},
Assertions: []enginetest.ScriptTestAssertion{
{
Query: "select count(*) from dolt_history_t;",
ExpectedErr: sql.ErrTableNotFound,
},
{
Query: "select count(*) from dolt_history_T2;",
Expected: []sql.Row{{2}},
},
{
Query: "select pk, c1, c2 from dolt_history_t2 where commit_hash != @Commit1;",
Expected: []sql.Row{{1, 2, "3"}, {4, 5, "6"}},
},
},
},
{
Name: "primary key table: delete and recreate table",
SetUpScript: []string{
"create table t (pk int primary key, c1 int, c2 text);",
"insert into t values (1, 2, '3'), (4, 5, '6');",
"set @Commit1 = DOLT_COMMIT('-am', 'creating table t');",
"drop table t;",
"set @Commit2 = DOLT_COMMIT('-am', 'dropping table t');",
"create table t (pk int primary key, c1 int);",
"set @Commit3 = DOLT_COMMIT('-am', 'recreating table t');",
},
Assertions: []enginetest.ScriptTestAssertion{
{
// TODO: The history system table processes history in parallel and pulls the rows for the
// user table at all commits. This means we can't currently detect when a table was dropped
// and if a different table with the same name exists at earlier commits, those results will
// be included in the history table. It may make more sense to have history scoped only
// to the current instance of the table, which would require changing the history system table
// to use something like an iterator approach where it goes back sequentially until it detects
// the current table doesn't exist any more and then stop.
Query: "select count(*) from dolt_history_t;",
Expected: []sql.Row{{2}},
},
},
},
@@ -565,7 +720,7 @@ var DoltMerge = []enginetest.ScriptTest{
},
}
var DiffTableTests = []enginetest.ScriptTest{
var DiffSystemTableScriptTests = []enginetest.ScriptTest{
{
Name: "base case: added rows",
SetUpScript: []string{
@@ -956,10 +1111,367 @@ var DiffTableTests = []enginetest.ScriptTest{
},
}
var UnscopedDiffTableTests = []enginetest.ScriptTest{
// There's a bug in queries with where clauses that compare column equality with a
// variable. These UnscopedDiffTableTests use "commit_hash in (@Commit1)" to work around that bug.
// https://github.com/dolthub/go-mysql-server/issues/790
var DiffTableFunctionScriptTests = []enginetest.ScriptTest{
{
Name: "invalid arguments",
SetUpScript: []string{
"create table t (pk int primary key, c1 text, c2 text);",
"set @Commit1 = dolt_commit('-am', 'creating table t');",
"insert into t values(1, 'one', 'two'), (2, 'two', 'three');",
"set @Commit2 = dolt_commit('-am', 'inserting into t');",
},
Assertions: []enginetest.ScriptTestAssertion{
{
Query: "SELECT * from dolt_diff('t');",
ExpectedErr: sql.ErrInvalidArgumentNumber,
},
{
Query: "SELECT * from dolt_diff('t', @Commit1);",
ExpectedErr: sql.ErrInvalidArgumentNumber,
},
{
Query: "SELECT * from dolt_diff('t', @Commit1, @Commit2, 'extra');",
ExpectedErr: sql.ErrInvalidArgumentNumber,
},
{
Query: "SELECT * from dolt_diff(null, null, null);",
ExpectedErr: sql.ErrInvalidArgumentDetails,
},
{
Query: "SELECT * from dolt_diff(123, @Commit1, @Commit2);",
ExpectedErr: sql.ErrInvalidArgumentDetails,
},
{
Query: "SELECT * from dolt_diff('t', 123, @Commit2);",
ExpectedErr: sql.ErrInvalidArgumentDetails,
},
{
Query: "SELECT * from dolt_diff('t', @Commit1, 123);",
ExpectedErr: sql.ErrInvalidArgumentDetails,
},
{
Query: "SELECT * from dolt_diff('doesnotexist', @Commit1, @Commit2);",
ExpectedErr: sql.ErrTableNotFound,
},
{
Query: "SELECT * from dolt_diff('t', 'fakefakefakefakefakefakefakefake', @Commit2);",
ExpectedErrStr: "could not find a value for this hash",
},
{
Query: "SELECT * from dolt_diff('t', @Commit1, 'fake-branch');",
ExpectedErrStr: "branch not found",
},
{
Query: "SELECT * from dolt_diff('t', @Commit1, concat('fake', '-', 'branch'));",
ExpectedErr: sqle.ErrInvalidNonLiteralArgument,
},
{
Query: "SELECT * from dolt_diff('t', hashof('main'), @Commit2);",
ExpectedErr: sqle.ErrInvalidNonLiteralArgument,
},
{
Query: "SELECT * from dolt_diff(LOWER('T'), hashof('main'), @Commit2);",
ExpectedErr: sqle.ErrInvalidNonLiteralArgument,
},
},
},
{
Name: "basic case",
SetUpScript: []string{
"set @Commit0 = HashOf('HEAD');",
"create table t (pk int primary key, c1 text, c2 text);",
"set @Commit1 = dolt_commit('-am', 'creating table t');",
"insert into t values(1, 'one', 'two');",
"set @Commit2 = dolt_commit('-am', 'inserting into table t');",
"create table t2 (pk int primary key, c1 text, c2 text);",
"insert into t2 values(100, 'hundred', 'hundert');",
"set @Commit3 = dolt_commit('-am', 'inserting into table t2');",
"insert into t values(2, 'two', 'three'), (3, 'three', 'four');",
"update t set c1='uno', c2='dos' where pk=1;",
"set @Commit4 = dolt_commit('-am', 'inserting into table t');",
},
Assertions: []enginetest.ScriptTestAssertion{
{
Query: "SELECT to_pk, to_c1, to_c2, from_pk, from_c1, from_c2, diff_type from dolt_diff('t', @Commit1, @Commit2);",
Expected: []sql.Row{{1, "one", "two", nil, nil, nil, "added"}},
},
{
Query: "SELECT COUNT(*) from dolt_diff('t', @Commit2, @Commit3);",
Expected: []sql.Row{{0}},
},
{
Query: "SELECT to_pk, to_c1, to_c2, from_pk, from_c1, from_c2, diff_type from dolt_diff('t', @Commit3, @Commit4);",
Expected: []sql.Row{
{1, "uno", "dos", 1, "one", "two", "modified"},
{2, "two", "three", nil, nil, nil, "added"},
{3, "three", "four", nil, nil, nil, "added"},
},
},
{
// Table t2 had no changes between Commit3 and Commit4, so results should be empty
Query: "SELECT to_pk, to_c1, to_c2, from_pk, from_c1, from_c2, diff_type from dolt_diff('T2', @Commit3, @Commit4);",
Expected: []sql.Row{},
},
{
Query: "SELECT to_pk, to_c1, to_c2, from_pk, from_c1, from_c2, diff_type from dolt_diff('t', @Commit1, @Commit4);",
Expected: []sql.Row{
{1, "uno", "dos", nil, nil, nil, "added"},
{2, "two", "three", nil, nil, nil, "added"},
{3, "three", "four", nil, nil, nil, "added"},
},
},
{
// Reverse the to/from commits to see the diff from the other direction
Query: "SELECT to_pk, to_c1, to_c2, from_pk, from_c1, from_c2, diff_type from dolt_diff('T', @Commit4, @Commit1);",
Expected: []sql.Row{
{nil, nil, nil, 1, "uno", "dos", "removed"},
{nil, nil, nil, 2, "two", "three", "removed"},
{nil, nil, nil, 3, "three", "four", "removed"},
},
},
},
},
{
Name: "diff with branch refs",
SetUpScript: []string{
"create table t (pk int primary key, c1 text, c2 text);",
"set @Commit1 = dolt_commit('-am', 'creating table t');",
"insert into t values(1, 'one', 'two');",
"set @Commit2 = dolt_commit('-am', 'inserting row 1 into t in main');",
"select dolt_checkout('-b', 'branch1');",
"alter table t drop column c2;",
"set @Commit3 = dolt_commit('-am', 'dropping column c2 in branch1');",
"delete from t where pk=1;",
"set @Commit4 = dolt_commit('-am', 'deleting row 1 in branch1');",
"insert into t values (2, 'two');",
"set @Commit5 = dolt_commit('-am', 'inserting row 2 in branch1');",
"select dolt_checkout('main');",
"insert into t values (2, 'two', 'three');",
"set @Commit6 = dolt_commit('-am', 'inserting row 2 in main');",
},
Assertions: []enginetest.ScriptTestAssertion{
{
Query: "SELECT to_pk, to_c1, from_pk, from_c1, from_c2, diff_type from dolt_diff('t', 'main', 'branch1');",
Expected: []sql.Row{
{nil, nil, 1, "one", "two", "removed"},
{2, "two", 2, "two", "three", "modified"},
},
},
{
Query: "SELECT to_pk, to_c1, to_c2, from_pk, from_c1, diff_type from dolt_diff('t', 'branch1', 'main');",
Expected: []sql.Row{
{1, "one", "two", nil, nil, "added"},
{2, "two", "three", 2, "two", "modified"},
},
},
{
Query: "SELECT to_pk, to_c1, from_pk, from_c1, from_c2, diff_type from dolt_diff('t', 'main~', 'branch1');",
Expected: []sql.Row{
{nil, nil, 1, "one", "two", "removed"},
{2, "two", nil, nil, nil, "added"},
},
},
},
},
{
Name: "schema modification: drop and recreate column with same type",
SetUpScript: []string{
"create table t (pk int primary key, c1 text, c2 text);",
"set @Commit1 = dolt_commit('-am', 'creating table t');",
"insert into t values(1, 'one', 'two'), (2, 'two', 'three');",
"set @Commit2 = dolt_commit('-am', 'inserting into t');",
"alter table t drop column c2;",
"set @Commit3 = dolt_commit('-am', 'dropping column c2');",
"alter table t add column c2 text;",
"insert into t values (3, 'three', 'four');",
"update t set c2='foo' where pk=1;",
"set @Commit4 = dolt_commit('-am', 'adding column c2, inserting, and updating data');",
},
Assertions: []enginetest.ScriptTestAssertion{
{
Query: "SELECT to_pk, to_c1, to_c2, from_pk, from_c1, from_c2, diff_type from dolt_diff('t', @Commit1, @Commit2);",
Expected: []sql.Row{
{1, "one", "two", nil, nil, nil, "added"},
{2, "two", "three", nil, nil, nil, "added"},
},
},
{
Query: "SELECT to_pk, to_c1, from_pk, from_c1, from_c2, diff_type from dolt_diff('t', @Commit2, @Commit3);",
Expected: []sql.Row{
{1, "one", 1, "one", "two", "modified"},
{2, "two", 2, "two", "three", "modified"},
},
},
{
Query: "SELECT to_c2 from dolt_diff('t', @Commit2, @Commit3);",
ExpectedErr: sql.ErrColumnNotFound,
},
{
Query: "SELECT to_pk, to_c1, to_c2, from_pk, from_c1, diff_type from dolt_diff('t', @Commit3, @Commit4);",
Expected: []sql.Row{
{1, "one", "foo", 1, "one", "modified"},
// This row doesn't show up as changed because adding a column doesn't touch the row data.
//{2, "two", nil, 2, "two", "modified"},
{3, "three", "four", nil, nil, "added"},
},
},
{
Query: "SELECT from_c2 from dolt_diff('t', @Commit3, @Commit4);",
ExpectedErr: sql.ErrColumnNotFound,
},
{
Query: "SELECT to_pk, to_c1, to_c2, from_pk, from_c1, from_c2, diff_type from dolt_diff('t', @Commit1, @Commit4);",
Expected: []sql.Row{
{1, "one", "foo", nil, nil, nil, "added"},
{2, "two", nil, nil, nil, nil, "added"},
{3, "three", "four", nil, nil, nil, "added"},
},
},
},
},
{
Name: "schema modification: rename columns",
SetUpScript: []string{
"create table t (pk int primary key, c1 text, c2 int);",
"set @Commit1 = dolt_commit('-am', 'creating table t');",
"insert into t values(1, 'one', -1), (2, 'two', -2);",
"set @Commit2 = dolt_commit('-am', 'inserting into t');",
"alter table t rename column c2 to c3;",
"set @Commit3 = dolt_commit('-am', 'renaming column c2 to c3');",
"insert into t values (3, 'three', -3);",
"update t set c3=1 where pk=1;",
"set @Commit4 = dolt_commit('-am', 'inserting and updating data');",
"alter table t rename column c3 to c2;",
"insert into t values (4, 'four', -4);",
"set @Commit5 = dolt_commit('-am', 'renaming column c3 to c2, and inserting data');",
},
Assertions: []enginetest.ScriptTestAssertion{
{
Query: "SELECT to_pk, to_c1, to_c2, from_pk, from_c1, from_c2, diff_type from dolt_diff('t', @Commit1, @Commit2);",
Expected: []sql.Row{
{1, "one", -1, nil, nil, nil, "added"},
{2, "two", -2, nil, nil, nil, "added"},
},
},
{
Query: "SELECT to_c2 from dolt_diff('t', @Commit2, @Commit3);",
ExpectedErr: sql.ErrColumnNotFound,
},
{
Query: "SELECT to_pk, to_c1, to_c3, from_pk, from_c1, from_c2, diff_type from dolt_diff('t', @Commit2, @Commit3);",
Expected: []sql.Row{},
},
{
Query: "SELECT to_pk, to_c1, to_c3, from_pk, from_c1, from_c3, diff_type from dolt_diff('t', @Commit3, @Commit4);",
Expected: []sql.Row{
{3, "three", -3, nil, nil, nil, "added"},
{1, "one", 1, 1, "one", -1, "modified"},
},
},
{
Query: "SELECT from_c2 from dolt_diff('t', @Commit4, @Commit5);",
ExpectedErr: sql.ErrColumnNotFound,
},
{
Query: "SELECT to_c3 from dolt_diff('t', @Commit4, @Commit5);",
ExpectedErr: sql.ErrColumnNotFound,
},
{
Query: "SELECT to_pk, to_c1, to_c2, from_pk, from_c1, from_c3, diff_type from dolt_diff('t', @Commit4, @Commit5);",
Expected: []sql.Row{
{4, "four", -4, nil, nil, nil, "added"},
},
},
{
Query: "SELECT to_pk, to_c1, to_c2, from_pk, from_c1, from_c2, diff_type from dolt_diff('t', @Commit1, @Commit5);",
Expected: []sql.Row{
{1, "one", 1, nil, nil, nil, "added"},
{2, "two", -2, nil, nil, nil, "added"},
{3, "three", -3, nil, nil, nil, "added"},
{4, "four", -4, nil, nil, nil, "added"},
},
},
},
},
{
Name: "schema modification: drop and rename columns with different types",
SetUpScript: []string{
"create table t (pk int primary key, c1 text, c2 text);",
"set @Commit1 = dolt_commit('-am', 'creating table t');",
"insert into t values(1, 'one', 'asdf'), (2, 'two', '2');",
"set @Commit2 = dolt_commit('-am', 'inserting into t');",
"alter table t drop column c2;",
"set @Commit3 = dolt_commit('-am', 'dropping column c2');",
"insert into t values (3, 'three');",
"update t set c1='fdsa' where pk=1;",
"set @Commit4 = dolt_commit('-am', 'inserting and updating data');",
"alter table t add column c2 int;",
"insert into t values (4, 'four', -4);",
"set @Commit5 = dolt_commit('-am', 'adding column c2, and inserting data');",
},
Assertions: []enginetest.ScriptTestAssertion{
{
Query: "SELECT to_pk, to_c1, to_c2, from_pk, from_c1, from_c2, diff_type from dolt_diff('t', @Commit1, @Commit2);",
Expected: []sql.Row{
{1, "one", "asdf", nil, nil, nil, "added"},
{2, "two", "2", nil, nil, nil, "added"},
},
},
{
Query: "SELECT to_pk, to_c1, from_pk, from_c1, from_c2, diff_type from dolt_diff('t', @Commit2, @Commit3);",
Expected: []sql.Row{
{1, "one", 1, "one", "asdf", "modified"},
{2, "two", 2, "two", "2", "modified"},
},
},
{
Query: "SELECT to_pk, to_c1, from_pk, from_c1, diff_type from dolt_diff('t', @Commit3, @Commit4);",
Expected: []sql.Row{
{3, "three", nil, nil, "added"},
{1, "fdsa", 1, "one", "modified"},
},
},
{
Query: "SELECT to_pk, to_c1, to_c2, from_pk, from_c1, diff_type from dolt_diff('t', @Commit4, @Commit5);",
Expected: []sql.Row{
{4, "four", -4, nil, nil, "added"},
},
},
{
Query: "SELECT to_pk, to_c1, to_c2, from_pk, from_c1, from_c2, diff_type from dolt_diff('t', @Commit1, @Commit5);",
Expected: []sql.Row{
{1, "fdsa", nil, nil, nil, nil, "added"},
{2, "two", nil, nil, nil, nil, "added"},
{3, "three", nil, nil, nil, nil, "added"},
{4, "four", -4, nil, nil, nil, "added"},
},
},
},
},
}
var UnscopedDiffSystemTableScriptTests = []enginetest.ScriptTest{
{
Name: "basic case with three tables",
SetUpScript: []string{
@@ -1182,3 +1694,306 @@ var UnscopedDiffTableTests = []enginetest.ScriptTest{
},
},
}
var CommitDiffSystemTableScriptTests = []enginetest.ScriptTest{
{
Name: "error handling",
SetUpScript: []string{
"create table t (pk int primary key, c1 int, c2 int);",
"insert into t values (1, 2, 3), (4, 5, 6);",
"set @Commit1 = (select DOLT_COMMIT('-am', 'creating table t'));",
},
Assertions: []enginetest.ScriptTestAssertion{
{
Query: "SELECT * FROM DOLT_COMMIT_DIFF_t;",
ExpectedErrStr: "error querying table dolt_commit_diff_t: dolt_commit_diff_* tables must be filtered to a single 'to_commit'",
},
{
Query: "SELECT * FROM DOLT_COMMIT_DIFF_t where to_commit=@Commit1;",
ExpectedErrStr: "error querying table dolt_commit_diff_t: dolt_commit_diff_* tables must be filtered to a single 'from_commit'",
},
{
Query: "SELECT * FROM DOLT_COMMIT_DIFF_t where from_commit=@Commit1;",
ExpectedErrStr: "error querying table dolt_commit_diff_t: dolt_commit_diff_* tables must be filtered to a single 'to_commit'",
},
},
},
{
Name: "base case: insert, update, delete",
SetUpScript: []string{
"set @Commit0 = HASHOF('HEAD');",
"create table t (pk int primary key, c1 int, c2 int);",
"insert into t values (1, 2, 3), (4, 5, 6);",
"set @Commit1 = (select DOLT_COMMIT('-am', 'creating table t'));",
"update t set c2=0 where pk=1",
"set @Commit2 = (select DOLT_COMMIT('-am', 'modifying row'));",
"update t set c2=-1 where pk=1",
"set @Commit3 = (select DOLT_COMMIT('-am', 'modifying row'));",
"update t set c2=-2 where pk=1",
"set @Commit4 = (select DOLT_COMMIT('-am', 'modifying row'));",
"delete from t where pk=1",
"set @Commit5 = (select DOLT_COMMIT('-am', 'modifying row'));",
},
Assertions: []enginetest.ScriptTestAssertion{
{
Query: "SELECT to_pk, to_c1, to_c2, from_pk, from_c1, from_c2, diff_type FROM DOLT_COMMIT_DIFF_t WHERE TO_COMMIT=@Commit1 and FROM_COMMIT=@Commit0;",
Expected: []sql.Row{
{1, 2, 3, nil, nil, nil, "added"},
{4, 5, 6, nil, nil, nil, "added"},
},
},
{
Query: "SELECT to_pk, to_c1, to_c2, from_pk, from_c1, from_c2, diff_type FROM DOLT_COMMIT_DIFF_t WHERE TO_COMMIT=@Commit2 and FROM_COMMIT=@Commit1 ORDER BY to_pk;",
Expected: []sql.Row{
{1, 2, 0, 1, 2, 3, "modified"},
},
},
{
Query: "SELECT to_pk, to_c1, to_c2, from_pk, from_c1, from_c2, diff_type FROM DOLT_COMMIT_DIFF_T WHERE TO_COMMIT=@Commit4 and FROM_COMMIT=@Commit1 ORDER BY to_pk;",
Expected: []sql.Row{
{1, 2, -2, 1, 2, 3, "modified"},
},
},
{
Query: "SELECT to_pk, to_c1, to_c2, from_pk, from_c1, from_c2, diff_type FROM DOLT_commit_DIFF_t WHERE TO_COMMIT=@Commit5 and FROM_COMMIT=@Commit4 ORDER BY to_pk;",
Expected: []sql.Row{
{nil, nil, nil, 1, 2, -2, "removed"},
},
},
{
Query: "SELECT to_pk, to_c1, to_c2, from_pk, from_c1, from_c2, diff_type FROM DOLT_COMMIT_DIFF_t WHERE TO_COMMIT=@Commit5 and FROM_COMMIT=@Commit0 ORDER BY to_pk;",
Expected: []sql.Row{
{4, 5, 6, nil, nil, nil, "added"},
},
},
},
},
{
// When a column is dropped we should see the column's value set to null in that commit
Name: "schema modification: column drop",
SetUpScript: []string{
"set @Commit0 = HASHOF('HEAD');",
"create table t (pk int primary key, c1 int, c2 int);",
"insert into t values (1, 2, 3), (4, 5, 6);",
"set @Commit1 = (select DOLT_COMMIT('-am', 'creating table t'));",
"alter table t drop column c1;",
"set @Commit2 = (select DOLT_COMMIT('-am', 'dropping column c'));",
},
Assertions: []enginetest.ScriptTestAssertion{
{
Query: "SELECT to_pk, to_c2, from_pk, from_c2 FROM DOLT_COMMIT_DIFF_t WHERE TO_COMMIT=@Commit1 and FROM_COMMIT=@Commit0 ORDER BY to_pk;",
Expected: []sql.Row{
{1, 3, nil, nil},
{4, 6, nil, nil},
},
},
{
Query: "SELECT to_pk, to_c2, from_pk, from_c2 FROM DOLT_COMMIT_DIFF_t WHERE TO_COMMIT=@Commit2 and FROM_COMMIT=@Commit1 ORDER BY to_pk;",
Expected: []sql.Row{
{1, 3, 1, 3},
{4, 6, 4, 6},
},
},
},
},
{
// When a column is dropped and recreated with the same type, we expect it to be included in dolt_diff output
Name: "schema modification: column drop, recreate with same type",
SetUpScript: []string{
"set @Commit0 = HASHOF('HEAD');",
"create table t (pk int primary key, c int);",
"insert into t values (1, 2), (3, 4);",
"set @Commit1 = (select DOLT_COMMIT('-am', 'creating table t'));",
"alter table t drop column c;",
"set @Commit2 = (select DOLT_COMMIT('-am', 'dropping column c'));",
"alter table t add column c int;",
"insert into t values (100, 101);",
"set @Commit3 = (select DOLT_COMMIT('-am', 'inserting into t'));",
},
Assertions: []enginetest.ScriptTestAssertion{
{
Query: "SELECT to_pk, to_c, from_pk, from_c, diff_type FROM DOLT_COMMIT_DIFF_t WHERE TO_COMMIT=@Commit1 and FROM_COMMIT=@Commit0 ORDER BY to_pk;",
Expected: []sql.Row{
{1, 2, nil, nil, "added"},
{3, 4, nil, nil, "added"},
},
},
{
Query: "SELECT to_pk, from_pk, from_c, diff_type FROM DOLT_COMMIT_DIFF_t WHERE TO_COMMIT=@Commit2 and FROM_COMMIT=@Commit1 ORDER BY to_pk;",
Expected: []sql.Row{
{1, 1, 2, "modified"},
{3, 3, 4, "modified"},
},
},
{
Query: "SELECT to_pk, to_c, from_pk, from_c, diff_type FROM DOLT_COMMIT_DIFF_t WHERE TO_COMMIT=@Commit3 and FROM_COMMIT=@Commit2 ORDER BY to_pk;",
Expected: []sql.Row{
{100, 101, nil, nil, "added"},
},
},
},
},
{
// When a column is dropped and another column with the same type is renamed to that name, we expect it to be included in dolt_diff output
Name: "schema modification: column drop, rename column with same type to same name",
SetUpScript: []string{
"set @Commit0 = HASHOF('HEAD');",
"create table t (pk int primary key, c1 int, c2 int);",
"insert into t values (1, 2, 3), (4, 5, 6);",
"set @Commit1 = DOLT_COMMIT('-am', 'creating table t');",
"alter table t drop column c1;",
"set @Commit2 = DOLT_COMMIT('-am', 'dropping column c1');",
"alter table t rename column c2 to c1;",
"insert into t values (100, 101);",
"set @Commit3 = DOLT_COMMIT('-am', 'inserting into t');",
},
Assertions: []enginetest.ScriptTestAssertion{
{
Query: "SELECT to_pk, to_c1, from_pk, from_c1, diff_type FROM DOLT_COMMIT_DIFF_t WHERE TO_COMMIT=@Commit1 and FROM_COMMIT=@Commit0 ORDER BY to_pk;",
Expected: []sql.Row{
{1, 3, nil, nil, "added"},
{4, 6, nil, nil, "added"},
},
},
{
Query: "SELECT to_pk, to_c1, from_pk, from_c1, diff_type FROM DOLT_COMMIT_DIFF_t WHERE TO_COMMIT=@Commit2 and FROM_COMMIT=@Commit1 ORDER BY to_pk;",
Expected: []sql.Row{
{1, 3, 1, 3, "modified"},
{4, 6, 4, 6, "modified"},
},
},
{
Query: "SELECT to_pk, to_c1, from_pk, from_c1, diff_type FROM DOLT_COMMIT_DIFF_t WHERE TO_COMMIT=@Commit3 and FROM_COMMIT=@Commit2 ORDER BY to_pk;",
Expected: []sql.Row{
{100, 101, nil, nil, "added"},
},
},
},
},
{
// When a column is dropped and recreated with a different type, we expect only the new column
// to be included in dolt_commit_diff output, with previous values coerced (with any warnings reported) to the new type
Name: "schema modification: column drop, recreate with different type that can be coerced (int -> string)",
SetUpScript: []string{
"set @Commit0 = HASHOF('HEAD');",
"create table t (pk int primary key, c int);",
"insert into t values (1, 2), (3, 4);",
"set @Commit1 = DOLT_COMMIT('-am', 'creating table t');",
"alter table t drop column c;",
"set @Commit2 = DOLT_COMMIT('-am', 'dropping column c');",
"alter table t add column c text;",
"insert into t values (100, '101');",
"set @Commit3 = DOLT_COMMIT('-am', 're-adding column c');",
},
Assertions: []enginetest.ScriptTestAssertion{
{
Query: "SELECT to_pk, to_c, from_pk, from_c, diff_type FROM DOLT_COMMIT_DIFF_t WHERE TO_COMMIT=@Commit1 and FROM_COMMIT=@Commit0 ORDER BY to_pk;",
Expected: []sql.Row{
{1, "2", nil, nil, "added"},
{3, "4", nil, nil, "added"},
},
},
{
Query: "SELECT to_pk, to_c, from_pk, from_c, diff_type FROM DOLT_COMMIT_DIFF_t WHERE TO_COMMIT=@Commit2 and FROM_COMMIT=@Commit1 ORDER BY to_pk;",
Expected: []sql.Row{
{1, nil, 1, "2", "modified"},
{3, nil, 3, "4", "modified"},
},
},
{
Query: "SELECT to_pk, to_c, from_pk, from_c, diff_type FROM DOLT_COMMIT_DIFF_t WHERE TO_COMMIT=@Commit3 and FROM_COMMIT=@Commit2 ORDER BY to_pk;",
Expected: []sql.Row{
{100, "101", nil, nil, "added"},
},
},
},
},
{
Name: "schema modification: column drop, recreate with different type that can't be coerced (string -> int)",
SetUpScript: []string{
"set @Commit0 = HASHOF('HEAD');",
"create table t (pk int primary key, c text);",
"insert into t values (1, 'two'), (3, 'four');",
"set @Commit1 = (select DOLT_COMMIT('-am', 'creating table t'));",
"alter table t drop column c;",
"set @Commit2 = (select DOLT_COMMIT('-am', 'dropping column c'));",
"alter table t add column c int;",
"insert into t values (100, 101);",
"set @Commit3 = (select DOLT_COMMIT('-am', 're-adding column c'));",
},
Assertions: []enginetest.ScriptTestAssertion{
{
Query: "SELECT to_pk, to_c, from_pk, from_c, diff_type FROM DOLT_COMMIT_DIFF_t WHERE TO_COMMIT=@Commit1 and FROM_COMMIT=@Commit0 ORDER BY to_pk;",
Expected: []sql.Row{
{1, nil, nil, nil, "added"},
{3, nil, nil, nil, "added"},
},
},
{
Query: "SELECT to_pk, to_c, from_pk, from_c, diff_type FROM DOLT_COMMIT_DIFF_t WHERE TO_COMMIT=@Commit2 and FROM_COMMIT=@Commit1 ORDER BY to_pk;",
Expected: []sql.Row{
{1, nil, 1, nil, "modified"},
{3, nil, 3, nil, "modified"},
},
},
{
Query: "SELECT to_pk, to_c, from_pk, from_c, diff_type FROM DOLT_COMMIT_DIFF_t WHERE TO_COMMIT=@Commit3 and FROM_COMMIT=@Commit2 ORDER BY to_pk;",
Expected: []sql.Row{
{100, 101, nil, nil, "added"},
},
},
{
Query: "select * from dolt_commit_diff_t where to_commit=@Commit3 and from_commit=@Commit1;",
ExpectedWarning: 1105,
ExpectedWarningsCount: 2,
ExpectedWarningMessageSubstring: "unable to coerce value from field",
SkipResultsCheck: true,
},
},
},
{
Name: "schema modification: primary key change",
SetUpScript: []string{
"create table t (pk int primary key, c1 int);",
"insert into t values (1, 2), (3, 4);",
"set @Commit1 = DOLT_COMMIT('-am', 'creating table t');",
"alter table t drop primary key;",
"insert into t values (5, 6);",
"set @Commit2 = DOLT_COMMIT('-am', 'dropping primary key');",
"alter table t add primary key (c1);",
"set @Commit3 = DOLT_COMMIT('-am', 'adding primary key');",
"insert into t values (7, 8);",
"set @Commit4 = DOLT_COMMIT('-am', 'adding more data');",
},
Assertions: []enginetest.ScriptTestAssertion{
{
Query: "select * from dolt_commit_diff_t where from_commit=@Commit1 and to_commit=@Commit4;",
ExpectedWarning: 1105,
ExpectedWarningsCount: 1,
ExpectedWarningMessageSubstring: "cannot render full diff between commits",
SkipResultsCheck: true,
},
{
Query: "SELECT to_pk, to_c1, from_pk, from_c1, diff_type FROM DOLT_commit_DIFF_t where from_commit=@Commit3 and to_commit=@Commit4;",
Expected: []sql.Row{{7, 8, nil, nil, "added"}},
},
},
},
}
@@ -179,7 +179,7 @@ func pullBranches(ctx context.Context, rrd ReadReplicaDatabase, branches []strin
}
func fetchRef(ctx context.Context, rrd ReadReplicaDatabase, headRef, rtRef ref.DoltRef) error {
srcDBCommit, err := actions.FetchRemoteBranch(ctx, rrd.tmpDir, rrd.remote, rrd.srcDB, rrd.ddb, headRef, nil, actions.NoopRunProgFuncs, actions.NoopStopProgFuncs)
srcDBCommit, err := actions.FetchRemoteBranch(ctx, rrd.tmpDir, rrd.remote, rrd.srcDB, rrd.ddb, headRef, actions.NoopRunProgFuncs, actions.NoopStopProgFuncs)
if err != nil {
return err
}
+15 -10
View File
@@ -83,11 +83,6 @@ func TestCreateTable(t *testing.T) {
query: "create table dolt_table (id int primary key, age int)",
expectedErr: "Invalid table name",
},
{
name: "Test bad table name begins with number",
query: "create table 1testTable (id int primary key, age int)",
expectedErr: "syntax error",
},
{
name: "Test in use table name",
query: "create table people (id int primary key, age int)",
@@ -1125,6 +1120,14 @@ func TestRenameTable(t *testing.T) {
expectedSchema: AppearancesTestSchema,
expectedRows: AllAppsRows,
},
{
name: "alter rename table with alter syntax",
query: "alter table people rename to 123People",
oldTableName: "people",
newTableName: "123People",
expectedSchema: PeopleTestSchema,
expectedRows: AllPeopleRows,
},
{
name: "table not found",
query: "rename table notFound to newNowFound",
@@ -1278,6 +1281,13 @@ func TestParseCreateTableStatement(t *testing.T) {
expectedSchema: dtestutils.CreateSchema(
schemaNewColumn(t, "id", 4817, sql.Int32, true, schema.NotNullConstraint{})),
},
{
name: "Test create table starting with number",
query: "create table 123table (id int primary key)",
expectedTable: "`123table`",
expectedSchema: dtestutils.CreateSchema(
schemaNewColumn(t, "id", 4817, sql.Int32, true, schema.NotNullConstraint{})),
},
{
name: "Test create two column schema",
query: "create table testTable (id int primary key, age int)",
@@ -1292,11 +1302,6 @@ func TestParseCreateTableStatement(t *testing.T) {
expectedTable: "testTable",
expectedErr: "syntax error",
},
{
name: "Test bad table name begins with number",
query: "create table 1testTable (id int primary key, age int)",
expectedErr: "syntax error",
},
{
name: "Test types",
query: `create table testTable (
+1 -4
View File
@@ -66,10 +66,7 @@ func (rws *ReaderWithStats) Start(updateFunc func(ReadStats)) {
}()
}
// Stop "closes" ReaderWithStats. Occasionally, we might pass this ReaderWithStats as the body of
// a http.Request. Since http.Request will close the body if it is an io.Closer, we can't have ReaderWithStats conform
// to io.Closer. We want full control over the Start and Stop of ReaderWithStats.
func (rws *ReaderWithStats) Stop() error {
func (rws *ReaderWithStats) Close() error {
close(rws.closeCh)
if closer, ok := rws.rd.(io.Closer); ok {
+18 -23
View File
@@ -17,6 +17,7 @@ package pull
import (
"context"
"errors"
"io"
"github.com/cenkalti/backoff"
"golang.org/x/sync/errgroup"
@@ -85,13 +86,6 @@ func mapTableFiles(tblFiles []nbs.TableFile) ([]string, map[string]nbs.TableFile
return fileIds, fileIDtoTblFile, fileIDtoNumChunks
}
func stopWithErr(stats *iohelp.ReaderWithStats, err *error) {
e := stats.Stop()
if *err == nil && e != nil {
*err = e
}
}
const concurrentTableFileDownloads = 3
func clone(ctx context.Context, srcTS, sinkTS nbs.TableFileStore, eventCh chan<- TableFileEvent) error {
@@ -138,23 +132,24 @@ func clone(ctx context.Context, srcTS, sinkTS nbs.TableFileStore, eventCh chan<-
return backoff.Permanent(errors.New("table file not found. please try again"))
}
rd, contentLength, err := tblFile.Open(ctx)
if err != nil {
return err
}
rdStats := iohelp.NewReaderWithStats(rd, int64(contentLength))
defer stopWithErr(rdStats, &err)
rdStats.Start(func(s iohelp.ReadStats) {
report(TableFileEvent{
EventType: DownloadStats,
TableFiles: []nbs.TableFile{tblFile},
Stats: []iohelp.ReadStats{s},
})
})
report(TableFileEvent{EventType: DownloadStart, TableFiles: []nbs.TableFile{tblFile}})
err = sinkTS.WriteTableFile(ctx, tblFile.FileID(), tblFile.NumChunks(), rdStats, contentLength, nil)
err = sinkTS.WriteTableFile(ctx, tblFile.FileID(), tblFile.NumChunks(), nil, func() (io.ReadCloser, uint64, error) {
rd, contentLength, err := tblFile.Open(ctx)
if err != nil {
return nil, 0, err
}
rdStats := iohelp.NewReaderWithStats(rd, int64(contentLength))
rdStats.Start(func(s iohelp.ReadStats) {
report(TableFileEvent{
EventType: DownloadStats,
TableFiles: []nbs.TableFile{tblFile},
Stats: []iohelp.ReadStats{s},
})
})
return rdStats, contentLength, nil
})
if err != nil {
report(TableFileEvent{EventType: DownloadFailed, TableFiles: []nbs.TableFile{tblFile}})
return err
+7 -2
View File
@@ -457,9 +457,14 @@ func (ttfs *TestTableFileStore) Size(ctx context.Context) (uint64, error) {
return sz, nil
}
func (ttfs *TestTableFileStore) WriteTableFile(ctx context.Context, fileId string, numChunks int, rd io.Reader, contentLength uint64, contentHash []byte) error {
func (ttfs *TestTableFileStore) WriteTableFile(ctx context.Context, fileId string, numChunks int, contentHash []byte, getRd func() (io.ReadCloser, uint64, error)) error {
tblFile := &TestTableFileWriter{fileId, numChunks, bytes.NewBuffer(nil), ttfs}
_, err := io.Copy(tblFile, rd)
rd, _, err := getRd()
if err != nil {
return err
}
defer rd.Close()
_, err = io.Copy(tblFile, rd)
if err != nil {
return err
+17 -14
View File
@@ -18,6 +18,7 @@ import (
"context"
"errors"
"fmt"
"io"
"log"
"os"
"path/filepath"
@@ -206,25 +207,27 @@ func (p *Puller) uploadTempTableFile(ctx context.Context, tmpTblFile tempTblFile
return err
}
f, err := os.Open(tmpTblFile.path)
if err != nil {
return err
}
fileSize := fi.Size()
fWithStats := iohelp.NewReaderWithStats(f, fileSize)
fWithStats.Start(func(stats iohelp.ReadStats) {
p.addEvent(ctx, NewTFPullerEvent(UploadTableFileUpdateEvent, &TableFileEventDetails{
CurrentFileSize: fileSize,
Stats: stats,
}))
})
defer func() {
_ = fWithStats.Stop()
_ = file.Remove(tmpTblFile.path)
}()
return p.sinkDBCS.(nbs.TableFileStore).WriteTableFile(ctx, tmpTblFile.id, tmpTblFile.numChunks, fWithStats, tmpTblFile.contentLen, tmpTblFile.contentHash)
return p.sinkDBCS.(nbs.TableFileStore).WriteTableFile(ctx, tmpTblFile.id, tmpTblFile.numChunks, tmpTblFile.contentHash, func() (io.ReadCloser, uint64, error) {
f, err := os.Open(tmpTblFile.path)
if err != nil {
return nil, 0, err
}
fWithStats := iohelp.NewReaderWithStats(f, fileSize)
fWithStats.Start(func(stats iohelp.ReadStats) {
p.addEvent(ctx, NewTFPullerEvent(UploadTableFileUpdateEvent, &TableFileEventDetails{
CurrentFileSize: fileSize,
Stats: stats,
}))
})
return fWithStats, uint64(fileSize), nil
})
}
func (p *Puller) processCompletedTables(ctx context.Context, completedTables <-chan FilledWriters) error {
+2 -2
View File
@@ -279,8 +279,8 @@ func (gcs *GenerationalNBS) Size(ctx context.Context) (uint64, error) {
}
// WriteTableFile will read a table file from the provided reader and write it to the new gen TableFileStore
func (gcs *GenerationalNBS) WriteTableFile(ctx context.Context, fileId string, numChunks int, rd io.Reader, contentLength uint64, contentHash []byte) error {
return gcs.newGen.WriteTableFile(ctx, fileId, numChunks, rd, contentLength, contentHash)
func (gcs *GenerationalNBS) WriteTableFile(ctx context.Context, fileId string, numChunks int, contentHash []byte, getRd func() (io.ReadCloser, uint64, error)) error {
return gcs.newGen.WriteTableFile(ctx, fileId, numChunks, contentHash, getRd)
}
// AddTableFilesToManifest adds table files to the manifest of the newgen cs
+2 -2
View File
@@ -52,8 +52,8 @@ func (nbsMW *NBSMetricWrapper) Size(ctx context.Context) (uint64, error) {
}
// WriteTableFile will read a table file from the provided reader and write it to the TableFileStore
func (nbsMW *NBSMetricWrapper) WriteTableFile(ctx context.Context, fileId string, numChunks int, rd io.Reader, contentLength uint64, contentHash []byte) error {
return nbsMW.nbs.WriteTableFile(ctx, fileId, numChunks, rd, contentLength, contentHash)
func (nbsMW *NBSMetricWrapper) WriteTableFile(ctx context.Context, fileId string, numChunks int, contentHash []byte, getRd func() (io.ReadCloser, uint64, error)) error {
return nbsMW.nbs.WriteTableFile(ctx, fileId, numChunks, contentHash, getRd)
}
// AddTableFilesToManifest adds table files to the manifest
+16 -5
View File
@@ -1265,12 +1265,11 @@ func newTableFile(cs chunkSource, info tableSpec) tableFile {
return tableFile{
info: info,
open: func(ctx context.Context) (io.ReadCloser, uint64, error) {
r, err := cs.reader(ctx)
s, err := cs.size()
if err != nil {
return nil, 0, err
}
s, err := cs.size()
r, err := cs.reader(ctx)
if err != nil {
return nil, 0, err
}
@@ -1348,7 +1347,7 @@ func (nbs *NomsBlockStore) SupportedOperations() TableFileStoreOps {
}
// WriteTableFile will read a table file from the provided reader and write it to the TableFileStore
func (nbs *NomsBlockStore) WriteTableFile(ctx context.Context, fileId string, numChunks int, rd io.Reader, contentLength uint64, contentHash []byte) error {
func (nbs *NomsBlockStore) WriteTableFile(ctx context.Context, fileId string, numChunks int, contentHash []byte, getRd func() (io.ReadCloser, uint64, error)) error {
fsPersister, ok := nbs.p.(*fsTablePersister)
if !ok {
@@ -1370,7 +1369,19 @@ func (nbs *NomsBlockStore) WriteTableFile(ctx context.Context, fileId string, nu
}
}()
return writeTo(f, rd, copyTableFileBufferSize)
r, _, err := getRd()
if err != nil {
return err
}
defer func() {
closeErr := r.Close()
if err == nil {
err = closeErr
}
}()
return writeTo(f, r, copyTableFileBufferSize)
}
// AddTableFilesToManifest adds table files to the manifest
+3 -1
View File
@@ -69,7 +69,9 @@ func populateLocalStore(t *testing.T, st *NomsBlockStore, numTableFiles int) fil
fileID := addr.String()
fileToData[fileID] = data
fileIDToNumChunks[fileID] = i + 1
err = st.WriteTableFile(ctx, fileID, i+1, bytes.NewReader(data), 0, nil)
err = st.WriteTableFile(ctx, fileID, i+1, nil, func() (io.ReadCloser, uint64, error) {
return io.NopCloser(bytes.NewReader(data)), uint64(len(data)), nil
})
require.NoError(t, err)
}
err := st.AddTableFilesToManifest(ctx, fileIDToNumChunks)
+3 -3
View File
@@ -285,8 +285,8 @@ type TableFile interface {
// NumChunks returns the number of chunks in a table file
NumChunks() int
// Open returns an io.ReadCloser which can be used to read the bytes of a table file. The total length of the
// table file in bytes can be optionally returned.
// Open returns an io.ReadCloser which can be used to read the bytes of a
// table file. It also returns the content length of the table file.
Open(ctx context.Context) (io.ReadCloser, uint64, error)
}
@@ -312,7 +312,7 @@ type TableFileStore interface {
Size(ctx context.Context) (uint64, error)
// WriteTableFile will read a table file from the provided reader and write it to the TableFileStore.
WriteTableFile(ctx context.Context, fileId string, numChunks int, rd io.Reader, contentLength uint64, contentHash []byte) error
WriteTableFile(ctx context.Context, fileId string, numChunks int, contentHash []byte, getRd func() (io.ReadCloser, uint64, error)) error
// AddTableFilesToManifest adds table files to the manifest
AddTableFilesToManifest(ctx context.Context, fileIdToNumChunks map[string]int) error
+40
View File
@@ -235,3 +235,43 @@ SQL
[ "$status" -eq 1 ]
[[ "${lines[0]}" =~ "table not found: five" ]] || false
}
@test "create-views: AS OF" {
dolt sql <<SQL
create table t1 (a int primary key, b int);
insert into t1 values (1,1);
select dolt_commit('-am', 'table with one row');
select dolt_branch('onerow');
insert into t1 values (2,2);
select dolt_commit('-am', 'table with two rows');
select dolt_branch('tworows');
create view v1 as select * from t1;
select dolt_commit('-am', 'view with select *');
select dolt_branch('view');
insert into t1 values (3,3);
select dolt_commit('-am', 'table with three rows');
select dolt_branch('threerows');
drop view v1;
create view v1 as select a+10, b+10 from t1;
SQL
# should show the original view definition
run dolt sql -r csv -q "select * from dolt_schemas as of 'view' order by 1"
[ "$status" -eq 0 ]
[ "${#lines[@]}" -eq 2 ]
[[ "$output" =~ "select * from t1" ]] || false
# should use the view definition from branch named, data from branch named
run dolt sql -r csv -q "select * from \`dolt_repo_$$/view\`.v1 order by 1"
[ "$status" -eq 0 ]
[ "${#lines[@]}" -eq 3 ]
[[ "${lines[1]}" =~ "1,1" ]] || false
[[ "${lines[2]}" =~ "2,2" ]] || false
# should use the view definition from HEAD, data from branch named
run dolt sql -r csv -q "select * from v1 as of 'view' order by 1"
[ "$status" -eq 0 ]
[ "${#lines[@]}" -eq 3 ]
[[ "${lines[1]}" =~ "11,11" ]] || false
[[ "${lines[2]}" =~ "12,12" ]] || false
}
@@ -170,10 +170,20 @@ start_multi_db_server() {
wait_for_connection $PORT 5000
}
# stop_sql_server stops the SQL server. For cases where it's important
# to wait for the process to exit after the kill signal (e.g. waiting
# for an async replication push), pass 1.
stop_sql_server() {
wait=$1
if [ ! -z "$SERVER_PID" ]; then
kill $SERVER_PID
fi
if [ $wait ]; then
while ps -p $SERVER_PID > /dev/null; do
sleep .1;
done
fi;
SERVER_PID=
}
+65 -2
View File
@@ -84,8 +84,8 @@ teardown() {
multi_query repo1 1 "
SELECT DOLT_COMMIT('-am', 'Step 1');"
# threads guarenteed to flush after we stop server
stop_sql_server
# wait for the process to exit after we stop it
stop_sql_server 1
cd ../repo2
dolt pull remote1
@@ -177,6 +177,49 @@ teardown() {
server_query repo2 1 "select name from dolt_branches order by name" "name\nmain\nnew_feature"
}
@test "remotes-sql-server: connect to remote head" {
skiponwindows "Has dependencies that are missing on the Jenkins Windows installation."
cd repo1
dolt checkout -b new_feature
dolt commit -am "first commit"
dolt branch new_feature2
dolt push remote1 new_feature
dolt push remote1 new_feature2
dolt checkout main
dolt push remote1 main
cd ../repo2
dolt config --local --add sqlserver.global.dolt_read_replica_remote remote1
dolt config --local --add sqlserver.global.dolt_replicate_heads main
start_sql_server repo2
# No data on main
server_query repo2 1 "show tables" ""
# Connecting to heads that exist only on the remote should work fine (they get fetched)
server_query "repo2/new_feature" 1 "show tables" "Table\ntest"
server_query repo2 1 'use `repo2/new_feature2`' ""
server_query repo2 1 'select * from `repo2/new_feature2`.test' "pk\n0\n1\n2"
# Connecting to heads that don't exist should error out
run server_query "repo2/notexist" 1 'use `repo2/new_feature2`' ""
[ $status -eq 1 ]
[[ $output =~ "database not found" ]] || false
run server_query repo2 1 'use `repo2/notexist`' ""
[ $status -eq 1 ]
[[ $output =~ "database not found" ]] || false
# Creating a branch locally that doesn't exist on the remote
# works, but connecting to it is an error (nothing to pull)
server_query "repo2/new_feature" 1 "select dolt_checkout('-b', 'new_branch') as b" "b\n0"
run server_query "repo2/new_branch" 1 "show tables" "Table\ntest"
[ $status -eq 1 ]
[[ $output =~ "database not found" ]] || false
}
@test "remotes-sql-server: pull all heads" {
skiponwindows "Has dependencies that are missing on the Jenkins Windows installation."
@@ -255,3 +298,23 @@ teardown() {
server_query "repo2/feature-branch" 1 "SHOW Tables" "Table\ntest"
}
@test "remotes-sql-server: connect to hash works" {
skiponwindows "Has dependencies that are missing on the Jenkins Windows installation."
cd repo1
dolt commit -am "cm"
dolt push remote1 main
head_hash=$(get_head_commit)
cd ../repo2
dolt config --local --add sqlserver.global.dolt_read_replica_remote remote1
dolt config --local --add sqlserver.global.dolt_replicate_heads main
start_sql_server repo2
server_query repo2 1 "show tables" "Table\ntest"
server_query repo2 1 "use \`repo2/$head_hash\`" ""
}
get_head_commit() {
dolt log -n 1 | grep -m 1 commit | cut -c 13-44
}
@@ -300,8 +300,12 @@ SQL
}
@test "sql-create-database: create database with character set should parse correctly and not error" {
skip "currently fails with Error parsing SQL"
run dolt sql -q 'CREATE DATABASE metabase CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;'
[ "$status" -eq 0 ]
[[ "$output" =~ 'Query OK, 0 rows affected' ]] || false
run dolt sql -q "SHOW DATABASES;"
[ "$status" -eq 0 ]
[[ "$output" =~ "dolt_repo_$$" ]] || false
[[ "$output" =~ "information_schema" ]] || false
[[ "$output" =~ "metabase" ]] || false
}