package repository import ( "code.30cm.net/digimon/app-cloudep-wallet-service/pkg/domain/entity" "code.30cm.net/digimon/app-cloudep-wallet-service/pkg/domain/repository" "code.30cm.net/digimon/app-cloudep-wallet-service/pkg/domain/wallet" "context" "database/sql" "fmt" "github.com/shopspring/decimal" "gorm.io/gorm" "gorm.io/gorm/clause" "time" ) // 用戶某個幣種餘額 type userWallet struct { db *gorm.DB uid string asset string // local wallet相關計算的餘額存在這裡 localWalletBalance map[wallet.Types]entity.Wallet // local order wallet相關計算的餘額存在這裡 localOrderBalance map[int64]decimal.Decimal // local wallet內所有餘額變化紀錄 transactions []entity.WalletTransaction } func NewUserWallet(db *gorm.DB, uid, asset string) repository.UserWalletService { return &userWallet{ db: db, uid: uid, asset: asset, localWalletBalance: make(map[wallet.Types]entity.Wallet, len(wallet.AllTypes)), localOrderBalance: make(map[int64]decimal.Decimal, len(wallet.AllTypes)), } } func (repo *userWallet) Create(ctx context.Context, uid, asset, brand string) ([]entity.Wallet, error) { wallets := make([]entity.Wallet, 0, len(wallet.AllTypes)) for _, t := range wallet.AllTypes { var balance decimal.Decimal // 合約模擬初始資金 if t == wallet.TypeSimulationAvailable { balance = wallet.InitContractSimulationAvailable } wallets = append(wallets, entity.Wallet{ Brand: brand, UID: uid, Asset: asset, Balance: balance, Type: t, }) } if err := repo.db.WithContext(ctx).Create(&wallets).Error; err != nil { return nil, err } for _, v := range wallets { repo.localWalletBalance[v.Type] = v } return wallets, nil } func (repo *userWallet) AllBalances(ctx context.Context) ([]entity.Wallet, error) { var result []entity.Wallet err := repo.walletUserWhere(repo.uid, repo.asset). WithContext(ctx). Select("id, crypto, balance, type"). Find(&result).Error if err != nil { return []entity.Wallet{}, err } for _, v := range result { repo.localWalletBalance[v.Type] = v } return result, nil } func (repo *userWallet) Balance(ctx context.Context, kind wallet.Types) (entity.Wallet, error) { var result entity.Wallet err := repo.walletUserWhere(repo.uid, repo.asset). WithContext(ctx). Model(&result). Select("id, crypto, balance, type"). Where("type = ?", kind). Take(&result).Error if err != nil { return entity.Wallet{}, repository.WrapNotFoundError(err) } repo.localWalletBalance[result.Type] = result return result, nil } func (repo *userWallet) Balances(ctx context.Context, kind []wallet.Types) ([]entity.Wallet, error) { var wallets []entity.Wallet err := repo.walletUserWhere(repo.uid, repo.asset). WithContext(ctx). Model(&entity.Wallet{}). Select("id, crypto, balance, type"). Where("type IN ?", kind). Find(&wallets).Error if err != nil { return []entity.Wallet{}, repository.WrapNotFoundError(err) } for _, w := range wallets { repo.localWalletBalance[w.Type] = w } return wallets, nil } func (repo *userWallet) LocalBalance(kind wallet.Types) decimal.Decimal { w, ok := repo.localWalletBalance[kind] if !ok { return decimal.Zero } return w.Balance } func (repo *userWallet) BalanceXLock(ctx context.Context, kind wallet.Types) (entity.Wallet, error) { var result entity.Wallet err := repo.walletUserWhere(repo.uid, repo.asset). WithContext(ctx). Model(&result). Select("id, crypto, balance, type"). Where("type = ?", kind). Clauses(clause.Locking{Strength: "UPDATE"}). Take(&result).Error if err != nil { return entity.Wallet{}, repository.WrapNotFoundError(err) } repo.localWalletBalance[result.Type] = result return result, nil } func (repo *userWallet) BalancesXLock(ctx context.Context, kind []wallet.Types) ([]entity.Wallet, error) { var wallets []entity.Wallet err := repo.walletUserWhere(repo.uid, repo.asset). WithContext(ctx). Model(&entity.Wallet{}). Select("id, crypto, balance, type"). Where("type IN ?", kind). Clauses(clause.Locking{Strength: "UPDATE"}). Find(&wallets).Error if err != nil { return []entity.Wallet{}, repository.WrapNotFoundError(err) } for _, w := range wallets { repo.localWalletBalance[w.Type] = w } return wallets, nil } func (repo *userWallet) XLock(ctx context.Context, id int64) (entity.Wallet, error) { var result entity.Wallet err := repo.db.WithContext(ctx). Model(&result). Select("id"). Where("id = ?", id). Clauses(clause.Locking{Strength: "UPDATE"}). Take(&result).Error if err != nil { return entity.Wallet{}, repository.WrapNotFoundError(err) } repo.localWalletBalance[result.Type] = result return result, nil } func (repo *userWallet) XLocks(ctx context.Context, ids []int64) ([]entity.Wallet, error) { var wallets []entity.Wallet err := repo.db.WithContext(ctx). Model(&entity.Wallet{}). Select("id, crypto, balance, type"). Where("id IN ?", ids). Clauses(clause.Locking{Strength: "UPDATE"}). Find(&wallets).Error if err != nil { return []entity.Wallet{}, repository.WrapNotFoundError(err) } for _, w := range wallets { repo.localWalletBalance[w.Type] = w } return wallets, nil } func (repo *userWallet) GetAvailableBalanceXLock(ctx context.Context, _ wallet.BusinessName) (entity.Wallet, error) { //switch b { //case domain.ContractBusinessTypeBusinessName, domain.SystemTransferBusinessTypeBusinessName: // return repo.BalanceXLock(ctx, domain.WalletContractAvailableType) //case domain.ContractSimulationBusinessTypeBusinessName: // return repo.BalanceXLock(ctx, domain.WalletSimulationAvailableType) //} return repo.BalanceXLock(ctx, wallet.TypeAvailable) } func (repo *userWallet) GetFreezeBalanceXLock(ctx context.Context, _ wallet.BusinessName) (entity.Wallet, error) { //switch b { //case domain.ContractBusinessTypeBusinessName, domain.SystemTransferBusinessTypeBusinessName, domain.SystemTransferCommissionBusinessTypeBusinessName: // return w.Balance(ctx, domain.WalletContractFreezeType) //case domain.ContractSimulationBusinessTypeBusinessName: // return w.Balance(ctx, domain.WalletSimulationFreezeType) //} return repo.Balance(ctx, wallet.TypeFreeze) } func (repo *userWallet) GetUnconfirmedBalanceXLock(ctx context.Context) (entity.Wallet, error) { return repo.BalanceXLock(ctx, wallet.TypeUnconfirmed) } func (repo *userWallet) GetOrderBalanceXLock(ctx context.Context, id int64) (entity.Transaction, error) { var result entity.Transaction err := repo.db.WithContext(ctx). Model(&result). Select("id, transaction_id, order_id, uid, crypto, balance, type, business_type"). Where("id = ?", id). Clauses(clause.Locking{Strength: "UPDATE"}). Take(&result).Error if err != nil { return entity.Transaction{}, repository.WrapNotFoundError(err) } repo.localOrderBalance[result.ID] = result.Balance return result, nil } func (repo *userWallet) AddBalance(kind wallet.Types, amount decimal.Decimal) error { return repo.AddBalanceSetOrderID("", kind, amount) } func (repo *userWallet) AddBalanceSetOrderID(orderID string, kind wallet.Types, amount decimal.Decimal) error { w, ok := repo.localWalletBalance[kind] if !ok { return repository.ErrRecordNotFound } w.Balance = w.Balance.Add(amount) if w.Balance.LessThan(decimal.Zero) { return repository.ErrBalanceInsufficient } repo.transactions = append(repo.transactions, entity.WalletTransaction{ OrderID: orderID, UID: repo.uid, WalletType: kind, Asset: repo.asset, Amount: amount, Balance: w.Balance, }) repo.localWalletBalance[kind] = w return nil } func (repo *userWallet) AddAvailable(_ wallet.BusinessName, amount decimal.Decimal) error { //switch b { //case domain.ContractBusinessTypeBusinessName, domain.SystemTransferBusinessTypeBusinessName: // return w.AddBalance(domain.WalletContractAvailableType, amount) //case domain.ContractSimulationBusinessTypeBusinessName: // return w.AddBalance(domain.WalletSimulationAvailableType, amount) //case domain.DistributionBusinessTypeBusinessName: // return w.AddBalance(domain.WalletAvailableType, amount) //} return repo.AddBalance(wallet.TypeAvailable, amount) } func (repo *userWallet) AddFreeze(_ wallet.BusinessName, amount decimal.Decimal) error { //switch b { //case domain.ContractBusinessTypeBusinessName, domain.SystemTransferBusinessTypeBusinessName: // return w.AddBalance(domain.WalletContractFreezeType, amount) //case domain.ContractSimulationBusinessTypeBusinessName: // return w.AddBalance(domain.WalletSimulationFreezeType, amount) //} return repo.AddBalance(wallet.TypeFreeze, amount) } func (repo *userWallet) AddUnconfirmed(amount decimal.Decimal) error { return repo.AddBalance(wallet.TypeUnconfirmed, amount) } func (repo *userWallet) AddOrder(id int64, amount decimal.Decimal) error { return repo.addOrderBalance(id, amount) } func (repo *userWallet) SubAvailable(_ wallet.BusinessName, amount decimal.Decimal) error { //switch b { //case domain.ContractBusinessTypeBusinessName, domain.SystemTransferBusinessTypeBusinessName: // return w.AddBalance(domain.WalletContractAvailableType, decimal.Zero.Sub(amount)) //case domain.ContractSimulationBusinessTypeBusinessName: // return w.AddBalance(domain.WalletSimulationAvailableType, decimal.Zero.Sub(amount)) //} return repo.AddBalance(wallet.TypeAvailable, decimal.Zero.Sub(amount)) } func (repo *userWallet) SubFreeze(_ wallet.BusinessName, amount decimal.Decimal) error { //switch b { //case domain.ContractBusinessTypeBusinessName, domain.SystemTransferBusinessTypeBusinessName, domain.SystemTransferCommissionBusinessTypeBusinessName: // return w.AddBalance(domain.WalletContractFreezeType, decimal.Zero.Sub(amount)) //case domain.ContractSimulationBusinessTypeBusinessName: // return w.AddBalance(domain.WalletSimulationFreezeType, decimal.Zero.Sub(amount)) //} return repo.AddBalance(wallet.TypeFreeze, decimal.Zero.Sub(amount)) } func (repo *userWallet) SubUnconfirmed(amount decimal.Decimal) error { return repo.AddBalance(wallet.TypeUnconfirmed, decimal.Zero.Sub(amount)) } func (repo *userWallet) GetAvailableBalance(ctx context.Context, _ wallet.BusinessName) (entity.Wallet, error) { //switch b { //case domain.ContractBusinessTypeBusinessName, domain.SystemTransferBusinessTypeBusinessName: // return w.Balance(ctx, domain.WalletContractAvailableType) //case domain.ContractSimulationBusinessTypeBusinessName: // return w.Balance(ctx, domain.WalletSimulationAvailableType) //case domain.DistributionBusinessTypeBusinessName: // return w.Balance(ctx, domain.WalletAvailableType) //} return repo.Balance(ctx, wallet.TypeAvailable) } func (repo *userWallet) GetFreezeBalance(ctx context.Context, _ wallet.BusinessName) (entity.Wallet, error) { //switch b { //case domain.ContractBusinessTypeBusinessName, domain.SystemTransferBusinessTypeBusinessName, domain.SystemTransferCommissionBusinessTypeBusinessName: // return w.Balance(ctx, domain.WalletContractFreezeType) //case domain.ContractSimulationBusinessTypeBusinessName: // return w.Balance(ctx, domain.WalletSimulationFreezeType) //} return repo.Balance(ctx, wallet.TypeFreeze) } func (repo *userWallet) GetUnconfirmedBalance(ctx context.Context) (entity.Wallet, error) { return repo.Balance(ctx, wallet.TypeUnconfirmed) } func (repo *userWallet) GetOrderBalance(ctx context.Context, orderID string) (entity.Transaction, error) { var result entity.Transaction err := repo.db.WithContext(ctx). Model(&result). Select("id, transaction_id, order_id, uid, crypto, balance, type, business_type"). Where("order_id = ?", orderID). Take(&result).Error if err != nil { return entity.Transaction{}, repository.WrapNotFoundError(err) } // 要確實set這兩個Balance,不然在做計算會有問題 repo.localOrderBalance[result.ID] = result.Balance return result, nil } func (repo *userWallet) CheckWallet(ctx context.Context) (bool, error) { var exists bool err := repo.walletUserWhere(repo.uid, repo.asset).WithContext(ctx). Model(&entity.Wallet{}). Select("1"). Where("type = ?", wallet.TypeAvailable). Limit(1). Scan(&exists).Error if err != nil { return false, err } return exists, nil } func (repo *userWallet) Execute(ctx context.Context) error { // 處理wallet table for _, walletType := range wallet.AllTypes { w, ok := repo.localWalletBalance[walletType] if !ok { continue } rc := &sql.TxOptions{ Isolation: sql.LevelReadCommitted, ReadOnly: false, } err := repo.db.Transaction(func(tx *gorm.DB) error { return tx.WithContext(ctx). Model(&entity.Wallet{}). Where("id = ?", w.ID). UpdateColumns(map[string]interface{}{ "balance": w.Balance, "update_time": time.Now().UTC().Unix(), }).Error }, rc) if err != nil { err = fmt.Errorf("update uid: %s crypto: %s type: %d wallet error: %w", repo.uid, repo.asset, walletType, err) return err } } return nil } func (repo *userWallet) ExecuteOrder(ctx context.Context) error { for id, localBalance := range repo.localOrderBalance { rc := &sql.TxOptions{ Isolation: sql.LevelReadCommitted, ReadOnly: false, } err := repo.db.Transaction(func(tx *gorm.DB) error { return tx.WithContext(ctx). Where("id = ?", id). Model(&entity.Transaction{}). Update("balance", localBalance).Error }, rc) if err != nil { return err } } return nil } func (repo *userWallet) Transactions(txID int64, orderID, brand string, businessType wallet.BusinessName) []entity.WalletTransaction { for i := range repo.transactions { repo.transactions[i].TransactionID = txID repo.transactions[i].OrderID = orderID repo.transactions[i].Brand = brand repo.transactions[i].BusinessType = businessType.ToINT8() } return repo.transactions } func (repo *userWallet) GetTransactions() []entity.WalletTransaction { return repo.transactions } // ============================================================================ func (repo *userWallet) walletUserWhere(uid, asset string) *gorm.DB { return repo.db.Where("uid = ?", uid). Where("asset = ?", asset) } // addOrderBalance 新增訂單餘額 // 使用前local Balance必須有資料,所以必須執行過GetOrderBalanceXLock才會有資料 func (repo *userWallet) addOrderBalance(id int64, amount decimal.Decimal) error { balance, ok := repo.localOrderBalance[id] if !ok { return repository.ErrRecordNotFound } balance = balance.Add(amount) if balance.LessThan(decimal.Zero) { return repository.ErrBalanceInsufficient } repo.localOrderBalance[id] = balance return nil }