feat: add wallet translation

This commit is contained in:
王性驊 2025-04-18 17:10:40 +08:00
parent 4f6262d489
commit 47fb3139c2
6 changed files with 1196 additions and 260 deletions

View File

@ -58,36 +58,32 @@ type BalanceQuery struct {
Kinds []wallet.Types // 錢包類型(如可用、凍結等) Kinds []wallet.Types // 錢包類型(如可用、凍結等)
} }
// UserWalletService 專注於某位使用者在單一資產下的錢包操作邏輯 // UserWalletService 定義了一個「單一使用者、單一資產」的錢包操作合約
type UserWalletService interface { type UserWalletService interface {
// Init 初始化錢包(如建立可用、凍結、未確認等錢包) // InitializeWallets 為新使用者初始化所有錢包類型並寫入資料庫
Init(ctx context.Context, uid, asset, brand string) ([]entity.Wallet, error) InitializeWallets(ctx context.Context, brand string) ([]entity.Wallet, error)
// All 查詢所有錢包餘額 // GetAllBalances 查詢此使用者此資產下所有錢包類型的當前餘額
All(ctx context.Context) ([]entity.Wallet, error) GetAllBalances(ctx context.Context) ([]entity.Wallet, error)
// Get 查詢單一或多種類型的餘額 // GetBalancesForTypes 查詢指定錢包類型的一組餘額(不加鎖)
Get(ctx context.Context, kinds []wallet.Types) ([]entity.Wallet, error) GetBalancesForTypes(ctx context.Context, kinds []wallet.Types) ([]entity.Wallet, error)
// GetWithLock 查詢鎖定後的錢包(交易使用) // GetBalancesForUpdate 查詢並鎖定指定錢包類型FOR UPDATE
GetWithLock(ctx context.Context, kinds []wallet.Types) ([]entity.Wallet, error) GetBalancesForUpdate(ctx context.Context, kinds []wallet.Types) ([]entity.Wallet, error)
// LocalBalance 查詢記憶中的快取值(非查資料庫) // CurrentBalance 從本地緩存取得某種錢包類型的餘額
LocalBalance(kind wallet.Types) decimal.Decimal CurrentBalance(kind wallet.Types) decimal.Decimal
// LockByIDs 根據錢包 ID 鎖定(資料一致性用) // IncreaseBalance 增加指定錢包類型的餘額,並累積一筆交易紀錄
LockByIDs(ctx context.Context, ids []int64) ([]entity.Wallet, error) IncreaseBalance(kind wallet.Types, orderID string, amount decimal.Decimal) error
// CheckReady 檢查錢包是否已經存在並準備好(可用餘額的錢包) // DecreaseBalance 減少指定錢包類型的餘額(等同於 IncreaseBalance 的負數版本)
CheckReady(ctx context.Context) (bool, error) DecreaseBalance(kind wallet.Types, orderID string, amount decimal.Decimal) error
// Add 加值與扣款邏輯(含業務類別) // PrepareTransactions 為所有暫存交易紀錄填上 TXID/OrderID/Brand/BusinessType並回傳可落庫的切片
Add(kind wallet.Types, orderID string, amount decimal.Decimal) error PrepareTransactions(
Sub(kind wallet.Types, orderID string, amount decimal.Decimal) error
// AddTransaction 新增一筆交易紀錄(建立資料)
AddTransaction(txID int64, orderID string, brand string, business wallet.BusinessName, kind wallet.Types, amount decimal.Decimal)
Transactions(
txID int64, txID int64,
orderID string, orderID, brand string,
brand string,
businessType wallet.BusinessName, businessType wallet.BusinessName,
) []entity.WalletTransaction ) []entity.WalletTransaction
// PersistBalances 將本地緩存中所有錢包最終餘額批次寫入資料庫
// Commit 提交所有操作(更新錢包與新增交易紀錄) PersistBalances(ctx context.Context) error
Commit(ctx context.Context) error // PersistOrderBalances 將本地緩存中所有訂單相關餘額批次寫入 transaction 表
// CommitOrder 提交所有訂單 PersistOrderBalances(ctx context.Context) error
CommitOrder(ctx context.Context) error // HasAvailableBalance 確認此使用者此資產是否已有可用餘額錢包
HasAvailableBalance(ctx context.Context) (bool, error)
} }

