Optimize backup process to use data streaming instead of dumping in memory

This commit is contained in:
Luis Eduardo Jeréz Girón
2024-08-02 19:45:43 -06:00
parent c992dfd8f2
commit d83048058f
4 changed files with 56 additions and 85 deletions
+42 -25
View File
@@ -1,10 +1,12 @@
package postgres
import (
"archive/zip"
"bytes"
"fmt"
"io"
"os/exec"
"github.com/eduardolat/pgbackweb/internal/util/fileutil"
"github.com/orsinium-labs/enum"
)
@@ -117,10 +119,10 @@ type DumpParams struct {
}
// Dump runs the pg_dump command with the given parameters. It returns the SQL
// dump as a byte slice.
// dump as an io.Reader.
func (Client) Dump(
version PGVersion, connString string, params ...DumpParams,
) ([]byte, error) {
) io.Reader {
pickedParams := DumpParams{}
if len(params) > 0 {
pickedParams = params[0]
@@ -146,35 +148,50 @@ func (Client) Dump(
args = append(args, "--no-comments")
}
errorBuffer := &bytes.Buffer{}
reader, writer := io.Pipe()
cmd := exec.Command(version.Value.pgDump, args...)
output, err := cmd.CombinedOutput()
if err != nil {
return nil, fmt.Errorf(
"error running pg_dump v%s: %s",
version.Value.version, output,
)
}
cmd.Stdout = writer
cmd.Stderr = errorBuffer
return output, nil
go func() {
defer writer.Close()
if err := cmd.Run(); err != nil {
writer.CloseWithError(fmt.Errorf(
"error running pg_dump v%s: %s",
version.Value.version, errorBuffer.String(),
))
}
}()
return reader
}
// DumpZip runs the pg_dump command with the given parameters and returns the
// ZIP-compressed SQL dump as a byte slice.
// ZIP-compressed SQL dump as an io.Reader.
func (c *Client) DumpZip(
version PGVersion, connString string, params ...DumpParams,
) ([]byte, error) {
dump, err := c.Dump(version, connString, params...)
if err != nil {
return nil, err
}
) io.Reader {
dumpReader := c.Dump(version, connString, params...)
reader, writer := io.Pipe()
output, err := fileutil.CreateZip([]fileutil.ZipFile{{
Name: "dump.sql",
Bytes: dump,
}})
if err != nil {
return nil, fmt.Errorf("error creating zip file: %w", err)
}
go func() {
defer writer.Close()
return output, nil
zipWriter := zip.NewWriter(writer)
defer zipWriter.Close()
fileWriter, err := zipWriter.Create("dump.sql")
if err != nil {
writer.CloseWithError(fmt.Errorf("error creating zip file: %w", err))
return
}
if _, err := io.Copy(fileWriter, dumpReader); err != nil {
writer.CloseWithError(fmt.Errorf("error writing to zip file: %w", err))
return
}
}()
return reader
}
+11 -10
View File
@@ -1,14 +1,15 @@
package s3
import (
"bytes"
"fmt"
"io"
"time"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/s3"
"github.com/aws/aws-sdk-go/service/s3/s3manager"
"github.com/eduardolat/pgbackweb/internal/util/strutil"
)
@@ -56,33 +57,33 @@ func (Client) Ping(
return nil
}
// Upload uploads a file to S3
// Upload uploads a file to S3 from a reader
func (Client) Upload(
accessKey, secretKey, region, endpoint, bucketName, key string,
fileContent []byte,
) (string, error) {
fileReader io.Reader,
) error {
s3Client, err := createS3Client(
accessKey, secretKey, region, endpoint,
)
if err != nil {
return "", err
return err
}
reader := bytes.NewReader(fileContent)
key = strutil.RemoveLeadingSlash(key)
contentType := strutil.GetContentTypeFromFileName(key)
_, err = s3Client.PutObject(&s3.PutObjectInput{
uploader := s3manager.NewUploaderWithClient(s3Client)
_, err = uploader.Upload(&s3manager.UploadInput{
Bucket: aws.String(bucketName),
Key: aws.String(key),
Body: aws.ReadSeekCloser(reader),
Body: fileReader,
ContentType: aws.String(contentType),
})
if err != nil {
return "", fmt.Errorf("failed to upload file to S3: %w", err)
return fmt.Errorf("failed to upload file to S3: %w", err)
}
return key, nil
return nil
}
// Delete deletes a file from S3
+3 -12
View File
@@ -86,7 +86,7 @@ func (s *Service) RunExecution(ctx context.Context, backupID uuid.UUID) error {
})
}
dumpBytes, err := s.ints.PGClient.DumpZip(
dumpReader := s.ints.PGClient.DumpZip(
pgVersion, back.DecryptedDatabaseConnectionString, postgres.DumpParams{
DataOnly: back.BackupOptDataOnly,
SchemaOnly: back.BackupOptSchemaOnly,
@@ -96,15 +96,6 @@ func (s *Service) RunExecution(ctx context.Context, backupID uuid.UUID) error {
NoComments: back.BackupOptNoComments,
},
)
if err != nil {
logError(err)
return updateExec(dbgen.ExecutionsServiceUpdateExecutionParams{
ID: ex.ID,
Status: sql.NullString{Valid: true, String: "failed"},
Message: sql.NullString{Valid: true, String: err.Error()},
FinishedAt: sql.NullTime{Valid: true, Time: time.Now()},
})
}
date := time.Now().Format(timeutil.LayoutSlashYYYYMMDD)
file := fmt.Sprintf(
@@ -114,10 +105,10 @@ func (s *Service) RunExecution(ctx context.Context, backupID uuid.UUID) error {
)
path := strutil.CreatePath(false, back.BackupDestDir, date, file)
_, err = s.ints.S3Client.Upload(
err = s.ints.S3Client.Upload(
back.DecryptedDestinationAccessKey, back.DecryptedDestinationSecretKey,
back.DestinationRegion, back.DestinationEndpoint, back.DestinationBucketName,
path, dumpBytes,
path, dumpReader,
)
if err != nil {
logError(err)
-38
View File
@@ -1,38 +0,0 @@
package fileutil
import (
"archive/zip"
"bytes"
)
// ZipFile represents a file to be added to a zip archive.
type ZipFile struct {
Name string
Bytes []byte
}
// CreateZip creates a zip file with the given files and
// returns the zip file as a byte slice or an error if
// something went wrong.
func CreateZip(files []ZipFile) ([]byte, error) {
buf := new(bytes.Buffer)
w := zip.NewWriter(buf)
for _, file := range files {
f, err := w.Create(file.Name)
if err != nil {
w.Close()
return nil, err
}
if _, err := f.Write(file.Bytes); err != nil {
w.Close()
return nil, err
}
}
if err := w.Close(); err != nil {
return nil, err
}
return buf.Bytes(), nil
}