Files
phylum/internal/storage/minio_storage.go
T
2024-03-18 10:50:13 +05:30

112 lines
2.6 KiB
Go

package storage
import (
"context"
"crypto/md5"
"encoding/hex"
"errors"
"fmt"
"hash"
"io"
"os"
"path/filepath"
"github.com/google/uuid"
"github.com/minio/minio-go/v7"
"github.com/minio/minio-go/v7/pkg/credentials"
)
type minioStorage struct {
client *minio.Client
tmp string
bucketName string
prefix string
}
func newMinioStorage(
endpoint,
keyId,
accessKey,
tmp,
bucketName,
prefix string) (Storage, error) {
if endpoint == "" {
return nil, errors.New("minio endpoint not provided")
}
if keyId == "" {
return nil, errors.New("minio key id not provided")
}
if accessKey == "" {
return nil, errors.New("minio access key not provided")
}
if tmp == "" {
return nil, errors.New("minio tmp not provided")
}
if bucketName == "" {
return nil, errors.New("minio bucket name not provided")
}
err := os.MkdirAll(tmp, 0750)
if err != nil {
return nil, err
}
client, err := minio.New(endpoint, &minio.Options{
Creds: credentials.NewStaticV4(keyId, accessKey, ""),
Secure: true,
})
if err != nil {
return nil, err
}
return minioStorage{client: client, tmp: tmp, bucketName: bucketName, prefix: prefix}, nil
}
func (s minioStorage) OpenRead(id uuid.UUID, start, length int64) (io.ReadCloser, error) {
// TODO: Range
return s.client.GetObject(context.Background(), s.bucketName, s.prefix+id.String(), minio.GetObjectOptions{})
}
func (s minioStorage) OpenWrite(id uuid.UUID, callback func(int, string) error) (io.WriteCloser, error) {
path := filepath.Join(s.tmp, id.String())
file, err := os.OpenFile(path, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0640)
if err != nil {
return nil, err
}
return &hasher{dest: file, sum: md5.New(), closeCallback: func(len int, sum hash.Hash, err error) error {
if err != nil {
return err
}
s.client.FPutObject(context.Background(), s.bucketName, s.prefix+id.String(), path, minio.PutObjectOptions{Progress: &progressReader{len: len, callback: func() {
os.Remove(path)
}}})
etag := hex.EncodeToString(sum.Sum(nil))
return callback(len, etag)
}}, nil
}
func (s minioStorage) Delete(id uuid.UUID) error {
err := s.client.RemoveObject(context.Background(), s.bucketName, s.prefix+id.String(), minio.RemoveObjectOptions{})
return err
}
func (l minioStorage) String() string {
return fmt.Sprintf("minio (endpoint: %s, bucket: %s)", l.client.EndpointURL(), l.bucketName)
}
type progressReader struct {
cur, len int
callback func()
}
func (r *progressReader) Read(p []byte) (n int, err error) {
max := r.len - r.cur
size := len(p)
if size > max {
size = max
}
r.cur += size
if r.cur == r.len && size == 0 {
r.callback()
}
return size, nil
}