Pulling out drop/undrop code from provider into a new type to encapsulate it

This commit is contained in:
Jason Fulghum
2023-10-03 10:12:46 -07:00
parent 7ac5799066
commit 256b1ecb85
3 changed files with 254 additions and 200 deletions

View File

@@ -18,11 +18,9 @@ import (
"context"
"errors"
"fmt"
"path/filepath"
"sort"
"strings"
"sync"
"time"
"github.com/dolthub/go-mysql-server/sql"
@@ -44,10 +42,6 @@ import (
"github.com/dolthub/dolt/go/store/types"
)
// deletedDatabaseDirectoryName is the subdirectory within the data folder where Dolt moves databases after they are
// dropped. The dolt_undrop() stored procedure is then able to restore them from this location.
const deletedDatabaseDirectoryName = "dolt_deleted_databases"
type DoltDatabaseProvider struct {
// dbLocations maps a database name to its file system root
dbLocations map[string]filesys.Filesys
@@ -58,6 +52,8 @@ type DoltDatabaseProvider struct {
DropDatabaseHook DropDatabaseHook
mu *sync.RWMutex
droppedDatabaseManager *droppedDatabases
defaultBranch string
fs filesys.Filesys
remoteDialer dbfactory.GRPCDialProvider // TODO: why isn't this a method defined on the remote object
@@ -124,16 +120,17 @@ func NewDoltDatabaseProviderWithDatabases(defaultBranch string, fs filesys.Files
}
return DoltDatabaseProvider{
dbLocations: dbLocations,
databases: dbs,
functions: funcs,
externalProcedures: externalProcedures,
mu: &sync.RWMutex{},
fs: fs,
defaultBranch: defaultBranch,
dbFactoryUrl: dbFactoryUrl,
InitDatabaseHook: ConfigureReplicationDatabaseHook,
isStandby: new(bool),
dbLocations: dbLocations,
databases: dbs,
functions: funcs,
externalProcedures: externalProcedures,
mu: &sync.RWMutex{},
fs: fs,
defaultBranch: defaultBranch,
dbFactoryUrl: dbFactoryUrl,
InitDatabaseHook: ConfigureReplicationDatabaseHook,
isStandby: new(bool),
droppedDatabaseManager: newDroppedDatabaseManager(fs),
}, nil
}
@@ -565,23 +562,6 @@ func (p DoltDatabaseProvider) cloneDatabaseFromRemote(
return dEnv, nil
}
// initializeDeletedDatabaseDirectory initializes the special directory Dolt uses to store dropped databases until
// they are fully removed. If the directory is already created and set up correctly, then this method is a no-op.
// If the directory doesn't exist yet, it will be created. If there are any problems initializing the directory, an
// error is returned.
func (p DoltDatabaseProvider) initializeDeletedDatabaseDirectory() error {
exists, isDir := p.fs.Exists(deletedDatabaseDirectoryName)
if exists && !isDir {
return fmt.Errorf("%s exists, but is not a directory", deletedDatabaseDirectoryName)
}
if exists {
return nil
}
return p.fs.MkDirs(deletedDatabaseDirectoryName)
}
// DropDatabase implements the sql.MutableDatabaseProvider interface
func (p DoltDatabaseProvider) DropDatabase(ctx *sql.Context, name string) error {
_, revision := dsess.SplitRevisionDbName(name)
@@ -589,13 +569,10 @@ func (p DoltDatabaseProvider) DropDatabase(ctx *sql.Context, name string) error
return fmt.Errorf("unable to drop revision database: %s", name)
}
// get the case-sensitive name for case-sensitive file systems
// TODO: there are still cases (not server-first) where we rename databases because the directory name would need
// quoting if used as a database name, and that breaks here. We either need the database name to match the directory
// name in all cases, or else keep a mapping from database name to directory on disk.
p.mu.Lock()
defer p.mu.Unlock()
// get the case-sensitive name for case-sensitive file systems
dbKey := formatDbMapKeyName(name)
db := p.databases[dbKey]
@@ -628,195 +605,38 @@ func (p DoltDatabaseProvider) DropDatabase(ctx *sql.Context, name string) error
p.DropDatabaseHook(name)
}
rootDbLoc, err := p.fs.Abs("")
err = p.droppedDatabaseManager.DropDatabase(ctx, name, dropDbLoc)
if err != nil {
return err
}
isRootDatabase := false
//dirToDelete := ""
// if the database is in the directory itself, we remove '.dolt' directory rather than
// the whole directory itself because it can have other databases that are nested.
if rootDbLoc == dropDbLoc {
doltDirExists, _ := p.fs.Exists(dbfactory.DoltDir)
if !doltDirExists {
return sql.ErrDatabaseNotFound.New(db.Name())
}
dropDbLoc = filepath.Join(dropDbLoc, dbfactory.DoltDir)
isRootDatabase = true
} else {
// TODO: Do we really need the code in this block?
// Seems like a few places are checking this.
exists, isDir := p.fs.Exists(dropDbLoc)
// Get the DB's directory
if !exists {
// engine should already protect against this
return sql.ErrDatabaseNotFound.New(db.Name())
} else if !isDir {
return fmt.Errorf("unexpected error: %s exists but is not a directory", dbKey)
}
}
if err = p.initializeDeletedDatabaseDirectory(); err != nil {
return fmt.Errorf("unable to drop database %s: %w", name, err.Error())
}
// Move the dropped database to the Dolt deleted database directory so it can be restored if needed
_, file := filepath.Split(dropDbLoc)
var destinationDirectory string
if isRootDatabase {
// NOTE: This won't work without first creating the new subdirectory
newSubdirectory := filepath.Join(deletedDatabaseDirectoryName, name)
// TODO: If newSubdirectory exists already... then we're in trouble! (need to handle that)
// TODO: Maybe we should have this talk to the DroppedDatabaseVault API instead?
if err := p.fs.MkDirs(newSubdirectory); err != nil {
return err
}
destinationDirectory = filepath.Join(newSubdirectory, file)
} else {
destinationDirectory = filepath.Join(deletedDatabaseDirectoryName, file)
}
// Add the final directory segment and convert all hyphens to underscores in the database directory name
dir, file := filepath.Split(destinationDirectory)
if strings.Contains(file, "-") {
destinationDirectory = filepath.Join(dir, strings.ReplaceAll(file, "-", "_"))
}
if err := p.prepareToMoveDroppedDatabase(ctx, destinationDirectory); err != nil {
return err
}
if err = p.fs.MoveDir(dropDbLoc, destinationDirectory); err != nil {
return err
}
// We not only have to delete tracking metadata for this database, but also for any derivative ones we've stored
// as a result of USE or connection strings
// We not only have to delete tracking metadata for this database, but also for any derivative
// ones we've stored as a result of USE or connection strings
derivativeNamePrefix := strings.ToLower(dbKey + dsess.DbRevisionDelimiter)
for dbName := range p.databases {
if strings.HasPrefix(strings.ToLower(dbName), derivativeNamePrefix) {
delete(p.databases, dbName)
}
}
delete(p.databases, dbKey)
return p.invalidateDbStateInAllSessions(ctx, name)
}
// TODO: Might be helpful to group some of these DoltDeletedDatabaseDirectory helper functions into their own
//
// type. For example... maybe there's a new type for DroppedDatabaseVault that we can interact with?
func (p DoltDatabaseProvider) prepareToMoveDroppedDatabase(_ *sql.Context, targetPath string) error {
if exists, _ := p.fs.Exists(targetPath); !exists {
// If there's nothing at the desired targetPath, we're all set
return nil
}
// If there is something already there, pick a new path to move it to
newPath := fmt.Sprintf("%s.backup.%d", targetPath, time.Now().Unix())
if exists, _ := p.fs.Exists(newPath); exists {
return fmt.Errorf("unable to move existing dropped database out of the way: "+
"tried to move it to %s", newPath)
}
if err := p.fs.MoveFile(targetPath, newPath); err != nil {
return fmt.Errorf("unable to move existing dropped database out of the way: %w", err)
}
return nil
}
func (p DoltDatabaseProvider) ListUndroppableDatabases(_ *sql.Context) ([]string, error) {
if err := p.initializeDeletedDatabaseDirectory(); err != nil {
return nil, fmt.Errorf("unable to list undroppable database: %w", err.Error())
}
databaseNames := make([]string, 0, 5)
callback := func(path string, size int64, isDir bool) (stop bool) {
_, lastPathSegment := filepath.Split(path)
// TODO: Is there a common util we use for this somewhere?
lastPathSegment = strings.ReplaceAll(lastPathSegment, "-", "_")
databaseNames = append(databaseNames, lastPathSegment)
return false
}
if err := p.fs.Iter(deletedDatabaseDirectoryName, false, callback); err != nil {
return nil, err
}
return databaseNames, nil
}
// validateUndropDatabase validates that the database |name| is available to be "undropped" and that no existing
// database is already being managed that has the same (case-insensitive) name. If any problems are encountered,
// an error is returned.
func (p DoltDatabaseProvider) validateUndropDatabase(ctx *sql.Context, name string) (sourcePath, destinationPath, exactCaseName string, err error) {
// TODO: rename to ListDatabasesThatCanBeUndropped(ctx)?
availableDatabases, err := p.ListUndroppableDatabases(ctx)
if err != nil {
return "", "", "", err
}
found := false
exactCaseName = name
lowercaseName := strings.ToLower(name)
for _, s := range availableDatabases {
if lowercaseName == strings.ToLower(s) {
exactCaseName = s
found = true
break
}
}
// TODO: this error creation information could be extracted to a common function that dolt_undrop function could use
if !found {
extraInformation := "there are no databases currently available to be undropped"
if len(availableDatabases) > 0 {
extraInformation = fmt.Sprintf("available databases that can be undropped: %s", strings.Join(availableDatabases, ", "))
}
return "", "", "", fmt.Errorf("no database named '%s' found to undrop. %s", name, extraInformation)
}
// Check to see if the destination directory for restoring the database already exists (case insensitive match)
destinationPath, err = p.fs.Abs(exactCaseName)
if err != nil {
return "", "", "", err
}
sourcePath = filepath.Join(deletedDatabaseDirectoryName, exactCaseName)
// TODO: is this always a case insensitive check??? It seems like it must be since our test is working?
if exists, _ := p.fs.Exists(destinationPath); exists {
return "", "", "", fmt.Errorf("unable to undrop database '%s'; "+
"another database already exists with the same case-insensitive name", exactCaseName)
}
return sourcePath, destinationPath, exactCaseName, nil
func (p DoltDatabaseProvider) ListUndroppableDatabases(ctx *sql.Context) ([]string, error) {
return p.droppedDatabaseManager.ListUndroppableDatabases(ctx)
}
func (p DoltDatabaseProvider) UndropDatabase(ctx *sql.Context, name string) (err error) {
p.mu.Lock()
defer p.mu.Unlock()
// TODO: not sure I like sourcePath and destinationPath being returned here, but seems like they're needed in this function
sourcePath, destinationPath, exactCaseName, err := p.validateUndropDatabase(ctx, name)
newFs, exactCaseName, err := p.droppedDatabaseManager.UndropDatabase(ctx, name)
if err != nil {
return err
}
// TODO: Need to account for database directory renaming (i.e. converting '-' to '_')
err = p.fs.MoveDir(sourcePath, destinationPath)
if err != nil {
return err
}
newFs, err := p.fs.WithWorkingDir(exactCaseName)
if err != nil {
return err
}
newEnv := env.Load(ctx, env.GetCurrentUserHomeDir, newFs, p.dbFactoryUrl, "TODO")
return p.registerNewDatabase(ctx, exactCaseName, newEnv)
}

View File

@@ -0,0 +1,226 @@
// Copyright 2023 Dolthub, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package sqle
import (
"fmt"
"github.com/dolthub/dolt/go/libraries/doltcore/dbfactory"
"github.com/dolthub/dolt/go/libraries/utils/filesys"
"github.com/dolthub/go-mysql-server/sql"
"path/filepath"
"strings"
"time"
)
// deletedDatabaseDirectoryName is the subdirectory within the data folder where Dolt moves databases after they are
// dropped. The dolt_undrop() stored procedure is then able to restore them from this location.
const deletedDatabaseDirectoryName = "dolt_deleted_databases"
// TODO: Godoc
type droppedDatabases struct {
fs filesys.Filesys
}
func newDroppedDatabaseManager(fs filesys.Filesys) *droppedDatabases {
return &droppedDatabases{
fs: fs,
}
}
func (dd *droppedDatabases) DropDatabase(ctx *sql.Context, name string, dropDbLoc string) error {
rootDbLoc, err := dd.fs.Abs("")
if err != nil {
return err
}
isRootDatabase := false
//dirToDelete := ""
// if the database is in the directory itself, we remove '.dolt' directory rather than
// the whole directory itself because it can have other databases that are nested.
if rootDbLoc == dropDbLoc {
doltDirExists, _ := dd.fs.Exists(dbfactory.DoltDir)
if !doltDirExists {
return sql.ErrDatabaseNotFound.New(name)
}
dropDbLoc = filepath.Join(dropDbLoc, dbfactory.DoltDir)
isRootDatabase = true
} else {
// TODO: Do we really need the code in this block?
// Seems like a few places are checking this.
exists, isDir := dd.fs.Exists(dropDbLoc)
// Get the DB's directory
if !exists {
// engine should already protect against this
return sql.ErrDatabaseNotFound.New(name)
} else if !isDir {
return fmt.Errorf("unexpected error: %s exists but is not a directory", name)
}
}
if err = dd.initializeDeletedDatabaseDirectory(); err != nil {
return fmt.Errorf("unable to drop database %s: %w", name, err)
}
// Move the dropped database to the Dolt deleted database directory so it can be restored if needed
_, file := filepath.Split(dropDbLoc)
var destinationDirectory string
if isRootDatabase {
// NOTE: This won't work without first creating the new subdirectory
newSubdirectory := filepath.Join(deletedDatabaseDirectoryName, name)
// TODO: If newSubdirectory exists already... then we're in trouble! (need to handle that)
// TODO: Maybe we should have this talk to the DroppedDatabaseVault API instead?
if err := dd.fs.MkDirs(newSubdirectory); err != nil {
return err
}
destinationDirectory = filepath.Join(newSubdirectory, file)
} else {
destinationDirectory = filepath.Join(deletedDatabaseDirectoryName, file)
}
// Add the final directory segment and convert all hyphens to underscores in the database directory name
dir, file := filepath.Split(destinationDirectory)
if strings.Contains(file, "-") {
destinationDirectory = filepath.Join(dir, strings.ReplaceAll(file, "-", "_"))
}
if err := dd.prepareToMoveDroppedDatabase(ctx, destinationDirectory); err != nil {
return err
}
return dd.fs.MoveDir(dropDbLoc, destinationDirectory)
}
func (dd *droppedDatabases) UndropDatabase(ctx *sql.Context, name string) (filesys.Filesys, string, error) {
// TODO: not sure I like sourcePath and destinationPath being returned here, but seems like they're needed in this function
sourcePath, destinationPath, exactCaseName, err := dd.validateUndropDatabase(ctx, name)
if err != nil {
return nil, "", err
}
err = dd.fs.MoveDir(sourcePath, destinationPath)
if err != nil {
return nil, "", err
}
newFs, err := dd.fs.WithWorkingDir(exactCaseName)
if err != nil {
return nil, "", err
}
return newFs, exactCaseName, nil
}
// initializeDeletedDatabaseDirectory initializes the special directory Dolt uses to store dropped databases until
// they are fully removed. If the directory is already created and set up correctly, then this method is a no-op.
// If the directory doesn't exist yet, it will be created. If there are any problems initializing the directory, an
// error is returned.
func (dd *droppedDatabases) initializeDeletedDatabaseDirectory() error {
exists, isDir := dd.fs.Exists(deletedDatabaseDirectoryName)
if exists && !isDir {
return fmt.Errorf("%s exists, but is not a directory", deletedDatabaseDirectoryName)
}
if exists {
return nil
}
return dd.fs.MkDirs(deletedDatabaseDirectoryName)
}
func (dd *droppedDatabases) ListUndroppableDatabases(_ *sql.Context) ([]string, error) {
if err := dd.initializeDeletedDatabaseDirectory(); err != nil {
return nil, fmt.Errorf("unable to list undroppable database: %w", err)
}
databaseNames := make([]string, 0, 5)
callback := func(path string, size int64, isDir bool) (stop bool) {
_, lastPathSegment := filepath.Split(path)
// TODO: Is there a common util we use for this somewhere?
lastPathSegment = strings.ReplaceAll(lastPathSegment, "-", "_")
databaseNames = append(databaseNames, lastPathSegment)
return false
}
if err := dd.fs.Iter(deletedDatabaseDirectoryName, false, callback); err != nil {
return nil, err
}
return databaseNames, nil
}
// validateUndropDatabase validates that the database |name| is available to be "undropped" and that no existing
// database is already being managed that has the same (case-insensitive) name. If any problems are encountered,
// an error is returned.
func (dd *droppedDatabases) validateUndropDatabase(ctx *sql.Context, name string) (sourcePath, destinationPath, exactCaseName string, err error) {
// TODO: rename to ListDatabasesThatCanBeUndropped(ctx)?
availableDatabases, err := dd.ListUndroppableDatabases(ctx)
if err != nil {
return "", "", "", err
}
found := false
exactCaseName = name
lowercaseName := strings.ToLower(name)
for _, s := range availableDatabases {
if lowercaseName == strings.ToLower(s) {
exactCaseName = s
found = true
break
}
}
// TODO: this error creation information could be extracted to a common function that dolt_undrop function could use
if !found {
extraInformation := "there are no databases currently available to be undropped"
if len(availableDatabases) > 0 {
extraInformation = fmt.Sprintf("available databases that can be undropped: %s", strings.Join(availableDatabases, ", "))
}
return "", "", "", fmt.Errorf("no database named '%s' found to undrop. %s", name, extraInformation)
}
// Check to see if the destination directory for restoring the database already exists (case insensitive match)
destinationPath, err = dd.fs.Abs(exactCaseName)
if err != nil {
return "", "", "", err
}
sourcePath = filepath.Join(deletedDatabaseDirectoryName, exactCaseName)
// TODO: is this always a case insensitive check??? It seems like it must be since our test is working?
if exists, _ := dd.fs.Exists(destinationPath); exists {
return "", "", "", fmt.Errorf("unable to undrop database '%s'; "+
"another database already exists with the same case-insensitive name", exactCaseName)
}
return sourcePath, destinationPath, exactCaseName, nil
}
func (dd *droppedDatabases) prepareToMoveDroppedDatabase(_ *sql.Context, targetPath string) error {
if exists, _ := dd.fs.Exists(targetPath); !exists {
// If there's nothing at the desired targetPath, we're all set
return nil
}
// If there is something already there, pick a new path to move it to
newPath := fmt.Sprintf("%s.backup.%d", targetPath, time.Now().Unix())
if exists, _ := dd.fs.Exists(newPath); exists {
return fmt.Errorf("unable to move existing dropped database out of the way: "+
"tried to move it to %s", newPath)
}
if err := dd.fs.MoveFile(targetPath, newPath); err != nil {
return fmt.Errorf("unable to move existing dropped database out of the way: %w", err)
}
return nil
}

View File

@@ -251,6 +251,14 @@ type emptyRevisionDatabaseProvider struct {
sql.DatabaseProvider
}
func (e emptyRevisionDatabaseProvider) UndropDatabase(ctx *sql.Context, dbName string) error {
return nil
}
func (e emptyRevisionDatabaseProvider) ListUndroppableDatabases(ctx *sql.Context) ([]string, error) {
return nil, nil
}
func (e emptyRevisionDatabaseProvider) BaseDatabase(ctx *sql.Context, dbName string) (SqlDatabase, bool) {
return nil, false
}