more conflicts

This commit is contained in:
James Cor
2022-07-26 14:35:40 -07:00
54 changed files with 2297 additions and 174 deletions

View File

@@ -30,7 +30,6 @@ import (
"github.com/dolthub/dolt/go/libraries/utils/filesys"
"github.com/dolthub/dolt/go/store/chunks"
"github.com/dolthub/dolt/go/store/nbs"
"github.com/dolthub/dolt/go/store/types"
)
const (
@@ -59,10 +58,6 @@ func (cmd GarbageCollectionCmd) Description() string {
return gcDocs.ShortDesc
}
func (cmd GarbageCollectionCmd) GatedForNBF(nbf *types.NomsBinFormat) bool {
return types.IsFormat_DOLT_1(nbf)
}
// Hidden should return true if this command should be hidden from the help text
func (cmd GarbageCollectionCmd) Hidden() bool {
return false

View File

@@ -224,13 +224,7 @@ func logCommits(ctx context.Context, dEnv *env.DoltEnv, cs *doltdb.CommitSpec, o
}
matchFunc := func(commit *doltdb.Commit) (bool, error) {
numParents, err := commit.NumParents()
if err != nil {
return false, err
}
return numParents >= opts.minParents, nil
return commit.NumParents() >= opts.minParents, nil
}
commits, err := commitwalk.GetTopNTopoOrderedCommitsMatching(ctx, dEnv.DoltDB, h, opts.numLines, matchFunc)

View File

@@ -20,8 +20,10 @@ import (
"github.com/fatih/color"
"github.com/dolthub/dolt/go/cmd/dolt/cli"
"github.com/dolthub/dolt/go/cmd/dolt/errhand"
eventsapi "github.com/dolthub/dolt/go/gen/proto/dolt/services/eventsapi/v1alpha1"
"github.com/dolthub/dolt/go/libraries/doltcore/env"
"github.com/dolthub/dolt/go/libraries/doltcore/migrate"
"github.com/dolthub/dolt/go/libraries/utils/argparser"
)
@@ -36,7 +38,7 @@ const (
var migrateDocs = cli.CommandDocumentationContent{
ShortDesc: "Executes a database migration to use the latest Dolt data format.",
LongDesc: `Migrate is a multi-purpose command to update the data format of a Dolt database. Over time, development
on Dolt requires changes to the on-disk data format. These changes are necessary to improve Database performance amd
on Dolt requires changes to the on-disk data format. These changes are necessary to improve Database performance and
correctness. Migrating to the latest format is therefore necessary for compatibility with the latest Dolt clients, and
to take advantage of the newly released Dolt features.`,
@@ -76,7 +78,7 @@ func (cmd MigrateCmd) EventType() eventsapi.ClientEventType {
// Exec executes the command
func (cmd MigrateCmd) Exec(ctx context.Context, commandStr string, args []string, dEnv *env.DoltEnv) int {
ap := cmd.ArgParser()
help, _ := cli.HelpAndUsagePrinters(cli.CommandDocsForCommandString(commandStr, migrateDocs, ap))
help, usage := cli.HelpAndUsagePrinters(cli.CommandDocsForCommandString(commandStr, migrateDocs, ap))
apr := cli.ParseArgsOrDie(ap, args, help)
if apr.Contains(migratePushFlag) && apr.Contains(migratePullFlag) {
@@ -84,5 +86,29 @@ func (cmd MigrateCmd) Exec(ctx context.Context, commandStr string, args []string
return 1
}
if err := MigrateDatabase(ctx, dEnv); err != nil {
verr := errhand.BuildDError("migration failed").AddCause(err).Build()
return HandleVErrAndExitCode(verr, usage)
}
return 0
}
// MigrateDatabase migrates the NomsBinFormat of |dEnv.DoltDB|.
func MigrateDatabase(ctx context.Context, dEnv *env.DoltEnv) error {
menv, err := migrate.NewEnvironment(ctx, dEnv)
if err != nil {
return err
}
p, err := menv.Migration.FS.Abs(".")
if err != nil {
return err
}
cli.Println("migrating database at tmp dir: ", p)
err = migrate.TraverseDAG(ctx, menv.Existing.DoltDB, menv.Migration.DoltDB)
if err != nil {
return err
}
return migrate.SwapChunkStores(ctx, menv)
}

View File

@@ -19,6 +19,7 @@ import (
"fmt"
"net"
"net/http"
"runtime"
"strconv"
"time"
@@ -256,8 +257,8 @@ func newSessionBuilder(se *engine.SqlEngine, config ServerConfig) server.Session
userToSessionVars[curr.Name] = curr.Vars
}
return func(ctx context.Context, conn *mysql.Conn, host string) (sql.Session, error) {
mysqlSess, err := server.DefaultSessionBuilder(ctx, conn, host)
return func(ctx context.Context, conn *mysql.Conn, addr string) (sql.Session, error) {
mysqlSess, err := server.DefaultSessionBuilder(ctx, conn, addr)
if err != nil {
return nil, err
}
@@ -292,7 +293,11 @@ func newSessionBuilder(se *engine.SqlEngine, config ServerConfig) server.Session
// getConfigFromServerConfig processes ServerConfig and returns server.Config for sql-server.
func getConfigFromServerConfig(serverConfig ServerConfig) (server.Config, error, error) {
serverConf := server.Config{Protocol: "tcp"}
serverConf, err := handleProtocolAndAddress(serverConfig)
if err != nil {
return server.Config{}, err, nil
}
serverConf.DisableClientMultiStatements = serverConfig.DisableClientMultiStatements()
readTimeout := time.Duration(serverConfig.ReadTimeout()) * time.Millisecond
@@ -303,14 +308,6 @@ func getConfigFromServerConfig(serverConfig ServerConfig) (server.Config, error,
return server.Config{}, nil, err
}
portAsString := strconv.Itoa(serverConfig.Port())
hostPort := net.JoinHostPort(serverConfig.Host(), portAsString)
if portInUse(hostPort) {
portInUseError := fmt.Errorf("Port %s already in use.", portAsString)
return server.Config{}, portInUseError, nil
}
// if persist is 'load' we use currently set persisted global variable,
// else if 'ignore' we set persisted global variable to current value from serverConfig
if serverConfig.PersistenceBehavior() == loadPerisistentGlobals {
@@ -327,7 +324,6 @@ func getConfigFromServerConfig(serverConfig ServerConfig) (server.Config, error,
// Do not set the value of Version. Let it default to what go-mysql-server uses. This should be equivalent
// to the value of mysql that we support.
serverConf.Address = hostPort
serverConf.ConnReadTimeout = readTimeout
serverConf.ConnWriteTimeout = writeTimeout
serverConf.MaxConnections = serverConfig.MaxConnections()
@@ -336,3 +332,34 @@ func getConfigFromServerConfig(serverConfig ServerConfig) (server.Config, error,
return serverConf, nil, nil
}
// handleProtocolAndAddress returns new server.Config object with only Protocol and Address defined.
func handleProtocolAndAddress(serverConfig ServerConfig) (server.Config, error) {
serverConf := server.Config{Protocol: "tcp"}
portAsString := strconv.Itoa(serverConfig.Port())
hostPort := net.JoinHostPort(serverConfig.Host(), portAsString)
if portInUse(hostPort) {
portInUseError := fmt.Errorf("Port %s already in use.", portAsString)
return server.Config{}, portInUseError
}
serverConf.Address = hostPort
// if socket is defined with or without value -> unix
if serverConfig.Socket() != "" {
if runtime.GOOS == "windows" {
return server.Config{}, fmt.Errorf("cannot define unix socket file on Windows")
}
serverConf.Socket = serverConfig.Socket()
}
// TODO : making it an "opt in" feature (just to start) and requiring users to pass in the `--socket` flag
// to turn them on instead of defaulting them on when host and port aren't set or host is set to `localhost`.
//} else {
// // if host is undefined or defined as "localhost" -> unix
// if shouldUseUnixSocket(serverConfig) {
// serverConf.Socket = defaultUnixSocketFilePath
// }
//}
return serverConf, nil
}

View File

@@ -152,9 +152,9 @@ func TestServerGoodParams(t *testing.T) {
tests := []ServerConfig{
DefaultServerConfig(),
DefaultServerConfig().withHost("127.0.0.1").WithPort(15400),
DefaultServerConfig().withHost("localhost").WithPort(15401),
//DefaultServerConfig().withHost("::1").WithPort(15402), // Fails on Jenkins, assuming no IPv6 support
DefaultServerConfig().WithHost("127.0.0.1").WithPort(15400),
DefaultServerConfig().WithHost("localhost").WithPort(15401),
//DefaultServerConfig().WithHost("::1").WithPort(15402), // Fails on Jenkins, assuming no IPv6 support
DefaultServerConfig().withUser("testusername").WithPort(15403),
DefaultServerConfig().withPassword("hunter2").WithPort(15404),
DefaultServerConfig().withTimeout(0).WithPort(15405),

View File

@@ -54,6 +54,7 @@ const (
defaultMetricsHost = ""
defaultMetricsPort = -1
defaultAllowCleartextPasswords = false
defaultUnixSocketFilePath = "/tmp/mysql.sock"
)
const (
@@ -138,6 +139,8 @@ type ServerConfig interface {
JwksConfig() []engine.JwksConfig
// AllowCleartextPasswords is true if the server should accept cleartext passwords.
AllowCleartextPasswords() bool
// Socket is a path to the unix socket file
Socket() string
}
type commandLineServerConfig struct {
@@ -160,6 +163,7 @@ type commandLineServerConfig struct {
persistenceBehavior string
privilegeFilePath string
allowCleartextPasswords bool
socket string
}
var _ ServerConfig = (*commandLineServerConfig)(nil)
@@ -204,7 +208,7 @@ func (cfg *commandLineServerConfig) LogLevel() LogLevel {
return cfg.logLevel
}
// Autocommit defines the value of the @@autocommit session variable used on every connection
// AutoCommit defines the value of the @@autocommit session variable used on every connection
func (cfg *commandLineServerConfig) AutoCommit() bool {
return cfg.autoCommit
}
@@ -224,22 +228,29 @@ func (cfg *commandLineServerConfig) PersistenceBehavior() string {
return cfg.persistenceBehavior
}
// TLSKey returns a path to the servers PEM-encoded private TLS key. "" if there is none.
func (cfg *commandLineServerConfig) TLSKey() string {
return cfg.tlsKey
}
// TLSCert returns a path to the servers PEM-encoded TLS certificate chain. "" if there is none.
func (cfg *commandLineServerConfig) TLSCert() string {
return cfg.tlsCert
}
// RequireSecureTransport is true if the server should reject non-TLS connections.
func (cfg *commandLineServerConfig) RequireSecureTransport() bool {
return cfg.requireSecureTransport
}
// DisableClientMultiStatements is true if we want the server to not
// process incoming ComQuery packets as if they had multiple queries in
// them, even if the client advertises support for MULTI_STATEMENTS.
func (cfg *commandLineServerConfig) DisableClientMultiStatements() bool {
return false
}
// MetricsLabels returns labels that are applied to all prometheus metrics
func (cfg *commandLineServerConfig) MetricsLabels() map[string]string {
return nil
}
@@ -252,10 +263,13 @@ func (cfg *commandLineServerConfig) MetricsPort() int {
return defaultMetricsPort
}
// PrivilegeFilePath returns the path to the file which contains all needed privilege information in the form of a
// JSON string.
func (cfg *commandLineServerConfig) PrivilegeFilePath() string {
return cfg.privilegeFilePath
}
// UserVars is an array containing user specific session variables.
func (cfg *commandLineServerConfig) UserVars() []UserSessionVars {
return nil
}
@@ -275,16 +289,23 @@ func (cfg *commandLineServerConfig) DatabaseNamesAndPaths() []env.EnvNameAndPath
return cfg.dbNamesAndPaths
}
// DataDir is the path to a directory to use as the data dir, both to create new databases and locate existing ones.
func (cfg *commandLineServerConfig) DataDir() string {
return cfg.dataDir
}
// CfgDir is the path to a directory to use to store the dolt configuration files.
func (cfg *commandLineServerConfig) CfgDir() string {
return cfg.cfgDir
}
// withHost updates the host and returns the called `*commandLineServerConfig`, which is useful for chaining calls.
func (cfg *commandLineServerConfig) withHost(host string) *commandLineServerConfig {
// Socket is a path to the unix socket file
func (cfg *commandLineServerConfig) Socket() string {
return cfg.socket
}
// WithHost updates the host and returns the called `*commandLineServerConfig`, which is useful for chaining calls.
func (cfg *commandLineServerConfig) WithHost(host string) *commandLineServerConfig {
cfg.host = host
return cfg
}
@@ -338,26 +359,31 @@ func (cfg *commandLineServerConfig) withQueryParallelism(queryParallelism int) *
return cfg
}
// withDBNamesAndPaths updates the dbNamesAndPaths, which is an array of env.EnvNameAndPathObjects corresponding to the databases
func (cfg *commandLineServerConfig) withDBNamesAndPaths(dbNamesAndPaths []env.EnvNameAndPath) *commandLineServerConfig {
cfg.dbNamesAndPaths = dbNamesAndPaths
return cfg
}
// withDataDir updates the path to a directory to use as the data dir.
func (cfg *commandLineServerConfig) withDataDir(dataDir string) *commandLineServerConfig {
cfg.dataDir = dataDir
return cfg
}
// withCfgDir updates the path to a directory to use to store the dolt configuration files.
func (cfg *commandLineServerConfig) withCfgDir(cfgDir string) *commandLineServerConfig {
cfg.cfgDir = cfgDir
return cfg
}
// withPersistenceBehavior updates persistence behavior of system globals on server init
func (cfg *commandLineServerConfig) withPersistenceBehavior(persistenceBehavior string) *commandLineServerConfig {
cfg.persistenceBehavior = persistenceBehavior
return cfg
}
// withPrivilegeFilePath updates the path to the file which contains all needed privilege information in the form of a JSON string
func (cfg *commandLineServerConfig) withPrivilegeFilePath(privFilePath string) *commandLineServerConfig {
cfg.privilegeFilePath = privFilePath
return cfg
@@ -368,6 +394,12 @@ func (cfg *commandLineServerConfig) withAllowCleartextPasswords(allow bool) *com
return cfg
}
// WithSocket updates the path to the unix socket file
func (cfg *commandLineServerConfig) WithSocket(sockFilePath string) *commandLineServerConfig {
cfg.socket = sockFilePath
return cfg
}
// DefaultServerConfig creates a `*ServerConfig` that has all of the options set to their default values.
func DefaultServerConfig() *commandLineServerConfig {
return &commandLineServerConfig{
@@ -388,7 +420,7 @@ func DefaultServerConfig() *commandLineServerConfig {
}
}
// Validate returns an `error` if any field is not valid.
// ValidateConfig returns an `error` if any field is not valid.
func ValidateConfig(config ServerConfig) error {
if config.Host() != "localhost" {
ip := net.ParseIP(config.Host())
@@ -412,22 +444,36 @@ func ValidateConfig(config ServerConfig) error {
}
// ConnectionString returns a Data Source Name (DSN) to be used by go clients for connecting to a running server.
// If unix socket file path is defined in ServerConfig, then `unix` DSN will be returned.
func ConnectionString(config ServerConfig, database string) string {
user := config.User()
if user == "" {
user = "root"
}
str := fmt.Sprintf("%v:%v@tcp(%v:%v)/%v", user, config.Password(), config.Host(), config.Port(), database)
if config.AllowCleartextPasswords() {
str += "?allowCleartextPasswords=1"
var dsn string
if config.Socket() != "" {
dsn = fmt.Sprintf("%v:%v@unix(%v)/%v", user, config.Password(), config.Socket(), database)
} else {
dsn = fmt.Sprintf("%v:%v@tcp(%v:%v)/%v", user, config.Password(), config.Host(), config.Port(), database)
}
return str
if config.AllowCleartextPasswords() {
dsn += "?allowCleartextPasswords=1"
}
return dsn
}
// ConfigInfo returns a summary of some of the config which contains some of the more important information
func ConfigInfo(config ServerConfig) string {
return fmt.Sprintf(`HP="%v:%v"|T="%v"|R="%v"|L="%v"`, config.Host(), config.Port(),
config.ReadTimeout(), config.ReadOnly(), config.LogLevel())
socket := ""
if config.Socket() != "" {
s := config.Socket()
if s == "" {
s = defaultUnixSocketFilePath
}
socket = fmt.Sprintf(`|S="%v"`, s)
}
return fmt.Sprintf(`HP="%v:%v"|T="%v"|R="%v"|L="%v"%s`, config.Host(), config.Port(),
config.ReadTimeout(), config.ReadOnly(), config.LogLevel(), socket)
}
// LoadTLSConfig loads the certificate chain from config.TLSKey() and config.TLSCert() and returns

View File

@@ -46,6 +46,7 @@ const (
maxConnectionsFlag = "max-connections"
persistenceBehaviorFlag = "persistence-behavior"
allowCleartextPasswordsFlag = "allow-cleartext-passwords"
socketFlag = "socket"
)
func indentLines(s string) string {
@@ -146,6 +147,7 @@ func (cmd SqlServerCmd) ArgParser() *argparser.ArgParser {
ap.SupportsString(persistenceBehaviorFlag, "", "persistence-behavior", fmt.Sprintf("Indicate whether to `load` or `ignore` persisted global variables (default `%s`)", serverConfig.PersistenceBehavior()))
ap.SupportsString(commands.PrivsFilePathFlag, "", "privilege file", "Path to a file to load and store users and grants. Defaults to $doltcfg-dir/privileges.db")
ap.SupportsString(allowCleartextPasswordsFlag, "", "allow-cleartext-passwords", "Allows use of cleartext passwords. Defaults to false.")
ap.SupportsString(socketFlag, "", "socket file", "Path for the unix socket file. Defaults to '/tmp/mysql.sock'")
return ap
}
@@ -300,8 +302,16 @@ func SetupDoltConfig(dEnv *env.DoltEnv, apr *argparser.ArgParseResults, config S
func getCommandLineServerConfig(dEnv *env.DoltEnv, apr *argparser.ArgParseResults) (ServerConfig, error) {
serverConfig := DefaultServerConfig()
if sock, ok := apr.GetValue(socketFlag); ok {
// defined without value gets default
if sock == "" {
sock = defaultUnixSocketFilePath
}
serverConfig.WithSocket(sock)
}
if host, ok := apr.GetValue(hostFlag); ok {
serverConfig.withHost(host)
serverConfig.WithHost(host)
}
if port, ok := apr.GetInt(portFlag); ok {

View File

@@ -98,6 +98,8 @@ type ListenerYAMLConfig struct {
RequireSecureTransport *bool `yaml:"require_secure_transport"`
// AllowCleartextPasswords enables use of cleartext passwords.
AllowCleartextPasswords *bool `yaml:"allow_cleartext_passwords"`
// Socket is unix socket file path
Socket *string `yaml:"socket"`
}
// PerformanceYAMLConfig contains configuration parameters for performance tweaking
@@ -160,6 +162,7 @@ func serverConfigAsYAMLConfig(cfg ServerConfig) YAMLConfig {
nillableStrPtr(cfg.TLSCert()),
nillableBoolPtr(cfg.RequireSecureTransport()),
nillableBoolPtr(cfg.AllowCleartextPasswords()),
nillableStrPtr(cfg.Socket()),
},
DatabaseConfig: nil,
}
@@ -260,7 +263,7 @@ func (cfg YAMLConfig) ReadOnly() bool {
return *cfg.BehaviorConfig.ReadOnly
}
// Autocommit defines the value of the @@autocommit session variable used on every connection
// AutoCommit defines the value of the @@autocommit session variable used on every connection
func (cfg YAMLConfig) AutoCommit() bool {
if cfg.BehaviorConfig.AutoCommit == nil {
return defaultAutoCommit
@@ -310,6 +313,7 @@ func (cfg YAMLConfig) DisableClientMultiStatements() bool {
return *cfg.BehaviorConfig.DisableClientMultiStatements
}
// MetricsLabels returns labels that are applied to all prometheus metrics
func (cfg YAMLConfig) MetricsLabels() map[string]string {
return cfg.MetricsConfig.Labels
}
@@ -330,6 +334,8 @@ func (cfg YAMLConfig) MetricsPort() int {
return *cfg.MetricsConfig.Port
}
// PrivilegeFilePath returns the path to the file which contains all needed privilege information in the form of a
// JSON string.
func (cfg YAMLConfig) PrivilegeFilePath() string {
if cfg.PrivilegeFile != nil {
return *cfg.PrivilegeFile
@@ -337,6 +343,7 @@ func (cfg YAMLConfig) PrivilegeFilePath() string {
return filepath.Join(cfg.CfgDir(), defaultPrivilegeFilePath)
}
// UserVars is an array containing user specific session variables
func (cfg YAMLConfig) UserVars() []UserSessionVars {
if cfg.Vars != nil {
return cfg.Vars
@@ -345,6 +352,7 @@ func (cfg YAMLConfig) UserVars() []UserSessionVars {
return nil
}
// JwksConfig is JSON Web Key Set config, and used to validate a user authed with a jwt (JSON Web Token).
func (cfg YAMLConfig) JwksConfig() []engine.JwksConfig {
if cfg.Jwks != nil {
return cfg.Jwks
@@ -368,6 +376,7 @@ func (cfg YAMLConfig) QueryParallelism() int {
return *cfg.PerformanceConfig.QueryParallelism
}
// TLSKey returns a path to the servers PEM-encoded private TLS key. "" if there is none.
func (cfg YAMLConfig) TLSKey() string {
if cfg.ListenerConfig.TLSKey == nil {
return ""
@@ -375,6 +384,7 @@ func (cfg YAMLConfig) TLSKey() string {
return *cfg.ListenerConfig.TLSKey
}
// TLSCert returns a path to the servers PEM-encoded TLS certificate chain. "" if there is none.
func (cfg YAMLConfig) TLSCert() string {
if cfg.ListenerConfig.TLSCert == nil {
return ""
@@ -382,6 +392,7 @@ func (cfg YAMLConfig) TLSCert() string {
return *cfg.ListenerConfig.TLSCert
}
// RequireSecureTransport is true if the server should reject non-TLS connections.
func (cfg YAMLConfig) RequireSecureTransport() bool {
if cfg.ListenerConfig.RequireSecureTransport == nil {
return false
@@ -389,6 +400,7 @@ func (cfg YAMLConfig) RequireSecureTransport() bool {
return *cfg.ListenerConfig.RequireSecureTransport
}
// PersistenceBehavior is "load" if we include persisted system globals on server init
func (cfg YAMLConfig) PersistenceBehavior() string {
if cfg.BehaviorConfig.PersistenceBehavior == nil {
return loadPerisistentGlobals
@@ -396,6 +408,7 @@ func (cfg YAMLConfig) PersistenceBehavior() string {
return *cfg.BehaviorConfig.PersistenceBehavior
}
// DataDir is the path to a directory to use as the data dir, both to create new databases and locate existing ones.
func (cfg YAMLConfig) DataDir() string {
if cfg.DataDirStr != nil {
return *cfg.DataDirStr
@@ -403,9 +416,22 @@ func (cfg YAMLConfig) DataDir() string {
return defaultDataDir
}
// CfgDir is the path to a directory to use to store the dolt configuration files.
func (cfg YAMLConfig) CfgDir() string {
if cfg.CfgDirStr != nil {
return *cfg.CfgDirStr
}
return filepath.Join(cfg.DataDir(), defaultCfgDir)
}
// Socket is a path to the unix socket file
func (cfg YAMLConfig) Socket() string {
if cfg.ListenerConfig.Socket == nil {
return ""
}
// if defined but empty -> default
if *cfg.ListenerConfig.Socket == "" {
return defaultUnixSocketFilePath
}
return *cfg.ListenerConfig.Socket
}

View File

@@ -34,6 +34,7 @@ const AddressMapFileID = "ADRM"
const CommitClosureFileID = "CMCL"
const TableSchemaFileID = "DSCH"
const ForeignKeyCollectionFileID = "DFKC"
const MergeArtifactsFileID = "ARTM"
const MessageTypesKind int = 27

View File

@@ -0,0 +1,341 @@
// Copyright 2022 Dolthub, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Code generated by the FlatBuffers compiler. DO NOT EDIT.
package serial
import (
flatbuffers "github.com/google/flatbuffers/go"
)
type MergeArtifacts struct {
_tab flatbuffers.Table
}
func GetRootAsMergeArtifacts(buf []byte, offset flatbuffers.UOffsetT) *MergeArtifacts {
n := flatbuffers.GetUOffsetT(buf[offset:])
x := &MergeArtifacts{}
x.Init(buf, n+offset)
return x
}
func GetSizePrefixedRootAsMergeArtifacts(buf []byte, offset flatbuffers.UOffsetT) *MergeArtifacts {
n := flatbuffers.GetUOffsetT(buf[offset+flatbuffers.SizeUint32:])
x := &MergeArtifacts{}
x.Init(buf, n+offset+flatbuffers.SizeUint32)
return x
}
func (rcv *MergeArtifacts) Init(buf []byte, i flatbuffers.UOffsetT) {
rcv._tab.Bytes = buf
rcv._tab.Pos = i
}
func (rcv *MergeArtifacts) Table() flatbuffers.Table {
return rcv._tab
}
func (rcv *MergeArtifacts) KeyItems(j int) byte {
o := flatbuffers.UOffsetT(rcv._tab.Offset(4))
if o != 0 {
a := rcv._tab.Vector(o)
return rcv._tab.GetByte(a + flatbuffers.UOffsetT(j*1))
}
return 0
}
func (rcv *MergeArtifacts) KeyItemsLength() int {
o := flatbuffers.UOffsetT(rcv._tab.Offset(4))
if o != 0 {
return rcv._tab.VectorLen(o)
}
return 0
}
func (rcv *MergeArtifacts) KeyItemsBytes() []byte {
o := flatbuffers.UOffsetT(rcv._tab.Offset(4))
if o != 0 {
return rcv._tab.ByteVector(o + rcv._tab.Pos)
}
return nil
}
func (rcv *MergeArtifacts) MutateKeyItems(j int, n byte) bool {
o := flatbuffers.UOffsetT(rcv._tab.Offset(4))
if o != 0 {
a := rcv._tab.Vector(o)
return rcv._tab.MutateByte(a+flatbuffers.UOffsetT(j*1), n)
}
return false
}
func (rcv *MergeArtifacts) KeyOffsets(j int) uint16 {
o := flatbuffers.UOffsetT(rcv._tab.Offset(6))
if o != 0 {
a := rcv._tab.Vector(o)
return rcv._tab.GetUint16(a + flatbuffers.UOffsetT(j*2))
}
return 0
}
func (rcv *MergeArtifacts) KeyOffsetsLength() int {
o := flatbuffers.UOffsetT(rcv._tab.Offset(6))
if o != 0 {
return rcv._tab.VectorLen(o)
}
return 0
}
func (rcv *MergeArtifacts) MutateKeyOffsets(j int, n uint16) bool {
o := flatbuffers.UOffsetT(rcv._tab.Offset(6))
if o != 0 {
a := rcv._tab.Vector(o)
return rcv._tab.MutateUint16(a+flatbuffers.UOffsetT(j*2), n)
}
return false
}
func (rcv *MergeArtifacts) KeyAddressOffsets(j int) uint16 {
o := flatbuffers.UOffsetT(rcv._tab.Offset(8))
if o != 0 {
a := rcv._tab.Vector(o)
return rcv._tab.GetUint16(a + flatbuffers.UOffsetT(j*2))
}
return 0
}
func (rcv *MergeArtifacts) KeyAddressOffsetsLength() int {
o := flatbuffers.UOffsetT(rcv._tab.Offset(8))
if o != 0 {
return rcv._tab.VectorLen(o)
}
return 0
}
func (rcv *MergeArtifacts) MutateKeyAddressOffsets(j int, n uint16) bool {
o := flatbuffers.UOffsetT(rcv._tab.Offset(8))
if o != 0 {
a := rcv._tab.Vector(o)
return rcv._tab.MutateUint16(a+flatbuffers.UOffsetT(j*2), n)
}
return false
}
func (rcv *MergeArtifacts) ValueItems(j int) byte {
o := flatbuffers.UOffsetT(rcv._tab.Offset(10))
if o != 0 {
a := rcv._tab.Vector(o)
return rcv._tab.GetByte(a + flatbuffers.UOffsetT(j*1))
}
return 0
}
func (rcv *MergeArtifacts) ValueItemsLength() int {
o := flatbuffers.UOffsetT(rcv._tab.Offset(10))
if o != 0 {
return rcv._tab.VectorLen(o)
}
return 0
}
func (rcv *MergeArtifacts) ValueItemsBytes() []byte {
o := flatbuffers.UOffsetT(rcv._tab.Offset(10))
if o != 0 {
return rcv._tab.ByteVector(o + rcv._tab.Pos)
}
return nil
}
func (rcv *MergeArtifacts) MutateValueItems(j int, n byte) bool {
o := flatbuffers.UOffsetT(rcv._tab.Offset(10))
if o != 0 {
a := rcv._tab.Vector(o)
return rcv._tab.MutateByte(a+flatbuffers.UOffsetT(j*1), n)
}
return false
}
func (rcv *MergeArtifacts) ValueOffsets(j int) uint16 {
o := flatbuffers.UOffsetT(rcv._tab.Offset(12))
if o != 0 {
a := rcv._tab.Vector(o)
return rcv._tab.GetUint16(a + flatbuffers.UOffsetT(j*2))
}
return 0
}
func (rcv *MergeArtifacts) ValueOffsetsLength() int {
o := flatbuffers.UOffsetT(rcv._tab.Offset(12))
if o != 0 {
return rcv._tab.VectorLen(o)
}
return 0
}
func (rcv *MergeArtifacts) MutateValueOffsets(j int, n uint16) bool {
o := flatbuffers.UOffsetT(rcv._tab.Offset(12))
if o != 0 {
a := rcv._tab.Vector(o)
return rcv._tab.MutateUint16(a+flatbuffers.UOffsetT(j*2), n)
}
return false
}
func (rcv *MergeArtifacts) AddressArray(j int) byte {
o := flatbuffers.UOffsetT(rcv._tab.Offset(14))
if o != 0 {
a := rcv._tab.Vector(o)
return rcv._tab.GetByte(a + flatbuffers.UOffsetT(j*1))
}
return 0
}
func (rcv *MergeArtifacts) AddressArrayLength() int {
o := flatbuffers.UOffsetT(rcv._tab.Offset(14))
if o != 0 {
return rcv._tab.VectorLen(o)
}
return 0
}
func (rcv *MergeArtifacts) AddressArrayBytes() []byte {
o := flatbuffers.UOffsetT(rcv._tab.Offset(14))
if o != 0 {
return rcv._tab.ByteVector(o + rcv._tab.Pos)
}
return nil
}
func (rcv *MergeArtifacts) MutateAddressArray(j int, n byte) bool {
o := flatbuffers.UOffsetT(rcv._tab.Offset(14))
if o != 0 {
a := rcv._tab.Vector(o)
return rcv._tab.MutateByte(a+flatbuffers.UOffsetT(j*1), n)
}
return false
}
func (rcv *MergeArtifacts) SubtreeCounts(j int) byte {
o := flatbuffers.UOffsetT(rcv._tab.Offset(16))
if o != 0 {
a := rcv._tab.Vector(o)
return rcv._tab.GetByte(a + flatbuffers.UOffsetT(j*1))
}
return 0
}
func (rcv *MergeArtifacts) SubtreeCountsLength() int {
o := flatbuffers.UOffsetT(rcv._tab.Offset(16))
if o != 0 {
return rcv._tab.VectorLen(o)
}
return 0
}
func (rcv *MergeArtifacts) SubtreeCountsBytes() []byte {
o := flatbuffers.UOffsetT(rcv._tab.Offset(16))
if o != 0 {
return rcv._tab.ByteVector(o + rcv._tab.Pos)
}
return nil
}
func (rcv *MergeArtifacts) MutateSubtreeCounts(j int, n byte) bool {
o := flatbuffers.UOffsetT(rcv._tab.Offset(16))
if o != 0 {
a := rcv._tab.Vector(o)
return rcv._tab.MutateByte(a+flatbuffers.UOffsetT(j*1), n)
}
return false
}
func (rcv *MergeArtifacts) TreeCount() uint64 {
o := flatbuffers.UOffsetT(rcv._tab.Offset(18))
if o != 0 {
return rcv._tab.GetUint64(o + rcv._tab.Pos)
}
return 0
}
func (rcv *MergeArtifacts) MutateTreeCount(n uint64) bool {
return rcv._tab.MutateUint64Slot(18, n)
}
func (rcv *MergeArtifacts) TreeLevel() byte {
o := flatbuffers.UOffsetT(rcv._tab.Offset(20))
if o != 0 {
return rcv._tab.GetByte(o + rcv._tab.Pos)
}
return 0
}
func (rcv *MergeArtifacts) MutateTreeLevel(n byte) bool {
return rcv._tab.MutateByteSlot(20, n)
}
func MergeArtifactsStart(builder *flatbuffers.Builder) {
builder.StartObject(9)
}
func MergeArtifactsAddKeyItems(builder *flatbuffers.Builder, keyItems flatbuffers.UOffsetT) {
builder.PrependUOffsetTSlot(0, flatbuffers.UOffsetT(keyItems), 0)
}
func MergeArtifactsStartKeyItemsVector(builder *flatbuffers.Builder, numElems int) flatbuffers.UOffsetT {
return builder.StartVector(1, numElems, 1)
}
func MergeArtifactsAddKeyOffsets(builder *flatbuffers.Builder, keyOffsets flatbuffers.UOffsetT) {
builder.PrependUOffsetTSlot(1, flatbuffers.UOffsetT(keyOffsets), 0)
}
func MergeArtifactsStartKeyOffsetsVector(builder *flatbuffers.Builder, numElems int) flatbuffers.UOffsetT {
return builder.StartVector(2, numElems, 2)
}
func MergeArtifactsAddKeyAddressOffsets(builder *flatbuffers.Builder, keyAddressOffsets flatbuffers.UOffsetT) {
builder.PrependUOffsetTSlot(2, flatbuffers.UOffsetT(keyAddressOffsets), 0)
}
func MergeArtifactsStartKeyAddressOffsetsVector(builder *flatbuffers.Builder, numElems int) flatbuffers.UOffsetT {
return builder.StartVector(2, numElems, 2)
}
func MergeArtifactsAddValueItems(builder *flatbuffers.Builder, valueItems flatbuffers.UOffsetT) {
builder.PrependUOffsetTSlot(3, flatbuffers.UOffsetT(valueItems), 0)
}
func MergeArtifactsStartValueItemsVector(builder *flatbuffers.Builder, numElems int) flatbuffers.UOffsetT {
return builder.StartVector(1, numElems, 1)
}
func MergeArtifactsAddValueOffsets(builder *flatbuffers.Builder, valueOffsets flatbuffers.UOffsetT) {
builder.PrependUOffsetTSlot(4, flatbuffers.UOffsetT(valueOffsets), 0)
}
func MergeArtifactsStartValueOffsetsVector(builder *flatbuffers.Builder, numElems int) flatbuffers.UOffsetT {
return builder.StartVector(2, numElems, 2)
}
func MergeArtifactsAddAddressArray(builder *flatbuffers.Builder, addressArray flatbuffers.UOffsetT) {
builder.PrependUOffsetTSlot(5, flatbuffers.UOffsetT(addressArray), 0)
}
func MergeArtifactsStartAddressArrayVector(builder *flatbuffers.Builder, numElems int) flatbuffers.UOffsetT {
return builder.StartVector(1, numElems, 1)
}
func MergeArtifactsAddSubtreeCounts(builder *flatbuffers.Builder, subtreeCounts flatbuffers.UOffsetT) {
builder.PrependUOffsetTSlot(6, flatbuffers.UOffsetT(subtreeCounts), 0)
}
func MergeArtifactsStartSubtreeCountsVector(builder *flatbuffers.Builder, numElems int) flatbuffers.UOffsetT {
return builder.StartVector(1, numElems, 1)
}
func MergeArtifactsAddTreeCount(builder *flatbuffers.Builder, treeCount uint64) {
builder.PrependUint64Slot(7, treeCount, 0)
}
func MergeArtifactsAddTreeLevel(builder *flatbuffers.Builder, treeLevel byte) {
builder.PrependByteSlot(8, treeLevel, 0)
}
func MergeArtifactsEnd(builder *flatbuffers.Builder) flatbuffers.UOffsetT {
return builder.EndObject()
}

View File

@@ -80,8 +80,8 @@ func (c *Commit) ParentHashes(ctx context.Context) ([]hash.Hash, error) {
}
// NumParents gets the number of parents a commit has.
func (c *Commit) NumParents() (int, error) {
return len(c.parents), nil
func (c *Commit) NumParents() int {
return len(c.parents)
}
func (c *Commit) Height() (uint64, error) {
@@ -180,14 +180,11 @@ func (c *Commit) GetAncestor(ctx context.Context, as *AncestorSpec) (*Commit, er
instructions := as.Instructions
for _, inst := range instructions {
n, err := cur.NumParents()
if err != nil {
return nil, err
}
if inst >= n {
if inst >= cur.NumParents() {
return nil, ErrInvalidAncestorSpec
}
var err error
cur, err = cur.GetParent(ctx, inst)
if err != nil {
return nil, err

View File

@@ -47,7 +47,7 @@ const (
)
const (
creationBranch = "create"
CreationBranch = "create"
defaultChunksPerTF = 256 * 1024
)
@@ -79,6 +79,12 @@ func DoltDBFromCS(cs chunks.ChunkStore) *DoltDB {
return &DoltDB{hooksDatabase{Database: db}, vrw, ns}
}
// HackDatasDatabaseFromDoltDB unwraps a DoltDB to a datas.Database.
// Deprecated: only for use in dolt migrate.
func HackDatasDatabaseFromDoltDB(ddb *DoltDB) datas.Database {
return ddb.db
}
// LoadDoltDB will acquire a reference to the underlying noms db. If the Location is InMemDoltDB then a reference
// to a newly created in memory database will be used. If the location is LocalDirDoltDB, the directory must exist or
// this returns nil.
@@ -155,7 +161,7 @@ func (ddb *DoltDB) WriteEmptyRepoWithCommitTimeAndDefaultBranch(
panic("Passed bad name or email. Both should be valid")
}
ds, err := ddb.db.GetDataset(ctx, creationBranch)
ds, err := ddb.db.GetDataset(ctx, CreationBranch)
if err != nil {
return err
@@ -181,7 +187,7 @@ func (ddb *DoltDB) WriteEmptyRepoWithCommitTimeAndDefaultBranch(
commitOpts := datas.CommitOptions{Meta: cm}
cb := ref.NewInternalRef(creationBranch)
cb := ref.NewInternalRef(CreationBranch)
ds, err = ddb.db.GetDataset(ctx, cb.String())
if err != nil {
@@ -413,6 +419,15 @@ func (ddb *DoltDB) ReadRootValue(ctx context.Context, h hash.Hash) (*RootValue,
return decodeRootNomsValue(ddb.vrw, ddb.ns, val)
}
// ReadCommit reads the Commit whose hash is |h|, if one exists.
func (ddb *DoltDB) ReadCommit(ctx context.Context, h hash.Hash) (*Commit, error) {
c, err := datas.LoadCommitAddr(ctx, ddb.vrw, h)
if err != nil {
return nil, err
}
return NewCommit(ctx, ddb.vrw, ddb.ns, c)
}
// Commit will update a branch's head value to be that of a previously committed root value hash
func (ddb *DoltDB) Commit(ctx context.Context, valHash hash.Hash, dref ref.DoltRef, cm *datas.CommitMeta) (*Commit, error) {
if dref.GetType() != ref.BranchRefType {
@@ -525,10 +540,13 @@ func (ddb *DoltDB) CommitWithParentCommits(ctx context.Context, valHash hash.Has
parents = append(parents, addr)
}
}
commitOpts := datas.CommitOptions{Parents: parents, Meta: cm}
ds, err = ddb.db.GetDataset(ctx, dref.String())
return ddb.CommitValue(ctx, dref, val, commitOpts)
}
func (ddb *DoltDB) CommitValue(ctx context.Context, dref ref.DoltRef, val types.Value, commitOpts datas.CommitOptions) (*Commit, error) {
ds, err := ddb.db.GetDataset(ctx, dref.String())
if err != nil {
return nil, err
}
@@ -573,9 +591,16 @@ func (ddb *DoltDB) CommitDanglingWithParentCommits(ctx context.Context, valHash
}
parents = append(parents, addr)
}
commitOpts := datas.CommitOptions{Parents: parents, Meta: cm}
dcommit, err := datas.NewCommitForValue(ctx, datas.ChunkStoreFromDatabase(ddb.db), ddb.vrw, ddb.ns, val, commitOpts)
return ddb.CommitDangling(ctx, val, commitOpts)
}
// CommitDangling creates a new Commit for |val| that is not referenced by any DoltRef.
func (ddb *DoltDB) CommitDangling(ctx context.Context, val types.Value, opts datas.CommitOptions) (*Commit, error) {
cs := datas.ChunkStoreFromDatabase(ddb.db)
dcommit, err := datas.NewCommitForValue(ctx, cs, ddb.vrw, ddb.ns, val, opts)
if err != nil {
return nil, err
}
@@ -633,10 +658,7 @@ func (ddb *DoltDB) ResolveParent(ctx context.Context, commit *Commit, parentIdx
}
func (ddb *DoltDB) ResolveAllParents(ctx context.Context, commit *Commit) ([]*Commit, error) {
num, err := commit.NumParents()
if err != nil {
return nil, err
}
num := commit.NumParents()
resolved := make([]*Commit, num)
for i := 0; i < num; i++ {
parent, err := ddb.ResolveParent(ctx, commit, i)

View File

@@ -325,7 +325,7 @@ func TestLDNoms(t *testing.T) {
assert.Equal(t, len(branches), 1)
assert.Equal(t, branches[0].Ref.GetPath(), "master")
numParents, err := commit.NumParents()
numParents := commit.NumParents()
assert.NoError(t, err)
if numParents != 1 {

View File

@@ -1249,3 +1249,9 @@ func NewDataCacheKey(rv *RootValue) (DataCacheKey, error) {
return DataCacheKey{hash}, nil
}
// HackNomsValuesFromRootValues unwraps a RootVal to a noms Value.
// Deprecated: only for use in dolt migrate.
func HackNomsValuesFromRootValues(root *RootValue) types.Value {
return root.nomsValue()
}

View File

@@ -0,0 +1,247 @@
// Copyright 2022 Dolthub, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package migrate
import (
"context"
"fmt"
"path/filepath"
"strings"
"time"
"github.com/dolthub/dolt/go/libraries/doltcore/dbfactory"
"github.com/dolthub/dolt/go/libraries/doltcore/doltdb"
"github.com/dolthub/dolt/go/libraries/doltcore/env"
"github.com/dolthub/dolt/go/libraries/doltcore/ref"
"github.com/dolthub/dolt/go/libraries/utils/earl"
"github.com/dolthub/dolt/go/libraries/utils/filesys"
"github.com/dolthub/dolt/go/store/datas"
"github.com/dolthub/dolt/go/store/types"
)
const (
doltDir = dbfactory.DoltDir
nomsDir = dbfactory.DataDir
manifestFile = "manifest"
migrationRef = "migration"
)
var (
targetFormat = types.Format_DOLT_DEV
migrationMsg = fmt.Sprintf("migrating database to Noms Binary Format %s", targetFormat.VersionString())
)
// Environment is a migration environment.
type Environment struct {
Migration *env.DoltEnv
Existing *env.DoltEnv
}
// NewEnvironment creates a migration Environment for |existing|.
func NewEnvironment(ctx context.Context, existing *env.DoltEnv) (Environment, error) {
mfs, err := getMigrateFS(existing.FS)
if err != nil {
return Environment{}, err
}
if err = initMigrationDB(ctx, existing, existing.FS, mfs); err != nil {
return Environment{}, err
}
mdb, err := doltdb.LoadDoltDB(ctx, targetFormat, doltdb.LocalDirDoltDB, mfs)
if err != nil {
return Environment{}, err
}
config, err := env.LoadDoltCliConfig(env.GetCurrentUserHomeDir, mfs)
if err != nil {
return Environment{}, err
}
migration := &env.DoltEnv{
Version: existing.Version,
Config: config,
RepoState: existing.RepoState,
DoltDB: mdb,
FS: mfs,
//urlStr: urlStr,
//hdp: hdp,
}
return Environment{
Migration: migration,
Existing: existing,
}, nil
}
func initMigrationDB(ctx context.Context, existing *env.DoltEnv, src, dest filesys.Filesys) (err error) {
base, err := src.Abs(".")
if err != nil {
return err
}
ierr := src.Iter(doltDir, true, func(path string, size int64, isDir bool) (stop bool) {
if isDir {
err = dest.MkDirs(path)
stop = err != nil
return
}
if strings.Contains(path, nomsDir) {
return
}
path, err = filepath.Rel(base, path)
if err != nil {
stop = true
return
}
if err = filesys.CopyFile(path, path, src, dest); err != nil {
stop = true
return
}
return
})
if ierr != nil {
return ierr
}
if err != nil {
return err
}
dd, err := dest.Abs(filepath.Join(doltDir, nomsDir))
if err != nil {
return err
}
if err = dest.MkDirs(dd); err != nil {
return err
}
u, err := earl.Parse(dd)
if err != nil {
return err
}
db, vrw, ns, err := dbfactory.FileFactory{}.CreateDB(ctx, targetFormat, u, nil)
if err != nil {
return err
}
// write init commit for migration
name, email, err := env.GetNameAndEmail(existing.Config)
if err != nil {
return err
}
meta, err := datas.NewCommitMeta(name, email, migrationMsg)
if err != nil {
return err
}
rv, err := doltdb.EmptyRootValue(ctx, vrw, ns)
if err != nil {
return err
}
nv := doltdb.HackNomsValuesFromRootValues(rv)
ds, err := db.GetDataset(ctx, ref.NewInternalRef(migrationRef).String())
if err != nil {
return err
}
_, err = db.Commit(ctx, ds, nv, datas.CommitOptions{Meta: meta})
return nil
}
// SwapChunkStores atomically swaps the ChunkStores of |menv.Migration| and |menv.Existing|.
func SwapChunkStores(ctx context.Context, menv Environment) error {
src, dest := menv.Migration.FS, menv.Existing.FS
absSrc, err := src.Abs(filepath.Join(doltDir, nomsDir))
if err != nil {
return err
}
absDest, err := dest.Abs(filepath.Join(doltDir, nomsDir))
if err != nil {
return err
}
var cpErr error
err = src.Iter(absSrc, true, func(p string, size int64, isDir bool) (stop bool) {
if strings.Contains(p, manifestFile) || isDir {
return
}
var relPath string
if relPath, cpErr = filepath.Rel(absSrc, p); cpErr != nil {
stop = true
return
}
srcPath := filepath.Join(absSrc, relPath)
destPath := filepath.Join(absDest, relPath)
if cpErr = filesys.CopyFile(srcPath, destPath, src, dest); cpErr != nil {
stop = true
}
return
})
if err != nil {
return err
}
if cpErr != nil {
return cpErr
}
return swapManifests(ctx, src, dest)
}
func swapManifests(ctx context.Context, src, dest filesys.Filesys) (err error) {
// backup the current manifest
manifest := filepath.Join(doltDir, nomsDir, manifestFile)
bak := filepath.Join(doltDir, nomsDir, manifestFile+".bak")
if err = filesys.CopyFile(manifest, bak, dest, dest); err != nil {
return err
}
// copy manifest to |dest| under temporary name
tmp := filepath.Join(doltDir, nomsDir, "temp-manifest")
if err = filesys.CopyFile(manifest, tmp, src, dest); err != nil {
return err
}
// atomically swap the manifests
return dest.MoveFile(tmp, manifest)
// exit immediately!
}
func getMigrateFS(existing filesys.Filesys) (filesys.Filesys, error) {
uniq := fmt.Sprintf("dolt_migration_%d", time.Now().UnixNano())
tmpPath := filepath.Join(existing.TempDir(), uniq)
if err := existing.MkDirs(tmpPath); err != nil {
return nil, err
}
mfs, err := filesys.LocalFilesysWithWorkingDir(tmpPath)
if err != nil {
return nil, err
}
if err = mfs.MkDirs(doltDir); err != nil {
return nil, err
}
return mfs, nil
}

View File

@@ -0,0 +1,87 @@
// Copyright 2022 Dolthub, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package migrate
import (
"context"
"fmt"
"github.com/dolthub/dolt/go/libraries/doltcore/doltdb"
"github.com/dolthub/dolt/go/store/hash"
)
type ChunkMapping interface {
Has(ctx context.Context, addr hash.Hash) (bool, error)
Get(ctx context.Context, addr hash.Hash) (hash.Hash, error)
Put(ctx context.Context, old, new hash.Hash) error
}
type CommitStack interface {
Push(ctx context.Context, cm *doltdb.Commit) error
Pop(ctx context.Context) (*doltdb.Commit, error)
}
type Progress interface {
ChunkMapping
CommitStack
Log(ctx context.Context, format string, args ...any)
}
type memoryProgress struct {
stack []*doltdb.Commit
mapping map[hash.Hash]hash.Hash
}
func newProgress() Progress {
return &memoryProgress{
stack: make([]*doltdb.Commit, 0, 128),
mapping: make(map[hash.Hash]hash.Hash, 128),
}
}
func (mem *memoryProgress) Has(ctx context.Context, addr hash.Hash) (ok bool, err error) {
_, ok = mem.mapping[addr]
return
}
func (mem *memoryProgress) Get(ctx context.Context, old hash.Hash) (new hash.Hash, err error) {
new = mem.mapping[old]
return
}
func (mem *memoryProgress) Put(ctx context.Context, old, new hash.Hash) (err error) {
mem.mapping[old] = new
return
}
func (mem *memoryProgress) Push(ctx context.Context, cm *doltdb.Commit) (err error) {
mem.stack = append(mem.stack, cm)
return
}
func (mem *memoryProgress) Pop(ctx context.Context) (cm *doltdb.Commit, err error) {
if len(mem.stack) == 0 {
return nil, nil
}
top := len(mem.stack) - 1
cm = mem.stack[top]
mem.stack = mem.stack[:top]
return
}
func (mem *memoryProgress) Log(ctx context.Context, format string, args ...any) {
fmt.Println(fmt.Sprintf(format, args...))
}

View File

@@ -0,0 +1,296 @@
// Copyright 2022 Dolthub, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package migrate
import (
"context"
"github.com/dolthub/dolt/go/libraries/doltcore/ref"
"github.com/dolthub/dolt/go/libraries/doltcore/doltdb"
"github.com/dolthub/dolt/go/libraries/doltcore/doltdb/durable"
"github.com/dolthub/dolt/go/libraries/doltcore/schema"
"github.com/dolthub/dolt/go/store/datas"
"github.com/dolthub/dolt/go/store/hash"
"github.com/dolthub/dolt/go/store/types"
)
func migrateWorkingSet(ctx context.Context, wsRef ref.WorkingSetRef, old, new *doltdb.DoltDB, prog Progress) error {
oldWs, err := old.ResolveWorkingSet(ctx, wsRef)
if err != nil {
return err
}
wr, err := migrateRoot(ctx, oldWs.WorkingRoot(), new)
if err != nil {
return err
}
sr, err := migrateRoot(ctx, oldWs.StagedRoot(), new)
if err != nil {
return err
}
newWs := doltdb.EmptyWorkingSet(wsRef).WithWorkingRoot(wr).WithStagedRoot(sr)
return new.UpdateWorkingSet(ctx, wsRef, newWs, hash.Hash{}, oldWs.Meta())
}
func migrateCommit(ctx context.Context, cm *doltdb.Commit, new *doltdb.DoltDB, prog Progress) error {
oldHash, err := cm.HashOf()
if err != nil {
return err
}
ok, err := prog.Has(ctx, oldHash)
if err != nil {
return err
} else if ok {
return nil
}
if cm.NumParents() == 0 {
return migrateInitCommit(ctx, cm, new, prog)
}
prog.Log(ctx, "migrating commit %s", oldHash.String())
root, err := cm.GetRootValue(ctx)
if err != nil {
return err
}
mRoot, err := migrateRoot(ctx, root, new)
if err != nil {
return err
}
_, addr, err := new.WriteRootValue(ctx, mRoot)
if err != nil {
return err
}
value, err := new.ValueReadWriter().ReadValue(ctx, addr)
if err != nil {
return err
}
opts, err := migrateCommitOptions(ctx, cm, prog)
if err != nil {
return err
}
migratedCm, err := new.CommitDangling(ctx, value, opts)
if err != nil {
return err
}
// update progress
newHash, err := migratedCm.HashOf()
if err != nil {
return err
}
return prog.Put(ctx, oldHash, newHash)
}
func migrateInitCommit(ctx context.Context, cm *doltdb.Commit, new *doltdb.DoltDB, prog Progress) error {
oldHash, err := cm.HashOf()
if err != nil {
return err
}
rv, err := doltdb.EmptyRootValue(ctx, new.ValueReadWriter(), new.NodeStore())
if err != nil {
return err
}
nv := doltdb.HackNomsValuesFromRootValues(rv)
meta, err := cm.GetCommitMeta(ctx)
if err != nil {
return err
}
datasDB := doltdb.HackDatasDatabaseFromDoltDB(new)
creation := ref.NewInternalRef(doltdb.CreationBranch)
ds, err := datasDB.GetDataset(ctx, creation.String())
if err != nil {
return err
}
ds, err = datasDB.Commit(ctx, ds, nv, datas.CommitOptions{Meta: meta})
if err != nil {
return err
}
newCm, err := new.ResolveCommitRef(ctx, creation)
if err != nil {
return err
}
newHash, err := newCm.HashOf()
if err != nil {
return err
}
return prog.Put(ctx, oldHash, newHash)
}
func migrateCommitOptions(ctx context.Context, oldCm *doltdb.Commit, prog Progress) (datas.CommitOptions, error) {
parents, err := oldCm.ParentHashes(ctx)
if err != nil {
return datas.CommitOptions{}, err
}
if len(parents) == 0 {
panic("expected non-zero parents list")
}
for i := range parents {
migrated, err := prog.Get(ctx, parents[i])
if err != nil {
return datas.CommitOptions{}, err
}
parents[i] = migrated
}
meta, err := oldCm.GetCommitMeta(ctx)
if err != nil {
return datas.CommitOptions{}, err
}
return datas.CommitOptions{
Parents: parents,
Meta: meta,
}, nil
}
func migrateRoot(ctx context.Context, root *doltdb.RootValue, new *doltdb.DoltDB) (*doltdb.RootValue, error) {
migrated, err := doltdb.EmptyRootValue(ctx, new.ValueReadWriter(), new.NodeStore())
if err != nil {
return nil, err
}
fkc, err := root.GetForeignKeyCollection(ctx)
if err != nil {
return nil, err
}
migrated, err = migrated.PutForeignKeyCollection(ctx, fkc)
if err != nil {
return nil, err
}
err = root.IterTables(ctx, func(name string, tbl *doltdb.Table, _ schema.Schema) (bool, error) {
mtbl, err := migrateTable(ctx, tbl, new)
if err != nil {
return true, err
}
migrated, err = migrated.PutTable(ctx, name, mtbl)
if err != nil {
return true, err
}
return false, nil
})
if err != nil {
return nil, err
}
return migrated, nil
}
func migrateTable(ctx context.Context, table *doltdb.Table, new *doltdb.DoltDB) (*doltdb.Table, error) {
rows, err := table.GetRowData(ctx)
if err != nil {
return nil, err
}
err = migrateNomsMap(ctx, rows, table.ValueReadWriter(), new.ValueReadWriter())
if err != nil {
return nil, err
}
ai, err := table.GetAutoIncrementValue(ctx)
if err != nil {
return nil, err
}
autoInc := types.Uint(ai)
sch, err := table.GetSchema(ctx)
if err != nil {
return nil, err
}
oldSet, err := table.GetIndexSet(ctx)
if err != nil {
return nil, err
}
newSet, err := migrateIndexSet(ctx, sch, oldSet, table.ValueReadWriter(), new)
if err != nil {
return nil, err
}
return doltdb.NewTable(ctx, new.ValueReadWriter(), new.NodeStore(), sch, rows, newSet, autoInc)
}
func migrateIndexSet(ctx context.Context, sch schema.Schema, oldSet durable.IndexSet, old types.ValueReadWriter, new *doltdb.DoltDB) (durable.IndexSet, error) {
newSet := durable.NewIndexSet(ctx, new.ValueReadWriter(), new.NodeStore())
for _, def := range sch.Indexes().AllIndexes() {
idx, err := oldSet.GetIndex(ctx, sch, def.Name())
if err != nil {
return nil, err
}
if err = migrateNomsMap(ctx, idx, old, new.ValueReadWriter()); err != nil {
return nil, err
}
newSet, err = newSet.PutIndex(ctx, def.Name(), idx)
if err != nil {
return nil, err
}
}
return newSet, nil
}
func migrateNomsMap(ctx context.Context, idx durable.Index, old, new types.ValueReadWriter) error {
m := durable.NomsMapFromIndex(idx)
return copyTreeFromValue(ctx, m, old, new)
}
// copyTreeFromValue recursively copies |v| and all its children from |old| to |new|.
func copyTreeFromValue(ctx context.Context, v types.Value, old, new types.ValueReadWriter) error {
if _, err := new.WriteValue(ctx, v); err != nil {
return err
}
return types.WalkAddrs(v, old.Format(), func(h hash.Hash, isleaf bool) error {
if err := copyValue(ctx, h, old, new); err != nil {
return err
}
if isleaf {
return nil
}
val, err := old.ReadValue(ctx, h)
if err != nil {
return err
}
return copyTreeFromValue(ctx, val, old, new)
})
}
func copyValue(ctx context.Context, addr hash.Hash, old, new types.ValueReadWriter) (err error) {
var v types.Value
if v, err = old.ReadValue(ctx, addr); err != nil {
return err
}
_, err = new.WriteValue(ctx, v)
return
}

View File

@@ -0,0 +1,177 @@
// Copyright 2022 Dolthub, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package migrate
import (
"context"
"fmt"
"github.com/dolthub/dolt/go/store/hash"
"github.com/dolthub/dolt/go/libraries/doltcore/doltdb"
"github.com/dolthub/dolt/go/libraries/doltcore/ref"
)
// TraverseDAG traverses |old|, migrating values to |new|.
func TraverseDAG(ctx context.Context, old, new *doltdb.DoltDB) error {
heads, err := old.GetHeadRefs(ctx)
if err != nil {
return err
}
prog := newProgress()
for i := range heads {
if err = traverseRefHistory(ctx, heads[i], old, new, prog); err != nil {
return err
}
}
if err = validateMigration(ctx, old, new); err != nil {
return err
}
return nil
}
func traverseRefHistory(ctx context.Context, r ref.DoltRef, old, new *doltdb.DoltDB, prog Progress) error {
switch r.GetType() {
case ref.BranchRefType:
if err := traverseBranchHistory(ctx, r, old, new, prog); err != nil {
return err
}
wsRef, err := ref.WorkingSetRefForHead(r)
if err != nil {
return err
}
return migrateWorkingSet(ctx, wsRef, old, new, prog)
case ref.TagRefType:
return traverseTagHistory(ctx, r.(ref.TagRef), old, new, prog)
case ref.RemoteRefType:
return traverseBranchHistory(ctx, r, old, new, prog)
case ref.WorkspaceRefType, ref.InternalRefType:
return nil
default:
panic(fmt.Sprintf("unknown ref type %s", r.String()))
}
}
func traverseBranchHistory(ctx context.Context, r ref.DoltRef, old, new *doltdb.DoltDB, prog Progress) error {
cm, err := old.ResolveCommitRef(ctx, r)
if err != nil {
return err
}
if err = traverseCommitHistory(ctx, cm, new, prog); err != nil {
return err
}
oldHash, err := cm.HashOf()
if err != nil {
return err
}
newHash, err := prog.Get(ctx, oldHash)
if err != nil {
return err
}
return new.SetHead(ctx, r, newHash)
}
func traverseTagHistory(ctx context.Context, r ref.TagRef, old, new *doltdb.DoltDB, prog Progress) error {
t, err := old.ResolveTag(ctx, r)
if err != nil {
return err
}
if err = traverseCommitHistory(ctx, t.Commit, new, prog); err != nil {
return err
}
oldHash, err := t.Commit.HashOf()
if err != nil {
return err
}
newHash, err := prog.Get(ctx, oldHash)
if err != nil {
return err
}
cm, err := new.ReadCommit(ctx, newHash)
if err != nil {
return err
}
return new.NewTagAtCommit(ctx, r, cm, t.Meta)
}
func traverseCommitHistory(ctx context.Context, cm *doltdb.Commit, new *doltdb.DoltDB, prog Progress) error {
ch, err := cm.HashOf()
if err != nil {
return err
}
ok, err := prog.Has(ctx, ch)
if err != nil || ok {
return err
}
for {
ph, err := cm.ParentHashes(ctx)
if err != nil {
return err
}
idx, err := firstAbsent(ctx, prog, ph)
if err != nil {
return err
}
if idx < 0 {
// parents for |cm| are done, migrate |cm|
if err = migrateCommit(ctx, cm, new, prog); err != nil {
return err
}
// pop the stack, traverse upwards
cm, err = prog.Pop(ctx)
if err != nil {
return err
}
if cm == nil {
return nil // done
}
continue
}
// push the stack, traverse downwards
if err = prog.Push(ctx, cm); err != nil {
return err
}
cm, err = cm.GetParent(ctx, idx)
if err != nil {
return err
}
}
}
func firstAbsent(ctx context.Context, p Progress, addrs []hash.Hash) (int, error) {
for i := range addrs {
ok, err := p.Has(ctx, addrs[i])
if err != nil {
return -1, err
}
if !ok {
return i, nil
}
}
return -1, nil
}

View File

@@ -0,0 +1,48 @@
// Copyright 2022 Dolthub, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package migrate
import (
"context"
"fmt"
"github.com/dolthub/dolt/go/libraries/doltcore/doltdb"
)
func validateMigration(ctx context.Context, old, new *doltdb.DoltDB) error {
if err := validateBranchMapping(ctx, old, new); err != nil {
return err
}
return nil
}
func validateBranchMapping(ctx context.Context, old, new *doltdb.DoltDB) error {
branches, err := old.GetBranches(ctx)
if err != nil {
return err
}
var ok bool
for _, bref := range branches {
ok, err = new.HasBranch(ctx, bref.GetPath())
if err != nil {
return err
}
if !ok {
return fmt.Errorf("failed to map branch %s", bref.GetPath())
}
}
return nil
}

View File

@@ -31,8 +31,7 @@ type NeedsRebaseFn func(ctx context.Context, cm *doltdb.Commit) (bool, error)
// EntireHistory returns a |NeedsRebaseFn| that rebases the entire commit history.
func EntireHistory() NeedsRebaseFn {
return func(_ context.Context, cm *doltdb.Commit) (bool, error) {
n, err := cm.NumParents()
return n != 0, err
return cm.NumParents() != 0, nil
}
}
@@ -54,11 +53,7 @@ func StopAtCommit(stopCommit *doltdb.Commit) NeedsRebaseFn {
return false, nil
}
n, err := cm.NumParents()
if err != nil {
return false, err
}
if n == 0 {
if cm.NumParents() == 0 {
return false, fmt.Errorf("commit %s is missing from the commit history of at least one rebase head", sh)
}

View File

@@ -16,6 +16,7 @@ package dsess
import (
"strings"
"sync"
"github.com/dolthub/go-mysql-server/sql"
@@ -28,6 +29,8 @@ type SessionCache struct {
indexes map[doltdb.DataCacheKey]map[string][]sql.Index
tables map[doltdb.DataCacheKey]map[string]sql.Table
views map[doltdb.DataCacheKey]map[string]string
mu sync.RWMutex
}
func newSessionCache() *SessionCache {
@@ -36,6 +39,9 @@ func newSessionCache() *SessionCache {
// CacheTableIndexes caches all indexes for the table with the name given
func (c *SessionCache) CacheTableIndexes(key doltdb.DataCacheKey, table string, indexes []sql.Index) {
c.mu.Lock()
defer c.mu.Unlock()
table = strings.ToLower(table)
if c.indexes == nil {
@@ -53,8 +59,10 @@ func (c *SessionCache) CacheTableIndexes(key doltdb.DataCacheKey, table string,
// GetTableIndexesCache returns the cached index information for the table named, and whether the cache was present
func (c *SessionCache) GetTableIndexesCache(key doltdb.DataCacheKey, table string) ([]sql.Index, bool) {
table = strings.ToLower(table)
c.mu.RLock()
defer c.mu.RUnlock()
table = strings.ToLower(table)
if c.indexes == nil {
return nil, false
}
@@ -70,8 +78,10 @@ func (c *SessionCache) GetTableIndexesCache(key doltdb.DataCacheKey, table strin
// CacheTable caches a sql.Table implementation for the table named
func (c *SessionCache) CacheTable(key doltdb.DataCacheKey, tableName string, table sql.Table) {
tableName = strings.ToLower(tableName)
c.mu.Lock()
defer c.mu.Unlock()
tableName = strings.ToLower(tableName)
if c.tables == nil {
c.tables = make(map[doltdb.DataCacheKey]map[string]sql.Table)
}
@@ -87,13 +97,20 @@ func (c *SessionCache) CacheTable(key doltdb.DataCacheKey, tableName string, tab
// ClearTableCache removes all cache info for all tables at all cache keys
func (c *SessionCache) ClearTableCache() {
c.tables = make(map[doltdb.DataCacheKey]map[string]sql.Table)
c.mu.Lock()
defer c.mu.Unlock()
for k := range c.tables {
delete(c.tables, k)
}
}
// GetCachedTable returns the cached sql.Table for the table named, and whether the cache was present
func (c *SessionCache) GetCachedTable(key doltdb.DataCacheKey, tableName string) (sql.Table, bool) {
tableName = strings.ToLower(tableName)
c.mu.RLock()
defer c.mu.RUnlock()
tableName = strings.ToLower(tableName)
if c.tables == nil {
return nil, false
}
@@ -109,6 +126,9 @@ func (c *SessionCache) GetCachedTable(key doltdb.DataCacheKey, tableName string)
// CacheViews caches all views in a database for the cache key given
func (c *SessionCache) CacheViews(key doltdb.DataCacheKey, viewNames []string, viewDefs []string) {
c.mu.Lock()
defer c.mu.Unlock()
if c.views == nil {
c.views = make(map[doltdb.DataCacheKey]map[string]string)
}
@@ -127,6 +147,9 @@ func (c *SessionCache) CacheViews(key doltdb.DataCacheKey, viewNames []string, v
// ViewsCached returns whether this cache has been initialized with the set of views yet
func (c *SessionCache) ViewsCached(key doltdb.DataCacheKey) bool {
c.mu.RLock()
defer c.mu.RUnlock()
if c.views == nil {
return false
}
@@ -137,8 +160,10 @@ func (c *SessionCache) ViewsCached(key doltdb.DataCacheKey) bool {
// GetCachedView returns the cached view named, and whether the cache was present
func (c *SessionCache) GetCachedView(key doltdb.DataCacheKey, viewName string) (string, bool) {
viewName = strings.ToLower(viewName)
c.mu.RLock()
defer c.mu.RUnlock()
viewName = strings.ToLower(viewName)
if c.views == nil {
return "", false
}

View File

@@ -508,7 +508,7 @@ func (cd *prollyConflictDeleter) putPrimaryKeys(ctx *sql.Context, r sql.Row) err
}()
for i := 0; i < cd.kd.Count()-2; i++ {
err := index.PutField(ctx, cd.ed.Mut.NodeStore(), cd.kB, i, r[o+i])
err := index.PutField(ctx, cd.ed.NodeStore(), cd.kB, i, r[o+i])
if err != nil {
return err
@@ -531,7 +531,7 @@ func (cd *prollyConflictDeleter) putKeylessHash(ctx *sql.Context, r sql.Row) err
// init cardinality to 0
cd.vB.PutUint64(0, 0)
for i, v := range rowVals {
err := index.PutField(ctx, cd.ed.Mut.NodeStore(), cd.vB, i+1, v)
err := index.PutField(ctx, cd.ed.NodeStore(), cd.vB, i+1, v)
if err != nil {
return err
}

View File

@@ -18,6 +18,7 @@ import (
"context"
gosql "database/sql"
"math/rand"
"runtime"
"strings"
"testing"
"time"
@@ -209,7 +210,7 @@ func TestDoltMultiSessionBehavior(t *testing.T) {
func testMultiSessionScriptTests(t *testing.T, tests []queries.ScriptTest) {
for _, test := range tests {
sc, serverConfig := startServer(t)
sc, serverConfig := startServer(t, true, "", "")
sc.WaitForStart()
conn1, sess1 := newConnection(t, serverConfig)
@@ -313,11 +314,22 @@ func assertResultsEqual(t *testing.T, expected []sql.Row, rows *gosql.Rows) {
}
}
func startServer(t *testing.T) (*sqlserver.ServerController, sqlserver.ServerConfig) {
// startServer will start sql-server with given host, unix socket file path and whether to use specific port, which is defined randomly.
func startServer(t *testing.T, withPort bool, host string, unixSocketPath string) (*sqlserver.ServerController, sqlserver.ServerConfig) {
dEnv := dtestutils.CreateTestEnv()
rand.Seed(time.Now().UnixNano())
port := 15403 + rand.Intn(25)
serverConfig := sqlserver.DefaultServerConfig().WithPort(port)
serverConfig := sqlserver.DefaultServerConfig()
if withPort {
rand.Seed(time.Now().UnixNano())
port := 15403 + rand.Intn(25)
serverConfig = serverConfig.WithPort(port)
}
if host != "" {
serverConfig = serverConfig.WithHost(host)
}
if unixSocketPath != "" {
serverConfig = serverConfig.WithSocket(unixSocketPath)
}
sc := sqlserver.NewServerController()
go func() {
@@ -329,6 +341,7 @@ func startServer(t *testing.T) (*sqlserver.ServerController, sqlserver.ServerCon
return sc, serverConfig
}
// newConnection takes sqlserver.serverConfig and opens a connection, and will return that connection with a new session
func newConnection(t *testing.T, serverConfig sqlserver.ServerConfig) (*dbr.Connection, *dbr.Session) {
const dbName = "dolt"
conn, err := dbr.Open("mysql", sqlserver.ConnectionString(serverConfig, dbName), nil)
@@ -336,3 +349,83 @@ func newConnection(t *testing.T, serverConfig sqlserver.ServerConfig) (*dbr.Conn
sess := conn.NewSession(nil)
return conn, sess
}
func TestDoltServerRunningUnixSocket(t *testing.T) {
if runtime.GOOS == "windows" {
t.Skip("unix sockets not supported on Windows")
}
const defaultUnixSocketPath = "/tmp/mysql.sock"
// Running unix socket server
sc, serverConfig := startServer(t, false, "", defaultUnixSocketPath)
sc.WaitForStart()
require.True(t, strings.Contains(sqlserver.ConnectionString(serverConfig, "dolt"), "unix"))
// default unix socket connection works
localConn, localSess := newConnection(t, serverConfig)
rows, err := localSess.Query("select 1")
require.NoError(t, err)
assertResultsEqual(t, []sql.Row{{1}}, rows)
t.Run("connecting to local server with tcp connections", func(t *testing.T) {
// connect with port defined
serverConfigWithPortOnly := sqlserver.DefaultServerConfig().WithPort(3306)
conn1, sess1 := newConnection(t, serverConfigWithPortOnly)
rows1, err := sess1.Query("select 1")
require.NoError(t, err)
assertResultsEqual(t, []sql.Row{{1}}, rows1)
// connect with host defined
serverConfigWithPortandHost := sqlserver.DefaultServerConfig().WithHost("127.0.0.1")
conn2, sess2 := newConnection(t, serverConfigWithPortandHost)
rows2, err := sess2.Query("select 1")
require.NoError(t, err)
assertResultsEqual(t, []sql.Row{{1}}, rows2)
// connect with port and host defined
serverConfigWithPortandHost1 := sqlserver.DefaultServerConfig().WithPort(3306).WithHost("0.0.0.0")
conn3, sess3 := newConnection(t, serverConfigWithPortandHost1)
rows3, err := sess3.Query("select 1")
require.NoError(t, err)
assertResultsEqual(t, []sql.Row{{1}}, rows3)
// close connections
require.NoError(t, conn3.Close())
require.NoError(t, conn2.Close())
require.NoError(t, conn1.Close())
})
require.NoError(t, localConn.Close())
// Stopping unix socket server
sc.StopServer()
err = sc.WaitForClose()
require.NoError(t, err)
require.NoFileExists(t, defaultUnixSocketPath)
// Running TCP socket server
tcpSc, tcpServerConfig := startServer(t, true, "0.0.0.0", "")
tcpSc.WaitForStart()
require.False(t, strings.Contains(sqlserver.ConnectionString(tcpServerConfig, "dolt"), "unix"))
t.Run("host and port specified, there should not be unix socket created", func(t *testing.T) {
// unix socket connection should fail
localServerConfig := sqlserver.DefaultServerConfig().WithSocket(defaultUnixSocketPath)
conn, sess := newConnection(t, localServerConfig)
_, err := sess.Query("select 1")
require.Error(t, err)
require.NoError(t, conn.Close())
// connection with the host and port define should work
conn1, sess1 := newConnection(t, tcpServerConfig)
rows1, err := sess1.Query("select 1")
require.NoError(t, err)
assertResultsEqual(t, []sql.Row{{1}}, rows1)
require.NoError(t, conn1.Close())
})
// Stopping TCP socket server
tcpSc.StopServer()
err = tcpSc.WaitForClose()
require.NoError(t, err)
}

View File

@@ -184,17 +184,20 @@ func TestJSONStructuralSharing(t *testing.T) {
val := MustNomsJSONWithVRW(vrw, sb.String())
json_refs := make(hash.HashSet)
err := types.WalkAddrs(types.JSON(val), vrw.Format(), func(h hash.Hash, _ bool) {
err := types.WalkAddrs(types.JSON(val), vrw.Format(), func(h hash.Hash, _ bool) error {
json_refs.Insert(h)
return nil
})
require.NoError(t, err)
tup, err := types.NewTuple(types.Format_Default, types.Int(12), types.JSON(val))
require.NoError(t, err)
tuple_refs := make(hash.HashSet)
types.WalkAddrs(tup, vrw.Format(), func(h hash.Hash, _ bool) {
err = types.WalkAddrs(tup, vrw.Format(), func(h hash.Hash, _ bool) error {
tuple_refs.Insert(h)
return nil
})
assert.NoError(t, err)
assert.Greater(t, len(json_refs), 0)
assert.Equal(t, len(json_refs), len(tuple_refs))

View File

@@ -71,6 +71,9 @@ type WritableFS interface {
// MoveFile will move a file from the srcPath in the filesystem to the destPath
MoveFile(srcPath, destPath string) error
// TempDir returns the path of a new temporary directory.
TempDir() string
}
// FSIterCB specifies the signature of the function that will be called for every item found while iterating.
@@ -111,3 +114,18 @@ func UnmarshalJSONFile(fs ReadableFS, path string, dest interface{}) error {
return json.Unmarshal(data, dest)
}
func CopyFile(srcPath, destPath string, srcFS, destFS Filesys) (err error) {
rd, err := srcFS.OpenForRead(srcPath)
if err != nil {
return err
}
wr, err := destFS.OpenForWrite(destPath, os.ModePerm)
if err != nil {
return err
}
_, err = io.Copy(wr, rd)
return
}

View File

@@ -103,6 +103,20 @@ func TestFilesystems(t *testing.T) {
dataRead, err = fs.ReadFile(movedFilePath)
require.NoError(t, err)
require.Equal(t, dataRead, data)
tmp := fs.TempDir()
require.NotEmpty(t, tmp)
fp2 := filepath.Join(tmp, "data.txt")
wrc, err := fs.OpenForWrite(fp2, os.ModePerm)
require.NoError(t, err)
require.NoError(t, wrc.Close())
// Test writing/reading random data to tmp file
err = fs.WriteFile(fp2, data)
require.NoError(t, err)
dataRead, err = fs.ReadFile(fp2)
require.NoError(t, err)
require.Equal(t, dataRead, data)
})
}
}

View File

@@ -16,8 +16,10 @@ package filesys
import (
"bytes"
"encoding/base32"
"errors"
"io"
"math/rand"
"os"
"path/filepath"
"strings"
@@ -535,6 +537,44 @@ func (fs *InMemFS) MoveFile(srcPath, destPath string) error {
return os.ErrNotExist
}
func (fs *InMemFS) CopyFile(srcPath, destPath string) error {
fs.rwLock.Lock()
defer fs.rwLock.Unlock()
srcPath = fs.getAbsPath(srcPath)
destPath = fs.getAbsPath(destPath)
if exists, destIsDir := fs.exists(destPath); exists && destIsDir {
return ErrIsDir
}
if obj, ok := fs.objs[srcPath]; ok {
if obj.isDir() {
return ErrIsDir
}
destDir := filepath.Dir(destPath)
destParentDir, err := fs.mkDirs(destDir)
if err != nil {
return err
}
destData := make([]byte, len(obj.(*memFile).data))
copy(destData, obj.(*memFile).data)
now := InMemNowFunc()
destObj := &memFile{destPath, destData, destParentDir, now}
fs.objs[destPath] = destObj
destParentDir.objs[destPath] = destObj
destParentDir.time = now
return nil
}
return os.ErrNotExist
}
// converts a path to an absolute path. If it's already an absolute path the input path will be returned unaltered
func (fs *InMemFS) Abs(path string) (string, error) {
path = fs.pathToNative(path)
@@ -559,6 +599,13 @@ func (fs *InMemFS) LastModified(path string) (t time.Time, exists bool) {
return time.Time{}, false
}
func (fs *InMemFS) TempDir() string {
buf := make([]byte, 16)
rand.Read(buf)
s := base32.HexEncoding.EncodeToString(buf)
return "/var/folders/gc/" + s + "/T/"
}
func (fs *InMemFS) pathToNative(path string) string {
if len(path) >= 1 {
if path[0] == '.' {

View File

@@ -318,3 +318,7 @@ func (fs *localFS) LastModified(path string) (t time.Time, exists bool) {
return stat.ModTime(), true
}
func (fs *localFS) TempDir() string {
return os.TempDir()
}

View File

@@ -34,6 +34,7 @@ const AddressMapFileID = "ADRM"
const CommitClosureFileID = "CMCL"
const TableSchemaFileID = "DSCH"
const ForeignKeyCollectionFileID = "DFKC"
const MergeArtifactsFileID = "ARTM"
const MessageTypesKind int = 27

View File

@@ -17,6 +17,7 @@ flatc -o $GEN_DIR --gen-onefile --filename-suffix "" --gen-mutable --go-namespac
commitclosure.fbs \
encoding.fbs \
foreign_key.fbs \
mergeartifacts.fbs \
prolly.fbs \
rootvalue.fbs \
schema.fbs \

View File

@@ -0,0 +1,47 @@
// Copyright 2021 Dolthub, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
namespace serial;
table MergeArtifacts {
// sorted array of key items
// key items are encoded as TupleFormatAlpha
key_items:[ubyte] (required);
// items offets for |key_items|, zeroth offset omitted
key_offsets:[uint16] (required);
// offsets for each chunk address in |key_items|
key_address_offsets:[uint16];
// array of values items, ordered by paired key
// value items are encoded as TupleFormatAlpha
value_items:[ubyte];
// item offsets for |value_items|, zeroth offset omitted
value_offsets:[uint16];
// array of subtree addresses for internal tree nodes
address_array:[ubyte];
// array of uvarint encoded subtree counts
subtree_counts:[ubyte];
// total count of prolly tree
tree_count:uint64;
// prolly tree level, 0 for leaf nodes
tree_level:uint8;
}
// KEEP THIS IN SYNC WITH fileidentifiers.go
file_identifier "ARTM";
root_type MergeArtifacts;

View File

@@ -152,8 +152,9 @@ func (s *nomsShowTestSuite) TestNomsShowRaw() {
s.NoError(err)
numChildChunks := 0
err = types.WalkAddrs(l, vrw.Format(), func(_ hash.Hash, _ bool) {
err = types.WalkAddrs(l, vrw.Format(), func(_ hash.Hash, _ bool) error {
numChildChunks++
return nil
})
s.NoError(err)
s.True(numChildChunks > 0)

View File

@@ -51,6 +51,7 @@ import (
"fmt"
"regexp"
"strconv"
"strings"
"github.com/dolthub/dolt/go/store/d"
)
@@ -191,3 +192,17 @@ func (hs HashSet) Empty() {
delete(hs, h)
}
}
func (hs HashSet) String() string {
var sb strings.Builder
sb.Grow(len(hs)*34 + 100)
sb.WriteString("HashSet {\n")
for h := range hs {
sb.WriteString("\t")
sb.WriteString(h.String())
sb.WriteString("\n")
}
sb.WriteString("}\n")
return sb.String()
}

View File

@@ -130,11 +130,12 @@ func main() {
orderedChildren := hash.HashSlice{}
nextLevel := hash.HashSlice{}
for _, h := range current {
_ = types.WalkAddrs(currentValues[h], types.Format_Default, func(h hash.Hash, isleaf bool) {
_ = types.WalkAddrs(currentValues[h], types.Format_Default, func(h hash.Hash, isleaf bool) error {
orderedChildren = append(orderedChildren, h)
if !visited[h] && !isleaf {
nextLevel = append(nextLevel, h)
}
return nil
})
}

View File

@@ -54,7 +54,7 @@ type ArtifactMap struct {
// NewArtifactMap creates an artifact map based on |srcKeyDesc| which is the key descriptor for
// the corresponding row map.
func NewArtifactMap(node tree.Node, ns tree.NodeStore, srcKeyDesc val.TupleDesc) ArtifactMap {
keyDesc, valDesc := calcArtifactsDescriptors(srcKeyDesc)
keyDesc, valDesc := mergeArtifactsDescriptorsFromSource(srcKeyDesc)
tuples := orderedTree[val.Tuple, val.Tuple, val.TupleDesc]{
root: node,
ns: ns,
@@ -71,7 +71,9 @@ func NewArtifactMap(node tree.Node, ns tree.NodeStore, srcKeyDesc val.TupleDesc)
// NewArtifactMapFromTuples creates an artifact map based on |srcKeyDesc| which is the key descriptor for
// the corresponding row map and inserts the given |tups|. |tups| must be a key followed by a value.
func NewArtifactMapFromTuples(ctx context.Context, ns tree.NodeStore, srcKeyDesc val.TupleDesc, tups ...val.Tuple) (ArtifactMap, error) {
serializer := message.ProllyMapSerializer{Pool: ns.Pool()}
kd, vd := mergeArtifactsDescriptorsFromSource(srcKeyDesc)
serializer := message.MergeArtifactSerializer{KeyDesc: kd, Pool: ns.Pool()}
ch, err := tree.NewEmptyChunker(ctx, ns, serializer)
if err != nil {
return ArtifactMap{}, err
@@ -92,7 +94,17 @@ func NewArtifactMapFromTuples(ctx context.Context, ns tree.NodeStore, srcKeyDesc
return ArtifactMap{}, err
}
return NewArtifactMap(root, ns, srcKeyDesc), nil
tuples := orderedTree[val.Tuple, val.Tuple, val.TupleDesc]{
root: root,
ns: ns,
order: kd,
}
return ArtifactMap{
tuples: tuples,
srcKeyDesc: srcKeyDesc,
keyDesc: kd,
valDesc: vd,
}, nil
}
func (m ArtifactMap) Count() int {
@@ -147,7 +159,7 @@ func (m ArtifactMap) Editor() ArtifactsEditor {
artKD, artVD := m.Descriptors()
return ArtifactsEditor{
srcKeyDesc: m.srcKeyDesc,
Mut: MutableMap{
mut: MutableMap{
tuples: m.tuples.mutate(),
keyDesc: m.keyDesc,
valDesc: m.valDesc,
@@ -298,7 +310,7 @@ func MergeArtifactMaps(ctx context.Context, left, right, base ArtifactMap, cb tr
}
type ArtifactsEditor struct {
Mut MutableMap
mut MutableMap
srcKeyDesc val.TupleDesc
artKB, artVB *val.TupleBuilder
pool pool.BuffPool
@@ -315,7 +327,7 @@ func (wr ArtifactsEditor) Add(ctx context.Context, srcKey val.Tuple, theirRootIs
wr.artVB.PutJSON(0, meta)
value := wr.artVB.Build(wr.pool)
return wr.Mut.Put(ctx, key, value)
return wr.mut.Put(ctx, key, value)
}
type ErrMergeArtifactCollision struct {
@@ -333,14 +345,14 @@ func (e *ErrMergeArtifactCollision) Error() string {
// existing violation exists but has a different |meta.VInfo| value then
// ErrMergeArtifactCollision is a returned.
func (wr ArtifactsEditor) ReplaceConstraintViolation(ctx context.Context, srcKey val.Tuple, theirRootIsh hash.Hash, artType ArtifactType, meta ConstraintViolationMeta) error {
itr, err := wr.Mut.IterRange(ctx, PrefixRange(srcKey, wr.srcKeyDesc))
itr, err := wr.mut.IterRange(ctx, PrefixRange(srcKey, wr.srcKeyDesc))
if err != nil {
return err
}
aItr := artifactIterImpl{
itr: itr,
artKD: wr.Mut.keyDesc,
artVD: wr.Mut.valDesc,
artKD: wr.mut.keyDesc,
artVD: wr.mut.valDesc,
pool: wr.pool,
tb: val.NewTupleBuilder(wr.srcKeyDesc),
numPks: wr.srcKeyDesc.Count(),
@@ -395,11 +407,16 @@ func (wr ArtifactsEditor) ReplaceConstraintViolation(ctx context.Context, srcKey
}
func (wr ArtifactsEditor) Delete(ctx context.Context, key val.Tuple) error {
return wr.Mut.Delete(ctx, key)
return wr.mut.Delete(ctx, key)
}
func (wr ArtifactsEditor) Flush(ctx context.Context) (ArtifactMap, error) {
m, err := wr.Mut.Map(ctx)
s := message.MergeArtifactSerializer{
KeyDesc: wr.artKB.Desc,
Pool: wr.NodeStore().Pool(),
}
m, err := wr.mut.flushWithSerializer(ctx, s)
if err != nil {
return ArtifactMap{}, err
}
@@ -407,11 +424,15 @@ func (wr ArtifactsEditor) Flush(ctx context.Context) (ArtifactMap, error) {
return ArtifactMap{
tuples: m.tuples,
srcKeyDesc: wr.srcKeyDesc,
keyDesc: wr.Mut.keyDesc,
valDesc: wr.Mut.valDesc,
keyDesc: wr.mut.keyDesc,
valDesc: wr.mut.valDesc,
}, nil
}
func (wr ArtifactsEditor) NodeStore() tree.NodeStore {
return wr.mut.NodeStore()
}
// ConflictArtifactIter iters all the conflicts in ArtifactMap.
type ConflictArtifactIter struct {
itr artifactTypeIter
@@ -558,7 +579,7 @@ type Artifact struct {
Metadata []byte
}
func calcArtifactsDescriptors(srcKd val.TupleDesc) (kd, vd val.TupleDesc) {
func mergeArtifactsDescriptorsFromSource(srcKd val.TupleDesc) (kd, vd val.TupleDesc) {
// artifact key consists of keys of source schema, followed by target branch
// commit hash, and artifact type.

View File

@@ -0,0 +1,207 @@
// Copyright 2022 Dolthub, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package message
import (
"context"
"encoding/binary"
"fmt"
fb "github.com/google/flatbuffers/go"
"github.com/dolthub/dolt/go/gen/fb/serial"
"github.com/dolthub/dolt/go/store/hash"
"github.com/dolthub/dolt/go/store/pool"
"github.com/dolthub/dolt/go/store/val"
)
const (
// These constants are mirrored from serial.MergeArtifacts.KeyOffsets()
// and serial.MergeArtifacts.ValueOffsets() respectively.
// They are only as stable as the flatbuffers schema that define them.
mergeArtifactKeyOffsetsVOffset = 6
mergeArtifactValueOffsetsVOffset = 12
)
var mergeArtifactFileID = []byte(serial.MergeArtifactsFileID)
type MergeArtifactSerializer struct {
KeyDesc val.TupleDesc
Pool pool.BuffPool
}
var _ Serializer = MergeArtifactSerializer{}
func (s MergeArtifactSerializer) Serialize(keys, values [][]byte, subtrees []uint64, level int) serial.Message {
var (
keyTups, keyOffs fb.UOffsetT
valTups, valOffs fb.UOffsetT
keyAddrOffs fb.UOffsetT
refArr, cardArr fb.UOffsetT
)
keySz, valSz, bufSz := estimateMergeArtifactSize(keys, values, subtrees, s.KeyDesc.AddressFieldCount())
b := getFlatbufferBuilder(s.Pool, bufSz)
// serialize keys and offsets
keyTups = writeItemBytes(b, keys, keySz)
serial.MergeArtifactsStartKeyOffsetsVector(b, len(keys)-1)
keyOffs = writeItemOffsets(b, keys, keySz)
if level == 0 {
// serialize value tuples for leaf nodes
valTups = writeItemBytes(b, values, valSz)
serial.MergeArtifactsStartValueOffsetsVector(b, len(values)-1)
valOffs = writeItemOffsets(b, values, valSz)
// serialize offsets of chunk addresses within |keyTups|
if s.KeyDesc.AddressFieldCount() > 0 {
serial.MergeArtifactsStartKeyAddressOffsetsVector(b, countAddresses(keys, s.KeyDesc))
keyAddrOffs = writeAddressOffsets(b, keys, keySz, s.KeyDesc)
}
} else {
// serialize child refs and subtree counts for internal nodes
refArr = writeItemBytes(b, values, valSz)
cardArr = writeCountArray(b, subtrees)
}
// populate the node's vtable
serial.MergeArtifactsStart(b)
serial.MergeArtifactsAddKeyItems(b, keyTups)
serial.MergeArtifactsAddKeyOffsets(b, keyOffs)
if level == 0 {
serial.MergeArtifactsAddValueItems(b, valTups)
serial.MergeArtifactsAddValueOffsets(b, valOffs)
serial.MergeArtifactsAddTreeCount(b, uint64(len(keys)))
serial.MergeArtifactsAddKeyAddressOffsets(b, keyAddrOffs)
} else {
serial.MergeArtifactsAddAddressArray(b, refArr)
serial.MergeArtifactsAddSubtreeCounts(b, cardArr)
serial.MergeArtifactsAddTreeCount(b, sumSubtrees(subtrees))
}
serial.MergeArtifactsAddTreeLevel(b, uint8(level))
return serial.FinishMessage(b, serial.MergeArtifactsEnd(b), mergeArtifactFileID)
}
func getMergeArtifactKeys(msg serial.Message) (keys val.SlicedBuffer) {
ma := serial.GetRootAsMergeArtifacts(msg, serial.MessagePrefixSz)
keys.Buf = ma.KeyItemsBytes()
keys.Offs = getMergeArtifactKeyOffsets(ma)
return
}
func getMergeArtifactValues(msg serial.Message) (values val.SlicedBuffer) {
ma := serial.GetRootAsMergeArtifacts(msg, serial.MessagePrefixSz)
values.Buf = ma.ValueItemsBytes()
values.Offs = getMergeArtifactValueOffsets(ma)
return
}
func walkMergeArtifactAddresses(ctx context.Context, msg serial.Message, cb func(ctx context.Context, addr hash.Hash) error) error {
ma := serial.GetRootAsMergeArtifacts(msg, serial.MessagePrefixSz)
arr := ma.AddressArrayBytes()
for i := 0; i < len(arr)/hash.ByteLen; i++ {
addr := hash.New(arr[i*addrSize : (i+1)*addrSize])
if err := cb(ctx, addr); err != nil {
return err
}
}
cnt := ma.KeyAddressOffsetsLength()
arr2 := ma.KeyItemsBytes()
for i := 0; i < cnt; i++ {
o := ma.KeyAddressOffsets(i)
addr := hash.New(arr2[o : o+addrSize])
if err := cb(ctx, addr); err != nil {
return err
}
}
assertFalse((arr != nil) && (arr2 != nil))
return nil
}
func getMergeArtifactCount(msg serial.Message) uint16 {
ma := serial.GetRootAsMergeArtifacts(msg, serial.MessagePrefixSz)
if ma.KeyItemsLength() == 0 {
return 0
}
// zeroth offset ommitted from array
return uint16(ma.KeyOffsetsLength() + 1)
}
func getMergeArtifactTreeLevel(msg serial.Message) int {
ma := serial.GetRootAsMergeArtifacts(msg, serial.MessagePrefixSz)
return int(ma.TreeLevel())
}
func getMergeArtifactTreeCount(msg serial.Message) int {
ma := serial.GetRootAsMergeArtifacts(msg, serial.MessagePrefixSz)
return int(ma.TreeCount())
}
func getMergeArtifactSubtrees(msg serial.Message) []uint64 {
counts := make([]uint64, getMergeArtifactCount(msg))
ma := serial.GetRootAsMergeArtifacts(msg, serial.MessagePrefixSz)
return decodeVarints(ma.SubtreeCountsBytes(), counts)
}
func getMergeArtifactKeyOffsets(ma *serial.MergeArtifacts) []byte {
sz := ma.KeyOffsetsLength() * 2
tab := ma.Table()
vec := tab.Offset(mergeArtifactKeyOffsetsVOffset)
start := int(tab.Vector(fb.UOffsetT(vec)))
stop := start + sz
return tab.Bytes[start:stop]
}
func getMergeArtifactValueOffsets(ma *serial.MergeArtifacts) []byte {
sz := ma.ValueOffsetsLength() * 2
tab := ma.Table()
vec := tab.Offset(mergeArtifactValueOffsetsVOffset)
start := int(tab.Vector(fb.UOffsetT(vec)))
stop := start + sz
return tab.Bytes[start:stop]
}
// estimateMergeArtifact>Size returns the exact Size of the tuple vectors for keys and values,
// and an estimate of the overall Size of the final flatbuffer.
func estimateMergeArtifactSize(keys, values [][]byte, subtrees []uint64, keyAddrs int) (int, int, int) {
var keySz, valSz, bufSz int
for i := range keys {
keySz += len(keys[i])
valSz += len(values[i])
}
refCntSz := len(subtrees) * binary.MaxVarintLen64
// constraints enforced upstream
if keySz > int(MaxVectorOffset) {
panic(fmt.Sprintf("key vector exceeds Size limit ( %d > %d )", keySz, MaxVectorOffset))
}
if valSz > int(MaxVectorOffset) {
panic(fmt.Sprintf("value vector exceeds Size limit ( %d > %d )", valSz, MaxVectorOffset))
}
// todo(andy): better estimates
bufSz += keySz + valSz // tuples
bufSz += refCntSz // subtree counts
bufSz += len(keys)*2 + len(values)*2 // offsets
bufSz += 8 + 1 + 1 + 1 // metadata
bufSz += 72 // vtable (approx)
bufSz += 100 // padding?
bufSz += keyAddrs * len(keys) * 2
bufSz += serial.MessagePrefixSz
return keySz, valSz, bufSz
}

View File

@@ -39,6 +39,12 @@ func GetKeysAndValues(msg serial.Message) (keys, values val.SlicedBuffer, cnt ui
cnt = getAddressMapCount(msg)
return
}
if id == serial.MergeArtifactsFileID {
keys = getMergeArtifactKeys(msg)
values = getMergeArtifactValues(msg)
cnt = getMergeArtifactCount(msg)
return
}
if id == serial.CommitClosureFileID {
keys = getCommitClosureKeys(msg)
values = getCommitClosureValues(msg)
@@ -56,6 +62,8 @@ func WalkAddresses(ctx context.Context, msg serial.Message, cb func(ctx context.
return walkProllyMapAddresses(ctx, msg, cb)
case serial.AddressMapFileID:
return walkAddressMapAddresses(ctx, msg, cb)
case serial.MergeArtifactsFileID:
return walkMergeArtifactAddresses(ctx, msg, cb)
case serial.CommitClosureFileID:
return walkCommitClosureAddresses(ctx, msg, cb)
default:
@@ -70,6 +78,8 @@ func GetTreeLevel(msg serial.Message) int {
return getProllyMapTreeLevel(msg)
case serial.AddressMapFileID:
return getAddressMapTreeLevel(msg)
case serial.MergeArtifactsFileID:
return getMergeArtifactTreeLevel(msg)
case serial.CommitClosureFileID:
return getCommitClosureTreeLevel(msg)
default:
@@ -84,6 +94,8 @@ func GetTreeCount(msg serial.Message) int {
return getProllyMapTreeCount(msg)
case serial.AddressMapFileID:
return getAddressMapTreeCount(msg)
case serial.MergeArtifactsFileID:
return getMergeArtifactTreeCount(msg)
case serial.CommitClosureFileID:
return getCommitClosureTreeCount(msg)
default:
@@ -98,6 +110,8 @@ func GetSubtrees(msg serial.Message) []uint64 {
return getProllyMapSubtrees(msg)
case serial.AddressMapFileID:
return getAddressMapSubtrees(msg)
case serial.MergeArtifactsFileID:
return getMergeArtifactSubtrees(msg)
case serial.CommitClosureFileID:
return getCommitClosureSubtrees(msg)
default:

View File

@@ -52,7 +52,7 @@ func (s ProllyMapSerializer) Serialize(keys, values [][]byte, subtrees []uint64,
refArr, cardArr fb.UOffsetT
)
keySz, valSz, bufSz := estimateProllyMapSize(keys, values, subtrees, len(s.ValDesc.Addrs))
keySz, valSz, bufSz := estimateProllyMapSize(keys, values, subtrees, s.ValDesc.AddressFieldCount())
b := getFlatbufferBuilder(s.Pool, bufSz)
// serialize keys and offsets
@@ -65,9 +65,10 @@ func (s ProllyMapSerializer) Serialize(keys, values [][]byte, subtrees []uint64,
valTups = writeItemBytes(b, values, valSz)
serial.ProllyTreeNodeStartValueOffsetsVector(b, len(values)-1)
valOffs = writeItemOffsets(b, values, valSz)
if len(s.ValDesc.Addrs) > 0 {
serial.ProllyTreeNodeStartValueAddressOffsetsVector(b, len(values)*len(s.ValDesc.Addrs))
valAddrOffs = writeValAddrOffsets(b, values, valSz, s.ValDesc)
// serialize offsets of chunk addresses within |valTups|
if s.ValDesc.AddressFieldCount() > 0 {
serial.ProllyTreeNodeStartValueAddressOffsetsVector(b, countAddresses(values, s.ValDesc))
valAddrOffs = writeAddressOffsets(b, values, valSz, s.ValDesc)
}
} else {
// serialize child refs and subtree counts for internal nodes
@@ -212,7 +213,7 @@ func estimateProllyMapSize(keys, values [][]byte, subtrees []uint64, valAddrsCnt
bufSz += 8 + 1 + 1 + 1 // metadata
bufSz += 72 // vtable (approx)
bufSz += 100 // padding?
bufSz += valAddrsCnt * len(values)
bufSz += valAddrsCnt * len(values) * 2
bufSz += serial.MessagePrefixSz
return keySz, valSz, bufSz

View File

@@ -60,25 +60,39 @@ func writeItemOffsets(b *fb.Builder, items [][]byte, sumSz int) fb.UOffsetT {
return b.EndVector(cnt)
}
// writeAddrOffests returns offsets into the values array that correspond to addr root
// hashes that themselves have subtrees
func writeValAddrOffsets(b *fb.Builder, items [][]byte, sumSz int, valDesc val.TupleDesc) fb.UOffsetT {
// countAddresses returns the number of chunk addresses stored within |items|.
func countAddresses(items [][]byte, td val.TupleDesc) (cnt int) {
for i := len(items) - 1; i >= 0; i-- {
val.IterAddressFields(td, func(j int, t val.Type) {
// get offset of address withing |tup|
addr := val.Tuple(items[i]).GetField(j)
if len(addr) > 0 && !hash.New(addr).IsEmpty() {
cnt++
}
return
})
}
return
}
// writeAddressOffsets serializes an array of uint16 offsets representing address offsets within an array of items.
func writeAddressOffsets(b *fb.Builder, items [][]byte, sumSz int, td val.TupleDesc) fb.UOffsetT {
var cnt int
var off = sumSz
for i := len(items) - 1; i >= 0; i-- {
tup := val.Tuple(items[i])
off -= len(tup) // start of tuple
for _, j := range valDesc.Addrs {
// get index into value tuple pointing at address
o, _ := tup.GetOffset(j)
a := tup.GetField(j)
if len(a) == 0 || hash.New(a).IsEmpty() {
continue
val.IterAddressFields(td, func(j int, t val.Type) {
addr := val.Tuple(items[i]).GetField(j)
if len(addr) == 0 || hash.New(addr).IsEmpty() {
return
}
// get offset of address withing |tup|
o, _ := tup.GetOffset(j)
o += off // offset is tuple start plus field start
b.PrependUint16(uint16(o))
cnt++
}
})
}
return b.EndVector(cnt)
}

View File

@@ -50,13 +50,20 @@ func newMutableMap(m Map) MutableMap {
// Map materializes all pending and applied mutations in the MutableMap.
func (mut MutableMap) Map(ctx context.Context) (Map, error) {
serializer := message.ProllyMapSerializer{
Pool: mut.NodeStore().Pool(),
ValDesc: mut.valDesc,
}
return mut.flushWithSerializer(ctx, serializer)
}
func (mut MutableMap) flushWithSerializer(ctx context.Context, s message.Serializer) (Map, error) {
if err := mut.ApplyPending(ctx); err != nil {
return Map{}, err
}
tr := mut.tuples.tree
serializer := message.ProllyMapSerializer{Pool: tr.ns.Pool(), ValDesc: mut.valDesc}
root, err := tree.ApplyMutations(ctx, tr.ns, tr.root, serializer, mut.tuples.mutations(), tr.compareItems)
tr := mut.tuples.tree
root, err := tree.ApplyMutations(ctx, tr.ns, tr.root, s, mut.tuples.mutations(), tr.compareItems)
if err != nil {
return Map{}, err
}

View File

@@ -627,7 +627,7 @@ func (m Map) HumanReadableString() string {
}
// VisitMapLevelOrder writes hashes of internal node chunks to a writer
// delimited with a newline character and returns the number or chunks written and the total number of
// delimited with a newline character and returns the number of chunks written and the total number of
// bytes written or an error if encountered
func VisitMapLevelOrder(m Map, cb func(h hash.Hash) (int64, error)) (int64, int64, error) {
chunkCount := int64(0)

View File

@@ -242,9 +242,8 @@ func WalkAddrsForNBF(nbf *NomsBinFormat) func(chunks.Chunk, func(h hash.Hash, is
}
}
func WalkAddrs(v Value, nbf *NomsBinFormat, cb func(h hash.Hash, isleaf bool)) error {
func WalkAddrs(v Value, nbf *NomsBinFormat, cb func(h hash.Hash, isleaf bool) error) error {
return v.walkRefs(nbf, func(r Ref) error {
cb(r.TargetHash(), r.Height() == 1)
return nil
return cb(r.TargetHash(), r.Height() == 1)
})
}

View File

@@ -361,6 +361,8 @@ func (sm SerialMessage) walkRefs(nbf *NomsBinFormat, cb RefCallback) error {
fallthrough
case serial.AddressMapFileID:
fallthrough
case serial.MergeArtifactsFileID:
fallthrough
case serial.CommitClosureFileID:
return message.WalkAddresses(context.TODO(), serial.Message(sm), func(ctx context.Context, addr hash.Hash) error {
r, err := constructRef(nbf, addr, PrimitiveTypeMap[ValueKind], SerialMessageRefHeight)

View File

@@ -95,7 +95,8 @@ func PanicIfDangling(ctx context.Context, unresolved hash.HashSet, cs chunks.Chu
d.PanicIfError(err)
if len(absent) != 0 {
d.Panic("Found dangling references to %v", absent)
s := absent.String()
d.Panic("Found dangling references to %s", s)
}
}

View File

@@ -41,25 +41,28 @@ const (
type ByteSize uint16
const (
int8Size ByteSize = 1
uint8Size ByteSize = 1
int16Size ByteSize = 2
uint16Size ByteSize = 2
int32Size ByteSize = 4
uint32Size ByteSize = 4
int64Size ByteSize = 8
uint64Size ByteSize = 8
float32Size ByteSize = 4
float64Size ByteSize = 8
bit64Size ByteSize = 8
hash128Size ByteSize = 16
yearSize ByteSize = 1
dateSize ByteSize = 4
timeSize ByteSize = 8
datetimeSize ByteSize = 8
enumSize ByteSize = 2
setSize ByteSize = 8
addrSize ByteSize = hash.ByteLen
int8Size ByteSize = 1
uint8Size ByteSize = 1
int16Size ByteSize = 2
uint16Size ByteSize = 2
int32Size ByteSize = 4
uint32Size ByteSize = 4
int64Size ByteSize = 8
uint64Size ByteSize = 8
float32Size ByteSize = 4
float64Size ByteSize = 8
bit64Size ByteSize = 8
hash128Size ByteSize = 16
yearSize ByteSize = 1
dateSize ByteSize = 4
timeSize ByteSize = 8
datetimeSize ByteSize = 8
enumSize ByteSize = 2
setSize ByteSize = 8
bytesAddrEnc ByteSize = hash.ByteLen
commitAddrEnc ByteSize = hash.ByteLen
stringAddrEnc ByteSize = hash.ByteLen
jsonAddrEnc ByteSize = hash.ByteLen
)
type Encoding byte
@@ -133,10 +136,10 @@ func sizeFromType(t Type) (ByteSize, bool) {
return float32Size, true
case Float64Enc:
return float64Size, true
case Bit64Enc:
return bit64Size, true
case Hash128Enc:
return hash128Size, true
case BytesAddrEnc:
return addrSize, true
case YearEnc:
return yearSize, true
case DateEnc:
@@ -149,8 +152,14 @@ func sizeFromType(t Type) (ByteSize, bool) {
return enumSize, true
case SetEnc:
return setSize, true
case Bit64Enc:
return bit64Size, true
case BytesAddrEnc:
return bytesAddrEnc, true
case CommitAddrEnc:
return commitAddrEnc, true
case StringAddrEnc:
return stringAddrEnc, true
case JSONAddrEnc:
return jsonAddrEnc, true
default:
return 0, false
}
@@ -576,12 +585,12 @@ func writeRaw(buf, val []byte) {
}
func writeAddr(buf []byte, v []byte) {
expectSize(buf, addrSize)
expectSize(buf, hash.ByteLen)
copy(buf, v)
}
func readAddr(val []byte) hash.Hash {
expectSize(val, addrSize)
expectSize(val, hash.ByteLen)
return hash.New(val)
}

View File

@@ -147,7 +147,7 @@ func (tup Tuple) GetOffset(i int) (int, bool) {
start = readUint16(offs[pos : pos+2])
}
return int(start), start == stop
return int(start), start != stop
}
// GetField returns the value for field |i|.

View File

@@ -340,9 +340,9 @@ func (tb *TupleBuilder) PutJSONAddr(i int, v hash.Hash) {
}
func (tb *TupleBuilder) putAddr(i int, v hash.Hash) {
tb.fields[i] = tb.buf[tb.pos : tb.pos+addrSize]
tb.fields[i] = tb.buf[tb.pos : tb.pos+hash.ByteLen]
writeAddr(tb.fields[i], v[:])
tb.pos += addrSize
tb.pos += hash.ByteLen
}
func (tb *TupleBuilder) ensureCapacity(sz ByteSize) {

View File

@@ -32,7 +32,6 @@ type TupleDesc struct {
Types []Type
cmp TupleComparator
fast fixedAccess
Addrs []int
}
// NewTupleDescriptor makes a TupleDescriptor from |types|.
@@ -45,31 +44,30 @@ func NewTupleDescriptorWithComparator(cmp TupleComparator, types ...Type) (td Tu
if len(types) > MaxTupleFields {
panic("tuple field maxIdx exceeds maximum")
}
for _, typ := range types {
if typ.Enc == NullEnc {
panic("invalid encoding")
}
}
var addrIdxs []int
for i, t := range types {
switch t.Enc {
case BytesAddrEnc, StringAddrEnc, JSONAddrEnc:
addrIdxs = append(addrIdxs, i)
}
}
td = TupleDesc{
Types: types,
cmp: cmp,
fast: makeFixedAccess(types),
Addrs: addrIdxs,
}
return
}
func IterAddressFields(td TupleDesc, cb func(int, Type)) {
for i, typ := range td.Types {
switch typ.Enc {
case BytesAddrEnc, StringAddrEnc,
JSONAddrEnc, CommitAddrEnc:
cb(i, typ)
}
}
}
type fixedAccess [][2]ByteSize
func makeFixedAccess(types []Type) (acc fixedAccess) {
@@ -90,6 +88,13 @@ func makeFixedAccess(types []Type) (acc fixedAccess) {
return
}
func (td TupleDesc) AddressFieldCount() (n int) {
IterAddressFields(td, func(int, Type) {
n++
})
return
}
// PrefixDesc returns a descriptor for the first n types.
func (td TupleDesc) PrefixDesc(n int) TupleDesc {
return NewTupleDescriptor(td.Types[:n]...)

View File

@@ -0,0 +1,42 @@
// Copyright 2022 Dolthub, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package val
import (
"testing"
"unsafe"
"github.com/stretchr/testify/assert"
)
func TestTupleDescriptorSize(t *testing.T) {
sz := unsafe.Sizeof(TupleDesc{})
assert.Equal(t, 64, int(sz))
}
func TestTupleDescriptorAddressTypes(t *testing.T) {
types := []Type{
{Enc: BytesAddrEnc},
{Enc: CommitAddrEnc},
{Enc: StringAddrEnc},
{Enc: JSONAddrEnc},
}
td := NewTupleDescriptor(types...)
assert.Equal(t, 4, td.AddressFieldCount())
IterAddressFields(td, func(i int, typ Type) {
assert.Equal(t, types[i], typ)
})
}

View File

@@ -3,7 +3,6 @@ load $BATS_TEST_DIRNAME/helper/common.bash
remotesrv_pid=
setup() {
skip_nbf_dolt_1
skiponwindows "tests are flaky on Windows"
setup_common
cd $BATS_TMPDIR
@@ -152,6 +151,7 @@ setup_merge() {
}
@test "garbage_collection: leave merge commit" {
skip_nbf_dolt_1
setup_merge
dolt merge other

View File

@@ -69,6 +69,12 @@ skip_nbf_dolt_1() {
fi
}
skip_nbf_dolt_dev() {
if [ "$DOLT_DEFAULT_BIN_FORMAT" = "__DOLT_DEV__" ]; then
skip "skipping test for nomsBinFormat __DOLT_DEV__"
fi
}
setup_common() {
setup_no_dolt_init
dolt init

View File

@@ -0,0 +1,133 @@
#!/usr/bin/env bats
load $BATS_TEST_DIRNAME/helper/common.bash
setup() {
skip_nbf_dolt_1
skip_nbf_dolt_dev
TARGET_NBF="__DOLT_DEV__"
setup_common
}
teardown() {
teardown_common
}
function checksum_table {
QUERY="SELECT GROUP_CONCAT(column_name) FROM information_schema.columns WHERE table_name = '$1'"
COLUMNS=$( dolt sql -q "$QUERY" -r csv | tail -n1 | sed 's/"//g' )
dolt sql -q "SELECT CAST(SUM(CRC32(CONCAT($COLUMNS))) AS UNSIGNED) FROM $1 AS OF '$2';" -r csv | tail -n1
}
@test "migrate: smoke test" {
dolt sql <<SQL
CREATE TABLE test (pk int primary key, c0 int, c1 int);
INSERT INTO test VALUES (0,0,0);
CALL dadd('-A');
CALL dcommit('-am', 'added table test');
SQL
CHECKSUM=$(checksum_table test head)
run cat ./.dolt/noms/manifest
[[ "$output" =~ "__LD_1__" ]] || false
[[ ! "$output" =~ "$TARGET_NBF" ]] || false
dolt migrate
run cat ./.dolt/noms/manifest
[[ "$output" =~ "$TARGET_NBF" ]] || false
[[ ! "$output" =~ "__LD_1__" ]] || false
run checksum_table test head
[[ "$output" =~ "$CHECKSUM" ]] || false
run dolt sql -q "SELECT count(*) FROM dolt_commits" -r csv
[ $status -eq 0 ]
[[ "$output" =~ "2" ]] || false
}
@test "migrate: manifest backup" {
dolt sql <<SQL
CREATE TABLE test (pk int primary key, c0 int, c1 int);
INSERT INTO test VALUES (0,0,0);
CALL dadd('-A');
CALL dcommit('-am', 'added table test');
SQL
dolt migrate
run cat ./.dolt/noms/manifest.bak
[[ "$output" =~ "__LD_1__" ]] || false
[[ ! "$output" =~ "$TARGET_NBF" ]] || false
}
@test "migrate: multiple branches" {
dolt sql <<SQL
CREATE TABLE test (pk int primary key, c0 int, c1 int);
INSERT INTO test VALUES (0,0,0);
CALL dadd('-A');
CALL dcommit('-am', 'added table test');
SQL
dolt branch one
dolt branch two
dolt sql <<SQL
CALL dcheckout('one');
INSERT INTO test VALUES (1,1,1);
CALL dcommit('-am', 'row (1,1,1)');
CALL dcheckout('two');
INSERT INTO test VALUES (2,2,2);
CALL dcommit('-am', 'row (2,2,2)');
CALL dmerge('one');
SQL
MAIN=$(checksum_table test main)
ONE=$(checksum_table test one)
TWO=$(checksum_table test two)
dolt migrate
run cat ./.dolt/noms/manifest
[[ "$output" =~ "$TARGET_NBF" ]] || false
run checksum_table test main
[[ "$output" =~ "$MAIN" ]] || false
run checksum_table test one
[[ "$output" =~ "$ONE" ]] || false
run checksum_table test two
[[ "$output" =~ "$TWO" ]] || false
run dolt sql -q "SELECT count(*) FROM dolt_commits" -r csv
[ $status -eq 0 ]
[[ "$output" =~ "4" ]] || false
}
@test "migrate: tag and working set" {
dolt sql <<SQL
CREATE TABLE test (pk int primary key, c0 int, c1 int);
INSERT INTO test VALUES (0,0,0);
CALL dadd('-A');
CALL dcommit('-am', 'added table test');
CALL dtag('tag1', 'head');
INSERT INTO test VALUES (1,1,1);
CALL dcommit('-am', 'added rows');
INSERT INTO test VALUES (2,2,2);
SQL
HEAD=$(checksum_table test head)
PREV=$(checksum_table test head~1)
TAG=$(checksum_table test tag1)
[ $TAG -eq $PREV ]
dolt migrate
run cat ./.dolt/noms/manifest
[[ "$output" =~ "$TARGET_NBF" ]] || false
run checksum_table test head
[[ "$output" =~ "$HEAD" ]] || false
run checksum_table test head~1
[[ "$output" =~ "$PREV" ]] || false
run checksum_table test tag1
[[ "$output" =~ "$TAG" ]] || false
}

View File

@@ -546,12 +546,12 @@ SQL
"
server_query repo1 1 "SELECT * FROM test" "pk\n0\n1\n2"
multi_query repo1 1 "
SELECT DOLT_CHECKOUT('feature-branch');
SELECT DOLT_COMMIT('-a', '-m', 'Insert 3');
"
multi_query repo1 1 "
INSERT INTO test VALUES (500000);
INSERT INTO test VALUES (500001);
@@ -562,7 +562,7 @@ SQL
SELECT DOLT_MERGE('feature-branch');
SELECT DOLT_COMMIT('-a', '-m', 'Finish up Merge');
"
server_query repo1 1 "SELECT * FROM test order by pk" "pk\n0\n1\n2\n3\n21\n60"
run dolt status
@@ -814,7 +814,7 @@ SQL
multi_query repo1 1 '
USE `repo1/feature-branch`;
CREATE TABLE test (
CREATE TABLE test (
pk int,
c1 int,
PRIMARY KEY (pk)
@@ -1155,7 +1155,7 @@ END""")
[ -d test3 ]
[ ! -d test2 ]
# make sure the databases exist on restart
stop_sql_server
start_sql_server
@@ -1172,7 +1172,7 @@ END""")
server_query "" 1 "create database test1"
server_query "" 1 "create database test2"
server_query "" 1 "create database test3"
server_query "" 1 "show databases" "Database\ninformation_schema\ntest1\ntest2\ntest3"
server_query "test1" 1 "create table a(x int)"
server_query "test1" 1 "insert into a values (1), (2)"
@@ -1185,7 +1185,7 @@ END""")
server_query "test3" 1 "create table a(x int)"
server_query "test3" 1 "insert into a values (5), (6)"
run server_query "test3" 1 "select dolt_commit('-a', '-m', 'new table a')"
run server_query "test1" 1 "select dolt_checkout('-b', 'newbranch')"
server_query "test1/newbranch" 1 "select * from a" "x\n1\n2"
@@ -1208,7 +1208,7 @@ END""")
run server_query "test2/newbranch" 1 "select * from a"
[ "$status" -ne 0 ]
[[ "$output" =~ "database not found: test2/newbranch" ]] || false
server_query "test3" 1 "select * from a" "x\n5\n6"
}
@@ -1248,7 +1248,7 @@ END""")
run server_query "test1" 1 "select dolt_commit('-a', '-m', 'new table a')"
[ -d db_dir/test1 ]
cd db_dir/test1
run dolt log
[ "$status" -eq 0 ]
@@ -1265,7 +1265,7 @@ END""")
[ -d db_dir/test3 ]
[ ! -d db_dir/test1 ]
# make sure the databases exist on restart
stop_sql_server
start_sql_server_with_args --host 0.0.0.0 --user dolt --data-dir=db_dir
@@ -1279,7 +1279,7 @@ END""")
mkdir dir_exists
touch file_exists
start_sql_server
server_query "" 1 "create database test1"
# Error on creation, already exists
@@ -1287,7 +1287,7 @@ END""")
# Files / dirs in the way
server_query "" 1 "create database dir_exists" "" "exists"
server_query "" 1 "create database file_exists" "" "exists"
server_query "" 1 "create database file_exists" "" "exists"
}
@test "sql-server: create database with existing repo" {
@@ -1295,7 +1295,7 @@ END""")
cd repo1
start_sql_server
server_query "" 1 "create database test1"
server_query "repo1" 1 "show databases" "Database\ninformation_schema\nrepo1\ntest1"
server_query "test1" 1 "create table a(x int)"
@@ -1438,3 +1438,54 @@ databases:
[ "$status" -eq 1 ]
[[ "$output" =~ "database is locked to writes" ]] || false
}
@test "sql-server: server fails to start up if there is already a file in the socket file path" {
skiponwindows "unix socket is not available on Windows"
cd repo2
touch mysql.sock
run pwd
REPO_NAME=$output
let PORT="$$ % (65536-1024) + 1024"
dolt sql-server --port=$PORT --socket="$REPO_NAME/mysql.sock" --user dolt > log.txt 2>&1 &
SERVER_PID=$!
run wait_for_connection $PORT 5000
[ "$status" -eq 1 ]
run grep 'address already in use' log.txt
[ "$status" -eq 0 ]
[ "${#lines[@]}" -eq 1 ]
}
@test "sql-server: start server with yaml config with socket file path defined" {
skiponwindows "unix socket is not available on Windows"
cd repo2
DEFAULT_DB="repo2"
let PORT="$$ % (65536-1024) + 1024"
echo "
log_level: debug
user:
name: dolt
listener:
host: localhost
port: $PORT
max_connections: 10
socket: /tmp/mysql.sock
behavior:
autocommit: true" > server.yaml
dolt sql-server --config server.yaml > log.txt 2>&1 &
SERVER_PID=$!
wait_for_connection $PORT 5000
server_query repo2 1 "select 1 as col1" "col1\n1"
run grep '\"/tmp/mysql.sock\"' log.txt
[ "$status" -eq 0 ]
[ "${#lines[@]}" -eq 1 ]
}