mirror of
https://github.com/dolthub/dolt.git
synced 2026-01-10 08:49:43 -06:00
go/libraries/doltcore/remotesrv: Move logging to logrus.
This commit is contained in:
@@ -206,7 +206,7 @@ func Serve(
|
||||
var remoteSrv *remotesrv.Server
|
||||
if serverConfig.RemotesapiPort() != nil {
|
||||
if remoteSrvSqlCtx, err := sqlEngine.NewContext(context.Background()); err == nil {
|
||||
remoteSrv = sqle.NewRemoteSrvServer(remoteSrvSqlCtx, *serverConfig.RemotesapiPort())
|
||||
remoteSrv = sqle.NewRemoteSrvServer(logrus.NewEntry(lgr), remoteSrvSqlCtx, *serverConfig.RemotesapiPort())
|
||||
listeners, err := remoteSrv.Listeners()
|
||||
if err != nil {
|
||||
lgr.Warnf("error starting remotesapi server listeners on port %d; remotesapi will not be available: %v\n", *serverConfig.RemotesapiPort(), err)
|
||||
|
||||
@@ -18,11 +18,12 @@ import (
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
"log"
|
||||
"path/filepath"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync/atomic"
|
||||
|
||||
"github.com/sirupsen/logrus"
|
||||
"google.golang.org/grpc/codes"
|
||||
"google.golang.org/grpc/metadata"
|
||||
"google.golang.org/grpc/status"
|
||||
@@ -42,30 +43,34 @@ type RemoteChunkStore struct {
|
||||
bucket string
|
||||
expectedFiles fileDetails
|
||||
fs filesys.Filesys
|
||||
lgr *logrus.Entry
|
||||
remotesapi.UnimplementedChunkStoreServiceServer
|
||||
}
|
||||
|
||||
func NewHttpFSBackedChunkStore(httpHost string, csCache DBCache, expectedFiles fileDetails, fs filesys.Filesys) *RemoteChunkStore {
|
||||
func NewHttpFSBackedChunkStore(lgr *logrus.Entry, httpHost string, csCache DBCache, expectedFiles fileDetails, fs filesys.Filesys) *RemoteChunkStore {
|
||||
return &RemoteChunkStore{
|
||||
HttpHost: httpHost,
|
||||
csCache: csCache,
|
||||
bucket: "",
|
||||
expectedFiles: expectedFiles,
|
||||
fs: fs,
|
||||
lgr: lgr.WithFields(logrus.Fields{
|
||||
"service": "dolt.services.remotesapi.v1alpha1.ChunkStoreServiceServer",
|
||||
}),
|
||||
}
|
||||
}
|
||||
|
||||
func (rs *RemoteChunkStore) HasChunks(ctx context.Context, req *remotesapi.HasChunksRequest) (*remotesapi.HasChunksResponse, error) {
|
||||
logger := getReqLogger("GRPC", "HasChunks")
|
||||
defer func() { logger("finished") }()
|
||||
logger := getReqLogger(rs.lgr, "HasChunks")
|
||||
defer func() { logger.Println("finished") }()
|
||||
|
||||
cs := rs.getStore(req.RepoId, "HasChunks")
|
||||
cs := rs.getStore(logger, req.RepoId)
|
||||
|
||||
if cs == nil {
|
||||
return nil, status.Error(codes.Internal, "Could not get chunkstore")
|
||||
}
|
||||
|
||||
logger(fmt.Sprintf("found repo %s/%s", req.RepoId.Org, req.RepoId.RepoName))
|
||||
logger.Printf("found repo %s/%s", req.RepoId.Org, req.RepoId.RepoName)
|
||||
|
||||
hashes, hashToIndex := remotestorage.ParseByteSlices(req.Hashes)
|
||||
|
||||
@@ -109,16 +114,16 @@ func (rs *RemoteChunkStore) getRelativeStorePath(cs RemoteSrvStore) (string, err
|
||||
}
|
||||
|
||||
func (rs *RemoteChunkStore) GetDownloadLocations(ctx context.Context, req *remotesapi.GetDownloadLocsRequest) (*remotesapi.GetDownloadLocsResponse, error) {
|
||||
logger := getReqLogger("GRPC", "GetDownloadLocations")
|
||||
defer func() { logger("finished") }()
|
||||
logger := getReqLogger(rs.lgr, "GetDownloadLocations")
|
||||
defer func() { logger.Println("finished") }()
|
||||
|
||||
cs := rs.getStore(req.RepoId, "GetDownloadLoctions")
|
||||
cs := rs.getStore(logger, req.RepoId)
|
||||
|
||||
if cs == nil {
|
||||
return nil, status.Error(codes.Internal, "Could not get chunkstore")
|
||||
}
|
||||
|
||||
logger(fmt.Sprintf("found repo %s/%s", req.RepoId.Org, req.RepoId.RepoName))
|
||||
logger.Printf("found repo %s/%s", req.RepoId.Org, req.RepoId.RepoName)
|
||||
|
||||
hashes, _ := remotestorage.ParseByteSlices(req.ChunkHashes)
|
||||
|
||||
@@ -144,11 +149,11 @@ func (rs *RemoteChunkStore) GetDownloadLocations(ctx context.Context, req *remot
|
||||
|
||||
url, err := rs.getDownloadUrl(logger, md, prefix+"/"+loc)
|
||||
if err != nil {
|
||||
log.Println("Failed to sign request", err)
|
||||
logger.Println("Failed to sign request", err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
logger("The URL is " + url)
|
||||
logger.Println("The URL is", url)
|
||||
|
||||
getRange := &remotesapi.HttpGetRange{Url: url, Ranges: ranges}
|
||||
locs = append(locs, &remotesapi.DownloadLoc{Location: &remotesapi.DownloadLoc_HttpGetRange{HttpGetRange: getRange}})
|
||||
@@ -158,8 +163,8 @@ func (rs *RemoteChunkStore) GetDownloadLocations(ctx context.Context, req *remot
|
||||
}
|
||||
|
||||
func (rs *RemoteChunkStore) StreamDownloadLocations(stream remotesapi.ChunkStoreService_StreamDownloadLocationsServer) error {
|
||||
logger := getReqLogger("GRPC", "StreamDownloadLocations")
|
||||
defer func() { logger("finished") }()
|
||||
logger := getReqLogger(rs.lgr, "StreamDownloadLocations")
|
||||
defer func() { logger.Println("finished") }()
|
||||
|
||||
md, _ := metadata.FromIncomingContext(stream.Context())
|
||||
|
||||
@@ -177,11 +182,11 @@ func (rs *RemoteChunkStore) StreamDownloadLocations(stream remotesapi.ChunkStore
|
||||
|
||||
if !proto.Equal(req.RepoId, repoID) {
|
||||
repoID = req.RepoId
|
||||
cs = rs.getStore(repoID, "StreamDownloadLoctions")
|
||||
cs = rs.getStore(logger, repoID)
|
||||
if cs == nil {
|
||||
return status.Error(codes.Internal, "Could not get chunkstore")
|
||||
}
|
||||
logger(fmt.Sprintf("found repo %s/%s", repoID.Org, repoID.RepoName))
|
||||
logger.Printf("found repo %s/%s", repoID.Org, repoID.RepoName)
|
||||
|
||||
prefix, err = rs.getRelativeStorePath(cs)
|
||||
if err != nil {
|
||||
@@ -205,11 +210,11 @@ func (rs *RemoteChunkStore) StreamDownloadLocations(stream remotesapi.ChunkStore
|
||||
|
||||
url, err := rs.getDownloadUrl(logger, md, prefix+"/"+loc)
|
||||
if err != nil {
|
||||
log.Println("Failed to sign request", err)
|
||||
logger.Println("Failed to sign request", err)
|
||||
return err
|
||||
}
|
||||
|
||||
logger("The URL is " + url)
|
||||
logger.Println("The URL is", url)
|
||||
|
||||
getRange := &remotesapi.HttpGetRange{Url: url, Ranges: ranges}
|
||||
locs = append(locs, &remotesapi.DownloadLoc{Location: &remotesapi.DownloadLoc_HttpGetRange{HttpGetRange: getRange}})
|
||||
@@ -221,7 +226,7 @@ func (rs *RemoteChunkStore) StreamDownloadLocations(stream remotesapi.ChunkStore
|
||||
}
|
||||
}
|
||||
|
||||
func (rs *RemoteChunkStore) getDownloadUrl(logger func(string), md metadata.MD, path string) (string, error) {
|
||||
func (rs *RemoteChunkStore) getDownloadUrl(logger *logrus.Entry, md metadata.MD, path string) (string, error) {
|
||||
host := rs.HttpHost
|
||||
if strings.HasPrefix(rs.HttpHost, ":") && rs.HttpHost != ":80" {
|
||||
hosts := md.Get(":authority")
|
||||
@@ -258,16 +263,16 @@ func parseTableFileDetails(req *remotesapi.GetUploadLocsRequest) []*remotesapi.T
|
||||
}
|
||||
|
||||
func (rs *RemoteChunkStore) GetUploadLocations(ctx context.Context, req *remotesapi.GetUploadLocsRequest) (*remotesapi.GetUploadLocsResponse, error) {
|
||||
logger := getReqLogger("GRPC", "GetUploadLocations")
|
||||
defer func() { logger("finished") }()
|
||||
logger := getReqLogger(rs.lgr, "GetUploadLocations")
|
||||
defer func() { logger.Println("finished") }()
|
||||
|
||||
cs := rs.getStore(req.RepoId, "GetWriteChunkUrls")
|
||||
cs := rs.getStore(logger, req.RepoId)
|
||||
|
||||
if cs == nil {
|
||||
return nil, status.Error(codes.Internal, "Could not get chunkstore")
|
||||
}
|
||||
|
||||
logger(fmt.Sprintf("found repo %s/%s", req.RepoId.Org, req.RepoId.RepoName))
|
||||
logger.Printf("found repo %s/%s", req.RepoId.Org, req.RepoId.RepoName)
|
||||
|
||||
org := req.RepoId.Org
|
||||
repoName := req.RepoId.RepoName
|
||||
@@ -285,34 +290,34 @@ func (rs *RemoteChunkStore) GetUploadLocations(ctx context.Context, req *remotes
|
||||
loc := &remotesapi.UploadLoc_HttpPost{HttpPost: &remotesapi.HttpPostTableFile{Url: url}}
|
||||
locs = append(locs, &remotesapi.UploadLoc{TableFileHash: h[:], Location: loc})
|
||||
|
||||
logger(fmt.Sprintf("sending upload location for chunk %s: %s", h.String(), url))
|
||||
logger.Printf("sending upload location for chunk %s: %s", h.String(), url)
|
||||
}
|
||||
|
||||
return &remotesapi.GetUploadLocsResponse{Locs: locs}, nil
|
||||
}
|
||||
|
||||
func (rs *RemoteChunkStore) getUploadUrl(logger func(string), org, repoName string, tfd *remotesapi.TableFileDetails) (string, error) {
|
||||
func (rs *RemoteChunkStore) getUploadUrl(logger *logrus.Entry, org, repoName string, tfd *remotesapi.TableFileDetails) (string, error) {
|
||||
fileID := hash.New(tfd.Id).String()
|
||||
rs.expectedFiles.Put(fileID, tfd)
|
||||
return fmt.Sprintf("http://%s/%s/%s/%s", rs.HttpHost, org, repoName, fileID), nil
|
||||
}
|
||||
|
||||
func (rs *RemoteChunkStore) Rebase(ctx context.Context, req *remotesapi.RebaseRequest) (*remotesapi.RebaseResponse, error) {
|
||||
logger := getReqLogger("GRPC", "Rebase")
|
||||
defer func() { logger("finished") }()
|
||||
logger := getReqLogger(rs.lgr, "Rebase")
|
||||
defer func() { logger.Println("finished") }()
|
||||
|
||||
cs := rs.getStore(req.RepoId, "Rebase")
|
||||
cs := rs.getStore(logger, req.RepoId)
|
||||
|
||||
if cs == nil {
|
||||
return nil, status.Error(codes.Internal, "Could not get chunkstore")
|
||||
}
|
||||
|
||||
logger(fmt.Sprintf("found %s/%s", req.RepoId.Org, req.RepoId.RepoName))
|
||||
logger.Printf("found %s/%s", req.RepoId.Org, req.RepoId.RepoName)
|
||||
|
||||
err := cs.Rebase(ctx)
|
||||
|
||||
if err != nil {
|
||||
logger(fmt.Sprintf("error occurred during processing of Rebace rpc of %s/%s details: %v", req.RepoId.Org, req.RepoId.RepoName, err))
|
||||
logger.Printf("error occurred during processing of Rebace rpc of %s/%s details: %v", req.RepoId.Org, req.RepoId.RepoName, err)
|
||||
return nil, status.Errorf(codes.Internal, "failed to rebase: %v", err)
|
||||
}
|
||||
|
||||
@@ -320,10 +325,10 @@ func (rs *RemoteChunkStore) Rebase(ctx context.Context, req *remotesapi.RebaseRe
|
||||
}
|
||||
|
||||
func (rs *RemoteChunkStore) Root(ctx context.Context, req *remotesapi.RootRequest) (*remotesapi.RootResponse, error) {
|
||||
logger := getReqLogger("GRPC", "Root")
|
||||
defer func() { logger("finished") }()
|
||||
logger := getReqLogger(rs.lgr, "Root")
|
||||
defer func() { logger.Println("finished") }()
|
||||
|
||||
cs := rs.getStore(req.RepoId, "Root")
|
||||
cs := rs.getStore(logger, req.RepoId)
|
||||
|
||||
if cs == nil {
|
||||
return nil, status.Error(codes.Internal, "Could not get chunkstore")
|
||||
@@ -332,7 +337,7 @@ func (rs *RemoteChunkStore) Root(ctx context.Context, req *remotesapi.RootReques
|
||||
h, err := cs.Root(ctx)
|
||||
|
||||
if err != nil {
|
||||
logger(fmt.Sprintf("error occurred during processing of Root rpc of %s/%s details: %v", req.RepoId.Org, req.RepoId.RepoName, err))
|
||||
logger.Printf("error occurred during processing of Root rpc of %s/%s details: %v", req.RepoId.Org, req.RepoId.RepoName, err)
|
||||
return nil, status.Error(codes.Internal, "Failed to get root")
|
||||
}
|
||||
|
||||
@@ -340,16 +345,16 @@ func (rs *RemoteChunkStore) Root(ctx context.Context, req *remotesapi.RootReques
|
||||
}
|
||||
|
||||
func (rs *RemoteChunkStore) Commit(ctx context.Context, req *remotesapi.CommitRequest) (*remotesapi.CommitResponse, error) {
|
||||
logger := getReqLogger("GRPC", "Commit")
|
||||
defer func() { logger("finished") }()
|
||||
logger := getReqLogger(rs.lgr, "Commit")
|
||||
defer func() { logger.Println("finished") }()
|
||||
|
||||
cs := rs.getStore(req.RepoId, "Commit")
|
||||
cs := rs.getStore(logger, req.RepoId)
|
||||
|
||||
if cs == nil {
|
||||
return nil, status.Error(codes.Internal, "Could not get chunkstore")
|
||||
}
|
||||
|
||||
logger(fmt.Sprintf("found %s/%s", req.RepoId.Org, req.RepoId.RepoName))
|
||||
logger.Printf("found %s/%s", req.RepoId.Org, req.RepoId.RepoName)
|
||||
|
||||
//should validate
|
||||
updates := make(map[string]int)
|
||||
@@ -360,7 +365,7 @@ func (rs *RemoteChunkStore) Commit(ctx context.Context, req *remotesapi.CommitRe
|
||||
err := cs.AddTableFilesToManifest(ctx, updates)
|
||||
|
||||
if err != nil {
|
||||
logger(fmt.Sprintf("error occurred updating the manifest: %s", err.Error()))
|
||||
logger.Printf("error occurred updating the manifest: %s", err.Error())
|
||||
return nil, status.Errorf(codes.Internal, "manifest update error: %v", err)
|
||||
}
|
||||
|
||||
@@ -371,19 +376,19 @@ func (rs *RemoteChunkStore) Commit(ctx context.Context, req *remotesapi.CommitRe
|
||||
ok, err = cs.Commit(ctx, currHash, lastHash)
|
||||
|
||||
if err != nil {
|
||||
logger(fmt.Sprintf("error occurred during processing of Commit of %s/%s last %s curr: %s details: %v", req.RepoId.Org, req.RepoId.RepoName, lastHash.String(), currHash.String(), err))
|
||||
logger.Printf("error occurred during processing of Commit of %s/%s last %s curr: %s details: %v", req.RepoId.Org, req.RepoId.RepoName, lastHash.String(), currHash.String(), err)
|
||||
return nil, status.Errorf(codes.Internal, "failed to commit: %v", err)
|
||||
}
|
||||
|
||||
logger(fmt.Sprintf("committed %s/%s moved from %s -> %s", req.RepoId.Org, req.RepoId.RepoName, currHash.String(), lastHash.String()))
|
||||
logger.Printf("committed %s/%s moved from %s -> %s", req.RepoId.Org, req.RepoId.RepoName, currHash.String(), lastHash.String())
|
||||
return &remotesapi.CommitResponse{Success: ok}, nil
|
||||
}
|
||||
|
||||
func (rs *RemoteChunkStore) GetRepoMetadata(ctx context.Context, req *remotesapi.GetRepoMetadataRequest) (*remotesapi.GetRepoMetadataResponse, error) {
|
||||
logger := getReqLogger("GRPC", "GetRepoMetadata")
|
||||
defer func() { logger("finished") }()
|
||||
logger := getReqLogger(rs.lgr, "GetRepoMetadata")
|
||||
defer func() { logger.Println("finished") }()
|
||||
|
||||
cs := rs.getOrCreateStore(req.RepoId, "GetRepoMetadata", req.ClientRepoFormat.NbfVersion)
|
||||
cs := rs.getOrCreateStore(logger, req.RepoId, req.ClientRepoFormat.NbfVersion)
|
||||
if cs == nil {
|
||||
return nil, status.Error(codes.Internal, "Could not get chunkstore")
|
||||
}
|
||||
@@ -406,16 +411,16 @@ func (rs *RemoteChunkStore) GetRepoMetadata(ctx context.Context, req *remotesapi
|
||||
}
|
||||
|
||||
func (rs *RemoteChunkStore) ListTableFiles(ctx context.Context, req *remotesapi.ListTableFilesRequest) (*remotesapi.ListTableFilesResponse, error) {
|
||||
logger := getReqLogger("GRPC", "ListTableFiles")
|
||||
defer func() { logger("finished") }()
|
||||
logger := getReqLogger(rs.lgr, "ListTableFiles")
|
||||
defer func() { logger.Println("finished") }()
|
||||
|
||||
cs := rs.getStore(req.RepoId, "ListTableFiles")
|
||||
cs := rs.getStore(logger, req.RepoId)
|
||||
|
||||
if cs == nil {
|
||||
return nil, status.Error(codes.Internal, "Could not get chunkstore")
|
||||
}
|
||||
|
||||
logger(fmt.Sprintf("found repo %s/%s", req.RepoId.Org, req.RepoId.RepoName))
|
||||
logger.Printf("found repo %s/%s", req.RepoId.Org, req.RepoId.RepoName)
|
||||
|
||||
root, tables, appendixTables, err := cs.Sources(ctx)
|
||||
|
||||
@@ -445,7 +450,7 @@ func (rs *RemoteChunkStore) ListTableFiles(ctx context.Context, req *remotesapi.
|
||||
}
|
||||
|
||||
func getTableFileInfo(
|
||||
logger func(string),
|
||||
logger *logrus.Entry,
|
||||
md metadata.MD,
|
||||
rs *RemoteChunkStore,
|
||||
tableList []nbs.TableFile,
|
||||
@@ -474,16 +479,16 @@ func getTableFileInfo(
|
||||
|
||||
// AddTableFiles updates the remote manifest with new table files without modifying the root hash.
|
||||
func (rs *RemoteChunkStore) AddTableFiles(ctx context.Context, req *remotesapi.AddTableFilesRequest) (*remotesapi.AddTableFilesResponse, error) {
|
||||
logger := getReqLogger("GRPC", "Commit")
|
||||
defer func() { logger("finished") }()
|
||||
logger := getReqLogger(rs.lgr, "AddTableFiles")
|
||||
defer func() { logger.Println("finished") }()
|
||||
|
||||
cs := rs.getStore(req.RepoId, "Commit")
|
||||
cs := rs.getStore(logger, req.RepoId)
|
||||
|
||||
if cs == nil {
|
||||
return nil, status.Error(codes.Internal, "Could not get chunkstore")
|
||||
}
|
||||
|
||||
logger(fmt.Sprintf("found %s/%s", req.RepoId.Org, req.RepoId.RepoName))
|
||||
logger.Printf("found %s/%s", req.RepoId.Org, req.RepoId.RepoName)
|
||||
|
||||
// should validate
|
||||
updates := make(map[string]int)
|
||||
@@ -494,25 +499,25 @@ func (rs *RemoteChunkStore) AddTableFiles(ctx context.Context, req *remotesapi.A
|
||||
err := cs.AddTableFilesToManifest(ctx, updates)
|
||||
|
||||
if err != nil {
|
||||
logger(fmt.Sprintf("error occurred updating the manifest: %s", err.Error()))
|
||||
logger.Printf("error occurred updating the manifest: %s", err.Error())
|
||||
return nil, status.Error(codes.Internal, "manifest update error")
|
||||
}
|
||||
|
||||
return &remotesapi.AddTableFilesResponse{Success: true}, nil
|
||||
}
|
||||
|
||||
func (rs *RemoteChunkStore) getStore(repoId *remotesapi.RepoId, rpcName string) RemoteSrvStore {
|
||||
return rs.getOrCreateStore(repoId, rpcName, types.Format_Default.VersionString())
|
||||
func (rs *RemoteChunkStore) getStore(logger *logrus.Entry, repoId *remotesapi.RepoId) RemoteSrvStore {
|
||||
return rs.getOrCreateStore(logger, repoId, types.Format_Default.VersionString())
|
||||
}
|
||||
|
||||
func (rs *RemoteChunkStore) getOrCreateStore(repoId *remotesapi.RepoId, rpcName, nbfVerStr string) RemoteSrvStore {
|
||||
func (rs *RemoteChunkStore) getOrCreateStore(logger *logrus.Entry, repoId *remotesapi.RepoId, nbfVerStr string) RemoteSrvStore {
|
||||
org := repoId.Org
|
||||
repoName := repoId.RepoName
|
||||
|
||||
cs, err := rs.csCache.Get(org, repoName, nbfVerStr)
|
||||
|
||||
if err != nil {
|
||||
log.Printf("Failed to retrieve chunkstore for %s/%s\n", org, repoName)
|
||||
logger.Printf("Failed to retrieve chunkstore for %s/%s\n", org, repoName)
|
||||
}
|
||||
|
||||
return cs
|
||||
@@ -520,17 +525,17 @@ func (rs *RemoteChunkStore) getOrCreateStore(repoId *remotesapi.RepoId, rpcName,
|
||||
|
||||
var requestId int32
|
||||
|
||||
func incReqId() int32 {
|
||||
return atomic.AddInt32(&requestId, 1)
|
||||
func incReqId() int {
|
||||
return int(atomic.AddInt32(&requestId, 1))
|
||||
}
|
||||
|
||||
func getReqLogger(method, callName string) func(string) {
|
||||
callId := fmt.Sprintf("%s(%05d)", method, incReqId())
|
||||
log.Println(callId, "new request for:", callName)
|
||||
|
||||
return func(msg string) {
|
||||
log.Println(callId, "-", msg)
|
||||
}
|
||||
func getReqLogger(lgr *logrus.Entry, method string) *logrus.Entry {
|
||||
lgr = lgr.WithFields(logrus.Fields{
|
||||
"method": method,
|
||||
"request_num": strconv.Itoa(incReqId()),
|
||||
})
|
||||
lgr.Println("starting request")
|
||||
return lgr
|
||||
}
|
||||
|
||||
type ReadOnlyChunkStore struct {
|
||||
|
||||
@@ -29,8 +29,9 @@ import (
|
||||
"strings"
|
||||
"sync"
|
||||
|
||||
remotesapi "github.com/dolthub/dolt/go/gen/proto/dolt/services/remotesapi/v1alpha1"
|
||||
"github.com/sirupsen/logrus"
|
||||
|
||||
remotesapi "github.com/dolthub/dolt/go/gen/proto/dolt/services/remotesapi/v1alpha1"
|
||||
"github.com/dolthub/dolt/go/libraries/utils/filesys"
|
||||
"github.com/dolthub/dolt/go/store/hash"
|
||||
"github.com/dolthub/dolt/go/store/types"
|
||||
@@ -66,15 +67,24 @@ type filehandler struct {
|
||||
expectedFiles fileDetails
|
||||
fs filesys.Filesys
|
||||
readOnly bool
|
||||
lgr *logrus.Entry
|
||||
}
|
||||
|
||||
func newFileHandler(dbCache DBCache, expectedFiles fileDetails, fs filesys.Filesys, readOnly bool) filehandler {
|
||||
return filehandler{dbCache, expectedFiles, fs, readOnly}
|
||||
func newFileHandler(lgr *logrus.Entry, dbCache DBCache, expectedFiles fileDetails, fs filesys.Filesys, readOnly bool) filehandler {
|
||||
return filehandler{
|
||||
dbCache,
|
||||
expectedFiles,
|
||||
fs,
|
||||
readOnly,
|
||||
lgr.WithFields(logrus.Fields{
|
||||
"service": "dolt.services.remotesapi.v1alpha1.HttpFileServer",
|
||||
}),
|
||||
}
|
||||
}
|
||||
|
||||
func (fh filehandler) ServeHTTP(respWr http.ResponseWriter, req *http.Request) {
|
||||
logger := getReqLogger("HTTP_"+req.Method, req.RequestURI)
|
||||
defer func() { logger("finished") }()
|
||||
logger := getReqLogger(fh.lgr, req.Method+"_"+req.RequestURI)
|
||||
defer func() { logger.Println("finished") }()
|
||||
|
||||
path := strings.TrimLeft(req.URL.Path, "/")
|
||||
|
||||
@@ -83,25 +93,25 @@ func (fh filehandler) ServeHTTP(respWr http.ResponseWriter, req *http.Request) {
|
||||
case http.MethodGet:
|
||||
path = filepath.Clean(path)
|
||||
if strings.HasPrefix(path, "../") || strings.Contains(path, "/../") || strings.HasSuffix(path, "/..") {
|
||||
logger("bad request with .. for path " + path)
|
||||
logger.Println("bad request with .. for path", path)
|
||||
respWr.WriteHeader(http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
i := strings.LastIndex(path, "/")
|
||||
if i == -1 {
|
||||
logger("bad request with -1 LastIndex of '/' for path " + path)
|
||||
logger.Println("bad request with -1 LastIndex of '/' for path ", path)
|
||||
respWr.WriteHeader(http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
_, ok := hash.MaybeParse(path[i+1:])
|
||||
if !ok {
|
||||
logger("bad request with unparseable last path component " + path[i+1:])
|
||||
logger.Println("bad request with unparseable last path component", path[i+1:])
|
||||
respWr.WriteHeader(http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
abs, err := fh.fs.Abs(path)
|
||||
if err != nil {
|
||||
logger(fmt.Sprintf("could not get absolute path: %s", err.Error()))
|
||||
logger.Printf("could not get absolute path: %s", err.Error())
|
||||
respWr.WriteHeader(http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
@@ -115,7 +125,7 @@ func (fh filehandler) ServeHTTP(respWr http.ResponseWriter, req *http.Request) {
|
||||
|
||||
tokens := strings.Split(path, "/")
|
||||
if len(tokens) != 3 {
|
||||
logger(fmt.Sprintf("response to: %v method: %v http response code: %v", req.RequestURI, req.Method, http.StatusNotFound))
|
||||
logger.Printf("response to: %v method: %v http response code: %v", req.RequestURI, req.Method, http.StatusNotFound)
|
||||
respWr.WriteHeader(http.StatusNotFound)
|
||||
return
|
||||
}
|
||||
@@ -132,7 +142,7 @@ func (fh filehandler) ServeHTTP(respWr http.ResponseWriter, req *http.Request) {
|
||||
}
|
||||
}
|
||||
|
||||
func readTableFile(logger func(string), path string, respWr http.ResponseWriter, req *http.Request) int {
|
||||
func readTableFile(logger *logrus.Entry, path string, respWr http.ResponseWriter, req *http.Request) int {
|
||||
rangeStr := req.Header.Get("Range")
|
||||
|
||||
var r io.ReadCloser
|
||||
@@ -140,21 +150,21 @@ func readTableFile(logger func(string), path string, respWr http.ResponseWriter,
|
||||
var fileErr error
|
||||
{
|
||||
if rangeStr == "" {
|
||||
logger("going to read entire file")
|
||||
logger.Println("going to read entire file")
|
||||
r, readSize, fileErr = getFileReader(path)
|
||||
} else {
|
||||
offset, length, err := offsetAndLenFromRange(rangeStr)
|
||||
if err != nil {
|
||||
logger(err.Error())
|
||||
logger.Println(err.Error())
|
||||
return http.StatusBadRequest
|
||||
}
|
||||
logger(fmt.Sprintf("going to read file at offset %d, length %d", offset, length))
|
||||
logger.Printf("going to read file at offset %d, length %d", offset, length)
|
||||
readSize = length
|
||||
r, fileErr = getFileReaderAt(path, offset, length)
|
||||
}
|
||||
}
|
||||
if fileErr != nil {
|
||||
logger(fileErr.Error())
|
||||
logger.Println(fileErr.Error())
|
||||
if errors.Is(fileErr, os.ErrNotExist) {
|
||||
return http.StatusNotFound
|
||||
} else if errors.Is(fileErr, ErrReadOutOfBounds) {
|
||||
@@ -166,24 +176,24 @@ func readTableFile(logger func(string), path string, respWr http.ResponseWriter,
|
||||
err := r.Close()
|
||||
if err != nil {
|
||||
err = fmt.Errorf("failed to close file at path %s: %w", path, err)
|
||||
logger(err.Error())
|
||||
logger.Println(err.Error())
|
||||
}
|
||||
}()
|
||||
|
||||
logger(fmt.Sprintf("opened file at path %s, going to read %d bytes", path, readSize))
|
||||
logger.Printf("opened file at path %s, going to read %d bytes", path, readSize)
|
||||
|
||||
n, err := io.Copy(respWr, r)
|
||||
if err != nil {
|
||||
err = fmt.Errorf("failed to write data to response writer: %w", err)
|
||||
logger(err.Error())
|
||||
logger.Println(err.Error())
|
||||
return http.StatusInternalServerError
|
||||
}
|
||||
if n != readSize {
|
||||
logger(fmt.Sprintf("wanted to write %d bytes from file (%s) but only wrote %d", readSize, path, n))
|
||||
logger.Printf("wanted to write %d bytes from file (%s) but only wrote %d", readSize, path, n)
|
||||
return http.StatusInternalServerError
|
||||
}
|
||||
|
||||
logger(fmt.Sprintf("wrote %d bytes", n))
|
||||
logger.Printf("wrote %d bytes", n)
|
||||
|
||||
return -1
|
||||
}
|
||||
@@ -223,25 +233,25 @@ func (u *uploadreader) Close() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func writeTableFile(ctx context.Context, logger func(string), dbCache DBCache, expectedFiles fileDetails, org, repo, fileId string, request *http.Request) int {
|
||||
func writeTableFile(ctx context.Context, logger *logrus.Entry, dbCache DBCache, expectedFiles fileDetails, org, repo, fileId string, request *http.Request) int {
|
||||
_, ok := hash.MaybeParse(fileId)
|
||||
|
||||
if !ok {
|
||||
logger(fileId + " is not a valid hash")
|
||||
logger.Println(fileId, "is not a valid hash")
|
||||
return http.StatusBadRequest
|
||||
}
|
||||
|
||||
tfd, ok := expectedFiles.Get(fileId)
|
||||
if !ok {
|
||||
logger("bad request for " + fileId + ": tfd not found")
|
||||
logger.Println("bad request for ", fileId, ": tfd not found")
|
||||
return http.StatusBadRequest
|
||||
}
|
||||
|
||||
logger(fileId + " is valid")
|
||||
logger.Println(fileId, "is valid")
|
||||
|
||||
cs, err := dbCache.Get(org, repo, types.Format_Default.VersionString())
|
||||
if err != nil {
|
||||
logger("failed to get " + org + "/" + repo + " repository: " + err.Error())
|
||||
logger.Println("failed to get", org+"/"+repo, "repository:", err.Error())
|
||||
return http.StatusInternalServerError
|
||||
}
|
||||
|
||||
@@ -259,35 +269,20 @@ func writeTableFile(ctx context.Context, logger func(string), dbCache DBCache, e
|
||||
|
||||
if err != nil {
|
||||
if errors.Is(err, errBodyLengthTFDMismatch) {
|
||||
logger("bad write file request for " + fileId + ": body length mismatch")
|
||||
logger.Println("bad write file request for", fileId, ": body length mismatch")
|
||||
return http.StatusBadRequest
|
||||
}
|
||||
if errors.Is(err, errBodyHashTFDMismatch) {
|
||||
logger("bad write file request for " + fileId + ": body hash mismatch")
|
||||
logger.Println("bad write file request for", fileId, ": body hash mismatch")
|
||||
return http.StatusBadRequest
|
||||
}
|
||||
logger("failed to read body " + err.Error())
|
||||
logger.Println("failed to read body", err.Error())
|
||||
return http.StatusInternalServerError
|
||||
}
|
||||
|
||||
return http.StatusOK
|
||||
}
|
||||
|
||||
func writeLocal(logger func(string), org, repo, fileId string, data []byte) error {
|
||||
path := filepath.Join(org, repo, fileId)
|
||||
|
||||
err := os.WriteFile(path, data, os.ModePerm)
|
||||
|
||||
if err != nil {
|
||||
logger(fmt.Sprintf("failed to write file %s", path))
|
||||
return err
|
||||
}
|
||||
|
||||
logger("Successfully wrote object to storage")
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func offsetAndLenFromRange(rngStr string) (int64, int64, error) {
|
||||
if rngStr == "" {
|
||||
return -1, -1, nil
|
||||
|
||||
@@ -17,12 +17,12 @@ package remotesrv
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"log"
|
||||
"net"
|
||||
"net/http"
|
||||
"strings"
|
||||
"sync"
|
||||
|
||||
"github.com/sirupsen/logrus"
|
||||
"golang.org/x/net/http2"
|
||||
"golang.org/x/net/http2/h2c"
|
||||
"google.golang.org/grpc"
|
||||
@@ -46,7 +46,7 @@ func (s *Server) GracefulStop() {
|
||||
s.wg.Wait()
|
||||
}
|
||||
|
||||
func NewServer(httpHost string, httpPort, grpcPort int, fs filesys.Filesys, dbCache DBCache, readOnly bool) *Server {
|
||||
func NewServer(lgr *logrus.Entry, httpHost string, httpPort, grpcPort int, fs filesys.Filesys, dbCache DBCache, readOnly bool) *Server {
|
||||
s := new(Server)
|
||||
s.stopChan = make(chan struct{})
|
||||
|
||||
@@ -55,13 +55,13 @@ func NewServer(httpHost string, httpPort, grpcPort int, fs filesys.Filesys, dbCa
|
||||
s.wg.Add(2)
|
||||
s.grpcPort = grpcPort
|
||||
s.grpcSrv = grpc.NewServer(grpc.MaxRecvMsgSize(128 * 1024 * 1024))
|
||||
var chnkSt remotesapi.ChunkStoreServiceServer = NewHttpFSBackedChunkStore(httpHost, dbCache, expectedFiles, fs)
|
||||
var chnkSt remotesapi.ChunkStoreServiceServer = NewHttpFSBackedChunkStore(lgr, httpHost, dbCache, expectedFiles, fs)
|
||||
if readOnly {
|
||||
chnkSt = ReadOnlyChunkStore{chnkSt}
|
||||
}
|
||||
remotesapi.RegisterChunkStoreServiceServer(s.grpcSrv, chnkSt)
|
||||
|
||||
var handler http.Handler = newFileHandler(dbCache, expectedFiles, fs, readOnly)
|
||||
var handler http.Handler = newFileHandler(lgr, dbCache, expectedFiles, fs, readOnly)
|
||||
if httpPort == grpcPort {
|
||||
handler = grpcMultiplexHandler(s.grpcSrv, handler)
|
||||
} else {
|
||||
@@ -114,9 +114,9 @@ func (s *Server) Serve(listeners Listeners) {
|
||||
if listeners.grpc != nil {
|
||||
go func() {
|
||||
defer s.wg.Done()
|
||||
log.Println("Starting grpc server on port", s.grpcPort)
|
||||
logrus.Println("Starting grpc server on port", s.grpcPort)
|
||||
err := s.grpcSrv.Serve(listeners.grpc)
|
||||
log.Println("grpc server exited. error:", err)
|
||||
logrus.Println("grpc server exited. error:", err)
|
||||
}()
|
||||
go func() {
|
||||
defer s.wg.Done()
|
||||
@@ -127,9 +127,9 @@ func (s *Server) Serve(listeners Listeners) {
|
||||
|
||||
go func() {
|
||||
defer s.wg.Done()
|
||||
log.Println("Starting http server on port ", s.httpPort)
|
||||
logrus.Println("Starting http server on port", s.httpPort)
|
||||
err := s.httpSrv.Serve(listeners.http)
|
||||
log.Println("http server exited. exit error:", err)
|
||||
logrus.Println("http server exited. exit error:", err)
|
||||
}()
|
||||
go func() {
|
||||
defer s.wg.Done()
|
||||
|
||||
@@ -18,6 +18,7 @@ import (
|
||||
"errors"
|
||||
|
||||
"github.com/dolthub/go-mysql-server/sql"
|
||||
"github.com/sirupsen/logrus"
|
||||
|
||||
"github.com/dolthub/dolt/go/libraries/doltcore/doltdb"
|
||||
"github.com/dolthub/dolt/go/libraries/doltcore/remotesrv"
|
||||
@@ -50,7 +51,7 @@ func (s remotesrvStore) Get(org, repo, nbfVerStr string) (remotesrv.RemoteSrvSto
|
||||
return rss, nil
|
||||
}
|
||||
|
||||
func NewRemoteSrvServer(ctx *sql.Context, port int) *remotesrv.Server {
|
||||
func NewRemoteSrvServer(lgr *logrus.Entry, ctx *sql.Context, port int) *remotesrv.Server {
|
||||
sess := dsess.DSessFromSess(ctx.Session)
|
||||
return remotesrv.NewServer("", port, port, sess.Provider().FileSystem(), remotesrvStore{ctx}, true)
|
||||
return remotesrv.NewServer(lgr, "", port, port, sess.Provider().FileSystem(), remotesrvStore{ctx}, true)
|
||||
}
|
||||
|
||||
@@ -22,6 +22,8 @@ import (
|
||||
"os"
|
||||
"os/signal"
|
||||
|
||||
"github.com/sirupsen/logrus"
|
||||
|
||||
"github.com/dolthub/dolt/go/libraries/doltcore/doltdb"
|
||||
"github.com/dolthub/dolt/go/libraries/doltcore/env"
|
||||
"github.com/dolthub/dolt/go/libraries/doltcore/remotesrv"
|
||||
@@ -80,7 +82,7 @@ func main() {
|
||||
dbCache = NewLocalCSCache(fs)
|
||||
}
|
||||
|
||||
server := remotesrv.NewServer(*httpHostParam, *httpPortParam, *grpcPortParam, fs, dbCache, *readOnlyParam)
|
||||
server := remotesrv.NewServer(logrus.NewEntry(logrus.StandardLogger()), *httpHostParam, *httpPortParam, *grpcPortParam, fs, dbCache, *readOnlyParam)
|
||||
listeners, err := server.Listeners()
|
||||
if err != nil {
|
||||
log.Fatalf("error starting remotesrv Server listeners: %v\n", err)
|
||||
|
||||
Reference in New Issue
Block a user