View File

@ -14,152 +14,191 @@ import (
"time" "time"
) )
// 用戶某個幣種餘額 // WalletService 代表一個使用者在某資產上的錢包服務,
type userWallet struct { // 負責讀取/寫入資料庫並在記憶體暫存變動
type WalletService struct {
db *gorm.DB db *gorm.DB
uid string uid string // 使用者識別碼
asset string asset string // 資產代號 (如 BTC、ETH、TWD)
localBalances map[wallet.Types]entity.Wallet // 暫存各類型錢包當前餘額
// local wallet 相關計算的餘額存在這裡 localOrderBalances map[int64]decimal.Decimal // 暫存各訂單變動後的餘額
localWalletBalance map[wallet.Types]entity.Wallet transactions []entity.WalletTransaction // 暫存所有尚未落庫的錢包交易紀錄
// local order wallet 相關計算的餘額存在這裡
localOrderBalance map[int64]decimal.Decimal
// local wallet 內所有餘額變化紀錄
transactions []entity.WalletTransaction
} }
func NewUserWallet(db *gorm.DB, uid, asset string) repository.UserWalletService { // NewWalletService 建立一個 WalletService 實例
return &userWallet{ func NewWalletService(db *gorm.DB, uid, asset string) repository.UserWalletService {
return &WalletService{
db: db, db: db,
uid: uid, uid: uid,
asset: asset, asset: asset,
localWalletBalance: make(map[wallet.Types]entity.Wallet, len(wallet.AllTypes)), localBalances: make(map[wallet.Types]entity.Wallet, len(wallet.AllTypes)),
localOrderBalance: make(map[int64]decimal.Decimal, len(wallet.AllTypes)), localOrderBalances: make(map[int64]decimal.Decimal, len(wallet.AllTypes)),
} }
} }
func (repo *userWallet) Init(ctx context.Context, uid, asset, brand string) ([]entity.Wallet, error) { // InitializeWallets 啟動時為新使用者初始化所有類型錢包,並寫入資料庫
wallets := make([]entity.Wallet, 0, len(wallet.AllTypes)) func (s *WalletService) InitializeWallets(ctx context.Context, brand string) ([]entity.Wallet, error) {
var wallets []entity.Wallet
for _, t := range wallet.AllTypes { for _, t := range wallet.AllTypes {
balance := decimal.Zero
wallets = append(wallets, entity.Wallet{ wallets = append(wallets, entity.Wallet{
Brand: brand, Brand: brand,
UID: uid, UID: s.uid,
Asset: asset, Asset: s.asset,
Balance: balance, Balance: decimal.Zero,
Type: t, Type: t,
}) })
} }
if err := s.db.WithContext(ctx).Create(&wallets).Error; err != nil {
if err := repo.db.WithContext(ctx).Create(&wallets).Error; err != nil {
return nil, err return nil, err
} }
// 將初始化後的錢包資料寫入本地緩存
for _, v := range wallets { for _, w := range wallets {
repo.localWalletBalance[v.Type] = v s.localBalances[w.Type] = w
} }
return wallets, nil return wallets, nil
} }
func (repo *userWallet) All(ctx context.Context) ([]entity.Wallet, error) { // GetAllBalances 查詢該使用者某資產所有錢包類型當前餘額
func (s *WalletService) GetAllBalances(ctx context.Context) ([]entity.Wallet, error) {
var result []entity.Wallet var result []entity.Wallet
err := s.db.WithContext(ctx).
err := repo.buildCommonWhereSQL(repo.uid, repo.asset). Where("uid = ? AND asset = ?", s.uid, s.asset).
WithContext(ctx). Select("id, asset, balance, type").
Select("id, crypto, balance, type").
Find(&result).Error Find(&result).Error
if err != nil { if err != nil {
return []entity.Wallet{}, err return nil, err
} }
for _, w := range result {
for _, v := range result { s.localBalances[w.Type] = w
repo.localWalletBalance[v.Type] = v
} }
return result, nil return result, nil
} }
func (repo *userWallet) Get(ctx context.Context, kinds []wallet.Types) ([]entity.Wallet, error) { // GetBalancesForTypes 查詢指定類型的錢包餘額,不上鎖
var wallets []entity.Wallet func (s *WalletService) GetBalancesForTypes(ctx context.Context, kinds []wallet.Types) ([]entity.Wallet, error) {
var result []entity.Wallet
err := repo.buildCommonWhereSQL(repo.uid, repo.asset). err := s.db.WithContext(ctx).
WithContext(ctx). Where("uid = ? AND asset = ?", s.uid, s.asset).
Model(&entity.Wallet{}).
Select("id, crypto, balance, type").
Where("type IN ?", kinds). Where("type IN ?", kinds).
Find(&wallets).Error Select("id, asset, balance, type").
Find(&result).Error
if err != nil { if err != nil {
return []entity.Wallet{}, notFoundError(err) return nil, translateNotFound(err)
} }
for _, w := range result {
for _, w := range wallets { s.localBalances[w.Type] = w
repo.localWalletBalance[w.Type] = w
} }
return result, nil
return wallets, nil
} }
func (repo *userWallet) GetWithLock(ctx context.Context, kinds []wallet.Types) ([]entity.Wallet, error) { // GetBalancesForUpdate 查詢並鎖定指定類型的錢包 (FOR UPDATE)
var wallets []entity.Wallet func (s *WalletService) GetBalancesForUpdate(ctx context.Context, kinds []wallet.Types) ([]entity.Wallet, error) {
var result []entity.Wallet
err := repo.buildCommonWhereSQL(repo.uid, repo.asset). err := s.db.WithContext(ctx).
WithContext(ctx). Where("uid = ? AND asset = ?", s.uid, s.asset).
Model(&entity.Wallet{}).
Select("id, crypto, balance, type").
Where("type IN ?", kinds). Where("type IN ?", kinds).
Clauses(clause.Locking{Strength: "UPDATE"}). Clauses(clause.Locking{Strength: "UPDATE"}).
Find(&wallets).Error Select("id, asset, balance, type").
Find(&result).Error
if err != nil { if err != nil {
return []entity.Wallet{}, notFoundError(err) return nil, translateNotFound(err)
} }
for _, w := range result {
for _, w := range wallets { s.localBalances[w.Type] = w
repo.localWalletBalance[w.Type] = w
} }
return result, nil
return wallets, nil
} }
func (repo *userWallet) LocalBalance(kind wallet.Types) decimal.Decimal { // CurrentBalance 從緩存中取得某種類型錢包的當前餘額
w, ok := repo.localWalletBalance[kind] func (s *WalletService) CurrentBalance(kind wallet.Types) decimal.Decimal {
if !ok { if w, ok := s.localBalances[kind]; ok {
return decimal.Zero
}
return w.Balance return w.Balance
}
return decimal.Zero
} }
func (repo *userWallet) LockByIDs(ctx context.Context, ids []int64) ([]entity.Wallet, error) { // IncreaseBalance 在本地緩存新增餘額,並記錄一筆 WalletTransaction
var wallets []entity.Wallet func (s *WalletService) IncreaseBalance(kind wallet.Types, orderID string, amount decimal.Decimal) error {
w, ok := s.localBalances[kind]
if !ok {
return repository.ErrRecordNotFound
}
w.Balance = w.Balance.Add(amount)
if w.Balance.LessThan(decimal.Zero) {
return repository.ErrBalanceInsufficient
}
s.transactions = append(s.transactions, entity.WalletTransaction{
OrderID: orderID,
UID: s.uid,
WalletType: kind,
Asset: s.asset,
Amount: amount,
Balance: w.Balance,
})
s.localBalances[kind] = w
return nil
}
err := repo.db.WithContext(ctx). // DecreaseBalance 本質上是 IncreaseBalance 的負數版本
func (s *WalletService) DecreaseBalance(kind wallet.Types, orderID string, amount decimal.Decimal) error {
return s.IncreaseBalance(kind, orderID, amount.Neg())
}
// PrepareTransactions 為每筆暫存的 WalletTransaction 填入共用欄位 (txID, brand, businessType)
// 並回傳完整可落庫的切片
func (s *WalletService) PrepareTransactions(
txID int64,
orderID, brand string,
businessType wallet.BusinessName,
) []entity.WalletTransaction {
for i := range s.transactions {
s.transactions[i].TransactionID = txID
s.transactions[i].OrderID = orderID
s.transactions[i].Brand = brand
s.transactions[i].BusinessType = businessType.ToInt8()
}
return s.transactions
}
// PersistBalances 寫入本地緩存中所有錢包的最終餘額到資料庫
func (s *WalletService) PersistBalances(ctx context.Context) error {
return s.db.Transaction(func(tx *gorm.DB) error {
for _, w := range s.localBalances {
if err := tx.WithContext(ctx).
Model(&entity.Wallet{}). Model(&entity.Wallet{}).
Select("id, crypto, balance, type"). Where("id = ?", w.ID).
Where("id IN ?", ids). UpdateColumns(map[string]interface{}{
Clauses(clause.Locking{Strength: "UPDATE"}). "balance": w.Balance,
Find(&wallets).Error "update_at": time.Now().Unix(),
}).Error; err != nil {
if err != nil { return fmt.Errorf("更新錢包餘額失敗 (id=%d): %w", w.ID, err)
return []entity.Wallet{}, notFoundError(err)
} }
for _, w := range wallets {
repo.localWalletBalance[w.Type] = w
} }
return nil
return wallets, nil }, &sql.TxOptions{Isolation: sql.LevelReadCommitted})
} }
func (repo *userWallet) CheckReady(ctx context.Context) (bool, error) { // PersistOrderBalances 寫入所有訂單錢包的最終餘額到 transaction 表
func (s *WalletService) PersistOrderBalances(ctx context.Context) error {
return s.db.Transaction(func(tx *gorm.DB) error {
for id, bal := range s.localOrderBalances {
if err := tx.WithContext(ctx).
Model(&entity.Transaction{}).
Where("id = ?", id).
Update("post_transfer_balance", bal).Error; err != nil {
return fmt.Errorf("更新訂單錢包餘額失敗 (id=%d): %w", id, err)
}
}
return nil
}, &sql.TxOptions{Isolation: sql.LevelReadCommitted})
}
func (s *WalletService) HasAvailableBalance(ctx context.Context) (bool, error) {
var exists bool var exists bool
err := repo.buildCommonWhereSQL(repo.uid, repo.asset).WithContext(ctx). err := s.db.WithContext(ctx).
Model(&entity.Wallet{}). Model(&entity.Wallet{}).
Select("1"). Select("1").
Where("uid = ? AND asset = ?", s.uid, s.asset).
Where("type = ?", wallet.TypeAvailable). Where("type = ?", wallet.TypeAvailable).
Limit(1). Limit(1).
Scan(&exists).Error Scan(&exists).Error
@ -171,134 +210,10 @@ func (repo *userWallet) CheckReady(ctx context.Context) (bool, error) {
return exists, nil return exists, nil
} }
// Add 新增某種餘額餘額 // translateNotFound 將 GORM 的 RecordNotFound 轉為自訂錯誤
// 使用前 localWalletBalance 必須有資料,所以必須執行過 GetWithLock / All 才會有資料 func translateNotFound(err error) error {
func (repo *userWallet) Add(kind wallet.Types, orderID string, amount decimal.Decimal) error {
w, ok := repo.localWalletBalance[kind]
if !ok {
return repository.ErrRecordNotFound
}
w.Balance = w.Balance.Add(amount)
if w.Balance.LessThan(decimal.Zero) {
return repository.ErrBalanceInsufficient
}
repo.transactions = append(repo.transactions, entity.WalletTransaction{
OrderID: orderID,
UID: repo.uid,
WalletType: kind,
Asset: repo.asset,
Amount: amount,
Balance: w.Balance,
})
repo.localWalletBalance[kind] = w
return nil
}
func (repo *userWallet) Sub(kind wallet.Types, orderID string, amount decimal.Decimal) error {
return repo.Add(kind, orderID, decimal.Zero.Sub(amount))
}
// Transactions 為本次整筆交易 (txID) 給所有暫存的 WalletTransaction 設置共用欄位,
// 並回傳整批交易紀錄以便後續寫入資料庫。
func (repo *userWallet) Transactions(
txID int64,
orderID string,
brand string,
businessType wallet.BusinessName,
) []entity.WalletTransaction {
for i := range repo.transactions {
repo.transactions[i].TransactionID = txID
repo.transactions[i].OrderID = orderID
repo.transactions[i].Brand = brand
repo.transactions[i].BusinessType = businessType.ToInt8()
}
return repo.transactions
}
func (repo *userWallet) Commit(ctx context.Context) error {
// 事務隔離等級設定
rc := &sql.TxOptions{
Isolation: sql.LevelReadCommitted,
ReadOnly: false,
}
err := repo.db.Transaction(func(tx *gorm.DB) error {
for _, w := range repo.localWalletBalance {
err := tx.WithContext(ctx).
Model(&entity.Wallet{}).
Where("id = ?", w.ID).
UpdateColumns(map[string]any{
"balance": w.Balance,
"update_time": time.Now().UTC().Unix(),
}).Error
if err != nil {
return fmt.Errorf("failed to update wallet id %d: %w", w.ID, err)
}
}
return nil // 所有更新成功才 return nil
}, rc)
if err != nil {
return fmt.Errorf("update uid: %s asset: %s error: %w", repo.uid, repo.asset, err)
}
return nil
}
func (repo *userWallet) GetTransactions() []entity.WalletTransaction {
return repo.transactions
}
func (repo *userWallet) CommitOrder(ctx context.Context) error {
rc := &sql.TxOptions{
Isolation: sql.LevelReadCommitted,
ReadOnly: false,
}
err := repo.db.Transaction(func(tx *gorm.DB) error {
for id, balance := range repo.localOrderBalance {
err := tx.WithContext(ctx).
Model(&entity.Transaction{}).
Where("id = ?", id).
Update("balance", balance).Error
if err != nil {
return fmt.Errorf("failed to update order balance, id=%d, err=%w", id, err)
}
}
return nil // 所有更新成功才 return nil
}, rc)
if err != nil {
return fmt.Errorf("update uid: %s asset: %s error: %w", repo.uid, repo.asset, err)
}
return nil
}
func (repo *userWallet) AddTransaction(txID int64, orderID string, brand string, business wallet.BusinessName, kind wallet.Types, amount decimal.Decimal) {
//TODO implement me
panic("implement me")
}
// =============================================================================
func (repo *userWallet) buildCommonWhereSQL(uid, asset string) *gorm.DB {
return repo.db.Where("uid = ?", uid).
Where("asset = ?", asset)
}
func notFoundError(err error) error {
if errors.Is(err, gorm.ErrRecordNotFound) { if errors.Is(err, gorm.ErrRecordNotFound) {
return repository.ErrRecordNotFound return repository.ErrRecordNotFound
} }
return err return err
} }

