backend/pkg/fileStorage/repository/aws_s3.go

248 lines
6.6 KiB
Go
Raw Normal View History

2025-11-12 06:50:35 +00:00
package repository
import (
"backend/pkg/fileStorage/config"
s3Storage "backend/pkg/fileStorage/domain/aws"
"backend/pkg/fileStorage/domain/repository"
"backend/pkg/fileStorage/utils"
errs "backend/pkg/library/errors"
"bytes"
"context"
"fmt"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/s3"
"github.com/aws/aws-sdk-go/service/s3/s3manager"
"io"
"net/http"
"os"
"path"
"strings"
"time"
)
type AwsS3FileStorageRepositoryParam struct {
Conf *config.Config
Logger errs.Logger
}
type AwsS3FileStorageRepository struct {
AwsS3FileStorageRepositoryParam
}
func MustAwsS3FileStorageRepo(param AwsS3FileStorageRepositoryParam) repository.FileStorageRepository {
return &AwsS3FileStorageRepository{
param,
}
}
// 每次上傳都用一個新的 Session
func (repo *AwsS3FileStorageRepository) getAwsSession() (*session.Session, error) {
conf := &aws.Config{
Region: aws.String(repo.Conf.AmazonS3Settings.Region),
Credentials: credentials.NewStaticCredentials(
repo.Conf.AmazonS3Settings.AccessKey,
repo.Conf.AmazonS3Settings.SecretKey,
"",
),
}
awsSession, err := session.NewSession(conf)
if err != nil {
return nil, err
}
return awsSession, nil
}
func (repo *AwsS3FileStorageRepository) Move(_ context.Context, objectPath string, destinationPath string) error {
awsSession, _ := repo.getAwsSession()
svc := s3.New(awsSession)
copyObjectInput := &s3.CopyObjectInput{
Key: aws.String(destinationPath),
Bucket: aws.String(repo.Conf.AmazonS3Settings.Bucket),
CopySource: aws.String(path.Join(repo.Conf.AmazonS3Settings.Bucket, objectPath)),
}
deleteObjectInput := &s3.DeleteObjectInput{
Bucket: aws.String(repo.Conf.AmazonS3Settings.Bucket),
Key: aws.String(objectPath),
}
if _, err := svc.CopyObject(copyObjectInput); err != nil {
return err
}
if _, err := svc.DeleteObject(deleteObjectInput); err != nil {
return err
}
return nil
}
func (repo *AwsS3FileStorageRepository) Delete(_ context.Context, objectPath string) error {
awsSession, _ := repo.getAwsSession()
svc := s3.New(awsSession)
deleteObjectInput := &s3.DeleteObjectInput{
Bucket: aws.String(repo.Conf.AmazonS3Settings.Bucket),
Key: aws.String(objectPath),
}
deleteObjOutput, err := svc.DeleteObject(deleteObjectInput)
if err != nil {
return err
}
if deleteObjOutput.DeleteMarker != nil && deleteObjOutput.VersionId != nil {
repo.Logger.Info(fmt.Sprintf("s3 - delete object, delete marker: %t, VersionId: %s", *deleteObjOutput.DeleteMarker, *deleteObjOutput.VersionId))
}
return nil
}
func (repo *AwsS3FileStorageRepository) Exists(_ context.Context, objectPath string) (bool, error) {
awsSession, _ := repo.getAwsSession()
downloader := s3.New(awsSession)
_, err := downloader.HeadObject(&s3.HeadObjectInput{
Bucket: aws.String(repo.Conf.AmazonS3Settings.Bucket),
Key: aws.String(objectPath),
})
if err != nil {
return false, err
}
return true, nil
}
func (repo *AwsS3FileStorageRepository) DeleteDirectory(_ context.Context, directoryPath string) error {
if strings.HasPrefix(directoryPath, "/") {
directoryPath = directoryPath[1:]
}
awsSession, _ := repo.getAwsSession()
svc := s3.New(awsSession)
listObjectsInput := &s3.ListObjectsV2Input{
Bucket: aws.String(repo.Conf.AmazonS3Settings.Bucket),
Prefix: aws.String(directoryPath),
}
listObjectsOutput, err := svc.ListObjectsV2(listObjectsInput)
if err != nil {
return err
}
for _, object := range listObjectsOutput.Contents {
deleteObjectInput := &s3.DeleteObjectInput{
Bucket: aws.String(repo.Conf.AmazonS3Settings.Bucket),
Key: object.Key,
}
go func(inp *s3.DeleteObjectInput) {
_, _ = svc.DeleteObject(inp)
}(deleteObjectInput)
}
return nil
}
func (repo *AwsS3FileStorageRepository) UploadWithTTL(_ context.Context, content io.Reader, objectPath string, expires *time.Time) error {
awsSession, _ := repo.getAwsSession()
uploader := s3manager.NewUploader(awsSession)
_, err := uploader.Upload(&s3manager.UploadInput{
Bucket: aws.String(repo.Conf.AmazonS3Settings.Bucket),
Key: aws.String(objectPath),
Body: content,
ACL: aws.String(s3Storage.S3AclSetting),
ContentDisposition: aws.String("attachment"),
Expires: expires,
})
if err != nil {
return err
}
return nil
}
func (repo *AwsS3FileStorageRepository) UploadFromData(_ context.Context, data []byte, objectPath string, contentType string) error {
reader := bytes.NewReader(data)
awsSession, _ := repo.getAwsSession()
uploader := s3manager.NewUploader(awsSession)
_, err := uploader.Upload(&s3manager.UploadInput{
Bucket: aws.String(repo.Conf.AmazonS3Settings.Bucket),
Key: aws.String(objectPath),
Body: reader,
ACL: aws.String(s3Storage.S3AclSetting),
ContentType: aws.String(contentType),
})
if err != nil {
return err
}
return nil
}
func (repo *AwsS3FileStorageRepository) UploadFromPath(_ context.Context, localFilePath string, objectPath string) error {
file, err := os.Open(localFilePath)
if err != nil {
return err
}
defer file.Close()
fileInfo, _ := file.Stat()
buffer := make([]byte, fileInfo.Size())
_, _ = file.Read(buffer)
awsSession, err := repo.getAwsSession()
if err != nil {
return err
}
uploader := s3manager.NewUploader(awsSession)
_, err = uploader.Upload(&s3manager.UploadInput{
Bucket: aws.String(repo.Conf.AmazonS3Settings.Bucket),
Key: aws.String(objectPath),
Body: bytes.NewReader(buffer),
ContentType: aws.String(http.DetectContentType(buffer)),
ACL: aws.String(s3Storage.S3AclSetting),
ContentDisposition: aws.String("attachment"),
})
if err != nil {
return err
}
return nil
}
func (repo *AwsS3FileStorageRepository) GetPublicURL(_ context.Context, objectPath string) string {
dirPart := strings.Split(objectPath, string(os.PathSeparator))
return utils.URLJoin(repo.Conf.AmazonS3Settings.CloudFrontURI, dirPart...)
}
func (repo *AwsS3FileStorageRepository) Download(_ context.Context, objectPath string) ([]byte, error) {
awsSession, _ := repo.getAwsSession()
downloader := s3manager.NewDownloader(awsSession, func(d *s3manager.Downloader) {
d.PartSize = 64 * 1024 * 1024 // 每部分 64MB
d.Concurrency = 4
d.BufferProvider = s3manager.NewPooledBufferedWriterReadFromProvider(1024 * 1024 * 8)
})
buff := &aws.WriteAtBuffer{}
_, err := downloader.Download(buff, &s3.GetObjectInput{
Bucket: aws.String(repo.Conf.AmazonS3Settings.Bucket),
Key: aws.String(objectPath),
})
data := buff.Bytes()
if err != nil {
return nil, err
}
return data, nil
}