265 lines
8.2 KiB
Go
265 lines
8.2 KiB
Go
|
package repository
|
||
|
|
||
|
import (
|
||
|
"code.30cm.net/digimon/app-cloudep-wallet-service/pkg/domain/entity"
|
||
|
"code.30cm.net/digimon/app-cloudep-wallet-service/pkg/domain/repository"
|
||
|
"code.30cm.net/digimon/app-cloudep-wallet-service/pkg/domain/wallet"
|
||
|
"context"
|
||
|
"database/sql"
|
||
|
"fmt"
|
||
|
"github.com/shopspring/decimal"
|
||
|
"gorm.io/gorm"
|
||
|
"gorm.io/gorm/clause"
|
||
|
"time"
|
||
|
)
|
||
|
|
||
|
// userWalletManager 代表一個使用者在某資產上的錢包服務,
|
||
|
// 負責讀取/寫入資料庫並在記憶體暫存變動
|
||
|
type userWalletManager struct {
|
||
|
db *gorm.DB
|
||
|
uid, asset string // 使用者識別碼, 資產代號 (如 BTC、ETH、TWD)
|
||
|
cacheWallets map[wallet.Types]entity.Wallet // 暫存各類型錢包
|
||
|
cacheOrderBal map[int64]decimal.Decimal // 暫存訂單後餘額
|
||
|
pendingTxs []entity.WalletTransaction // 暫存待落庫的錢包交易紀錄
|
||
|
}
|
||
|
|
||
|
// NewUserWalletManager 建立一個新的 UserWalletManager
|
||
|
func NewUserWalletManager(db *gorm.DB, uid, asset string) repository.UserWalletManager {
|
||
|
return &userWalletManager{
|
||
|
db: db,
|
||
|
uid: uid,
|
||
|
asset: asset,
|
||
|
cacheWallets: make(map[wallet.Types]entity.Wallet, len(wallet.AllTypes)),
|
||
|
cacheOrderBal: make(map[int64]decimal.Decimal),
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// InitWallets 為新使用者建立所有類型錢包,並寫入資料庫與本地緩存
|
||
|
func (repo *userWalletManager) InitWallets(ctx context.Context, brand string) ([]entity.Wallet, error) {
|
||
|
ws := make([]entity.Wallet, 0, len(wallet.AllTypes))
|
||
|
for _, t := range wallet.AllTypes {
|
||
|
ws = append(ws,
|
||
|
entity.Wallet{
|
||
|
Brand: brand,
|
||
|
UID: repo.uid,
|
||
|
Asset: repo.asset,
|
||
|
Balance: decimal.Zero,
|
||
|
Type: t,
|
||
|
})
|
||
|
}
|
||
|
|
||
|
if err := repo.db.WithContext(ctx).Create(&ws).Error; err != nil {
|
||
|
return nil, err
|
||
|
}
|
||
|
|
||
|
for _, w := range ws {
|
||
|
repo.cacheWallets[w.Type] = w
|
||
|
}
|
||
|
|
||
|
return ws, nil
|
||
|
}
|
||
|
|
||
|
// FetchAllWallets 查詢資料庫所有錢包類型餘額,不鎖表,並更新本地緩存
|
||
|
func (repo *userWalletManager) FetchAllWallets(ctx context.Context) ([]entity.Wallet, error) {
|
||
|
out := make([]entity.Wallet, 0, len(wallet.AllTypes))
|
||
|
|
||
|
if err := repo.db.WithContext(ctx).
|
||
|
Select("id, asset, balance, type").
|
||
|
Where("uid = ? AND asset = ?", repo.uid, repo.asset).
|
||
|
Find(&out).Error; err != nil {
|
||
|
return nil, err
|
||
|
}
|
||
|
for _, w := range out {
|
||
|
repo.cacheWallets[w.Type] = w
|
||
|
}
|
||
|
|
||
|
return out, nil
|
||
|
}
|
||
|
|
||
|
// FetchWalletsByTypes 查詢指定類型錢包餘額,不鎖表
|
||
|
func (repo *userWalletManager) FetchWalletsByTypes(ctx context.Context, kinds []wallet.Types) ([]entity.Wallet, error) {
|
||
|
out := make([]entity.Wallet, 0, len(wallet.AllTypes))
|
||
|
if err := repo.db.WithContext(ctx).
|
||
|
Select("id, asset, balance, type").
|
||
|
Where("uid = ? AND asset = ?", repo.uid, repo.asset).
|
||
|
Where("type IN ?", kinds).
|
||
|
Find(&out).Error; err != nil {
|
||
|
return nil, translateNotFound(err)
|
||
|
}
|
||
|
for _, w := range out {
|
||
|
repo.cacheWallets[w.Type] = w
|
||
|
}
|
||
|
|
||
|
return out, nil
|
||
|
}
|
||
|
|
||
|
// LockWalletsByTypes 查詢並鎖定指定類型錢包 (FOR UPDATE)
|
||
|
func (repo *userWalletManager) LockWalletsByTypes(ctx context.Context, kinds []wallet.Types) ([]entity.Wallet, error) {
|
||
|
out := make([]entity.Wallet, 0, len(wallet.AllTypes))
|
||
|
|
||
|
if err := repo.db.WithContext(ctx).
|
||
|
Where("uid = ? AND asset = ?", repo.uid, repo.asset).
|
||
|
Where("type IN ?", kinds).
|
||
|
Clauses(clause.Locking{Strength: "UPDATE"}).
|
||
|
Find(&out).Error; err != nil {
|
||
|
return nil, translateNotFound(err)
|
||
|
}
|
||
|
for _, w := range out {
|
||
|
repo.cacheWallets[w.Type] = w
|
||
|
}
|
||
|
|
||
|
return out, nil
|
||
|
}
|
||
|
|
||
|
// GetCachedBalance 從本地緩存讀取指定類型錢包餘額
|
||
|
func (repo *userWalletManager) GetCachedBalance(kind wallet.Types) decimal.Decimal {
|
||
|
if w, ok := repo.cacheWallets[kind]; ok {
|
||
|
return w.Balance
|
||
|
}
|
||
|
|
||
|
return decimal.Zero
|
||
|
}
|
||
|
|
||
|
// CreditWallet 增加本地緩存錢包餘額,並記錄一筆 WalletTransaction
|
||
|
func (repo *userWalletManager) CreditWallet(kind wallet.Types, orderID string, amount decimal.Decimal) error {
|
||
|
w, ok := repo.cacheWallets[kind]
|
||
|
if !ok {
|
||
|
return repository.ErrRecordNotFound
|
||
|
}
|
||
|
|
||
|
w.Balance = w.Balance.Add(amount)
|
||
|
if w.Balance.LessThan(decimal.Zero) {
|
||
|
return repository.ErrBalanceInsufficient
|
||
|
}
|
||
|
repo.cacheWallets[kind] = w
|
||
|
repo.pendingTxs = append(repo.pendingTxs, entity.WalletTransaction{
|
||
|
OrderID: orderID,
|
||
|
UID: repo.uid,
|
||
|
WalletType: kind,
|
||
|
Asset: repo.asset,
|
||
|
Amount: amount,
|
||
|
Balance: w.Balance,
|
||
|
})
|
||
|
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
// DebitWallet 扣減本地緩存錢包餘額,並記錄一筆 WalletTransaction
|
||
|
func (repo *userWalletManager) DebitWallet(kind wallet.Types, orderID string, amount decimal.Decimal) error {
|
||
|
return repo.CreditWallet(kind, orderID, amount.Neg())
|
||
|
}
|
||
|
|
||
|
// BuildWalletTransactions 填充所有暫存 WalletTransaction 的共用欄位,回傳可落庫之切片
|
||
|
func (repo *userWalletManager) BuildWalletTransactions(txID int64, brand string, biz wallet.BusinessName) []entity.WalletTransaction {
|
||
|
for i := range repo.pendingTxs {
|
||
|
repo.pendingTxs[i].TransactionID = txID
|
||
|
repo.pendingTxs[i].Brand = brand
|
||
|
repo.pendingTxs[i].BusinessType = biz.ToInt8()
|
||
|
}
|
||
|
return repo.pendingTxs
|
||
|
}
|
||
|
|
||
|
// CommitWalletBalances 將本地緩存的最終錢包餘額一次性寫回 wallet 表
|
||
|
func (repo *userWalletManager) CommitWalletBalances(ctx context.Context) error {
|
||
|
return repo.db.Transaction(func(tx *gorm.DB) error {
|
||
|
for _, w := range repo.cacheWallets {
|
||
|
if err := tx.WithContext(ctx).
|
||
|
Model(&entity.Wallet{}).
|
||
|
Where("id = ?", w.ID).
|
||
|
UpdateColumns(map[string]interface{}{
|
||
|
"balance": w.Balance,
|
||
|
"update_at": time.Now().Unix(),
|
||
|
}).Error; err != nil {
|
||
|
|
||
|
return fmt.Errorf("更新錢包 %d 失敗: %w", w.ID, err)
|
||
|
}
|
||
|
}
|
||
|
|
||
|
return nil
|
||
|
}, &sql.TxOptions{Isolation: sql.LevelReadCommitted})
|
||
|
}
|
||
|
|
||
|
// FetchOrderTx 從 transaction 表讀取一筆訂單交易,並緩存其後餘額
|
||
|
func (repo *userWalletManager) FetchOrderTx(ctx context.Context, orderID string) (entity.Transaction, error) {
|
||
|
var t entity.Transaction
|
||
|
if err := repo.db.WithContext(ctx).
|
||
|
Where("order_id = ?", orderID).
|
||
|
Take(&t).Error; err != nil {
|
||
|
return t, translateNotFound(err)
|
||
|
}
|
||
|
repo.cacheOrderBal[t.ID] = t.PostTransferBalance
|
||
|
|
||
|
return t, nil
|
||
|
}
|
||
|
|
||
|
// LockOrderTx 同 FetchOrderTx 但加上 FOR UPDATE
|
||
|
func (repo *userWalletManager) LockOrderTx(ctx context.Context, orderID string) (entity.Transaction, error) {
|
||
|
var t entity.Transaction
|
||
|
|
||
|
if err := repo.db.WithContext(ctx).
|
||
|
Where("order_id = ?", orderID).
|
||
|
Clauses(clause.Locking{Strength: "UPDATE"}).
|
||
|
Take(&t).Error; err != nil {
|
||
|
|
||
|
return t, translateNotFound(err)
|
||
|
}
|
||
|
repo.cacheOrderBal[t.ID] = t.PostTransferBalance
|
||
|
|
||
|
return t, nil
|
||
|
}
|
||
|
|
||
|
// CreditOrderBalance 增加本地緩存的訂單後餘額
|
||
|
func (repo *userWalletManager) CreditOrderBalance(txID int64, amount decimal.Decimal) error {
|
||
|
b, ok := repo.cacheOrderBal[txID]
|
||
|
if !ok {
|
||
|
return repository.ErrRecordNotFound
|
||
|
}
|
||
|
nb := b.Add(amount)
|
||
|
if nb.LessThan(decimal.Zero) {
|
||
|
return repository.ErrBalanceInsufficient
|
||
|
}
|
||
|
repo.cacheOrderBal[txID] = nb
|
||
|
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
// DebitOrderBalance 扣減本地緩存的訂單後餘額
|
||
|
func (repo *userWalletManager) DebitOrderBalance(txID int64, amount decimal.Decimal) error {
|
||
|
return repo.CreditOrderBalance(txID, amount.Neg())
|
||
|
}
|
||
|
|
||
|
// CommitOrderBalances 將本地暫存的訂單後餘額寫回 transaction.post_transfer_balance
|
||
|
func (repo *userWalletManager) CommitOrderBalances(ctx context.Context) error {
|
||
|
return repo.db.Transaction(func(tx *gorm.DB) error {
|
||
|
for id, bal := range repo.cacheOrderBal {
|
||
|
if err := tx.WithContext(ctx).
|
||
|
Model(&entity.Transaction{}).
|
||
|
Where("id = ?", id).
|
||
|
UpdateColumn("post_transfer_balance", bal).Error; err != nil {
|
||
|
return fmt.Errorf("更新訂單 %d 後餘額失敗: %w", id, err)
|
||
|
}
|
||
|
}
|
||
|
return nil
|
||
|
}, &sql.TxOptions{Isolation: sql.LevelReadCommitted})
|
||
|
}
|
||
|
|
||
|
// HasAvailableWallet 檢查是否已有「可用餘額」類型的錢包
|
||
|
func (repo *userWalletManager) HasAvailableWallet(ctx context.Context) (bool, error) {
|
||
|
var ok bool
|
||
|
|
||
|
err := repo.db.WithContext(ctx).
|
||
|
Model(&entity.Wallet{}).
|
||
|
Select("1").
|
||
|
Where("uid = ? AND asset = ? AND type = ?", repo.uid, repo.asset, wallet.TypeAvailable).
|
||
|
Limit(1).
|
||
|
Scan(&ok).Error
|
||
|
return ok, err
|
||
|
}
|
||
|
|
||
|
// Reset 清空本地所有緩存
|
||
|
func (repo *userWalletManager) Reset() {
|
||
|
repo.cacheWallets = make(map[wallet.Types]entity.Wallet, len(wallet.AllTypes))
|
||
|
repo.cacheOrderBal = make(map[int64]decimal.Decimal)
|
||
|
repo.pendingTxs = nil
|
||
|
}
|