287 lines
7.9 KiB
Go
287 lines
7.9 KiB
Go
|
package repository
|
||
|
|
||
|
import (
|
||
|
"code.30cm.net/digimon/app-cloudep-product-service/pkg/domain"
|
||
|
"code.30cm.net/digimon/app-cloudep-product-service/pkg/domain/entity"
|
||
|
"code.30cm.net/digimon/app-cloudep-product-service/pkg/domain/repository"
|
||
|
mgo "code.30cm.net/digimon/library-go/mongo"
|
||
|
"context"
|
||
|
"errors"
|
||
|
"fmt"
|
||
|
"github.com/zeromicro/go-zero/core/stores/cache"
|
||
|
"github.com/zeromicro/go-zero/core/stores/mon"
|
||
|
"go.mongodb.org/mongo-driver/bson"
|
||
|
"go.mongodb.org/mongo-driver/bson/primitive"
|
||
|
"go.mongodb.org/mongo-driver/mongo"
|
||
|
"go.mongodb.org/mongo-driver/mongo/options"
|
||
|
"time"
|
||
|
)
|
||
|
|
||
|
type ProductStatisticsRepositoryParam struct {
|
||
|
Conf *mgo.Conf
|
||
|
CacheConf cache.CacheConf
|
||
|
DBOpts []mon.Option
|
||
|
CacheOpts []cache.Option
|
||
|
}
|
||
|
|
||
|
type ProductStatisticsRepository struct {
|
||
|
DB mgo.DocumentDBWithCacheUseCase
|
||
|
}
|
||
|
|
||
|
func NewProductStatisticsRepository(param ProductStatisticsRepositoryParam) repository.ProductStatisticsRepo {
|
||
|
e := entity.ProductStatistics{}
|
||
|
documentDB, err := mgo.MustDocumentDBWithCache(
|
||
|
param.Conf,
|
||
|
e.CollectionName(),
|
||
|
param.CacheConf,
|
||
|
param.DBOpts,
|
||
|
param.CacheOpts,
|
||
|
)
|
||
|
if err != nil {
|
||
|
panic(err)
|
||
|
}
|
||
|
|
||
|
return &ProductStatisticsRepository{
|
||
|
DB: documentDB,
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func (repo *ProductStatisticsRepository) Create(ctx context.Context, data *entity.ProductStatistics) error {
|
||
|
if data.ID.IsZero() {
|
||
|
now := time.Now().UTC().UnixNano()
|
||
|
data.ID = primitive.NewObjectID()
|
||
|
data.CreatedAt = now
|
||
|
data.UpdatedAt = now
|
||
|
}
|
||
|
rk := domain.GetProductStatisticsRK(data.ID.Hex())
|
||
|
_, err := repo.DB.InsertOne(ctx, rk, data)
|
||
|
|
||
|
productKey := domain.GetProductStatisticsRK(data.ProductID)
|
||
|
_ = repo.DB.SetCache(productKey, data)
|
||
|
|
||
|
return err
|
||
|
}
|
||
|
|
||
|
func (repo *ProductStatisticsRepository) GetByID(ctx context.Context, id string) (*entity.ProductStatistics, error) {
|
||
|
oid, err := primitive.ObjectIDFromHex(id)
|
||
|
if err != nil {
|
||
|
return nil, err
|
||
|
}
|
||
|
|
||
|
var result *entity.ProductStatistics
|
||
|
err = repo.DB.FindOne(ctx, domain.GetProductStatisticsRK(id), &result, bson.M{"_id": oid})
|
||
|
switch {
|
||
|
case err == nil:
|
||
|
return result, nil
|
||
|
case errors.Is(err, mon.ErrNotFound):
|
||
|
return nil, ErrNotFound
|
||
|
default:
|
||
|
return nil, err
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func (repo *ProductStatisticsRepository) GetByProductID(ctx context.Context, productID string) (*entity.ProductStatistics, error) {
|
||
|
var result *entity.ProductStatistics
|
||
|
err := repo.DB.FindOne(ctx, domain.GetProductStatisticsRK(productID), &result, bson.M{"product_id": productID})
|
||
|
switch {
|
||
|
case err == nil:
|
||
|
return result, nil
|
||
|
case errors.Is(err, mon.ErrNotFound):
|
||
|
return nil, ErrNotFound
|
||
|
default:
|
||
|
return nil, err
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func (repo *ProductStatisticsRepository) IncOrders(ctx context.Context, productID string, count int64) error {
|
||
|
filter := bson.M{"product_id": productID}
|
||
|
update := bson.M{"$inc": bson.M{"total_orders": count}}
|
||
|
|
||
|
rk := domain.GetProductStatisticsRK(productID)
|
||
|
_, err := repo.DB.UpdateOne(ctx, rk, filter, update)
|
||
|
if err != nil {
|
||
|
return fmt.Errorf("failed to decrease stock: %w", err)
|
||
|
}
|
||
|
|
||
|
id, err := repo.getIDByProductID(ctx, productID)
|
||
|
if err != nil {
|
||
|
return fmt.Errorf("failed to get product_id by id: %w", err)
|
||
|
}
|
||
|
|
||
|
repo.clearCache(ctx, id, productID)
|
||
|
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
func (repo *ProductStatisticsRepository) DecOrders(ctx context.Context, productID string, count int64) error {
|
||
|
filter := bson.M{"product_id": productID, "total_orders": bson.M{"$gte": count}}
|
||
|
update := bson.M{"$inc": bson.M{"total_orders": -count}}
|
||
|
|
||
|
rk := domain.GetProductStatisticsRK(productID)
|
||
|
_, err := repo.DB.UpdateOne(ctx, rk, filter, update)
|
||
|
if err != nil {
|
||
|
return fmt.Errorf("failed to decrease stock: %w", err)
|
||
|
}
|
||
|
|
||
|
id, err := repo.getIDByProductID(ctx, productID)
|
||
|
if err != nil {
|
||
|
return fmt.Errorf("failed to get product_id by id: %w", err)
|
||
|
}
|
||
|
|
||
|
repo.clearCache(ctx, id, productID)
|
||
|
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
func (repo *ProductStatisticsRepository) UpdateAverageRating(ctx context.Context, productID string, averageRating float64) error {
|
||
|
filter := bson.M{"product_id": productID}
|
||
|
now := time.Now().UnixNano()
|
||
|
update := bson.M{
|
||
|
"$set": bson.M{
|
||
|
"average_rating": averageRating,
|
||
|
"average_rating_time": now,
|
||
|
"updated_at": now,
|
||
|
},
|
||
|
}
|
||
|
|
||
|
_, err := repo.DB.UpdateOne(ctx, domain.GetProductStatisticsRK(productID), filter, update)
|
||
|
if err != nil {
|
||
|
return err
|
||
|
}
|
||
|
|
||
|
id, err := repo.getIDByProductID(ctx, productID)
|
||
|
if err != nil {
|
||
|
return fmt.Errorf("failed to get product_id by id: %w", err)
|
||
|
}
|
||
|
|
||
|
repo.clearCache(ctx, id, productID)
|
||
|
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
func (repo *ProductStatisticsRepository) IncFansCount(ctx context.Context, productID string, fansCount uint64) error {
|
||
|
filter := bson.M{"product_id": productID}
|
||
|
now := time.Now().UnixNano()
|
||
|
update := bson.M{
|
||
|
"$inc": bson.M{"fans_count": fansCount},
|
||
|
"$set": bson.M{
|
||
|
"fans_count_update_time": now,
|
||
|
"updated_at": now,
|
||
|
},
|
||
|
}
|
||
|
|
||
|
rk := domain.GetProductStatisticsRK(productID)
|
||
|
_, err := repo.DB.UpdateOne(ctx, rk, filter, update)
|
||
|
if err != nil {
|
||
|
return fmt.Errorf("failed to increment fans count: %w", err)
|
||
|
}
|
||
|
|
||
|
id, err := repo.getIDByProductID(ctx, productID)
|
||
|
if err != nil {
|
||
|
return fmt.Errorf("failed to get product_id by id: %w", err)
|
||
|
}
|
||
|
|
||
|
repo.clearCache(ctx, id, productID)
|
||
|
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
func (repo *ProductStatisticsRepository) DecFansCount(ctx context.Context, productID string, fansCount uint64) error {
|
||
|
// 只允許在 fans_count 大於或等於欲扣減值時進行扣減
|
||
|
filter := bson.M{"product_id": productID, "fans_count": bson.M{"$gte": fansCount}}
|
||
|
now := time.Now().UnixNano()
|
||
|
update := bson.M{
|
||
|
"$inc": bson.M{"fans_count": -int64(fansCount)},
|
||
|
"$set": bson.M{
|
||
|
"fans_count_update_time": now,
|
||
|
"updated_at": now,
|
||
|
},
|
||
|
}
|
||
|
|
||
|
rk := domain.GetProductStatisticsRK(productID)
|
||
|
_, err := repo.DB.UpdateOne(ctx, rk, filter, update)
|
||
|
if err != nil {
|
||
|
return fmt.Errorf("failed to decrement fans count: %w", err)
|
||
|
}
|
||
|
|
||
|
id, err := repo.getIDByProductID(ctx, productID)
|
||
|
if err != nil {
|
||
|
return fmt.Errorf("failed to get product_id by id: %w", err)
|
||
|
}
|
||
|
|
||
|
repo.clearCache(ctx, id, productID)
|
||
|
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
func (repo *ProductStatisticsRepository) Delete(ctx context.Context, id string) error {
|
||
|
oid, err := primitive.ObjectIDFromHex(id)
|
||
|
if err != nil {
|
||
|
return ErrInvalidObjectID
|
||
|
}
|
||
|
productID, err := repo.getProductIDByID(ctx, id)
|
||
|
if err != nil {
|
||
|
return fmt.Errorf("failed to get product_id by id: %w", err)
|
||
|
}
|
||
|
|
||
|
filter := bson.M{"_id": oid}
|
||
|
_, err = repo.DB.DeleteOne(ctx, domain.GetProductStatisticsRK(id), filter)
|
||
|
if err != nil {
|
||
|
return err
|
||
|
}
|
||
|
|
||
|
repo.clearCache(ctx, id, productID)
|
||
|
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
func (repo *ProductStatisticsRepository) Index20250317001UP(ctx context.Context) (*mongo.Cursor, error) {
|
||
|
// 等價於 db.account.createIndex({"product_id": 1})
|
||
|
repo.DB.PopulateIndex(ctx, "product_id", 1, true)
|
||
|
|
||
|
return repo.DB.GetClient().Indexes().List(ctx)
|
||
|
}
|
||
|
|
||
|
// 快取輔助函數
|
||
|
// clearCache 同時刪除 product_id 與 _id 兩個 cache key
|
||
|
func (repo *ProductStatisticsRepository) clearCache(ctx context.Context, id, productID string) {
|
||
|
keys := []string{
|
||
|
domain.GetProductStatisticsRK(productID),
|
||
|
domain.GetProductStatisticsRK(id),
|
||
|
}
|
||
|
for _, key := range keys {
|
||
|
_ = repo.DB.DelCache(ctx, key)
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func (repo *ProductStatisticsRepository) getIDByProductID(ctx context.Context, productID string) (string, error) {
|
||
|
filter := bson.M{"product_id": productID}
|
||
|
var e entity.ProductStatistics
|
||
|
projection := bson.M{"_id": 1}
|
||
|
opts := options.FindOne().SetProjection(projection)
|
||
|
err := repo.DB.GetClient().FindOne(ctx, &e, filter, opts)
|
||
|
if err != nil {
|
||
|
return "", fmt.Errorf("failed to set projection: %w", err)
|
||
|
}
|
||
|
|
||
|
return e.ID.Hex(), nil
|
||
|
}
|
||
|
|
||
|
func (repo *ProductStatisticsRepository) getProductIDByID(ctx context.Context, id string) (string, error) {
|
||
|
oid, err := primitive.ObjectIDFromHex(id)
|
||
|
if err != nil {
|
||
|
return "", err
|
||
|
}
|
||
|
|
||
|
filter := bson.M{"_id": oid}
|
||
|
var e entity.ProductStatistics
|
||
|
projection := bson.M{"product_id": 1}
|
||
|
opts := options.FindOne().SetProjection(projection)
|
||
|
err = repo.DB.GetClient().FindOne(ctx, &e, filter, opts)
|
||
|
if err != nil {
|
||
|
return "", fmt.Errorf("failed to set projection: %w", err)
|
||
|
}
|
||
|
|
||
|
return e.ProductID, nil
|
||
|
}
|