Merge pull request #3924 from dolthub/andy/migration

[no-release-notes] `__DOLT_DEV__` Migration
This commit is contained in:
AndyA
2022-07-26 12:43:05 -07:00
committed by GitHub
25 changed files with 1181 additions and 44 deletions

View File

@@ -224,13 +224,7 @@ func logCommits(ctx context.Context, dEnv *env.DoltEnv, cs *doltdb.CommitSpec, o
}
matchFunc := func(commit *doltdb.Commit) (bool, error) {
numParents, err := commit.NumParents()
if err != nil {
return false, err
}
return numParents >= opts.minParents, nil
return commit.NumParents() >= opts.minParents, nil
}
commits, err := commitwalk.GetTopNTopoOrderedCommitsMatching(ctx, dEnv.DoltDB, h, opts.numLines, matchFunc)

View File

@@ -20,8 +20,10 @@ import (
"github.com/fatih/color"
"github.com/dolthub/dolt/go/cmd/dolt/cli"
"github.com/dolthub/dolt/go/cmd/dolt/errhand"
eventsapi "github.com/dolthub/dolt/go/gen/proto/dolt/services/eventsapi/v1alpha1"
"github.com/dolthub/dolt/go/libraries/doltcore/env"
"github.com/dolthub/dolt/go/libraries/doltcore/migrate"
"github.com/dolthub/dolt/go/libraries/utils/argparser"
)
@@ -36,7 +38,7 @@ const (
var migrateDocs = cli.CommandDocumentationContent{
ShortDesc: "Executes a database migration to use the latest Dolt data format.",
LongDesc: `Migrate is a multi-purpose command to update the data format of a Dolt database. Over time, development
on Dolt requires changes to the on-disk data format. These changes are necessary to improve Database performance amd
on Dolt requires changes to the on-disk data format. These changes are necessary to improve Database performance and
correctness. Migrating to the latest format is therefore necessary for compatibility with the latest Dolt clients, and
to take advantage of the newly released Dolt features.`,
@@ -76,7 +78,7 @@ func (cmd MigrateCmd) EventType() eventsapi.ClientEventType {
// Exec executes the command
func (cmd MigrateCmd) Exec(ctx context.Context, commandStr string, args []string, dEnv *env.DoltEnv) int {
ap := cmd.ArgParser()
help, _ := cli.HelpAndUsagePrinters(cli.CommandDocsForCommandString(commandStr, migrateDocs, ap))
help, usage := cli.HelpAndUsagePrinters(cli.CommandDocsForCommandString(commandStr, migrateDocs, ap))
apr := cli.ParseArgsOrDie(ap, args, help)
if apr.Contains(migratePushFlag) && apr.Contains(migratePullFlag) {
@@ -84,5 +86,29 @@ func (cmd MigrateCmd) Exec(ctx context.Context, commandStr string, args []string
return 1
}
if err := MigrateDatabase(ctx, dEnv); err != nil {
verr := errhand.BuildDError("migration failed").AddCause(err).Build()
return HandleVErrAndExitCode(verr, usage)
}
return 0
}
// MigrateDatabase migrates the NomsBinFormat of |dEnv.DoltDB|.
func MigrateDatabase(ctx context.Context, dEnv *env.DoltEnv) error {
menv, err := migrate.NewEnvironment(ctx, dEnv)
if err != nil {
return err
}
p, err := menv.Migration.FS.Abs(".")
if err != nil {
return err
}
cli.Println("migrating database at tmp dir: ", p)
err = migrate.TraverseDAG(ctx, menv.Existing.DoltDB, menv.Migration.DoltDB)
if err != nil {
return err
}
return migrate.SwapChunkStores(ctx, menv)
}

View File

@@ -80,8 +80,8 @@ func (c *Commit) ParentHashes(ctx context.Context) ([]hash.Hash, error) {
}
// NumParents gets the number of parents a commit has.
func (c *Commit) NumParents() (int, error) {
return len(c.parents), nil
func (c *Commit) NumParents() int {
return len(c.parents)
}
func (c *Commit) Height() (uint64, error) {
@@ -180,14 +180,11 @@ func (c *Commit) GetAncestor(ctx context.Context, as *AncestorSpec) (*Commit, er
instructions := as.Instructions
for _, inst := range instructions {
n, err := cur.NumParents()
if err != nil {
return nil, err
}
if inst >= n {
if inst >= cur.NumParents() {
return nil, ErrInvalidAncestorSpec
}
var err error
cur, err = cur.GetParent(ctx, inst)
if err != nil {
return nil, err

View File

@@ -47,7 +47,7 @@ const (
)
const (
creationBranch = "create"
CreationBranch = "create"
defaultChunksPerTF = 256 * 1024
)
@@ -79,6 +79,12 @@ func DoltDBFromCS(cs chunks.ChunkStore) *DoltDB {
return &DoltDB{hooksDatabase{Database: db}, vrw, ns}
}
// HackDatasDatabaseFromDoltDB unwraps a DoltDB to a datas.Database.
// Deprecated: only for use in dolt migrate.
func HackDatasDatabaseFromDoltDB(ddb *DoltDB) datas.Database {
return ddb.db
}
// LoadDoltDB will acquire a reference to the underlying noms db. If the Location is InMemDoltDB then a reference
// to a newly created in memory database will be used. If the location is LocalDirDoltDB, the directory must exist or
// this returns nil.
@@ -155,7 +161,7 @@ func (ddb *DoltDB) WriteEmptyRepoWithCommitTimeAndDefaultBranch(
panic("Passed bad name or email. Both should be valid")
}
ds, err := ddb.db.GetDataset(ctx, creationBranch)
ds, err := ddb.db.GetDataset(ctx, CreationBranch)
if err != nil {
return err
@@ -181,7 +187,7 @@ func (ddb *DoltDB) WriteEmptyRepoWithCommitTimeAndDefaultBranch(
commitOpts := datas.CommitOptions{Meta: cm}
cb := ref.NewInternalRef(creationBranch)
cb := ref.NewInternalRef(CreationBranch)
ds, err = ddb.db.GetDataset(ctx, cb.String())
if err != nil {
@@ -413,6 +419,15 @@ func (ddb *DoltDB) ReadRootValue(ctx context.Context, h hash.Hash) (*RootValue,
return decodeRootNomsValue(ddb.vrw, ddb.ns, val)
}
// ReadCommit reads the Commit whose hash is |h|, if one exists.
func (ddb *DoltDB) ReadCommit(ctx context.Context, h hash.Hash) (*Commit, error) {
c, err := datas.LoadCommitAddr(ctx, ddb.vrw, h)
if err != nil {
return nil, err
}
return NewCommit(ctx, ddb.vrw, ddb.ns, c)
}
// Commit will update a branch's head value to be that of a previously committed root value hash
func (ddb *DoltDB) Commit(ctx context.Context, valHash hash.Hash, dref ref.DoltRef, cm *datas.CommitMeta) (*Commit, error) {
if dref.GetType() != ref.BranchRefType {
@@ -525,10 +540,13 @@ func (ddb *DoltDB) CommitWithParentCommits(ctx context.Context, valHash hash.Has
parents = append(parents, addr)
}
}
commitOpts := datas.CommitOptions{Parents: parents, Meta: cm}
ds, err = ddb.db.GetDataset(ctx, dref.String())
return ddb.CommitValue(ctx, dref, val, commitOpts)
}
func (ddb *DoltDB) CommitValue(ctx context.Context, dref ref.DoltRef, val types.Value, commitOpts datas.CommitOptions) (*Commit, error) {
ds, err := ddb.db.GetDataset(ctx, dref.String())
if err != nil {
return nil, err
}
@@ -573,9 +591,16 @@ func (ddb *DoltDB) CommitDanglingWithParentCommits(ctx context.Context, valHash
}
parents = append(parents, addr)
}
commitOpts := datas.CommitOptions{Parents: parents, Meta: cm}
dcommit, err := datas.NewCommitForValue(ctx, datas.ChunkStoreFromDatabase(ddb.db), ddb.vrw, ddb.ns, val, commitOpts)
return ddb.CommitDangling(ctx, val, commitOpts)
}
// CommitDangling creates a new Commit for |val| that is not referenced by any DoltRef.
func (ddb *DoltDB) CommitDangling(ctx context.Context, val types.Value, opts datas.CommitOptions) (*Commit, error) {
cs := datas.ChunkStoreFromDatabase(ddb.db)
dcommit, err := datas.NewCommitForValue(ctx, cs, ddb.vrw, ddb.ns, val, opts)
if err != nil {
return nil, err
}
@@ -633,10 +658,7 @@ func (ddb *DoltDB) ResolveParent(ctx context.Context, commit *Commit, parentIdx
}
func (ddb *DoltDB) ResolveAllParents(ctx context.Context, commit *Commit) ([]*Commit, error) {
num, err := commit.NumParents()
if err != nil {
return nil, err
}
num := commit.NumParents()
resolved := make([]*Commit, num)
for i := 0; i < num; i++ {
parent, err := ddb.ResolveParent(ctx, commit, i)

View File

@@ -325,7 +325,7 @@ func TestLDNoms(t *testing.T) {
assert.Equal(t, len(branches), 1)
assert.Equal(t, branches[0].Ref.GetPath(), "master")
numParents, err := commit.NumParents()
numParents := commit.NumParents()
assert.NoError(t, err)
if numParents != 1 {

View File

@@ -1249,3 +1249,9 @@ func NewDataCacheKey(rv *RootValue) (DataCacheKey, error) {
return DataCacheKey{hash}, nil
}
// HackNomsValuesFromRootValues unwraps a RootVal to a noms Value.
// Deprecated: only for use in dolt migrate.
func HackNomsValuesFromRootValues(root *RootValue) types.Value {
return root.nomsValue()
}

View File

@@ -0,0 +1,247 @@
// Copyright 2022 Dolthub, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package migrate
import (
"context"
"fmt"
"path/filepath"
"strings"
"time"
"github.com/dolthub/dolt/go/libraries/doltcore/dbfactory"
"github.com/dolthub/dolt/go/libraries/doltcore/doltdb"
"github.com/dolthub/dolt/go/libraries/doltcore/env"
"github.com/dolthub/dolt/go/libraries/doltcore/ref"
"github.com/dolthub/dolt/go/libraries/utils/earl"
"github.com/dolthub/dolt/go/libraries/utils/filesys"
"github.com/dolthub/dolt/go/store/datas"
"github.com/dolthub/dolt/go/store/types"
)
const (
doltDir = dbfactory.DoltDir
nomsDir = dbfactory.DataDir
manifestFile = "manifest"
migrationRef = "migration"
)
var (
targetFormat = types.Format_DOLT_DEV
migrationMsg = fmt.Sprintf("migrating database to Noms Binary Format %s", targetFormat.VersionString())
)
// Environment is a migration environment.
type Environment struct {
Migration *env.DoltEnv
Existing *env.DoltEnv
}
// NewEnvironment creates a migration Environment for |existing|.
func NewEnvironment(ctx context.Context, existing *env.DoltEnv) (Environment, error) {
mfs, err := getMigrateFS(existing.FS)
if err != nil {
return Environment{}, err
}
if err = initMigrationDB(ctx, existing, existing.FS, mfs); err != nil {
return Environment{}, err
}
mdb, err := doltdb.LoadDoltDB(ctx, targetFormat, doltdb.LocalDirDoltDB, mfs)
if err != nil {
return Environment{}, err
}
config, err := env.LoadDoltCliConfig(env.GetCurrentUserHomeDir, mfs)
if err != nil {
return Environment{}, err
}
migration := &env.DoltEnv{
Version: existing.Version,
Config: config,
RepoState: existing.RepoState,
DoltDB: mdb,
FS: mfs,
//urlStr: urlStr,
//hdp: hdp,
}
return Environment{
Migration: migration,
Existing: existing,
}, nil
}
func initMigrationDB(ctx context.Context, existing *env.DoltEnv, src, dest filesys.Filesys) (err error) {
base, err := src.Abs(".")
if err != nil {
return err
}
ierr := src.Iter(doltDir, true, func(path string, size int64, isDir bool) (stop bool) {
if isDir {
err = dest.MkDirs(path)
stop = err != nil
return
}
if strings.Contains(path, nomsDir) {
return
}
path, err = filepath.Rel(base, path)
if err != nil {
stop = true
return
}
if err = filesys.CopyFile(path, path, src, dest); err != nil {
stop = true
return
}
return
})
if ierr != nil {
return ierr
}
if err != nil {
return err
}
dd, err := dest.Abs(filepath.Join(doltDir, nomsDir))
if err != nil {
return err
}
if err = dest.MkDirs(dd); err != nil {
return err
}
u, err := earl.Parse(dd)
if err != nil {
return err
}
db, vrw, ns, err := dbfactory.FileFactory{}.CreateDB(ctx, targetFormat, u, nil)
if err != nil {
return err
}
// write init commit for migration
name, email, err := env.GetNameAndEmail(existing.Config)
if err != nil {
return err
}
meta, err := datas.NewCommitMeta(name, email, migrationMsg)
if err != nil {
return err
}
rv, err := doltdb.EmptyRootValue(ctx, vrw, ns)
if err != nil {
return err
}
nv := doltdb.HackNomsValuesFromRootValues(rv)
ds, err := db.GetDataset(ctx, ref.NewInternalRef(migrationRef).String())
if err != nil {
return err
}
_, err = db.Commit(ctx, ds, nv, datas.CommitOptions{Meta: meta})
return nil
}
// SwapChunkStores atomically swaps the ChunkStores of |menv.Migration| and |menv.Existing|.
func SwapChunkStores(ctx context.Context, menv Environment) error {
src, dest := menv.Migration.FS, menv.Existing.FS
absSrc, err := src.Abs(filepath.Join(doltDir, nomsDir))
if err != nil {
return err
}
absDest, err := dest.Abs(filepath.Join(doltDir, nomsDir))
if err != nil {
return err
}
var cpErr error
err = src.Iter(absSrc, true, func(p string, size int64, isDir bool) (stop bool) {
if strings.Contains(p, manifestFile) || isDir {
return
}
var relPath string
if relPath, cpErr = filepath.Rel(absSrc, p); cpErr != nil {
stop = true
return
}
srcPath := filepath.Join(absSrc, relPath)
destPath := filepath.Join(absDest, relPath)
if cpErr = filesys.CopyFile(srcPath, destPath, src, dest); cpErr != nil {
stop = true
}
return
})
if err != nil {
return err
}
if cpErr != nil {
return cpErr
}
return swapManifests(ctx, src, dest)
}
func swapManifests(ctx context.Context, src, dest filesys.Filesys) (err error) {
// backup the current manifest
manifest := filepath.Join(doltDir, nomsDir, manifestFile)
bak := filepath.Join(doltDir, nomsDir, manifestFile+".bak")
if err = filesys.CopyFile(manifest, bak, dest, dest); err != nil {
return err
}
// copy manifest to |dest| under temporary name
tmp := filepath.Join(doltDir, nomsDir, "temp-manifest")
if err = filesys.CopyFile(manifest, tmp, src, dest); err != nil {
return err
}
// atomically swap the manifests
return dest.MoveFile(tmp, manifest)
// exit immediately!
}
func getMigrateFS(existing filesys.Filesys) (filesys.Filesys, error) {
uniq := fmt.Sprintf("dolt_migration_%d", time.Now().UnixNano())
tmpPath := filepath.Join(existing.TempDir(), uniq)
if err := existing.MkDirs(tmpPath); err != nil {
return nil, err
}
mfs, err := filesys.LocalFilesysWithWorkingDir(tmpPath)
if err != nil {
return nil, err
}
if err = mfs.MkDirs(doltDir); err != nil {
return nil, err
}
return mfs, nil
}

View File

@@ -0,0 +1,87 @@
// Copyright 2022 Dolthub, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package migrate
import (
"context"
"fmt"
"github.com/dolthub/dolt/go/libraries/doltcore/doltdb"
"github.com/dolthub/dolt/go/store/hash"
)
type ChunkMapping interface {
Has(ctx context.Context, addr hash.Hash) (bool, error)
Get(ctx context.Context, addr hash.Hash) (hash.Hash, error)
Put(ctx context.Context, old, new hash.Hash) error
}
type CommitStack interface {
Push(ctx context.Context, cm *doltdb.Commit) error
Pop(ctx context.Context) (*doltdb.Commit, error)
}
type Progress interface {
ChunkMapping
CommitStack
Log(ctx context.Context, format string, args ...any)
}
type memoryProgress struct {
stack []*doltdb.Commit
mapping map[hash.Hash]hash.Hash
}
func newProgress() Progress {
return &memoryProgress{
stack: make([]*doltdb.Commit, 0, 128),
mapping: make(map[hash.Hash]hash.Hash, 128),
}
}
func (mem *memoryProgress) Has(ctx context.Context, addr hash.Hash) (ok bool, err error) {
_, ok = mem.mapping[addr]
return
}
func (mem *memoryProgress) Get(ctx context.Context, old hash.Hash) (new hash.Hash, err error) {
new = mem.mapping[old]
return
}
func (mem *memoryProgress) Put(ctx context.Context, old, new hash.Hash) (err error) {
mem.mapping[old] = new
return
}
func (mem *memoryProgress) Push(ctx context.Context, cm *doltdb.Commit) (err error) {
mem.stack = append(mem.stack, cm)
return
}
func (mem *memoryProgress) Pop(ctx context.Context) (cm *doltdb.Commit, err error) {
if len(mem.stack) == 0 {
return nil, nil
}
top := len(mem.stack) - 1
cm = mem.stack[top]
mem.stack = mem.stack[:top]
return
}
func (mem *memoryProgress) Log(ctx context.Context, format string, args ...any) {
fmt.Println(fmt.Sprintf(format, args...))
}

View File

@@ -0,0 +1,296 @@
// Copyright 2022 Dolthub, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package migrate
import (
"context"
"github.com/dolthub/dolt/go/libraries/doltcore/ref"
"github.com/dolthub/dolt/go/libraries/doltcore/doltdb"
"github.com/dolthub/dolt/go/libraries/doltcore/doltdb/durable"
"github.com/dolthub/dolt/go/libraries/doltcore/schema"
"github.com/dolthub/dolt/go/store/datas"
"github.com/dolthub/dolt/go/store/hash"
"github.com/dolthub/dolt/go/store/types"
)
func migrateWorkingSet(ctx context.Context, wsRef ref.WorkingSetRef, old, new *doltdb.DoltDB, prog Progress) error {
oldWs, err := old.ResolveWorkingSet(ctx, wsRef)
if err != nil {
return err
}
wr, err := migrateRoot(ctx, oldWs.WorkingRoot(), new)
if err != nil {
return err
}
sr, err := migrateRoot(ctx, oldWs.StagedRoot(), new)
if err != nil {
return err
}
newWs := doltdb.EmptyWorkingSet(wsRef).WithWorkingRoot(wr).WithStagedRoot(sr)
return new.UpdateWorkingSet(ctx, wsRef, newWs, hash.Hash{}, oldWs.Meta())
}
func migrateCommit(ctx context.Context, cm *doltdb.Commit, new *doltdb.DoltDB, prog Progress) error {
oldHash, err := cm.HashOf()
if err != nil {
return err
}
ok, err := prog.Has(ctx, oldHash)
if err != nil {
return err
} else if ok {
return nil
}
if cm.NumParents() == 0 {
return migrateInitCommit(ctx, cm, new, prog)
}
prog.Log(ctx, "migrating commit %s", oldHash.String())
root, err := cm.GetRootValue(ctx)
if err != nil {
return err
}
mRoot, err := migrateRoot(ctx, root, new)
if err != nil {
return err
}
_, addr, err := new.WriteRootValue(ctx, mRoot)
if err != nil {
return err
}
value, err := new.ValueReadWriter().ReadValue(ctx, addr)
if err != nil {
return err
}
opts, err := migrateCommitOptions(ctx, cm, prog)
if err != nil {
return err
}
migratedCm, err := new.CommitDangling(ctx, value, opts)
if err != nil {
return err
}
// update progress
newHash, err := migratedCm.HashOf()
if err != nil {
return err
}
return prog.Put(ctx, oldHash, newHash)
}
func migrateInitCommit(ctx context.Context, cm *doltdb.Commit, new *doltdb.DoltDB, prog Progress) error {
oldHash, err := cm.HashOf()
if err != nil {
return err
}
rv, err := doltdb.EmptyRootValue(ctx, new.ValueReadWriter(), new.NodeStore())
if err != nil {
return err
}
nv := doltdb.HackNomsValuesFromRootValues(rv)
meta, err := cm.GetCommitMeta(ctx)
if err != nil {
return err
}
datasDB := doltdb.HackDatasDatabaseFromDoltDB(new)
creation := ref.NewInternalRef(doltdb.CreationBranch)
ds, err := datasDB.GetDataset(ctx, creation.String())
if err != nil {
return err
}
ds, err = datasDB.Commit(ctx, ds, nv, datas.CommitOptions{Meta: meta})
if err != nil {
return err
}
newCm, err := new.ResolveCommitRef(ctx, creation)
if err != nil {
return err
}
newHash, err := newCm.HashOf()
if err != nil {
return err
}
return prog.Put(ctx, oldHash, newHash)
}
func migrateCommitOptions(ctx context.Context, oldCm *doltdb.Commit, prog Progress) (datas.CommitOptions, error) {
parents, err := oldCm.ParentHashes(ctx)
if err != nil {
return datas.CommitOptions{}, err
}
if len(parents) == 0 {
panic("expected non-zero parents list")
}
for i := range parents {
migrated, err := prog.Get(ctx, parents[i])
if err != nil {
return datas.CommitOptions{}, err
}
parents[i] = migrated
}
meta, err := oldCm.GetCommitMeta(ctx)
if err != nil {
return datas.CommitOptions{}, err
}
return datas.CommitOptions{
Parents: parents,
Meta: meta,
}, nil
}
func migrateRoot(ctx context.Context, root *doltdb.RootValue, new *doltdb.DoltDB) (*doltdb.RootValue, error) {
migrated, err := doltdb.EmptyRootValue(ctx, new.ValueReadWriter(), new.NodeStore())
if err != nil {
return nil, err
}
fkc, err := root.GetForeignKeyCollection(ctx)
if err != nil {
return nil, err
}
migrated, err = migrated.PutForeignKeyCollection(ctx, fkc)
if err != nil {
return nil, err
}
err = root.IterTables(ctx, func(name string, tbl *doltdb.Table, _ schema.Schema) (bool, error) {
mtbl, err := migrateTable(ctx, tbl, new)
if err != nil {
return true, err
}
migrated, err = migrated.PutTable(ctx, name, mtbl)
if err != nil {
return true, err
}
return false, nil
})
if err != nil {
return nil, err
}
return migrated, nil
}
func migrateTable(ctx context.Context, table *doltdb.Table, new *doltdb.DoltDB) (*doltdb.Table, error) {
rows, err := table.GetRowData(ctx)
if err != nil {
return nil, err
}
err = migrateNomsMap(ctx, rows, table.ValueReadWriter(), new.ValueReadWriter())
if err != nil {
return nil, err
}
ai, err := table.GetAutoIncrementValue(ctx)
if err != nil {
return nil, err
}
autoInc := types.Uint(ai)
sch, err := table.GetSchema(ctx)
if err != nil {
return nil, err
}
oldSet, err := table.GetIndexSet(ctx)
if err != nil {
return nil, err
}
newSet, err := migrateIndexSet(ctx, sch, oldSet, table.ValueReadWriter(), new)
if err != nil {
return nil, err
}
return doltdb.NewTable(ctx, new.ValueReadWriter(), new.NodeStore(), sch, rows, newSet, autoInc)
}
func migrateIndexSet(ctx context.Context, sch schema.Schema, oldSet durable.IndexSet, old types.ValueReadWriter, new *doltdb.DoltDB) (durable.IndexSet, error) {
newSet := durable.NewIndexSet(ctx, new.ValueReadWriter(), new.NodeStore())
for _, def := range sch.Indexes().AllIndexes() {
idx, err := oldSet.GetIndex(ctx, sch, def.Name())
if err != nil {
return nil, err
}
if err = migrateNomsMap(ctx, idx, old, new.ValueReadWriter()); err != nil {
return nil, err
}
newSet, err = newSet.PutIndex(ctx, def.Name(), idx)
if err != nil {
return nil, err
}
}
return newSet, nil
}
func migrateNomsMap(ctx context.Context, idx durable.Index, old, new types.ValueReadWriter) error {
m := durable.NomsMapFromIndex(idx)
return copyTreeFromValue(ctx, m, old, new)
}
// copyTreeFromValue recursively copies |v| and all its children from |old| to |new|.
func copyTreeFromValue(ctx context.Context, v types.Value, old, new types.ValueReadWriter) error {
if _, err := new.WriteValue(ctx, v); err != nil {
return err
}
return types.WalkAddrs(v, old.Format(), func(h hash.Hash, isleaf bool) error {
if err := copyValue(ctx, h, old, new); err != nil {
return err
}
if isleaf {
return nil
}
val, err := old.ReadValue(ctx, h)
if err != nil {
return err
}
return copyTreeFromValue(ctx, val, old, new)
})
}
func copyValue(ctx context.Context, addr hash.Hash, old, new types.ValueReadWriter) (err error) {
var v types.Value
if v, err = old.ReadValue(ctx, addr); err != nil {
return err
}
_, err = new.WriteValue(ctx, v)
return
}

View File

@@ -0,0 +1,177 @@
// Copyright 2022 Dolthub, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package migrate
import (
"context"
"fmt"
"github.com/dolthub/dolt/go/store/hash"
"github.com/dolthub/dolt/go/libraries/doltcore/doltdb"
"github.com/dolthub/dolt/go/libraries/doltcore/ref"
)
// TraverseDAG traverses |old|, migrating values to |new|.
func TraverseDAG(ctx context.Context, old, new *doltdb.DoltDB) error {
heads, err := old.GetHeadRefs(ctx)
if err != nil {
return err
}
prog := newProgress()
for i := range heads {
if err = traverseRefHistory(ctx, heads[i], old, new, prog); err != nil {
return err
}
}
if err = validateMigration(ctx, old, new); err != nil {
return err
}
return nil
}
func traverseRefHistory(ctx context.Context, r ref.DoltRef, old, new *doltdb.DoltDB, prog Progress) error {
switch r.GetType() {
case ref.BranchRefType:
if err := traverseBranchHistory(ctx, r, old, new, prog); err != nil {
return err
}
wsRef, err := ref.WorkingSetRefForHead(r)
if err != nil {
return err
}
return migrateWorkingSet(ctx, wsRef, old, new, prog)
case ref.TagRefType:
return traverseTagHistory(ctx, r.(ref.TagRef), old, new, prog)
case ref.RemoteRefType:
return traverseBranchHistory(ctx, r, old, new, prog)
case ref.WorkspaceRefType, ref.InternalRefType:
return nil
default:
panic(fmt.Sprintf("unknown ref type %s", r.String()))
}
}
func traverseBranchHistory(ctx context.Context, r ref.DoltRef, old, new *doltdb.DoltDB, prog Progress) error {
cm, err := old.ResolveCommitRef(ctx, r)
if err != nil {
return err
}
if err = traverseCommitHistory(ctx, cm, new, prog); err != nil {
return err
}
oldHash, err := cm.HashOf()
if err != nil {
return err
}
newHash, err := prog.Get(ctx, oldHash)
if err != nil {
return err
}
return new.SetHead(ctx, r, newHash)
}
func traverseTagHistory(ctx context.Context, r ref.TagRef, old, new *doltdb.DoltDB, prog Progress) error {
t, err := old.ResolveTag(ctx, r)
if err != nil {
return err
}
if err = traverseCommitHistory(ctx, t.Commit, new, prog); err != nil {
return err
}
oldHash, err := t.Commit.HashOf()
if err != nil {
return err
}
newHash, err := prog.Get(ctx, oldHash)
if err != nil {
return err
}
cm, err := new.ReadCommit(ctx, newHash)
if err != nil {
return err
}
return new.NewTagAtCommit(ctx, r, cm, t.Meta)
}
func traverseCommitHistory(ctx context.Context, cm *doltdb.Commit, new *doltdb.DoltDB, prog Progress) error {
ch, err := cm.HashOf()
if err != nil {
return err
}
ok, err := prog.Has(ctx, ch)
if err != nil || ok {
return err
}
for {
ph, err := cm.ParentHashes(ctx)
if err != nil {
return err
}
idx, err := firstAbsent(ctx, prog, ph)
if err != nil {
return err
}
if idx < 0 {
// parents for |cm| are done, migrate |cm|
if err = migrateCommit(ctx, cm, new, prog); err != nil {
return err
}
// pop the stack, traverse upwards
cm, err = prog.Pop(ctx)
if err != nil {
return err
}
if cm == nil {
return nil // done
}
continue
}
// push the stack, traverse downwards
if err = prog.Push(ctx, cm); err != nil {
return err
}
cm, err = cm.GetParent(ctx, idx)
if err != nil {
return err
}
}
}
func firstAbsent(ctx context.Context, p Progress, addrs []hash.Hash) (int, error) {
for i := range addrs {
ok, err := p.Has(ctx, addrs[i])
if err != nil {
return -1, err
}
if !ok {
return i, nil
}
}
return -1, nil
}

View File

@@ -0,0 +1,48 @@
// Copyright 2022 Dolthub, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package migrate
import (
"context"
"fmt"
"github.com/dolthub/dolt/go/libraries/doltcore/doltdb"
)
func validateMigration(ctx context.Context, old, new *doltdb.DoltDB) error {
if err := validateBranchMapping(ctx, old, new); err != nil {
return err
}
return nil
}
func validateBranchMapping(ctx context.Context, old, new *doltdb.DoltDB) error {
branches, err := old.GetBranches(ctx)
if err != nil {
return err
}
var ok bool
for _, bref := range branches {
ok, err = new.HasBranch(ctx, bref.GetPath())
if err != nil {
return err
}
if !ok {
return fmt.Errorf("failed to map branch %s", bref.GetPath())
}
}
return nil
}

View File

@@ -31,8 +31,7 @@ type NeedsRebaseFn func(ctx context.Context, cm *doltdb.Commit) (bool, error)
// EntireHistory returns a |NeedsRebaseFn| that rebases the entire commit history.
func EntireHistory() NeedsRebaseFn {
return func(_ context.Context, cm *doltdb.Commit) (bool, error) {
n, err := cm.NumParents()
return n != 0, err
return cm.NumParents() != 0, nil
}
}
@@ -54,11 +53,7 @@ func StopAtCommit(stopCommit *doltdb.Commit) NeedsRebaseFn {
return false, nil
}
n, err := cm.NumParents()
if err != nil {
return false, err
}
if n == 0 {
if cm.NumParents() == 0 {
return false, fmt.Errorf("commit %s is missing from the commit history of at least one rebase head", sh)
}

View File

@@ -184,17 +184,20 @@ func TestJSONStructuralSharing(t *testing.T) {
val := MustNomsJSONWithVRW(vrw, sb.String())
json_refs := make(hash.HashSet)
err := types.WalkAddrs(types.JSON(val), vrw.Format(), func(h hash.Hash, _ bool) {
err := types.WalkAddrs(types.JSON(val), vrw.Format(), func(h hash.Hash, _ bool) error {
json_refs.Insert(h)
return nil
})
require.NoError(t, err)
tup, err := types.NewTuple(types.Format_Default, types.Int(12), types.JSON(val))
require.NoError(t, err)
tuple_refs := make(hash.HashSet)
types.WalkAddrs(tup, vrw.Format(), func(h hash.Hash, _ bool) {
err = types.WalkAddrs(tup, vrw.Format(), func(h hash.Hash, _ bool) error {
tuple_refs.Insert(h)
return nil
})
assert.NoError(t, err)
assert.Greater(t, len(json_refs), 0)
assert.Equal(t, len(json_refs), len(tuple_refs))

View File

@@ -71,6 +71,9 @@ type WritableFS interface {
// MoveFile will move a file from the srcPath in the filesystem to the destPath
MoveFile(srcPath, destPath string) error
// TempDir returns the path of a new temporary directory.
TempDir() string
}
// FSIterCB specifies the signature of the function that will be called for every item found while iterating.
@@ -111,3 +114,18 @@ func UnmarshalJSONFile(fs ReadableFS, path string, dest interface{}) error {
return json.Unmarshal(data, dest)
}
func CopyFile(srcPath, destPath string, srcFS, destFS Filesys) (err error) {
rd, err := srcFS.OpenForRead(srcPath)
if err != nil {
return err
}
wr, err := destFS.OpenForWrite(destPath, os.ModePerm)
if err != nil {
return err
}
_, err = io.Copy(wr, rd)
return
}

View File

@@ -103,6 +103,20 @@ func TestFilesystems(t *testing.T) {
dataRead, err = fs.ReadFile(movedFilePath)
require.NoError(t, err)
require.Equal(t, dataRead, data)
tmp := fs.TempDir()
require.NotEmpty(t, tmp)
fp2 := filepath.Join(tmp, "data.txt")
wrc, err := fs.OpenForWrite(fp2, os.ModePerm)
require.NoError(t, err)
require.NoError(t, wrc.Close())
// Test writing/reading random data to tmp file
err = fs.WriteFile(fp2, data)
require.NoError(t, err)
dataRead, err = fs.ReadFile(fp2)
require.NoError(t, err)
require.Equal(t, dataRead, data)
})
}
}

View File

@@ -16,8 +16,10 @@ package filesys
import (
"bytes"
"encoding/base32"
"errors"
"io"
"math/rand"
"os"
"path/filepath"
"strings"
@@ -535,6 +537,44 @@ func (fs *InMemFS) MoveFile(srcPath, destPath string) error {
return os.ErrNotExist
}
func (fs *InMemFS) CopyFile(srcPath, destPath string) error {
fs.rwLock.Lock()
defer fs.rwLock.Unlock()
srcPath = fs.getAbsPath(srcPath)
destPath = fs.getAbsPath(destPath)
if exists, destIsDir := fs.exists(destPath); exists && destIsDir {
return ErrIsDir
}
if obj, ok := fs.objs[srcPath]; ok {
if obj.isDir() {
return ErrIsDir
}
destDir := filepath.Dir(destPath)
destParentDir, err := fs.mkDirs(destDir)
if err != nil {
return err
}
destData := make([]byte, len(obj.(*memFile).data))
copy(destData, obj.(*memFile).data)
now := InMemNowFunc()
destObj := &memFile{destPath, destData, destParentDir, now}
fs.objs[destPath] = destObj
destParentDir.objs[destPath] = destObj
destParentDir.time = now
return nil
}
return os.ErrNotExist
}
// converts a path to an absolute path. If it's already an absolute path the input path will be returned unaltered
func (fs *InMemFS) Abs(path string) (string, error) {
path = fs.pathToNative(path)
@@ -559,6 +599,13 @@ func (fs *InMemFS) LastModified(path string) (t time.Time, exists bool) {
return time.Time{}, false
}
func (fs *InMemFS) TempDir() string {
buf := make([]byte, 16)
rand.Read(buf)
s := base32.HexEncoding.EncodeToString(buf)
return "/var/folders/gc/" + s + "/T/"
}
func (fs *InMemFS) pathToNative(path string) string {
if len(path) >= 1 {
if path[0] == '.' {

View File

@@ -318,3 +318,7 @@ func (fs *localFS) LastModified(path string) (t time.Time, exists bool) {
return stat.ModTime(), true
}
func (fs *localFS) TempDir() string {
return os.TempDir()
}

View File

@@ -152,8 +152,9 @@ func (s *nomsShowTestSuite) TestNomsShowRaw() {
s.NoError(err)
numChildChunks := 0
err = types.WalkAddrs(l, vrw.Format(), func(_ hash.Hash, _ bool) {
err = types.WalkAddrs(l, vrw.Format(), func(_ hash.Hash, _ bool) error {
numChildChunks++
return nil
})
s.NoError(err)
s.True(numChildChunks > 0)

View File

@@ -51,6 +51,7 @@ import (
"fmt"
"regexp"
"strconv"
"strings"
"github.com/dolthub/dolt/go/store/d"
)
@@ -191,3 +192,17 @@ func (hs HashSet) Empty() {
delete(hs, h)
}
}
func (hs HashSet) String() string {
var sb strings.Builder
sb.Grow(len(hs)*34 + 100)
sb.WriteString("HashSet {\n")
for h := range hs {
sb.WriteString("\t")
sb.WriteString(h.String())
sb.WriteString("\n")
}
sb.WriteString("}\n")
return sb.String()
}

View File

@@ -130,11 +130,12 @@ func main() {
orderedChildren := hash.HashSlice{}
nextLevel := hash.HashSlice{}
for _, h := range current {
_ = types.WalkAddrs(currentValues[h], types.Format_Default, func(h hash.Hash, isleaf bool) {
_ = types.WalkAddrs(currentValues[h], types.Format_Default, func(h hash.Hash, isleaf bool) error {
orderedChildren = append(orderedChildren, h)
if !visited[h] && !isleaf {
nextLevel = append(nextLevel, h)
}
return nil
})
}

View File

@@ -627,7 +627,7 @@ func (m Map) HumanReadableString() string {
}
// VisitMapLevelOrder writes hashes of internal node chunks to a writer
// delimited with a newline character and returns the number or chunks written and the total number of
// delimited with a newline character and returns the number of chunks written and the total number of
// bytes written or an error if encountered
func VisitMapLevelOrder(m Map, cb func(h hash.Hash) (int64, error)) (int64, int64, error) {
chunkCount := int64(0)

View File

@@ -242,9 +242,8 @@ func WalkAddrsForNBF(nbf *NomsBinFormat) func(chunks.Chunk, func(h hash.Hash, is
}
}
func WalkAddrs(v Value, nbf *NomsBinFormat, cb func(h hash.Hash, isleaf bool)) error {
func WalkAddrs(v Value, nbf *NomsBinFormat, cb func(h hash.Hash, isleaf bool) error) error {
return v.walkRefs(nbf, func(r Ref) error {
cb(r.TargetHash(), r.Height() == 1)
return nil
return cb(r.TargetHash(), r.Height() == 1)
})
}

View File

@@ -95,7 +95,8 @@ func PanicIfDangling(ctx context.Context, unresolved hash.HashSet, cs chunks.Chu
d.PanicIfError(err)
if len(absent) != 0 {
d.Panic("Found dangling references to %v", absent)
s := absent.String()
d.Panic("Found dangling references to %s", s)
}
}

View File

@@ -69,6 +69,12 @@ skip_nbf_dolt_1() {
fi
}
skip_nbf_dolt_dev() {
if [ "$DOLT_DEFAULT_BIN_FORMAT" = "__DOLT_DEV__" ]; then
skip "skipping test for nomsBinFormat __DOLT_DEV__"
fi
}
setup_common() {
setup_no_dolt_init
dolt init

View File

@@ -0,0 +1,133 @@
#!/usr/bin/env bats
load $BATS_TEST_DIRNAME/helper/common.bash
setup() {
skip_nbf_dolt_1
skip_nbf_dolt_dev
TARGET_NBF="__DOLT_DEV__"
setup_common
}
teardown() {
teardown_common
}
function checksum_table {
QUERY="SELECT GROUP_CONCAT(column_name) FROM information_schema.columns WHERE table_name = '$1'"
COLUMNS=$( dolt sql -q "$QUERY" -r csv | tail -n1 | sed 's/"//g' )
dolt sql -q "SELECT CAST(SUM(CRC32(CONCAT($COLUMNS))) AS UNSIGNED) FROM $1 AS OF '$2';" -r csv | tail -n1
}
@test "migrate: smoke test" {
dolt sql <<SQL
CREATE TABLE test (pk int primary key, c0 int, c1 int);
INSERT INTO test VALUES (0,0,0);
CALL dadd('-A');
CALL dcommit('-am', 'added table test');
SQL
CHECKSUM=$(checksum_table test head)
run cat ./.dolt/noms/manifest
[[ "$output" =~ "__LD_1__" ]] || false
[[ ! "$output" =~ "$TARGET_NBF" ]] || false
dolt migrate
run cat ./.dolt/noms/manifest
[[ "$output" =~ "$TARGET_NBF" ]] || false
[[ ! "$output" =~ "__LD_1__" ]] || false
run checksum_table test head
[[ "$output" =~ "$CHECKSUM" ]] || false
run dolt sql -q "SELECT count(*) FROM dolt_commits" -r csv
[ $status -eq 0 ]
[[ "$output" =~ "2" ]] || false
}
@test "migrate: manifest backup" {
dolt sql <<SQL
CREATE TABLE test (pk int primary key, c0 int, c1 int);
INSERT INTO test VALUES (0,0,0);
CALL dadd('-A');
CALL dcommit('-am', 'added table test');
SQL
dolt migrate
run cat ./.dolt/noms/manifest.bak
[[ "$output" =~ "__LD_1__" ]] || false
[[ ! "$output" =~ "$TARGET_NBF" ]] || false
}
@test "migrate: multiple branches" {
dolt sql <<SQL
CREATE TABLE test (pk int primary key, c0 int, c1 int);
INSERT INTO test VALUES (0,0,0);
CALL dadd('-A');
CALL dcommit('-am', 'added table test');
SQL
dolt branch one
dolt branch two
dolt sql <<SQL
CALL dcheckout('one');
INSERT INTO test VALUES (1,1,1);
CALL dcommit('-am', 'row (1,1,1)');
CALL dcheckout('two');
INSERT INTO test VALUES (2,2,2);
CALL dcommit('-am', 'row (2,2,2)');
CALL dmerge('one');
SQL
MAIN=$(checksum_table test main)
ONE=$(checksum_table test one)
TWO=$(checksum_table test two)
dolt migrate
run cat ./.dolt/noms/manifest
[[ "$output" =~ "$TARGET_NBF" ]] || false
run checksum_table test main
[[ "$output" =~ "$MAIN" ]] || false
run checksum_table test one
[[ "$output" =~ "$ONE" ]] || false
run checksum_table test two
[[ "$output" =~ "$TWO" ]] || false
run dolt sql -q "SELECT count(*) FROM dolt_commits" -r csv
[ $status -eq 0 ]
[[ "$output" =~ "4" ]] || false
}
@test "migrate: tag and working set" {
dolt sql <<SQL
CREATE TABLE test (pk int primary key, c0 int, c1 int);
INSERT INTO test VALUES (0,0,0);
CALL dadd('-A');
CALL dcommit('-am', 'added table test');
CALL dtag('tag1', 'head');
INSERT INTO test VALUES (1,1,1);
CALL dcommit('-am', 'added rows');
INSERT INTO test VALUES (2,2,2);
SQL
HEAD=$(checksum_table test head)
PREV=$(checksum_table test head~1)
TAG=$(checksum_table test tag1)
[ $TAG -eq $PREV ]
dolt migrate
run cat ./.dolt/noms/manifest
[[ "$output" =~ "$TARGET_NBF" ]] || false
run checksum_table test head
[[ "$output" =~ "$HEAD" ]] || false
run checksum_table test head~1
[[ "$output" =~ "$PREV" ]] || false
run checksum_table test tag1
[[ "$output" =~ "$TAG" ]] || false
}