File diff suppressed because it is too large Load Diff

View File

@ -46,11 +46,11 @@ func (repo *WalletRepository) Transaction(fn func(db *gorm.DB) error) error {
} }
func (repo *WalletRepository) Session(uid, asset string) repository.UserWalletService { func (repo *WalletRepository) Session(uid, asset string) repository.UserWalletService {
return NewUserWallet(repo.DB, uid, asset) return NewWalletService(repo.DB, uid, asset)
} }
func (repo *WalletRepository) SessionWithTx(db *gorm.DB, uid, asset string) repository.UserWalletService { func (repo *WalletRepository) SessionWithTx(db *gorm.DB, uid, asset string) repository.UserWalletService {
return NewUserWallet(db, uid, asset) return NewWalletService(db, uid, asset)
} }
func (repo *WalletRepository) InitWallets(ctx context.Context, param []repository.Wallet) error { func (repo *WalletRepository) InitWallets(ctx context.Context, param []repository.Wallet) error {

View File

@ -31,7 +31,7 @@ func (use *WalletUseCase) withLockAvailable() walletActionOption {
if !use.checkWalletExistence(uidAsset) { if !use.checkWalletExistence(uidAsset) {
// 找不到錢包存不存在 // 找不到錢包存不存在
wStatus, err := w.CheckReady(ctx) wStatus, err := w.HasAvailableBalance(ctx)
if err != nil { if err != nil {
return fmt.Errorf("failed to check wallet: %w", err) return fmt.Errorf("failed to check wallet: %w", err)
} }
@ -43,7 +43,7 @@ func (use *WalletUseCase) withLockAvailable() walletActionOption {
// return use.translateError(err) // return use.translateError(err)
//} //}
if _, err := w.Init(ctx, tx.FromUID, tx.Asset, tx.Brand); err != nil { if _, err := w.InitializeWallets(ctx, tx.Brand); err != nil {
return err return err
} }
@ -53,7 +53,7 @@ func (use *WalletUseCase) withLockAvailable() walletActionOption {
use.markWalletAsExisting(uidAsset) use.markWalletAsExisting(uidAsset)
} }
_, err := w.GetWithLock(ctx, []wallet.Types{wallet.TypeAvailable}) _, err := w.GetBalancesForUpdate(ctx, []wallet.Types{wallet.TypeAvailable})
if err != nil { if err != nil {
return err return err
} }
@ -65,7 +65,7 @@ func (use *WalletUseCase) withLockAvailable() walletActionOption {
// subAvailable 減少用戶可用餘額 // subAvailable 減少用戶可用餘額
func (use *WalletUseCase) withSubAvailable() walletActionOption { func (use *WalletUseCase) withSubAvailable() walletActionOption {
return func(_ context.Context, tx *usecase.WalletTransferRequest, w repository.UserWalletService) error { return func(_ context.Context, tx *usecase.WalletTransferRequest, w repository.UserWalletService) error {
if err := w.Sub(wallet.TypeAvailable, tx.ReferenceOrderID, tx.Amount); err != nil { if err := w.DecreaseBalance(wallet.TypeAvailable, tx.ReferenceOrderID, tx.Amount); err != nil {
if errors.Is(err, repository.ErrBalanceInsufficient) { if errors.Is(err, repository.ErrBalanceInsufficient) {
// todo 錯誤要看怎麼給(餘額不足) // todo 錯誤要看怎麼給(餘額不足)
return fmt.Errorf("balance insufficient") return fmt.Errorf("balance insufficient")

View File

@ -40,7 +40,7 @@ func (use *WalletUseCase) ProcessTransaction(
// flows 會按照順序做.順序是重要的 // flows 會按照順序做.順序是重要的
for _, flow := range flows { for _, flow := range flows {
// 1⃣ 建立針對該使用者+資產的 UserWalletService // 1⃣ 建立針對該使用者+資產的 UserWalletService
wSvc := repo.NewUserWallet(db, flow.UID, flow.Asset) wSvc := repo.NewWalletService(db, flow.UID, flow.Asset)
// 2⃣ 依序執行所有定義好的錢包操作 // 2⃣ 依序執行所有定義好的錢包操作
for _, action := range flow.Actions { for _, action := range flow.Actions {
@ -80,7 +80,7 @@ func (use *WalletUseCase) ProcessTransaction(
for _, w := range wallets { for _, w := range wallets {
walletTxs = append( walletTxs = append(
walletTxs, walletTxs,
w.Transactions( w.PrepareTransactions(
txRecord.ID, txRecord.ID,
txRecord.OrderID, txRecord.OrderID,
req.Brand, req.Brand,
@ -96,10 +96,10 @@ func (use *WalletUseCase) ProcessTransaction(
// 8⃣ 最後才真正把錢包的餘額更新到資料庫(同一事務) // 8⃣ 最後才真正把錢包的餘額更新到資料庫(同一事務)
for _, wSvc := range wallets { for _, wSvc := range wallets {
if err := wSvc.Commit(ctx); err != nil { if err := wSvc.PersistBalances(ctx); err != nil {
return err return err
} }
if err := wSvc.CommitOrder(ctx); err != nil { if err := wSvc.PersistOrderBalances(ctx); err != nil {
return err return err
} }
} }