diff --git a/go/cmd/dolt/commands/engine/sqlengine.go b/go/cmd/dolt/commands/engine/sqlengine.go index 4d79052e9b..299a9de15e 100644 --- a/go/cmd/dolt/commands/engine/sqlengine.go +++ b/go/cmd/dolt/commands/engine/sqlengine.go @@ -132,9 +132,17 @@ func NewSqlEngine( pro.InitDatabaseHook = cluster.NewInitDatabaseHook(config.ClusterController, bThreads, pro.InitDatabaseHook) config.ClusterController.ManageDatabaseProvider(pro) + // Create the engine + engine := gms.New(analyzer.NewBuilder(pro).WithParallelism(parallelism).Build(), &gms.Config{ + IsReadOnly: config.IsReadOnly, + IsServerLocked: config.IsServerLocked, + }).WithBackgroundThreads(bThreads) + // Load in privileges from file, if it exists - persister := mysql_file_handler.NewPersister(config.PrivFilePath, config.DoltCfgDirPath) - data, err := persister.LoadData() + var persister cluster.MySQLDbPersister + persister = mysql_file_handler.NewPersister(config.PrivFilePath, config.DoltCfgDirPath) + persister = config.ClusterController.HookMySQLDbPersister(persister, engine.Analyzer.Catalog.MySQLDb) + data, err := persister.LoadData(ctx) if err != nil { return nil, err } @@ -145,11 +153,7 @@ func NewSqlEngine( return nil, err } - // Set up engine - engine := gms.New(analyzer.NewBuilder(pro).WithParallelism(parallelism).Build(), &gms.Config{ - IsReadOnly: config.IsReadOnly, - IsServerLocked: config.IsServerLocked, - }).WithBackgroundThreads(bThreads) + // Setup the engine. engine.Analyzer.Catalog.MySQLDb.SetPersister(persister) engine.Analyzer.Catalog.MySQLDb.SetPlugins(map[string]mysql_db.PlaintextAuthPlugin{ diff --git a/go/cmd/dolt/commands/sqlserver/server.go b/go/cmd/dolt/commands/sqlserver/server.go index a1ecb9def3..7859c9315a 100644 --- a/go/cmd/dolt/commands/sqlserver/server.go +++ b/go/cmd/dolt/commands/sqlserver/server.go @@ -312,6 +312,7 @@ func Serve( startError = err return } + clusterController.RegisterGrpcServices(clusterRemoteSrv.GrpcServer()) listeners, err := clusterRemoteSrv.Listeners() if err != nil { diff --git a/go/gen/proto/dolt/services/replicationapi/v1alpha1/replication.pb.go b/go/gen/proto/dolt/services/replicationapi/v1alpha1/replication.pb.go new file mode 100644 index 0000000000..b5431cdf41 --- /dev/null +++ b/go/gen/proto/dolt/services/replicationapi/v1alpha1/replication.pb.go @@ -0,0 +1,237 @@ +// Copyright 2023 Dolthub, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.28.1 +// protoc v4.22.0 +// source: dolt/services/replicationapi/v1alpha1/replication.proto + +package replicationapi + +import ( + reflect "reflect" + sync "sync" + + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +type UpdateUsersAndGrantsRequest struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + // The contents of the *MySQLDb instance, as seen by a Persister + // implementation. + SerializedContents []byte `protobuf:"bytes,1,opt,name=serialized_contents,json=serializedContents,proto3" json:"serialized_contents,omitempty"` +} + +func (x *UpdateUsersAndGrantsRequest) Reset() { + *x = UpdateUsersAndGrantsRequest{} + if protoimpl.UnsafeEnabled { + mi := &file_dolt_services_replicationapi_v1alpha1_replication_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *UpdateUsersAndGrantsRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*UpdateUsersAndGrantsRequest) ProtoMessage() {} + +func (x *UpdateUsersAndGrantsRequest) ProtoReflect() protoreflect.Message { + mi := &file_dolt_services_replicationapi_v1alpha1_replication_proto_msgTypes[0] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use UpdateUsersAndGrantsRequest.ProtoReflect.Descriptor instead. +func (*UpdateUsersAndGrantsRequest) Descriptor() ([]byte, []int) { + return file_dolt_services_replicationapi_v1alpha1_replication_proto_rawDescGZIP(), []int{0} +} + +func (x *UpdateUsersAndGrantsRequest) GetSerializedContents() []byte { + if x != nil { + return x.SerializedContents + } + return nil +} + +type UpdateUsersAndGrantsResponse struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields +} + +func (x *UpdateUsersAndGrantsResponse) Reset() { + *x = UpdateUsersAndGrantsResponse{} + if protoimpl.UnsafeEnabled { + mi := &file_dolt_services_replicationapi_v1alpha1_replication_proto_msgTypes[1] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *UpdateUsersAndGrantsResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*UpdateUsersAndGrantsResponse) ProtoMessage() {} + +func (x *UpdateUsersAndGrantsResponse) ProtoReflect() protoreflect.Message { + mi := &file_dolt_services_replicationapi_v1alpha1_replication_proto_msgTypes[1] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use UpdateUsersAndGrantsResponse.ProtoReflect.Descriptor instead. +func (*UpdateUsersAndGrantsResponse) Descriptor() ([]byte, []int) { + return file_dolt_services_replicationapi_v1alpha1_replication_proto_rawDescGZIP(), []int{1} +} + +var File_dolt_services_replicationapi_v1alpha1_replication_proto protoreflect.FileDescriptor + +var file_dolt_services_replicationapi_v1alpha1_replication_proto_rawDesc = []byte{ + 0x0a, 0x37, 0x64, 0x6f, 0x6c, 0x74, 0x2f, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x73, 0x2f, + 0x72, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x61, 0x70, 0x69, 0x2f, 0x76, + 0x31, 0x61, 0x6c, 0x70, 0x68, 0x61, 0x31, 0x2f, 0x72, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x74, + 0x69, 0x6f, 0x6e, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x25, 0x64, 0x6f, 0x6c, 0x74, 0x2e, + 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x73, 0x2e, 0x72, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, + 0x74, 0x69, 0x6f, 0x6e, 0x61, 0x70, 0x69, 0x2e, 0x76, 0x31, 0x61, 0x6c, 0x70, 0x68, 0x61, 0x31, + 0x22, 0x4e, 0x0a, 0x1b, 0x55, 0x70, 0x64, 0x61, 0x74, 0x65, 0x55, 0x73, 0x65, 0x72, 0x73, 0x41, + 0x6e, 0x64, 0x47, 0x72, 0x61, 0x6e, 0x74, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, + 0x2f, 0x0a, 0x13, 0x73, 0x65, 0x72, 0x69, 0x61, 0x6c, 0x69, 0x7a, 0x65, 0x64, 0x5f, 0x63, 0x6f, + 0x6e, 0x74, 0x65, 0x6e, 0x74, 0x73, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x12, 0x73, 0x65, + 0x72, 0x69, 0x61, 0x6c, 0x69, 0x7a, 0x65, 0x64, 0x43, 0x6f, 0x6e, 0x74, 0x65, 0x6e, 0x74, 0x73, + 0x22, 0x1e, 0x0a, 0x1c, 0x55, 0x70, 0x64, 0x61, 0x74, 0x65, 0x55, 0x73, 0x65, 0x72, 0x73, 0x41, + 0x6e, 0x64, 0x47, 0x72, 0x61, 0x6e, 0x74, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, + 0x32, 0xb6, 0x01, 0x0a, 0x12, 0x52, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, + 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x12, 0x9f, 0x01, 0x0a, 0x14, 0x55, 0x70, 0x64, 0x61, + 0x74, 0x65, 0x55, 0x73, 0x65, 0x72, 0x73, 0x41, 0x6e, 0x64, 0x47, 0x72, 0x61, 0x6e, 0x74, 0x73, + 0x12, 0x42, 0x2e, 0x64, 0x6f, 0x6c, 0x74, 0x2e, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x73, + 0x2e, 0x72, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x61, 0x70, 0x69, 0x2e, + 0x76, 0x31, 0x61, 0x6c, 0x70, 0x68, 0x61, 0x31, 0x2e, 0x55, 0x70, 0x64, 0x61, 0x74, 0x65, 0x55, + 0x73, 0x65, 0x72, 0x73, 0x41, 0x6e, 0x64, 0x47, 0x72, 0x61, 0x6e, 0x74, 0x73, 0x52, 0x65, 0x71, + 0x75, 0x65, 0x73, 0x74, 0x1a, 0x43, 0x2e, 0x64, 0x6f, 0x6c, 0x74, 0x2e, 0x73, 0x65, 0x72, 0x76, + 0x69, 0x63, 0x65, 0x73, 0x2e, 0x72, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, + 0x61, 0x70, 0x69, 0x2e, 0x76, 0x31, 0x61, 0x6c, 0x70, 0x68, 0x61, 0x31, 0x2e, 0x55, 0x70, 0x64, + 0x61, 0x74, 0x65, 0x55, 0x73, 0x65, 0x72, 0x73, 0x41, 0x6e, 0x64, 0x47, 0x72, 0x61, 0x6e, 0x74, + 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x42, 0x5b, 0x5a, 0x59, 0x67, 0x69, 0x74, + 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x64, 0x6f, 0x6c, 0x74, 0x68, 0x75, 0x62, 0x2f, + 0x64, 0x6f, 0x6c, 0x74, 0x2f, 0x67, 0x6f, 0x2f, 0x67, 0x65, 0x6e, 0x2f, 0x70, 0x72, 0x6f, 0x74, + 0x6f, 0x2f, 0x64, 0x6f, 0x6c, 0x74, 0x2f, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x73, 0x2f, + 0x72, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x61, 0x70, 0x69, 0x2f, 0x76, + 0x31, 0x61, 0x6c, 0x70, 0x68, 0x61, 0x31, 0x3b, 0x72, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x74, + 0x69, 0x6f, 0x6e, 0x61, 0x70, 0x69, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, +} + +var ( + file_dolt_services_replicationapi_v1alpha1_replication_proto_rawDescOnce sync.Once + file_dolt_services_replicationapi_v1alpha1_replication_proto_rawDescData = file_dolt_services_replicationapi_v1alpha1_replication_proto_rawDesc +) + +func file_dolt_services_replicationapi_v1alpha1_replication_proto_rawDescGZIP() []byte { + file_dolt_services_replicationapi_v1alpha1_replication_proto_rawDescOnce.Do(func() { + file_dolt_services_replicationapi_v1alpha1_replication_proto_rawDescData = protoimpl.X.CompressGZIP(file_dolt_services_replicationapi_v1alpha1_replication_proto_rawDescData) + }) + return file_dolt_services_replicationapi_v1alpha1_replication_proto_rawDescData +} + +var file_dolt_services_replicationapi_v1alpha1_replication_proto_msgTypes = make([]protoimpl.MessageInfo, 2) +var file_dolt_services_replicationapi_v1alpha1_replication_proto_goTypes = []interface{}{ + (*UpdateUsersAndGrantsRequest)(nil), // 0: dolt.services.replicationapi.v1alpha1.UpdateUsersAndGrantsRequest + (*UpdateUsersAndGrantsResponse)(nil), // 1: dolt.services.replicationapi.v1alpha1.UpdateUsersAndGrantsResponse +} +var file_dolt_services_replicationapi_v1alpha1_replication_proto_depIdxs = []int32{ + 0, // 0: dolt.services.replicationapi.v1alpha1.ReplicationService.UpdateUsersAndGrants:input_type -> dolt.services.replicationapi.v1alpha1.UpdateUsersAndGrantsRequest + 1, // 1: dolt.services.replicationapi.v1alpha1.ReplicationService.UpdateUsersAndGrants:output_type -> dolt.services.replicationapi.v1alpha1.UpdateUsersAndGrantsResponse + 1, // [1:2] is the sub-list for method output_type + 0, // [0:1] is the sub-list for method input_type + 0, // [0:0] is the sub-list for extension type_name + 0, // [0:0] is the sub-list for extension extendee + 0, // [0:0] is the sub-list for field type_name +} + +func init() { file_dolt_services_replicationapi_v1alpha1_replication_proto_init() } +func file_dolt_services_replicationapi_v1alpha1_replication_proto_init() { + if File_dolt_services_replicationapi_v1alpha1_replication_proto != nil { + return + } + if !protoimpl.UnsafeEnabled { + file_dolt_services_replicationapi_v1alpha1_replication_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*UpdateUsersAndGrantsRequest); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_dolt_services_replicationapi_v1alpha1_replication_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*UpdateUsersAndGrantsResponse); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + } + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: file_dolt_services_replicationapi_v1alpha1_replication_proto_rawDesc, + NumEnums: 0, + NumMessages: 2, + NumExtensions: 0, + NumServices: 1, + }, + GoTypes: file_dolt_services_replicationapi_v1alpha1_replication_proto_goTypes, + DependencyIndexes: file_dolt_services_replicationapi_v1alpha1_replication_proto_depIdxs, + MessageInfos: file_dolt_services_replicationapi_v1alpha1_replication_proto_msgTypes, + }.Build() + File_dolt_services_replicationapi_v1alpha1_replication_proto = out.File + file_dolt_services_replicationapi_v1alpha1_replication_proto_rawDesc = nil + file_dolt_services_replicationapi_v1alpha1_replication_proto_goTypes = nil + file_dolt_services_replicationapi_v1alpha1_replication_proto_depIdxs = nil +} diff --git a/go/gen/proto/dolt/services/replicationapi/v1alpha1/replication_grpc.pb.go b/go/gen/proto/dolt/services/replicationapi/v1alpha1/replication_grpc.pb.go new file mode 100644 index 0000000000..15a4e0dda3 --- /dev/null +++ b/go/gen/proto/dolt/services/replicationapi/v1alpha1/replication_grpc.pb.go @@ -0,0 +1,132 @@ +// Copyright 2023 Dolthub, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Code generated by protoc-gen-go-grpc. DO NOT EDIT. +// versions: +// - protoc-gen-go-grpc v1.2.0 +// - protoc v4.22.0 +// source: dolt/services/replicationapi/v1alpha1/replication.proto + +package replicationapi + +import ( + context "context" + + grpc "google.golang.org/grpc" + codes "google.golang.org/grpc/codes" + status "google.golang.org/grpc/status" +) + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the grpc package it is being compiled against. +// Requires gRPC-Go v1.32.0 or later. +const _ = grpc.SupportPackageIsVersion7 + +// ReplicationServiceClient is the client API for ReplicationService service. +// +// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream. +type ReplicationServiceClient interface { + // Users and grants in Dolt are stored in in a + // `github.com/dolthub/go-mysql-server/sql/mysql_db.*MySQLDb` instance. This + // method is called by a primary on a standby instance in order to set its + // in-effect users and grants. Its primary payload is the serialized contents + // of the `*MySQLDb` instance at the primary, such that it can be applied + // with OverwriteUsersAndGrantData. + UpdateUsersAndGrants(ctx context.Context, in *UpdateUsersAndGrantsRequest, opts ...grpc.CallOption) (*UpdateUsersAndGrantsResponse, error) +} + +type replicationServiceClient struct { + cc grpc.ClientConnInterface +} + +func NewReplicationServiceClient(cc grpc.ClientConnInterface) ReplicationServiceClient { + return &replicationServiceClient{cc} +} + +func (c *replicationServiceClient) UpdateUsersAndGrants(ctx context.Context, in *UpdateUsersAndGrantsRequest, opts ...grpc.CallOption) (*UpdateUsersAndGrantsResponse, error) { + out := new(UpdateUsersAndGrantsResponse) + err := c.cc.Invoke(ctx, "/dolt.services.replicationapi.v1alpha1.ReplicationService/UpdateUsersAndGrants", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +// ReplicationServiceServer is the server API for ReplicationService service. +// All implementations must embed UnimplementedReplicationServiceServer +// for forward compatibility +type ReplicationServiceServer interface { + // Users and grants in Dolt are stored in in a + // `github.com/dolthub/go-mysql-server/sql/mysql_db.*MySQLDb` instance. This + // method is called by a primary on a standby instance in order to set its + // in-effect users and grants. Its primary payload is the serialized contents + // of the `*MySQLDb` instance at the primary, such that it can be applied + // with OverwriteUsersAndGrantData. + UpdateUsersAndGrants(context.Context, *UpdateUsersAndGrantsRequest) (*UpdateUsersAndGrantsResponse, error) + mustEmbedUnimplementedReplicationServiceServer() +} + +// UnimplementedReplicationServiceServer must be embedded to have forward compatible implementations. +type UnimplementedReplicationServiceServer struct { +} + +func (UnimplementedReplicationServiceServer) UpdateUsersAndGrants(context.Context, *UpdateUsersAndGrantsRequest) (*UpdateUsersAndGrantsResponse, error) { + return nil, status.Errorf(codes.Unimplemented, "method UpdateUsersAndGrants not implemented") +} +func (UnimplementedReplicationServiceServer) mustEmbedUnimplementedReplicationServiceServer() {} + +// UnsafeReplicationServiceServer may be embedded to opt out of forward compatibility for this service. +// Use of this interface is not recommended, as added methods to ReplicationServiceServer will +// result in compilation errors. +type UnsafeReplicationServiceServer interface { + mustEmbedUnimplementedReplicationServiceServer() +} + +func RegisterReplicationServiceServer(s grpc.ServiceRegistrar, srv ReplicationServiceServer) { + s.RegisterService(&ReplicationService_ServiceDesc, srv) +} + +func _ReplicationService_UpdateUsersAndGrants_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(UpdateUsersAndGrantsRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(ReplicationServiceServer).UpdateUsersAndGrants(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/dolt.services.replicationapi.v1alpha1.ReplicationService/UpdateUsersAndGrants", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(ReplicationServiceServer).UpdateUsersAndGrants(ctx, req.(*UpdateUsersAndGrantsRequest)) + } + return interceptor(ctx, in, info, handler) +} + +// ReplicationService_ServiceDesc is the grpc.ServiceDesc for ReplicationService service. +// It's only intended for direct use with grpc.RegisterService, +// and not to be introspected or modified (even as a copy) +var ReplicationService_ServiceDesc = grpc.ServiceDesc{ + ServiceName: "dolt.services.replicationapi.v1alpha1.ReplicationService", + HandlerType: (*ReplicationServiceServer)(nil), + Methods: []grpc.MethodDesc{ + { + MethodName: "UpdateUsersAndGrants", + Handler: _ReplicationService_UpdateUsersAndGrants_Handler, + }, + }, + Streams: []grpc.StreamDesc{}, + Metadata: "dolt/services/replicationapi/v1alpha1/replication.proto", +} diff --git a/go/libraries/doltcore/remotesrv/server.go b/go/libraries/doltcore/remotesrv/server.go index 7087ba4b3a..406e9e3c46 100644 --- a/go/libraries/doltcore/remotesrv/server.go +++ b/go/libraries/doltcore/remotesrv/server.go @@ -160,6 +160,12 @@ func (s *Server) Listeners() (Listeners, error) { return Listeners{http: httpListener, grpc: grpcListener}, nil } +// Can be used to register more services on the server. +// Should only be accessed before `Serve` is called. +func (s *Server) GrpcServer() *grpc.Server { + return s.grpcSrv +} + func (s *Server) Serve(listeners Listeners) { if listeners.grpc != nil { go func() { diff --git a/go/libraries/doltcore/sqle/cluster/controller.go b/go/libraries/doltcore/sqle/cluster/controller.go index bd8b047cf1..9e39e1ff49 100644 --- a/go/libraries/doltcore/sqle/cluster/controller.go +++ b/go/libraries/doltcore/sqle/cluster/controller.go @@ -31,11 +31,13 @@ import ( "time" "github.com/dolthub/go-mysql-server/sql" + "github.com/dolthub/go-mysql-server/sql/mysql_db" gmstypes "github.com/dolthub/go-mysql-server/sql/types" "github.com/sirupsen/logrus" "google.golang.org/grpc" "google.golang.org/grpc/credentials" + replicationapi "github.com/dolthub/dolt/go/gen/proto/dolt/services/replicationapi/v1alpha1" "github.com/dolthub/dolt/go/libraries/doltcore/creds" "github.com/dolthub/dolt/go/libraries/doltcore/dbfactory" "github.com/dolthub/dolt/go/libraries/doltcore/doltdb" @@ -79,6 +81,10 @@ type Controller struct { grpcCreds credentials.PerRPCCredentials pub ed25519.PublicKey priv ed25519.PrivateKey + + mysqlDb *mysql_db.MySQLDb + mysqlDbPersister *replicatingPersister + mysqlDbReplicas []*mysqlDbReplica } type sqlvars interface { @@ -154,6 +160,19 @@ func NewController(lgr *logrus.Logger, cfg Config, pCfg config.ReadWriteConfig) ret.sinterceptor.keyProvider = ret.jwks ret.sinterceptor.jwtExpected = JWTExpectations() + clients, err := ret.replicationServiceClients(context.Background()) + if err != nil { + return nil, err + } + ret.mysqlDbReplicas = make([]*mysqlDbReplica, len(clients)) + for i := range ret.mysqlDbReplicas { + ret.mysqlDbReplicas[i] = &mysqlDbReplica{ + lgr: lgr.WithFields(logrus.Fields{}), + client: clients[i], + } + ret.mysqlDbReplicas[i].cond = sync.NewCond(&ret.mysqlDbReplicas[i].mu) + } + return ret, nil } @@ -164,11 +183,17 @@ func (c *Controller) Run() { defer wg.Done() c.jwks.Run() }() + wg.Add(1) + go func() { + defer wg.Done() + c.mysqlDbPersister.Run() + }() wg.Wait() } func (c *Controller) GracefulStop() error { c.jwks.GracefulStop() + c.mysqlDbPersister.GracefulStop() return nil } @@ -448,6 +473,10 @@ func (c *Controller) setRoleAndEpoch(role string, epoch int, opts roleTransition for _, h := range c.commithooks { h.setRole(c.role) } + + // TODO: For a graceful transition, this should true up the + // replicas the same as we do for replication hooks. + c.mysqlDbPersister.setRole(c.role) } _ = c.persistVariables() return roleTransitionResult{ @@ -521,6 +550,25 @@ func (c *Controller) RemoteSrvServerArgs(ctx *sql.Context, args remotesrv.Server return args } +func (c *Controller) HookMySQLDbPersister(persister MySQLDbPersister, mysqlDb *mysql_db.MySQLDb) MySQLDbPersister { + if c != nil { + c.mysqlDb = mysqlDb + c.mysqlDbPersister = &replicatingPersister{ + base: persister, + replicas: c.mysqlDbReplicas, + } + c.mysqlDbPersister.setRole(c.role) + persister = c.mysqlDbPersister + } + return persister +} + +func (c *Controller) RegisterGrpcServices(srv *grpc.Server) { + replicationapi.RegisterReplicationServiceServer(srv, &replicationServiceServer{ + mysqlDb: c.mysqlDb, + }) +} + // TODO: make the deadline here configurable or something. const waitForHooksToReplicateTimeout = 10 * time.Second @@ -843,3 +891,48 @@ func (c *Controller) standbyRemotesJWKS() *jwtauth.MultiJWKS { } return jwtauth.NewMultiJWKS(c.lgr.WithFields(logrus.Fields{"component": "jwks-key-provider"}), urls, client) } + +type replicationServiceClient struct { + remote string + url string + client replicationapi.ReplicationServiceClient +} + +func (c *Controller) replicationServiceDialOptions() []grpc.DialOption { + var ret []grpc.DialOption + if c.tlsCfg == nil { + ret = append(ret, grpc.WithInsecure()) + } else { + ret = append(ret, grpc.WithTransportCredentials(credentials.NewTLS(c.tlsCfg))) + } + + ret = append(ret, grpc.WithStreamInterceptor(c.cinterceptor.Stream())) + ret = append(ret, grpc.WithUnaryInterceptor(c.cinterceptor.Unary())) + + ret = append(ret, grpc.WithPerRPCCredentials(c.grpcCreds)) + + return ret +} + +func (c *Controller) replicationServiceClients(ctx context.Context) ([]*replicationServiceClient, error) { + var ret []*replicationServiceClient + for _, r := range c.cfg.StandbyRemotes() { + urlStr := strings.Replace(r.RemoteURLTemplate(), dsess.URLTemplateDatabasePlaceholder, "", -1) + url, err := url.Parse(urlStr) + if err != nil { + return nil, fmt.Errorf("could not parse remote url template [%s] for remote %s: %w", r.RemoteURLTemplate(), r.Name(), err) + } + grpcTarget := "dns:" + url.Hostname() + ":" + url.Port() + cc, err := grpc.DialContext(ctx, grpcTarget, c.replicationServiceDialOptions()...) + if err != nil { + return nil, fmt.Errorf("could not dial grpc endpoint [%s] for remote %s: %w", grpcTarget, r.Name(), err) + } + client := replicationapi.NewReplicationServiceClient(cc) + ret = append(ret, &replicationServiceClient{ + remote: r.Name(), + url: grpcTarget, + client: client, + }) + } + return ret, nil +} diff --git a/go/libraries/doltcore/sqle/cluster/mysqldb_persister.go b/go/libraries/doltcore/sqle/cluster/mysqldb_persister.go new file mode 100644 index 0000000000..d4db5bad22 --- /dev/null +++ b/go/libraries/doltcore/sqle/cluster/mysqldb_persister.go @@ -0,0 +1,209 @@ +// Copyright 2023 Dolthub, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package cluster + +import ( + "context" + "sync" + "time" + + "github.com/dolthub/go-mysql-server/sql" + "github.com/dolthub/go-mysql-server/sql/mysql_db" + "github.com/sirupsen/logrus" + + replicationapi "github.com/dolthub/dolt/go/gen/proto/dolt/services/replicationapi/v1alpha1" +) + +type MySQLDbPersister interface { + mysql_db.MySQLDbPersistence + LoadData(context.Context) ([]byte, error) +} + +type replicatingPersister struct { + base MySQLDbPersister + + current []byte + version uint32 + replicas []*mysqlDbReplica + + mu sync.Mutex +} + +type mysqlDbReplica struct { + shutdown bool + role Role + + contents []byte + version uint32 + + replicatedVersion uint32 + nextAttempt time.Time + + client *replicationServiceClient + lgr *logrus.Entry + + mu sync.Mutex + cond *sync.Cond +} + +func (r *mysqlDbReplica) UpdateMySQLDb(ctx context.Context, contents []byte, version uint32) error { + r.mu.Lock() + defer r.mu.Unlock() + r.contents = contents + r.version = version + r.nextAttempt = time.Time{} + r.cond.Broadcast() + return nil +} + +func (r *mysqlDbReplica) Run() { + r.mu.Lock() + defer r.mu.Unlock() + r.lgr.Tracef("mysqlDbReplica[%s]: running", r.client.remote) + for !r.shutdown { + if r.role != RolePrimary { + r.cond.Wait() + continue + } + if r.version == 0 { + r.cond.Wait() + continue + } + if r.replicatedVersion == r.version { + r.cond.Wait() + continue + } + if r.nextAttempt.After(time.Now()) { + r.cond.Wait() + continue + } + _, err := r.client.client.UpdateUsersAndGrants(context.Background(), &replicationapi.UpdateUsersAndGrantsRequest{ + SerializedContents: r.contents, + }) + if err != nil { + r.lgr.Warnf("mysqlDbReplica[%s]: error replicating users and grants. backing off. %v", r.client.remote, err) + // TODO: Add backoff. + r.nextAttempt = time.Now().Add(1 * time.Second) + next := r.nextAttempt + go func() { + <-time.After(time.Until(next)) + r.mu.Lock() + defer r.mu.Unlock() + for !time.Now().After(next) { + } + r.cond.Broadcast() + }() + continue + } + r.lgr.Warnf("mysqlDbReplica[%s]: sucessfully replicated users and grants.", r.client.remote) + r.replicatedVersion = r.version + } +} + +func (r *mysqlDbReplica) GracefulStop() { + r.mu.Lock() + defer r.mu.Unlock() + r.shutdown = true + r.cond.Broadcast() +} + +func (r *mysqlDbReplica) setRole(role Role) { + r.mu.Lock() + defer r.mu.Unlock() + r.role = role + r.cond.Broadcast() +} + +func (p *replicatingPersister) setRole(role Role) { + for _, r := range p.replicas { + r.setRole(role) + } + p.mu.Lock() + // If we are transitioning to primary and we are already initialized, + // then we reload data so that we have the most recent persisted users + // and grants to replicate. + needsLoad := p.version != 0 && role == RolePrimary + p.mu.Unlock() + if needsLoad { + p.LoadData(context.Background()) + } +} + +func (p *replicatingPersister) Run() { + var wg sync.WaitGroup + for _, r := range p.replicas { + r := r + wg.Add(1) + func() { + defer wg.Done() + r.Run() + }() + } + wg.Wait() +} + +func (p *replicatingPersister) GracefulStop() { + for _, r := range p.replicas { + r.GracefulStop() + } +} + +func (p *replicatingPersister) Persist(ctx *sql.Context, data []byte) error { + err := p.base.Persist(ctx, data) + if err == nil { + p.mu.Lock() + p.current = data + p.version += 1 + defer p.mu.Unlock() + for _, r := range p.replicas { + r.UpdateMySQLDb(ctx, p.current, p.version) + } + } + return err +} + +func (p *replicatingPersister) LoadData(ctx context.Context) ([]byte, error) { + ret, err := p.base.LoadData(ctx) + if err == nil { + p.mu.Lock() + p.current = ret + p.version += 1 + defer p.mu.Unlock() + for _, r := range p.replicas { + r.UpdateMySQLDb(ctx, p.current, p.version) + } + } + return ret, err +} + +type replicationServiceServer struct { + replicationapi.UnimplementedReplicationServiceServer + mysqlDb *mysql_db.MySQLDb +} + +func (s *replicationServiceServer) UpdateUsersAndGrants(ctx context.Context, req *replicationapi.UpdateUsersAndGrantsRequest) (*replicationapi.UpdateUsersAndGrantsResponse, error) { + sqlCtx := sql.NewContext(ctx) + ed := s.mysqlDb.Editor() + defer ed.Close() + err := s.mysqlDb.OverwriteUsersAndGrantData(sqlCtx, ed, req.SerializedContents) + if err != nil { + return nil, err + } + err = s.mysqlDb.Persist(sqlCtx, ed) + if err != nil { + return nil, err + } + return &replicationapi.UpdateUsersAndGrantsResponse{}, nil +} diff --git a/go/libraries/doltcore/sqle/mysql_file_handler/file_handler.go b/go/libraries/doltcore/sqle/mysql_file_handler/file_handler.go index 4624a46f98..4e0b3383f4 100644 --- a/go/libraries/doltcore/sqle/mysql_file_handler/file_handler.go +++ b/go/libraries/doltcore/sqle/mysql_file_handler/file_handler.go @@ -15,6 +15,7 @@ package mysql_file_handler import ( + "context" "errors" "os" "sync" @@ -56,7 +57,7 @@ func (p *Persister) Persist(ctx *sql.Context, data []byte) error { } // LoadData reads the mysql.db file, returns nil if empty or not found -func (p Persister) LoadData() ([]byte, error) { +func (p Persister) LoadData(context.Context) ([]byte, error) { // do nothing if no filepath specified if len(p.privsFilePath) == 0 { return nil, nil diff --git a/integration-tests/go-sql-server-driver/main_test.go b/integration-tests/go-sql-server-driver/main_test.go index 5f8e0527f8..cace631a5e 100644 --- a/integration-tests/go-sql-server-driver/main_test.go +++ b/integration-tests/go-sql-server-driver/main_test.go @@ -28,6 +28,10 @@ func TestCluster(t *testing.T) { RunTestsFile(t, "tests/sql-server-cluster.yaml") } +func TestClusterUsersAndGrants(t *testing.T) { + RunTestsFile(t, "tests/sql-server-cluster-users-and-grants.yaml") +} + func TestRemotesAPI(t *testing.T) { RunTestsFile(t, "tests/sql-server-remotesapi.yaml") } @@ -35,7 +39,7 @@ func TestRemotesAPI(t *testing.T) { // TestSingle is a convenience method for running a single test from within an IDE. Unskip and set to the file and name // of the test you want to debug. See README.md in the `tests` directory for more debugging info. func TestSingle(t *testing.T) { - // t.Skip() + t.Skip() RunSingleTest(t, "tests/sql-server-cluster.yaml", "primary comes up and replicates to standby") } diff --git a/integration-tests/go-sql-server-driver/tests/sql-server-cluster-users-and-grants.yaml b/integration-tests/go-sql-server-driver/tests/sql-server-cluster-users-and-grants.yaml new file mode 100644 index 0000000000..faa003a243 --- /dev/null +++ b/integration-tests/go-sql-server-driver/tests/sql-server-cluster-users-and-grants.yaml @@ -0,0 +1,145 @@ +tests: +- name: users and grants replicate + multi_repos: + - name: server1 + with_files: + - name: server.yaml + contents: | + log_level: trace + listener: + host: 0.0.0.0 + port: 3309 + cluster: + standby_remotes: + - name: standby + remote_url_template: http://localhost:3852/{database} + bootstrap_role: primary + bootstrap_epoch: 1 + remotesapi: + port: 3851 + server: + args: ["--config", "server.yaml"] + port: 3309 + - name: server2 + with_files: + - name: server.yaml + contents: | + log_level: trace + listener: + host: 0.0.0.0 + port: 3310 + cluster: + standby_remotes: + - name: standby + remote_url_template: http://localhost:3851/{database} + bootstrap_role: standby + bootstrap_epoch: 1 + remotesapi: + port: 3852 + server: + args: ["--config", "server.yaml"] + port: 3310 + connections: + - on: server1 + queries: + - exec: 'SET @@PERSIST.dolt_cluster_ack_writes_timeout_secs = 10' + - exec: 'create database repo1' + - exec: "use repo1" + - exec: 'create table vals (i int primary key)' + - exec: 'insert into vals values (0),(1),(2),(3),(4)' + - exec: 'create user "aaron"@"%" IDENTIFIED BY "aaronspassword"' + - exec: 'grant ALL ON *.* to "aaron"@"%"' + - exec: 'insert into vals values (5),(6),(7),(8),(9)' + - on: server1 + user: 'aaron' + password: 'aaronspassword' + queries: + - exec: "use repo1" + - exec: 'insert into vals values (10),(11),(12),(13),(14)' + - on: server2 + user: 'aaron' + password: 'aaronspassword' + queries: + - exec: "use repo1" + - query: 'select count(*) from vals' + result: + columns: ["count(*)"] + rows: [["15"]] +- name: users and grants applied to standby do not replicate +### TODO: This test should not be possible; being able to run create user on a standby is a bug. + multi_repos: + - name: server1 + with_files: + - name: server.yaml + contents: | + log_level: trace + listener: + host: 0.0.0.0 + port: 3309 + cluster: + standby_remotes: + - name: standby + remote_url_template: http://localhost:3852/{database} + bootstrap_role: primary + bootstrap_epoch: 1 + remotesapi: + port: 3851 + server: + args: ["--config", "server.yaml"] + port: 3309 + - name: server2 + with_files: + - name: server.yaml + contents: | + log_level: trace + listener: + host: 0.0.0.0 + port: 3310 + cluster: + standby_remotes: + - name: standby + remote_url_template: http://localhost:3851/{database} + bootstrap_role: standby + bootstrap_epoch: 1 + remotesapi: + port: 3852 + server: + args: ["--config", "server.yaml"] + port: 3310 + connections: + - on: server1 + queries: + - exec: 'SET @@PERSIST.dolt_cluster_ack_writes_timeout_secs = 10' + - exec: 'create database repo1' + - exec: "use repo1" + - exec: 'create table vals (i int primary key)' + - exec: 'insert into vals values (0),(1),(2),(3),(4)' + - exec: 'create user "aaron"@"%" IDENTIFIED BY "aaronspassword"' + - exec: 'grant ALL ON *.* to "aaron"@"%"' + - exec: 'insert into vals values (5),(6),(7),(8),(9)' + - on: server1 + user: 'aaron' + password: 'aaronspassword' + queries: + - exec: "use repo1" + - exec: 'insert into vals values (10),(11),(12),(13),(14)' + - on: server2 + user: 'aaron' + password: 'aaronspassword' + queries: + - exec: "use repo1" + - query: 'select count(*) from vals' + result: + columns: ["count(*)"] + rows: [["15"]] + - exec: 'create user "brian"@"%" IDENTIFIED BY "brianspassword"' + - exec: 'grant ALL ON *.* to "brian"@"%"' + - exec: 'select sleep(1) from dual' + - on: server1 + user: 'aaron' + password: 'aaronspassword' + queries: + - query: "select count(*) from mysql.user where User = 'brian'" + result: + columns: ["count(*)"] + rows: [["0"]] diff --git a/proto/Makefile b/proto/Makefile index eb7aff1ede..6ab524add4 100644 --- a/proto/Makefile +++ b/proto/Makefile @@ -22,12 +22,17 @@ REMOTESAPI_protos := \ dolt/services/remotesapi/v1alpha1/credentials.proto REMOTESAPI_pbgo_pkg_path := dolt/services/remotesapi/v1alpha1 +REPLICATIONAPI_protos := \ + dolt/services/replicationapi/v1alpha1/replication.proto +REPLICATIONAPI_pbgo_pkg_path := dolt/services/replicationapi/v1alpha1 + nonservice_protos := \ dolt/services/eventsapi/v1alpha1/event_constants.proto PBGO_pkgs := \ CLIENTEVENTS \ REMOTESAPI \ + REPLICATIONAPI \ EVENTSAPI all: diff --git a/proto/dolt/services/replicationapi/v1alpha1/replication.proto b/proto/dolt/services/replicationapi/v1alpha1/replication.proto new file mode 100644 index 0000000000..48169973cd --- /dev/null +++ b/proto/dolt/services/replicationapi/v1alpha1/replication.proto @@ -0,0 +1,38 @@ +// Copyright 2023 Dolthub, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +syntax = "proto3"; + +package dolt.services.replicationapi.v1alpha1; + +option go_package = "github.com/dolthub/dolt/go/gen/proto/dolt/services/replicationapi/v1alpha1;replicationapi"; + +service ReplicationService { + // Users and grants in Dolt are stored in in a + // `github.com/dolthub/go-mysql-server/sql/mysql_db.*MySQLDb` instance. This + // method is called by a primary on a standby instance in order to set its + // in-effect users and grants. Its primary payload is the serialized contents + // of the `*MySQLDb` instance at the primary, such that it can be applied + // with OverwriteUsersAndGrantData. + rpc UpdateUsersAndGrants(UpdateUsersAndGrantsRequest) returns (UpdateUsersAndGrantsResponse); +} + +message UpdateUsersAndGrantsRequest { + // The contents of the *MySQLDb instance, as seen by a Persister + // implementation. + bytes serialized_contents = 1; +} + +message UpdateUsersAndGrantsResponse { +}