feat: add transaction
This commit is contained in:
parent
eacf85a532
commit
14082dc5f5
|
@ -0,0 +1,48 @@
|
||||||
|
package repository
|
||||||
|
|
||||||
|
import (
|
||||||
|
"code.30cm.net/digimon/app-cloudep-wallet-service/pkg/domain/entity"
|
||||||
|
"code.30cm.net/digimon/app-cloudep-wallet-service/pkg/domain/wallet"
|
||||||
|
"context"
|
||||||
|
"github.com/shopspring/decimal"
|
||||||
|
)
|
||||||
|
|
||||||
|
// UserWalletManager 負責單一使用者對單一資產的錢包操作:
|
||||||
|
// · 建立初始錢包
|
||||||
|
// · 查詢/鎖定餘額
|
||||||
|
// · 本地暫存變動(錢包層面與訂單層面)
|
||||||
|
// · 最後一次性寫回資料庫
|
||||||
|
type UserWalletManager interface {
|
||||||
|
// InitWallets 為新使用者建立所有類型錢包,並寫入資料庫與本地緩存
|
||||||
|
InitWallets(ctx context.Context, brand string) ([]entity.Wallet, error)
|
||||||
|
// FetchAllWallets 查詢資料庫所有錢包類型餘額,不鎖表,並更新本地緩存
|
||||||
|
FetchAllWallets(ctx context.Context) ([]entity.Wallet, error)
|
||||||
|
// FetchWalletsByTypes 查詢指定類型錢包餘額,不鎖表
|
||||||
|
FetchWalletsByTypes(ctx context.Context, kinds []wallet.Types) ([]entity.Wallet, error)
|
||||||
|
// LockWalletsByTypes 查詢並鎖定指定類型錢包 (FOR UPDATE)
|
||||||
|
LockWalletsByTypes(ctx context.Context, kinds []wallet.Types) ([]entity.Wallet, error)
|
||||||
|
// GetCachedBalance 從本地緩存讀取指定類型錢包餘額
|
||||||
|
GetCachedBalance(kind wallet.Types) decimal.Decimal
|
||||||
|
// CreditWallet 增加本地緩存錢包餘額,並記錄一筆 WalletTransaction
|
||||||
|
CreditWallet(kind wallet.Types, orderID string, amount decimal.Decimal) error
|
||||||
|
// DebitWallet 扣減本地緩存錢包餘額,並記錄一筆 WalletTransaction
|
||||||
|
DebitWallet(kind wallet.Types, orderID string, amount decimal.Decimal) error
|
||||||
|
// BuildWalletTransactions 填充所有暫存 WalletTransaction 的共用欄位,回傳可落庫之切片
|
||||||
|
BuildWalletTransactions(txID int64, brand string, biz wallet.BusinessName) []entity.WalletTransaction
|
||||||
|
// CommitWalletBalances 將本地緩存的最終錢包餘額一次性寫回 wallet 表
|
||||||
|
CommitWalletBalances(ctx context.Context) error
|
||||||
|
// FetchOrderTx 從 transaction 表讀取一筆訂單交易,並緩存其後餘額
|
||||||
|
FetchOrderTx(ctx context.Context, orderID string) (entity.Transaction, error)
|
||||||
|
// LockOrderTx 同 FetchOrderTx 但加上 FOR UPDATE
|
||||||
|
LockOrderTx(ctx context.Context, orderID string) (entity.Transaction, error)
|
||||||
|
// CreditOrderBalance 增加本地緩存的訂單後餘額
|
||||||
|
CreditOrderBalance(txID int64, amount decimal.Decimal) error
|
||||||
|
// DebitOrderBalance 扣減本地緩存的訂單後餘額
|
||||||
|
DebitOrderBalance(txID int64, amount decimal.Decimal) error
|
||||||
|
// CommitOrderBalances 將本地暫存的訂單後餘額寫回 transaction.post_transfer_balance
|
||||||
|
CommitOrderBalances(ctx context.Context) error
|
||||||
|
// HasAvailableWallet 檢查是否已有「可用餘額」類型的錢包
|
||||||
|
HasAvailableWallet(ctx context.Context) (bool, error)
|
||||||
|
// Reset 清空本地所有緩存
|
||||||
|
Reset()
|
||||||
|
}
|
|
@ -87,9 +87,9 @@ type UserWalletService interface {
|
||||||
// HasAvailableBalance 確認此使用者此資產是否已有可用餘額錢包
|
// HasAvailableBalance 確認此使用者此資產是否已有可用餘額錢包
|
||||||
HasAvailableBalance(ctx context.Context) (bool, error)
|
HasAvailableBalance(ctx context.Context) (bool, error)
|
||||||
// GetOrderBalance 查詢某筆交易(訂單),詳情寫入本地暫存
|
// GetOrderBalance 查詢某筆交易(訂單),詳情寫入本地暫存
|
||||||
GetOrderBalance(ctx context.Context, txID int64) (entity.Transaction, error)
|
GetOrderBalance(ctx context.Context, orderID string) (entity.Transaction, error)
|
||||||
// GetOrderBalanceForUpdate 查詢某筆交易(訂單),詳情寫入本地暫存 (FOR UPDATE)
|
// GetOrderBalanceForUpdate 查詢某筆交易(訂單),詳情寫入本地暫存 (FOR UPDATE)
|
||||||
GetOrderBalanceForUpdate(ctx context.Context, txID int64) (entity.Transaction, error)
|
GetOrderBalanceForUpdate(ctx context.Context, orderID string) (entity.Transaction, error)
|
||||||
// ClearCache 清空本地所有暫存
|
// ClearCache 清空本地所有暫存
|
||||||
ClearCache()
|
ClearCache()
|
||||||
}
|
}
|
||||||
|
|
|
@ -211,10 +211,10 @@ func (s *WalletService) HasAvailableBalance(ctx context.Context) (bool, error) {
|
||||||
return exists, nil
|
return exists, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *WalletService) GetOrderBalance(ctx context.Context, txID int64) (entity.Transaction, error) {
|
func (s *WalletService) GetOrderBalance(ctx context.Context, orderID string) (entity.Transaction, error) {
|
||||||
var t entity.Transaction
|
var t entity.Transaction
|
||||||
err := s.db.WithContext(ctx).
|
err := s.db.WithContext(ctx).
|
||||||
Where("id = ?", txID).
|
Where("order_id = ?", orderID).
|
||||||
Take(&t).Error
|
Take(&t).Error
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return entity.Transaction{}, translateNotFound(err)
|
return entity.Transaction{}, translateNotFound(err)
|
||||||
|
@ -224,10 +224,10 @@ func (s *WalletService) GetOrderBalance(ctx context.Context, txID int64) (entity
|
||||||
return t, nil
|
return t, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *WalletService) GetOrderBalanceForUpdate(ctx context.Context, txID int64) (entity.Transaction, error) {
|
func (s *WalletService) GetOrderBalanceForUpdate(ctx context.Context, orderID string) (entity.Transaction, error) {
|
||||||
var t entity.Transaction
|
var t entity.Transaction
|
||||||
err := s.db.WithContext(ctx).
|
err := s.db.WithContext(ctx).
|
||||||
Where("id = ?", txID).
|
Where("order_id = ?", orderID).
|
||||||
Clauses(clause.Locking{Strength: "UPDATE"}).
|
Clauses(clause.Locking{Strength: "UPDATE"}).
|
||||||
Take(&t).Error
|
Take(&t).Error
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -248,5 +248,6 @@ func translateNotFound(err error) error {
|
||||||
if errors.Is(err, gorm.ErrRecordNotFound) {
|
if errors.Is(err, gorm.ErrRecordNotFound) {
|
||||||
return repository.ErrRecordNotFound
|
return repository.ErrRecordNotFound
|
||||||
}
|
}
|
||||||
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,264 @@
|
||||||
|
package repository
|
||||||
|
|
||||||
|
import (
|
||||||
|
"code.30cm.net/digimon/app-cloudep-wallet-service/pkg/domain/entity"
|
||||||
|
"code.30cm.net/digimon/app-cloudep-wallet-service/pkg/domain/repository"
|
||||||
|
"code.30cm.net/digimon/app-cloudep-wallet-service/pkg/domain/wallet"
|
||||||
|
"context"
|
||||||
|
"database/sql"
|
||||||
|
"fmt"
|
||||||
|
"github.com/shopspring/decimal"
|
||||||
|
"gorm.io/gorm"
|
||||||
|
"gorm.io/gorm/clause"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
// userWalletManager 代表一個使用者在某資產上的錢包服務,
|
||||||
|
// 負責讀取/寫入資料庫並在記憶體暫存變動
|
||||||
|
type userWalletManager struct {
|
||||||
|
db *gorm.DB
|
||||||
|
uid, asset string // 使用者識別碼, 資產代號 (如 BTC、ETH、TWD)
|
||||||
|
cacheWallets map[wallet.Types]entity.Wallet // 暫存各類型錢包
|
||||||
|
cacheOrderBal map[int64]decimal.Decimal // 暫存訂單後餘額
|
||||||
|
pendingTxs []entity.WalletTransaction // 暫存待落庫的錢包交易紀錄
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewUserWalletManager 建立一個新的 UserWalletManager
|
||||||
|
func NewUserWalletManager(db *gorm.DB, uid, asset string) repository.UserWalletManager {
|
||||||
|
return &userWalletManager{
|
||||||
|
db: db,
|
||||||
|
uid: uid,
|
||||||
|
asset: asset,
|
||||||
|
cacheWallets: make(map[wallet.Types]entity.Wallet, len(wallet.AllTypes)),
|
||||||
|
cacheOrderBal: make(map[int64]decimal.Decimal),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// InitWallets 為新使用者建立所有類型錢包,並寫入資料庫與本地緩存
|
||||||
|
func (repo *userWalletManager) InitWallets(ctx context.Context, brand string) ([]entity.Wallet, error) {
|
||||||
|
ws := make([]entity.Wallet, 0, len(wallet.AllTypes))
|
||||||
|
for _, t := range wallet.AllTypes {
|
||||||
|
ws = append(ws,
|
||||||
|
entity.Wallet{
|
||||||
|
Brand: brand,
|
||||||
|
UID: repo.uid,
|
||||||
|
Asset: repo.asset,
|
||||||
|
Balance: decimal.Zero,
|
||||||
|
Type: t,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := repo.db.WithContext(ctx).Create(&ws).Error; err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, w := range ws {
|
||||||
|
repo.cacheWallets[w.Type] = w
|
||||||
|
}
|
||||||
|
|
||||||
|
return ws, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// FetchAllWallets 查詢資料庫所有錢包類型餘額,不鎖表,並更新本地緩存
|
||||||
|
func (repo *userWalletManager) FetchAllWallets(ctx context.Context) ([]entity.Wallet, error) {
|
||||||
|
out := make([]entity.Wallet, 0, len(wallet.AllTypes))
|
||||||
|
|
||||||
|
if err := repo.db.WithContext(ctx).
|
||||||
|
Select("id, asset, balance, type").
|
||||||
|
Where("uid = ? AND asset = ?", repo.uid, repo.asset).
|
||||||
|
Find(&out).Error; err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
for _, w := range out {
|
||||||
|
repo.cacheWallets[w.Type] = w
|
||||||
|
}
|
||||||
|
|
||||||
|
return out, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// FetchWalletsByTypes 查詢指定類型錢包餘額,不鎖表
|
||||||
|
func (repo *userWalletManager) FetchWalletsByTypes(ctx context.Context, kinds []wallet.Types) ([]entity.Wallet, error) {
|
||||||
|
out := make([]entity.Wallet, 0, len(wallet.AllTypes))
|
||||||
|
if err := repo.db.WithContext(ctx).
|
||||||
|
Select("id, asset, balance, type").
|
||||||
|
Where("uid = ? AND asset = ?", repo.uid, repo.asset).
|
||||||
|
Where("type IN ?", kinds).
|
||||||
|
Find(&out).Error; err != nil {
|
||||||
|
return nil, translateNotFound(err)
|
||||||
|
}
|
||||||
|
for _, w := range out {
|
||||||
|
repo.cacheWallets[w.Type] = w
|
||||||
|
}
|
||||||
|
|
||||||
|
return out, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// LockWalletsByTypes 查詢並鎖定指定類型錢包 (FOR UPDATE)
|
||||||
|
func (repo *userWalletManager) LockWalletsByTypes(ctx context.Context, kinds []wallet.Types) ([]entity.Wallet, error) {
|
||||||
|
out := make([]entity.Wallet, 0, len(wallet.AllTypes))
|
||||||
|
|
||||||
|
if err := repo.db.WithContext(ctx).
|
||||||
|
Where("uid = ? AND asset = ?", repo.uid, repo.asset).
|
||||||
|
Where("type IN ?", kinds).
|
||||||
|
Clauses(clause.Locking{Strength: "UPDATE"}).
|
||||||
|
Find(&out).Error; err != nil {
|
||||||
|
return nil, translateNotFound(err)
|
||||||
|
}
|
||||||
|
for _, w := range out {
|
||||||
|
repo.cacheWallets[w.Type] = w
|
||||||
|
}
|
||||||
|
|
||||||
|
return out, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetCachedBalance 從本地緩存讀取指定類型錢包餘額
|
||||||
|
func (repo *userWalletManager) GetCachedBalance(kind wallet.Types) decimal.Decimal {
|
||||||
|
if w, ok := repo.cacheWallets[kind]; ok {
|
||||||
|
return w.Balance
|
||||||
|
}
|
||||||
|
|
||||||
|
return decimal.Zero
|
||||||
|
}
|
||||||
|
|
||||||
|
// CreditWallet 增加本地緩存錢包餘額,並記錄一筆 WalletTransaction
|
||||||
|
func (repo *userWalletManager) CreditWallet(kind wallet.Types, orderID string, amount decimal.Decimal) error {
|
||||||
|
w, ok := repo.cacheWallets[kind]
|
||||||
|
if !ok {
|
||||||
|
return repository.ErrRecordNotFound
|
||||||
|
}
|
||||||
|
|
||||||
|
w.Balance = w.Balance.Add(amount)
|
||||||
|
if w.Balance.LessThan(decimal.Zero) {
|
||||||
|
return repository.ErrBalanceInsufficient
|
||||||
|
}
|
||||||
|
repo.cacheWallets[kind] = w
|
||||||
|
repo.pendingTxs = append(repo.pendingTxs, entity.WalletTransaction{
|
||||||
|
OrderID: orderID,
|
||||||
|
UID: repo.uid,
|
||||||
|
WalletType: kind,
|
||||||
|
Asset: repo.asset,
|
||||||
|
Amount: amount,
|
||||||
|
Balance: w.Balance,
|
||||||
|
})
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// DebitWallet 扣減本地緩存錢包餘額,並記錄一筆 WalletTransaction
|
||||||
|
func (repo *userWalletManager) DebitWallet(kind wallet.Types, orderID string, amount decimal.Decimal) error {
|
||||||
|
return repo.CreditWallet(kind, orderID, amount.Neg())
|
||||||
|
}
|
||||||
|
|
||||||
|
// BuildWalletTransactions 填充所有暫存 WalletTransaction 的共用欄位,回傳可落庫之切片
|
||||||
|
func (repo *userWalletManager) BuildWalletTransactions(txID int64, brand string, biz wallet.BusinessName) []entity.WalletTransaction {
|
||||||
|
for i := range repo.pendingTxs {
|
||||||
|
repo.pendingTxs[i].TransactionID = txID
|
||||||
|
repo.pendingTxs[i].Brand = brand
|
||||||
|
repo.pendingTxs[i].BusinessType = biz.ToInt8()
|
||||||
|
}
|
||||||
|
return repo.pendingTxs
|
||||||
|
}
|
||||||
|
|
||||||
|
// CommitWalletBalances 將本地緩存的最終錢包餘額一次性寫回 wallet 表
|
||||||
|
func (repo *userWalletManager) CommitWalletBalances(ctx context.Context) error {
|
||||||
|
return repo.db.Transaction(func(tx *gorm.DB) error {
|
||||||
|
for _, w := range repo.cacheWallets {
|
||||||
|
if err := tx.WithContext(ctx).
|
||||||
|
Model(&entity.Wallet{}).
|
||||||
|
Where("id = ?", w.ID).
|
||||||
|
UpdateColumns(map[string]interface{}{
|
||||||
|
"balance": w.Balance,
|
||||||
|
"update_at": time.Now().Unix(),
|
||||||
|
}).Error; err != nil {
|
||||||
|
|
||||||
|
return fmt.Errorf("更新錢包 %d 失敗: %w", w.ID, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}, &sql.TxOptions{Isolation: sql.LevelReadCommitted})
|
||||||
|
}
|
||||||
|
|
||||||
|
// FetchOrderTx 從 transaction 表讀取一筆訂單交易,並緩存其後餘額
|
||||||
|
func (repo *userWalletManager) FetchOrderTx(ctx context.Context, orderID string) (entity.Transaction, error) {
|
||||||
|
var t entity.Transaction
|
||||||
|
if err := repo.db.WithContext(ctx).
|
||||||
|
Where("order_id = ?", orderID).
|
||||||
|
Take(&t).Error; err != nil {
|
||||||
|
return t, translateNotFound(err)
|
||||||
|
}
|
||||||
|
repo.cacheOrderBal[t.ID] = t.PostTransferBalance
|
||||||
|
|
||||||
|
return t, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// LockOrderTx 同 FetchOrderTx 但加上 FOR UPDATE
|
||||||
|
func (repo *userWalletManager) LockOrderTx(ctx context.Context, orderID string) (entity.Transaction, error) {
|
||||||
|
var t entity.Transaction
|
||||||
|
|
||||||
|
if err := repo.db.WithContext(ctx).
|
||||||
|
Where("order_id = ?", orderID).
|
||||||
|
Clauses(clause.Locking{Strength: "UPDATE"}).
|
||||||
|
Take(&t).Error; err != nil {
|
||||||
|
|
||||||
|
return t, translateNotFound(err)
|
||||||
|
}
|
||||||
|
repo.cacheOrderBal[t.ID] = t.PostTransferBalance
|
||||||
|
|
||||||
|
return t, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// CreditOrderBalance 增加本地緩存的訂單後餘額
|
||||||
|
func (repo *userWalletManager) CreditOrderBalance(txID int64, amount decimal.Decimal) error {
|
||||||
|
b, ok := repo.cacheOrderBal[txID]
|
||||||
|
if !ok {
|
||||||
|
return repository.ErrRecordNotFound
|
||||||
|
}
|
||||||
|
nb := b.Add(amount)
|
||||||
|
if nb.LessThan(decimal.Zero) {
|
||||||
|
return repository.ErrBalanceInsufficient
|
||||||
|
}
|
||||||
|
repo.cacheOrderBal[txID] = nb
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// DebitOrderBalance 扣減本地緩存的訂單後餘額
|
||||||
|
func (repo *userWalletManager) DebitOrderBalance(txID int64, amount decimal.Decimal) error {
|
||||||
|
return repo.CreditOrderBalance(txID, amount.Neg())
|
||||||
|
}
|
||||||
|
|
||||||
|
// CommitOrderBalances 將本地暫存的訂單後餘額寫回 transaction.post_transfer_balance
|
||||||
|
func (repo *userWalletManager) CommitOrderBalances(ctx context.Context) error {
|
||||||
|
return repo.db.Transaction(func(tx *gorm.DB) error {
|
||||||
|
for id, bal := range repo.cacheOrderBal {
|
||||||
|
if err := tx.WithContext(ctx).
|
||||||
|
Model(&entity.Transaction{}).
|
||||||
|
Where("id = ?", id).
|
||||||
|
UpdateColumn("post_transfer_balance", bal).Error; err != nil {
|
||||||
|
return fmt.Errorf("更新訂單 %d 後餘額失敗: %w", id, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}, &sql.TxOptions{Isolation: sql.LevelReadCommitted})
|
||||||
|
}
|
||||||
|
|
||||||
|
// HasAvailableWallet 檢查是否已有「可用餘額」類型的錢包
|
||||||
|
func (repo *userWalletManager) HasAvailableWallet(ctx context.Context) (bool, error) {
|
||||||
|
var ok bool
|
||||||
|
|
||||||
|
err := repo.db.WithContext(ctx).
|
||||||
|
Model(&entity.Wallet{}).
|
||||||
|
Select("1").
|
||||||
|
Where("uid = ? AND asset = ? AND type = ?", repo.uid, repo.asset, wallet.TypeAvailable).
|
||||||
|
Limit(1).
|
||||||
|
Scan(&ok).Error
|
||||||
|
return ok, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Reset 清空本地所有緩存
|
||||||
|
func (repo *userWalletManager) Reset() {
|
||||||
|
repo.cacheWallets = make(map[wallet.Types]entity.Wallet, len(wallet.AllTypes))
|
||||||
|
repo.cacheOrderBal = make(map[int64]decimal.Decimal)
|
||||||
|
repo.pendingTxs = nil
|
||||||
|
}
|
File diff suppressed because it is too large
Load Diff
|
@ -1077,13 +1077,13 @@ func TestWalletService_GetOrderBalance(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
tests := []struct {
|
tests := []struct {
|
||||||
name string
|
name string
|
||||||
txID int64
|
orderID string
|
||||||
want want
|
want want
|
||||||
}{
|
}{
|
||||||
{
|
{
|
||||||
name: "found existing transaction",
|
name: "found existing transaction",
|
||||||
txID: 1,
|
orderID: "order1",
|
||||||
want: want{
|
want: want{
|
||||||
tx: entity.Transaction{
|
tx: entity.Transaction{
|
||||||
ID: 1,
|
ID: 1,
|
||||||
|
@ -1106,15 +1106,15 @@ func TestWalletService_GetOrderBalance(t *testing.T) {
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
name: "not found returns ErrRecordNotFound",
|
name: "not found returns ErrRecordNotFound",
|
||||||
txID: 999,
|
orderID: "order2",
|
||||||
want: want{errIsNF: true},
|
want: want{errIsNF: true},
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, tt := range tests {
|
for _, tt := range tests {
|
||||||
t.Run(tt.name, func(t *testing.T) {
|
t.Run(tt.name, func(t *testing.T) {
|
||||||
got, err := svc.GetOrderBalance(context.Background(), tt.txID)
|
got, err := svc.GetOrderBalance(context.Background(), tt.orderID)
|
||||||
if tt.want.errIsNF {
|
if tt.want.errIsNF {
|
||||||
assert.Error(t, err)
|
assert.Error(t, err)
|
||||||
assert.True(t, errors.Is(err, repository.ErrRecordNotFound))
|
assert.True(t, errors.Is(err, repository.ErrRecordNotFound))
|
||||||
|
@ -1191,13 +1191,13 @@ func TestWalletService_GetOrderBalanceForUpdate(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
cases := []struct {
|
cases := []struct {
|
||||||
name string
|
name string
|
||||||
txID int64
|
orderID string
|
||||||
want want
|
want want
|
||||||
}{
|
}{
|
||||||
{
|
{
|
||||||
name: "found and locks",
|
name: "found and locks",
|
||||||
txID: 1,
|
orderID: "ordX",
|
||||||
want: want{
|
want: want{
|
||||||
tx: entity.Transaction{
|
tx: entity.Transaction{
|
||||||
ID: 1,
|
ID: 1,
|
||||||
|
@ -1220,15 +1220,15 @@ func TestWalletService_GetOrderBalanceForUpdate(t *testing.T) {
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
name: "missing row returns ErrRecordNotFound",
|
name: "missing row returns ErrRecordNotFound",
|
||||||
txID: 999,
|
orderID: "ordY",
|
||||||
want: want{errIsNF: true},
|
want: want{errIsNF: true},
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, tt := range cases {
|
for _, tt := range cases {
|
||||||
t.Run(tt.name, func(t *testing.T) {
|
t.Run(tt.name, func(t *testing.T) {
|
||||||
got, err := svc.GetOrderBalanceForUpdate(context.Background(), tt.txID)
|
got, err := svc.GetOrderBalanceForUpdate(context.Background(), tt.orderID)
|
||||||
if tt.want.errIsNF {
|
if tt.want.errIsNF {
|
||||||
assert.Error(t, err)
|
assert.Error(t, err)
|
||||||
return
|
return
|
||||||
|
|
|
@ -6,6 +6,9 @@ import (
|
||||||
"code.30cm.net/digimon/app-cloudep-wallet-service/pkg/domain/wallet"
|
"code.30cm.net/digimon/app-cloudep-wallet-service/pkg/domain/wallet"
|
||||||
"code.30cm.net/digimon/library-go/errs"
|
"code.30cm.net/digimon/library-go/errs"
|
||||||
"context"
|
"context"
|
||||||
|
"errors"
|
||||||
|
"github.com/go-sql-driver/mysql"
|
||||||
|
"github.com/sirupsen/logrus"
|
||||||
"sync"
|
"sync"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -58,11 +61,57 @@ func (use *WalletUseCase) Deposit(ctx context.Context, tx usecase.WalletTransfer
|
||||||
|
|
||||||
tx.TxType = wallet.Deposit
|
tx.TxType = wallet.Deposit
|
||||||
|
|
||||||
|
uidAsset := uidAssetKey{
|
||||||
|
uid: tx.FromUID,
|
||||||
|
asset: tx.Asset,
|
||||||
|
}
|
||||||
|
|
||||||
|
exists := use.checkWalletExistence(uidAsset)
|
||||||
|
|
||||||
|
withLockAvailable := func(ctx context.Context, tx *usecase.WalletTransferRequest, w repository.UserWalletService) error {
|
||||||
|
if !exists {
|
||||||
|
checkWallet, err := w.HasAvailableBalance(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// 錢包不存在要做新增
|
||||||
|
if !checkWallet {
|
||||||
|
if _, err := w.InitializeWallets(ctx, tx.Brand); err != nil {
|
||||||
|
var mysqlErr *mysql.MySQLError
|
||||||
|
// 解析是否被其他 transaction insert 了,是的話嘗試取得 insert 後的鎖,不是的話需要直接回傳錯誤
|
||||||
|
if errors.As(err, &mysqlErr) && mysqlErr.Number == 1062 {
|
||||||
|
logrus.WithFields(logrus.Fields{
|
||||||
|
"err": err,
|
||||||
|
"uid": tx.FromUID,
|
||||||
|
}).Warn("Deposit.Create.Wallet")
|
||||||
|
} else {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// 因為是透過 transaction 新增,所以不用上鎖
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
exists = true
|
||||||
|
use.markWalletAsExisting(uidAsset)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// 確認有 wallet 再 lock for update,避免 deadlock
|
||||||
|
_, err := w.GetBalancesForUpdate(ctx, []wallet.Types{wallet.TypeAvailable})
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
if err := use.ProcessTransaction(
|
if err := use.ProcessTransaction(
|
||||||
ctx, tx, userWalletFlow{
|
ctx, tx, userWalletFlow{
|
||||||
UID: tx.FromUID,
|
UID: tx.FromUID,
|
||||||
Asset: tx.Asset,
|
Asset: tx.Asset,
|
||||||
Actions: []walletActionOption{use.withLockAvailable(), use.withAddAvailable()},
|
Actions: []walletActionOption{withLockAvailable, use.withAddAvailable()},
|
||||||
}); err != nil {
|
}); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -70,19 +119,117 @@ func (use *WalletUseCase) Deposit(ctx context.Context, tx usecase.WalletTransfer
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// DepositUnconfirmed 增加限制餘額
|
||||||
|
// 1. 新增一筆充值限制交易
|
||||||
|
// 2. 錢包新增限制餘額
|
||||||
|
// 3. 錢包變化新增一筆增加限制餘額資料
|
||||||
func (use *WalletUseCase) DepositUnconfirmed(ctx context.Context, tx usecase.WalletTransferRequest) error {
|
func (use *WalletUseCase) DepositUnconfirmed(ctx context.Context, tx usecase.WalletTransferRequest) error {
|
||||||
//TODO implement me
|
// 確認錢包新增或減少的餘額是否正確
|
||||||
panic("implement me")
|
if !tx.Amount.IsPositive() {
|
||||||
|
return errs.InvalidRange("failed to get correct amount")
|
||||||
|
}
|
||||||
|
|
||||||
|
uidAsset := uidAssetKey{
|
||||||
|
uid: tx.FromUID,
|
||||||
|
asset: tx.Asset,
|
||||||
|
}
|
||||||
|
|
||||||
|
exists := use.checkWalletExistence(uidAsset)
|
||||||
|
tx.TxType = wallet.DepositUnconfirmed
|
||||||
|
|
||||||
|
withLockUnconfirmed := func(ctx context.Context, tx *usecase.WalletTransferRequest, w repository.UserWalletService) error {
|
||||||
|
if !exists {
|
||||||
|
checkWallet, err := w.HasAvailableBalance(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// 錢包不存在要做新增
|
||||||
|
if !checkWallet {
|
||||||
|
if _, err := w.InitializeWallets(ctx, tx.Brand); err != nil {
|
||||||
|
var mysqlErr *mysql.MySQLError
|
||||||
|
// 解析是否被其他 transaction insert 了,是的話嘗試取得 insert 後的鎖,不是的話需要直接回傳錯誤
|
||||||
|
if errors.As(err, &mysqlErr) && mysqlErr.Number == 1062 {
|
||||||
|
logrus.WithFields(logrus.Fields{
|
||||||
|
"err": err,
|
||||||
|
"uid": tx.FromUID,
|
||||||
|
}).Warn("Deposit.Create.Wallet")
|
||||||
|
} else {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// 因為是透過 transaction 新增,所以不用上鎖
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
exists = true
|
||||||
|
use.markWalletAsExisting(uidAsset)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// 確認有 wallet 再 lock for update,避免 deadlock
|
||||||
|
_, err := w.GetBalancesForUpdate(ctx, []wallet.Types{wallet.TypeUnconfirmed})
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := use.ProcessTransaction(
|
||||||
|
ctx, tx, userWalletFlow{
|
||||||
|
UID: tx.FromUID,
|
||||||
|
Asset: tx.Asset,
|
||||||
|
Actions: []walletActionOption{withLockUnconfirmed, use.withAddUnconfirmed()},
|
||||||
|
}); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Freeze 凍結
|
||||||
|
// 1. 新增一筆凍結交易
|
||||||
|
// 2. 錢包減少可用餘額
|
||||||
|
// 3. 錢包增加凍結餘額
|
||||||
|
// 4. 錢包變化新增一筆減少可用餘額資料
|
||||||
|
// 5. 錢包變化新增一筆增加凍結餘額資料
|
||||||
|
// 6. 訂單錢包新增一筆資料,餘額是凍結金額
|
||||||
func (use *WalletUseCase) Freeze(ctx context.Context, tx usecase.WalletTransferRequest) error {
|
func (use *WalletUseCase) Freeze(ctx context.Context, tx usecase.WalletTransferRequest) error {
|
||||||
//TODO implement me
|
// 確認錢包新增或減少的餘額是否正確
|
||||||
panic("implement me")
|
if !tx.Amount.IsPositive() {
|
||||||
|
return errs.InvalidRange("failed to get correct amount")
|
||||||
|
}
|
||||||
|
|
||||||
|
tx.TxType = wallet.DepositUnconfirmed
|
||||||
|
|
||||||
|
return use.ProcessTransaction(ctx, tx, userWalletFlow{
|
||||||
|
UID: tx.FromUID,
|
||||||
|
Asset: tx.Asset,
|
||||||
|
Actions: []walletActionOption{use.withLockAvailableAndFreeze(), use.withSubAvailable(), use.withAddFreeze()},
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// AppendFreeze 追加凍結金額
|
||||||
|
// 1. 新增一筆凍結交易
|
||||||
|
// 2. 錢包減少可用餘額
|
||||||
|
// 3. 錢包增加凍結餘額
|
||||||
|
// 4. 錢包變化新增一筆減少可用餘額資料
|
||||||
|
// 5. 錢包變化新增一筆增加凍結餘額資料
|
||||||
|
// 6. 原凍結金額上追加凍結金額
|
||||||
func (use *WalletUseCase) AppendFreeze(ctx context.Context, tx usecase.WalletTransferRequest) error {
|
func (use *WalletUseCase) AppendFreeze(ctx context.Context, tx usecase.WalletTransferRequest) error {
|
||||||
//TODO implement me
|
// 確認錢包新增或減少的餘額是否正確
|
||||||
panic("implement me")
|
if !tx.Amount.IsPositive() {
|
||||||
|
return errs.InvalidRange("failed to get correct amount")
|
||||||
|
}
|
||||||
|
|
||||||
|
tx.TxType = wallet.DepositUnconfirmed
|
||||||
|
|
||||||
|
return use.ProcessTransaction(ctx, tx, userWalletFlow{
|
||||||
|
UID: tx.FromUID,
|
||||||
|
Asset: tx.Asset,
|
||||||
|
Actions: []walletActionOption{use.withLockAvailableAndFreeze(), use.withSubAvailable(), use.withAddFreeze()},
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func (use *WalletUseCase) UnFreeze(ctx context.Context, tx usecase.WalletTransferRequest) error {
|
func (use *WalletUseCase) UnFreeze(ctx context.Context, tx usecase.WalletTransferRequest) error {
|
||||||
|
|
|
@ -103,6 +103,123 @@ func (use *WalletUseCase) withAddFreeze() walletActionOption {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// withAddUnconfirmed 增加用戶限制餘額
|
||||||
|
func (use *WalletUseCase) withAddUnconfirmed() walletActionOption {
|
||||||
|
return func(_ context.Context, tx *usecase.WalletTransferRequest, w repository.UserWalletService) error {
|
||||||
|
err := w.IncreaseBalance(wallet.TypeUnconfirmed, tx.ReferenceOrderID, tx.Amount)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// withLockAvailableAndFreeze 用戶可用與凍結餘額
|
||||||
|
func (use *WalletUseCase) withLockAvailableAndFreeze() walletActionOption {
|
||||||
|
return func(ctx context.Context, tx *usecase.WalletTransferRequest, w repository.UserWalletService) error {
|
||||||
|
uidAsset := uidAssetKey{
|
||||||
|
uid: tx.FromUID,
|
||||||
|
asset: tx.Asset,
|
||||||
|
}
|
||||||
|
|
||||||
|
if !use.checkWalletExistence(uidAsset) {
|
||||||
|
// 找不到錢包存不存在
|
||||||
|
wStatus, err := w.HasAvailableBalance(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to check wallet: %w", err)
|
||||||
|
}
|
||||||
|
// 錢包不存在要做新增
|
||||||
|
if !wStatus {
|
||||||
|
//// 是合約模擬交易或帳變且錢包不存在才建立錢包
|
||||||
|
//if !(tx.Business == wa.ContractSimulationBusinessTypeBusinessName || tx.BusinessType == domain.DistributionBusinessTypeBusinessName) {
|
||||||
|
// // 新增錢包有命中 UK 不需要額外上鎖
|
||||||
|
// return use.translateError(err)
|
||||||
|
//}
|
||||||
|
|
||||||
|
if _, err := w.InitializeWallets(ctx, tx.Brand); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
use.markWalletAsExisting(uidAsset)
|
||||||
|
}
|
||||||
|
|
||||||
|
_, err := w.GetBalancesForUpdate(ctx, []wallet.Types{wallet.TypeAvailable, wallet.TypeFreeze})
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// appendFreeze 追加用戶原凍結餘額
|
||||||
|
func (use *WalletUseCase) withAppendFreeze() walletActionOption {
|
||||||
|
return func(ctx context.Context, tx *usecase.WalletTransferRequest, w repository.UserWalletService) error {
|
||||||
|
order, err := w.GetOrderBalance(ctx, tx.ReferenceOrderID)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// 以id來做lock更可以確保只lock到該筆,而不會因為index關係lock到多筆導致死鎖
|
||||||
|
// 而且先不lock把資料先拉出來判斷餘額是否足夠,在不足夠時可以直接return而不用lock減少開銷
|
||||||
|
order, err = w.GetOrderBalanceForUpdate(ctx, tx.ReferenceOrderID)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
tx.Asset = order.Asset
|
||||||
|
tx.FromUID = order.UID
|
||||||
|
|
||||||
|
//w.IncreaseBalance()
|
||||||
|
//if err := w.AddOrder(order.ID, tx.Amount); err != nil {
|
||||||
|
// return use.translateError(err)
|
||||||
|
//}
|
||||||
|
//
|
||||||
|
//if err := wallet.AddFreeze(tx.BusinessType, tx.Amount); err != nil {
|
||||||
|
// return use.translateError(err)
|
||||||
|
//}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
//// withSubFreeze 減少用戶凍結餘額
|
||||||
|
//func (use *WalletUseCase) withSubFreeze() walletActionOption {
|
||||||
|
// return func(_ context.Context, tx *usecase.Transaction, wallet repository.UserWallet) error {
|
||||||
|
// if err := wallet.SubFreeze(tx.BusinessType, tx.Amount); err != nil {
|
||||||
|
// if errors.Is(err, repository.ErrBalanceInsufficient) {
|
||||||
|
// return usecase.BalanceInsufficientError{
|
||||||
|
// Amount: tx.Amount.Neg(),
|
||||||
|
// Balance: wallet.LocalBalance(domain.WalletFreezeType.ToBusiness(tx.BusinessType)),
|
||||||
|
// }
|
||||||
|
// }
|
||||||
|
//
|
||||||
|
// return use.translateError(err)
|
||||||
|
// }
|
||||||
|
//
|
||||||
|
// return nil
|
||||||
|
// }
|
||||||
|
//}
|
||||||
|
//
|
||||||
|
//// addFreeze 增加用戶凍結餘額
|
||||||
|
//func (use *walletUseCase) addFreeze() walletActionOption {
|
||||||
|
// return func(_ context.Context, tx *usecase.Transaction, wallet repository.UserWallet) error {
|
||||||
|
// if err := wallet.AddFreeze(tx.BusinessType, tx.Amount); err != nil {
|
||||||
|
// return use.translateError(err)
|
||||||
|
// }
|
||||||
|
//
|
||||||
|
// // 訂單可以做解凍,解凍最大上限金額來自當初凍結金額,所以在每一筆tx可以設定Balance
|
||||||
|
// // 後續tx需要依據其他tx做交易時能有所依據
|
||||||
|
// tx.Balance = tx.Amount
|
||||||
|
//
|
||||||
|
// return nil
|
||||||
|
// }
|
||||||
|
//}
|
||||||
|
|
||||||
//// WithAppendFreeze 追加用戶原凍結餘額
|
//// WithAppendFreeze 追加用戶原凍結餘額
|
||||||
//func (use *WalletUseCase) withAppendFreeze() walletActionOption {
|
//func (use *WalletUseCase) withAppendFreeze() walletActionOption {
|
||||||
// return func(ctx context.Context, tx *usecase.WalletTransferRequest, w repository.UserWalletService) error {
|
// return func(ctx context.Context, tx *usecase.WalletTransferRequest, w repository.UserWalletService) error {
|
||||||
|
|
|
@ -39,10 +39,10 @@ func (use *WalletUseCase) ProcessTransaction(
|
||||||
|
|
||||||
// flows 會按照順序做.順序是重要的
|
// flows 會按照順序做.順序是重要的
|
||||||
for _, flow := range flows {
|
for _, flow := range flows {
|
||||||
// 1️⃣ 建立針對該使用者+資產的 UserWalletService
|
// 1 建立針對該使用者+資產的 UserWalletService
|
||||||
wSvc := repo.NewWalletService(db, flow.UID, flow.Asset)
|
wSvc := repo.NewWalletService(db, flow.UID, flow.Asset)
|
||||||
|
|
||||||
// 2️⃣ 依序執行所有定義好的錢包操作
|
// 2 依序執行所有定義好的錢包操作
|
||||||
for _, action := range flow.Actions {
|
for _, action := range flow.Actions {
|
||||||
if err := action(ctx, &req, wSvc); err != nil {
|
if err := action(ctx, &req, wSvc); err != nil {
|
||||||
return err
|
return err
|
||||||
|
@ -52,7 +52,7 @@ func (use *WalletUseCase) ProcessTransaction(
|
||||||
wallets = append(wallets, wSvc)
|
wallets = append(wallets, wSvc)
|
||||||
}
|
}
|
||||||
|
|
||||||
// 3️⃣ 準備寫入 Transaction 主檔
|
// 3 準備寫入 Transaction 主檔
|
||||||
txRecord := &entity.Transaction{
|
txRecord := &entity.Transaction{
|
||||||
OrderID: req.ReferenceOrderID,
|
OrderID: req.ReferenceOrderID,
|
||||||
TransactionID: uuid.New().String(),
|
TransactionID: uuid.New().String(),
|
||||||
|
@ -68,14 +68,14 @@ func (use *WalletUseCase) ProcessTransaction(
|
||||||
DueTime: 0,
|
DueTime: 0,
|
||||||
}
|
}
|
||||||
|
|
||||||
// 4️⃣ TODO 計算 DueTime (T+N 結算時間)
|
// 4 TODO 計算 DueTime (T+N 結算時間)
|
||||||
|
|
||||||
// 5️⃣ 寫入 Transaction 主檔
|
// 5 寫入 Transaction 主檔
|
||||||
if err := use.TransactionRepo.Insert(ctx, txRecord); err != nil {
|
if err := use.TransactionRepo.Insert(ctx, txRecord); err != nil {
|
||||||
return fmt.Errorf("TransactionRepo.Insert 失敗: %w", err)
|
return fmt.Errorf("TransactionRepo.Insert 失敗: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// 6️⃣ 聚合所有 wallet 內的交易歷程
|
// 6 聚合所有 wallet 內的交易歷程
|
||||||
var walletTxs []entity.WalletTransaction
|
var walletTxs []entity.WalletTransaction
|
||||||
for _, w := range wallets {
|
for _, w := range wallets {
|
||||||
walletTxs = append(
|
walletTxs = append(
|
||||||
|
@ -89,12 +89,12 @@ func (use *WalletUseCase) ProcessTransaction(
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
// 7️⃣ 批次寫入所有 WalletTransaction
|
// 7 批次寫入所有 WalletTransaction
|
||||||
if err := use.WalletTransactionRepo.Create(ctx, db, walletTxs); err != nil {
|
if err := use.WalletTransactionRepo.Create(ctx, db, walletTxs); err != nil {
|
||||||
return fmt.Errorf("WalletTransactionRepository.Create 失敗: %w", err)
|
return fmt.Errorf("WalletTransactionRepository.Create 失敗: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// 8️⃣ 最後才真正把錢包的餘額更新到資料庫(同一事務)
|
// 8 最後才真正把錢包的餘額更新到資料庫(同一事務)
|
||||||
for _, wSvc := range wallets {
|
for _, wSvc := range wallets {
|
||||||
if err := wSvc.PersistBalances(ctx); err != nil {
|
if err := wSvc.PersistBalances(ctx); err != nil {
|
||||||
return err
|
return err
|
||||||
|
|
Loading…
Reference in New Issue