From 3c1c611f961815a87d39897104905baa4ed4c8f7 Mon Sep 17 00:00:00 2001 From: "daniel.w" Date: Wed, 30 Oct 2024 19:13:56 +0800 Subject: [PATCH] add wallet repo --- etc/trade.yaml | 4 +- go.mod | 5 ++ internal/domain/errors.go | 2 + internal/domain/repository/user_wallet.go | 19 ++++- internal/domain/repository/wallet.go | 8 +- internal/domain/repository/wallet_option.go | 67 +++++++++++++++++ internal/model/mysql_query_builder.go | 37 ++++++++++ internal/model/wallet_journal_model.go | 15 +++- internal/model/wallet_model.go | 81 ++++++++++++--------- internal/repository/user_wallet.go | 70 +++++++++++++++--- internal/repository/wallet.go | 68 +++++++++++++++-- internal/repository/wallet_test.go | 79 ++++++++++++++++++++ 12 files changed, 395 insertions(+), 60 deletions(-) create mode 100644 internal/domain/repository/wallet_option.go create mode 100644 internal/model/mysql_query_builder.go create mode 100644 internal/repository/wallet_test.go diff --git a/etc/trade.yaml b/etc/trade.yaml index ccba184..0391826 100644 --- a/etc/trade.yaml +++ b/etc/trade.yaml @@ -31,8 +31,8 @@ Mongo: DB: Host: 127.0.0.1 Port: 3306 - User: username - Password: password + User: root + Password: yytt name: permission MaxIdleConns: 10 MaxOpenConns: 200 diff --git a/go.mod b/go.mod index 7bf2ba6..5a9ac61 100644 --- a/go.mod +++ b/go.mod @@ -13,6 +13,8 @@ require ( go.uber.org/mock v0.5.0 google.golang.org/grpc v1.67.1 google.golang.org/protobuf v1.35.1 + gorm.io/driver/sqlite v1.5.6 + gorm.io/gorm v1.25.12 ) require ( @@ -44,6 +46,8 @@ require ( github.com/google/gofuzz v1.2.0 // indirect github.com/google/uuid v1.6.0 // indirect github.com/grpc-ecosystem/grpc-gateway/v2 v2.20.0 // indirect + github.com/jinzhu/inflection v1.0.0 // indirect + github.com/jinzhu/now v1.1.5 // indirect github.com/josharian/intern v1.0.0 // indirect github.com/json-iterator/go v1.1.12 // indirect github.com/klauspost/compress v1.17.9 // indirect @@ -51,6 +55,7 @@ require ( github.com/mailru/easyjson v0.7.7 // indirect github.com/mattn/go-colorable v0.1.13 // indirect github.com/mattn/go-isatty v0.0.20 // indirect + github.com/mattn/go-sqlite3 v1.14.22 // indirect github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.2 // indirect github.com/montanaflynn/stats v0.7.1 // indirect diff --git a/internal/domain/errors.go b/internal/domain/errors.go index 5d7ba75..da7577b 100644 --- a/internal/domain/errors.go +++ b/internal/domain/errors.go @@ -27,6 +27,8 @@ const ( const ( _ ErrorCode = 20 + iota CreateWalletErrorCode + WalletTxErrorCode + WalletBalanceNotFound ) const ( diff --git a/internal/domain/repository/user_wallet.go b/internal/domain/repository/user_wallet.go index 3032ee1..d1b818a 100644 --- a/internal/domain/repository/user_wallet.go +++ b/internal/domain/repository/user_wallet.go @@ -1,4 +1,21 @@ package repository +import ( + "app-cloudep-trade-service/internal/domain" + "app-cloudep-trade-service/internal/model" + "context" + + "github.com/shopspring/decimal" +) + +// 如果有需要Select for update 的話,請在 Option 當中加上鎖並且傳入 sqlx.conn) + // UserWalletOperator 針對使用者的錢包基本操作接口 -type UserWalletOperator interface{} +type UserWalletOperator interface { + // Balances 取得多種類別餘額 + Balances(ctx context.Context, kind []domain.WalletType, opts ...WalletOperatorOption) ([]model.Wallet, error) + // LocalBalance 取得本地錢包的數額 + LocalBalance(kind domain.WalletType) decimal.Decimal + // GetBalancesByID 取得錢包的數額 ByID + GetBalancesByID(ctx context.Context, ids []int64, opts ...WalletOperatorOption) ([]model.Wallet, error) +} diff --git a/internal/domain/repository/wallet.go b/internal/domain/repository/wallet.go index 3153d53..730fedc 100644 --- a/internal/domain/repository/wallet.go +++ b/internal/domain/repository/wallet.go @@ -10,14 +10,14 @@ import ( // WalletRepository 錢包基礎操作(可能有平台,使用者等多元的錢包) type WalletRepository interface { - // Create 建立錢包組合(某人的可用,凍結,限制三種) + // Create 建立錢包(某 id 的可用,凍結,限制三種) Create(ctx context.Context, uid, currency, brand string) ([]*model.Wallet, error) - // Balances 取得某個人的餘額 + // Balances 取得某個錢包的餘額 Balances(ctx context.Context, req BalanceReq) ([]model.Wallet, error) - // GetTxDatabaseConn 取得 sql 要做 tx 的連線 - GetTxDatabaseConn() sqlx.SqlConn // GetUserWalletOperator 取得使用者錢包操作看使否需要使用 transaction GetUserWalletOperator(uid, currency string, opts ...Option) UserWalletOperator + // Transaction 把 tx 暴露出來 + Transaction(ctx context.Context, fn func(tx sqlx.Session) error) error } // BalanceReq 取得全部的,因為一個人錢包種類的不會太多,故全撈 diff --git a/internal/domain/repository/wallet_option.go b/internal/domain/repository/wallet_option.go new file mode 100644 index 0000000..39ac50c --- /dev/null +++ b/internal/domain/repository/wallet_option.go @@ -0,0 +1,67 @@ +package repository + +import ( + "app-cloudep-trade-service/internal/domain" + + "github.com/zeromicro/go-zero/core/stores/sqlx" + + "github.com/shopspring/decimal" +) + +// WalletOperatorOption 選項模式 +type WalletOperatorOption func(*WalletOptions) + +type WalletOptions struct { + WithLock bool + OrderID string + Amount decimal.Decimal + Kind domain.WalletType + Business domain.BusinessName + Tx *sqlx.SqlConn +} + +// ApplyOptions 將多個 WalletOperatorOption 應用到一個 walletOptions 中 +func ApplyOptions(opts ...WalletOperatorOption) WalletOptions { + options := WalletOptions{} + for _, opt := range opts { + opt(&options) + } + + return options +} + +func WithLock() WalletOperatorOption { + return func(opts *WalletOptions) { + opts.WithLock = true + } +} + +func WithOrderID(orderID string) WalletOperatorOption { + return func(opts *WalletOptions) { + opts.OrderID = orderID + } +} + +func WithAmount(amount decimal.Decimal) WalletOperatorOption { + return func(opts *WalletOptions) { + opts.Amount = amount + } +} + +func WithKind(kind domain.WalletType) WalletOperatorOption { + return func(opts *WalletOptions) { + opts.Kind = kind + } +} + +func WithBusiness(business domain.BusinessName) WalletOperatorOption { + return func(opts *WalletOptions) { + opts.Business = business + } +} + +func WithSession(session *sqlx.SqlConn) WalletOperatorOption { + return func(opts *WalletOptions) { + opts.Tx = session + } +} diff --git a/internal/model/mysql_query_builder.go b/internal/model/mysql_query_builder.go new file mode 100644 index 0000000..133a875 --- /dev/null +++ b/internal/model/mysql_query_builder.go @@ -0,0 +1,37 @@ +package model + +import ( + "fmt" + "strings" +) + +// queryBindINBuilder 用來動態生成 WHERE 條件和參數 +func queryBindINBuilder(baseQuery string, conditions map[string][]any) (string, []any) { + args := make([]any, 0) + whereClause := make([]string, 0) + + for column, values := range conditions { + if len(values) > 0 { + placeholders := strings.Repeat("?,", len(values)) + placeholders = placeholders[:len(placeholders)-1] + whereClause = append(whereClause, fmt.Sprintf("%s IN (%s)", column, placeholders)) + args = append(args, values...) + } + } + + // 將 WHERE 條件添加到查詢中 + if len(whereClause) > 0 { + baseQuery += " WHERE " + strings.Join(whereClause, " AND ") + } + return baseQuery, args +} + +func convertSliceToInterface[T any](slice []T) []any { + result := make([]any, 0, len(slice)) + for _, v := range slice { + + result = append(result, v) + } + + return result +} diff --git a/internal/model/wallet_journal_model.go b/internal/model/wallet_journal_model.go index 7064fa0..c019215 100755 --- a/internal/model/wallet_journal_model.go +++ b/internal/model/wallet_journal_model.go @@ -1,6 +1,12 @@ package model -import "github.com/zeromicro/go-zero/core/stores/sqlx" +import ( + "context" + "database/sql" + "fmt" + + "github.com/zeromicro/go-zero/core/stores/sqlx" +) var _ WalletJournalModel = (*customWalletJournalModel)(nil) @@ -9,6 +15,7 @@ type ( // and implement the added methods in customWalletJournalModel. WalletJournalModel interface { walletJournalModel + InsertWithSession(ctx context.Context, tx sqlx.Session, data *WalletJournal) (sql.Result, error) } customWalletJournalModel struct { @@ -22,3 +29,9 @@ func NewWalletJournalModel(conn sqlx.SqlConn) WalletJournalModel { defaultWalletJournalModel: newWalletJournalModel(conn), } } + +func (m *customWalletJournalModel) InsertWithSession(ctx context.Context, tx sqlx.Session, data *WalletJournal) (sql.Result, error) { + query := fmt.Sprintf("insert into %s (%s) values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", m.table, walletJournalRowsExpectAutoSet) + ret, err := tx.ExecCtx(ctx, query, data.TransactionId, data.OrderId, data.Brand, data.Uid, data.WalletType, data.Currency, data.TransactionAmount, data.PostTransactionBalance, data.BusinessType, data.Status, data.DueTime, data.CreatedAt) + return ret, err +} diff --git a/internal/model/wallet_model.go b/internal/model/wallet_model.go index 16d462a..441c8d1 100755 --- a/internal/model/wallet_model.go +++ b/internal/model/wallet_model.go @@ -2,7 +2,6 @@ package model import ( "app-cloudep-trade-service/internal/domain" - "context" "database/sql" "errors" @@ -22,7 +21,10 @@ type ( WalletModel interface { walletModel InsertMany(ctx context.Context, wallets []*Wallet) (sql.Result, error) - Balances(ctx context.Context, req BalanceReq) ([]Wallet, error) + // Balances 給他 select for update 的鎖先加上去 + Balances(ctx context.Context, req BalanceReq, withLock bool, tx sqlx.Session) ([]Wallet, error) + // BalancesByIDs 給他 select for update 的鎖先加上去 + BalancesByIDs(ctx context.Context, ids []int64, withLock bool, tx sqlx.Session) ([]Wallet, error) } customWalletModel struct { @@ -64,53 +66,60 @@ func (m *customWalletModel) InsertMany(ctx context.Context, wallets []*Wallet) ( return m.conn.ExecCtx(ctx, query, valueArgs...) } -func (m *customWalletModel) Balances(ctx context.Context, req BalanceReq) ([]Wallet, error) { - var data []Wallet - query := fmt.Sprintf("select 'id', 'uid', 'currency', 'balance', 'type', 'update_time' from %s", m.table) - var conditions []string - var args []any +func (m *customWalletModel) Balances(ctx context.Context, req BalanceReq, isLock bool, tx sqlx.Session) ([]Wallet, error) { + baseQuery := fmt.Sprintf("SELECT `id`, `currency`, `balance`, `wallet_type` FROM %s", m.table) - // 根據條件動態拼接 WHERE 子句 - if len(req.UID) > 0 { - placeholders := strings.Repeat("?,", len(req.UID)) - placeholders = placeholders[:len(placeholders)-1] // 移除最後一個逗號 - conditions = append(conditions, fmt.Sprintf("uid IN (%s)", placeholders)) - args = append(args, convertSliceToInterface(req.UID)...) - } - if len(req.Currency) > 0 { - placeholders := strings.Repeat("?,", len(req.Currency)) - placeholders = placeholders[:len(placeholders)-1] - conditions = append(conditions, fmt.Sprintf("currency IN (%s)", placeholders)) - args = append(args, convertSliceToInterface(req.Currency)...) - } - if len(req.Kind) > 0 { - placeholders := strings.Repeat("?,", len(req.Kind)) - placeholders = placeholders[:len(placeholders)-1] - conditions = append(conditions, fmt.Sprintf("type IN (%s)", placeholders)) - args = append(args, convertSliceToInterface(req.Kind)...) + // 構建條件字典 + conditions := map[string][]any{ + "`uid`": convertSliceToInterface(req.UID), + "`currency`": convertSliceToInterface(req.Currency), + "`wallet_type`": convertSliceToInterface(req.Kind), } + // 使用 queryBuilder 構建完整查詢 + query, args := queryBindINBuilder(baseQuery, conditions) - // 如果有條件,則拼接 WHERE 子句 - if len(conditions) > 0 { - query += " WHERE " + strings.Join(conditions, " AND ") + if isLock { + // 加上排他鎖 + query += " FOR UPDATE" } // 執行查詢 - err := m.conn.QueryRowCtx(ctx, &data, query, args...) + var wallets []Wallet + err := tx.QueryRowsPartialCtx(ctx, &wallets, query, args...) switch { case err == nil: - return data, nil - case errors.Is(err, sqlc.ErrNotFound): + return wallets, nil + case errors.As(sqlc.ErrNotFound, &err): return nil, ErrNotFound default: return nil, err } } -func convertSliceToInterface[T any](slice []T) []any { - interfaces := make([]any, 0, len(slice)) - for i, v := range slice { - interfaces[i] = v +func (m *customWalletModel) BalancesByIDs(ctx context.Context, ids []int64, withLock bool, tx sqlx.Session) ([]Wallet, error) { + baseQuery := fmt.Sprintf("SELECT `id`, `currency`, `balance`, `wallet_type` FROM %s", m.table) + + // 構建條件字典 + conditions := map[string][]any{ + "`id`": convertSliceToInterface(ids), + } + // 使用 queryBuilder 構建完整查詢 + query, args := queryBindINBuilder(baseQuery, conditions) + + if withLock { + // 加上排他鎖 + query += " FOR UPDATE" + } + + // 執行查詢 + var wallets []Wallet + err := tx.QueryRowsPartialCtx(ctx, &wallets, query, args...) + switch { + case err == nil: + return wallets, nil + case errors.As(sqlc.ErrNotFound, &err): + return nil, ErrNotFound + default: + return nil, err } - return interfaces } diff --git a/internal/repository/user_wallet.go b/internal/repository/user_wallet.go index 9fc118a..bf3f27a 100644 --- a/internal/repository/user_wallet.go +++ b/internal/repository/user_wallet.go @@ -4,10 +4,10 @@ import ( "app-cloudep-trade-service/internal/domain" "app-cloudep-trade-service/internal/domain/repository" "app-cloudep-trade-service/internal/model" - - "github.com/zeromicro/go-zero/core/stores/sqlx" + "context" "github.com/shopspring/decimal" + "github.com/zeromicro/go-zero/core/stores/sqlx" ) // 用戶某個幣種餘額 @@ -15,8 +15,8 @@ type userLocalWallet struct { wm model.WalletModel txConn sqlx.SqlConn - uid string - crypto string + uid string + currency string // local wallet 相關計算的餘額存在這裡 walletBalance map[domain.WalletType]model.Wallet @@ -28,12 +28,64 @@ type userLocalWallet struct { transactions []model.WalletJournal } -func NewUserWalletOperator(uid, crypto string, wm model.WalletModel, txConn sqlx.SqlConn) repository.UserWalletOperator { +func (use *userLocalWallet) GetBalancesByID(ctx context.Context, ids []int64, opts ...repository.WalletOperatorOption) ([]model.Wallet, error) { + o := repository.ApplyOptions(opts...) + tx := use.txConn + if o.Tx != nil { + tx = *o.Tx + } + + wallets, err := use.wm.BalancesByIDs(ctx, ids, o.WithLock, tx) + if err != nil { + return nil, err + } + + for _, wallet := range wallets { + use.walletBalance[wallet.WalletType] = wallet + } + + return wallets, nil +} + +// LocalBalance 內存餘額 +func (use *userLocalWallet) LocalBalance(kind domain.WalletType) decimal.Decimal { + wallet, ok := use.walletBalance[kind] + if !ok { + return decimal.Zero + } + + return wallet.Balance +} + +func (use *userLocalWallet) Balances(ctx context.Context, kind []domain.WalletType, opts ...repository.WalletOperatorOption) ([]model.Wallet, error) { + o := repository.ApplyOptions(opts...) + tx := use.txConn + if o.Tx != nil { + tx = *o.Tx + } + + wallets, err := use.wm.Balances(ctx, model.BalanceReq{ + UID: []string{use.uid}, + Currency: []string{use.currency}, + Kind: kind, + }, o.WithLock, tx) + if err != nil { + return nil, err + } + + for _, wallet := range wallets { + use.walletBalance[wallet.WalletType] = wallet + } + + return wallets, nil +} + +func NewUserWalletOperator(uid, currency string, wm model.WalletModel, txConn sqlx.SqlConn) repository.UserWalletOperator { return &userLocalWallet{ - wm: wm, - txConn: txConn, - uid: uid, - crypto: crypto, + wm: wm, + txConn: txConn, + uid: uid, + currency: currency, walletBalance: make(map[domain.WalletType]model.Wallet, len(domain.AllWalletType)), localOrderBalance: make(map[int64]decimal.Decimal, len(domain.AllWalletType)), diff --git a/internal/repository/wallet.go b/internal/repository/wallet.go index 7f06ece..9fb1b9e 100644 --- a/internal/repository/wallet.go +++ b/internal/repository/wallet.go @@ -5,9 +5,13 @@ import ( "app-cloudep-trade-service/internal/domain/repository" "app-cloudep-trade-service/internal/model" "context" + "database/sql" + "errors" "fmt" "time" + "github.com/zeromicro/go-zero/core/stores/sqlc" + "github.com/shopspring/decimal" "github.com/zeromicro/go-zero/core/logx" "github.com/zeromicro/go-zero/core/stores/sqlx" @@ -22,6 +26,54 @@ type WalletRepository struct { WalletRepositoryParam } +func (repo *WalletRepository) Transaction(ctx context.Context, fn func(tx sqlx.Session) error) error { + // 取得原生的 *sql.DB + db, err := repo.TxConn.RawDB() + if err != nil { + return err + } + + // 定義事務選項 + txOptions := &sql.TxOptions{ + Isolation: sql.LevelReadCommitted, + ReadOnly: false, + } + + // 開啟事務 + tx, err := db.BeginTx(ctx, txOptions) + if err != nil { + return err + } + + // 使用 sqlx.Session 包裹事務 + session := sqlx.NewSessionFromTx(tx) + + // 執行傳入的操作 + if err := fn(session); err != nil { + // 若有錯誤則回滾 + err := tx.Rollback() + if err != nil { + // 錯誤代碼 06-021-22 + e := domain.CommentErrorL( + domain.WalletTxErrorCode, + logx.WithContext(ctx), + []logx.LogField{ + {Key: "func", Value: "WalletModel.Transaction.Rollback"}, + {Key: "err", Value: err}, + }, + "failed to find balance into mongo:").Wrap(err) + + return e + } + + // 裡面那一層會紀錄 + return err + } + + // 提交事務 + return tx.Commit() +} + // GetUserWalletOperator 取得本地操作使用者錢包的操作運算元 func (repo *WalletRepository) GetUserWalletOperator(uid, currency string, opts ...repository.Option) repository.UserWalletOperator { db := repo.TxConn @@ -75,9 +127,15 @@ func (repo *WalletRepository) Balances(ctx context.Context, req repository.Balan data, err := repo.WalletModel.Balances(ctx, model.BalanceReq{ UID: req.UID, Currency: req.Currency, - }) + Kind: req.Kind, + }, false, repo.TxConn) if err != nil { + if errors.Is(sqlc.ErrNotFound, err) { + // 錯誤代碼 06-031-23 + return nil, domain.NotFoundError(domain.WalletBalanceNotFound, "balance not found") + } + // 錯誤代碼 06-021-20 e := domain.CommentErrorL( domain.CreateWalletErrorCode, @@ -87,18 +145,14 @@ func (repo *WalletRepository) Balances(ctx context.Context, req repository.Balan {Key: "func", Value: "WalletModel.Balances"}, {Key: "err", Value: err}, }, - "failed to find balance into mongo:").Wrap(err) + "failed to get balance") - return []model.Wallet{}, e + return nil, e } return data, nil } -func (repo *WalletRepository) GetTxDatabaseConn() sqlx.SqlConn { - return repo.TxConn -} - func NewWalletRepository(param WalletRepositoryParam) repository.WalletRepository { return &WalletRepository{ WalletRepositoryParam: param, diff --git a/internal/repository/wallet_test.go b/internal/repository/wallet_test.go new file mode 100644 index 0000000..e94cb2a --- /dev/null +++ b/internal/repository/wallet_test.go @@ -0,0 +1,79 @@ +package repository + +import ( + "app-cloudep-trade-service/internal/domain" + "app-cloudep-trade-service/internal/model" + "context" + "fmt" + "testing" + + "github.com/zeromicro/go-zero/core/stores/sqlx" +) + +func TestByMe(t *testing.T) { + sqlConn := sqlx.NewSqlConn("mysql", + "root:yytt@tcp(127.0.0.1:3306)/digimon_wallet?parseTime=true&interpolateParams=false") + + WalletRepo := NewWalletRepository(WalletRepositoryParam{ + model.NewWalletModel(sqlConn), + sqlConn, + }) + + ctx := context.Background() + + wo := WalletRepo.GetUserWalletOperator("OOOOOOOK", "USD") + balances, err := wo.Balances(ctx, domain.AllWalletType) + if err != nil { + return + } + + fmt.Println(balances) + + // create, err := WalletRepo.Create(ctx, "OOOOOOOK", "USD", "Digimon") + // if err != nil { + // return + // } + + // fmt.Println(create) + + // balances, err := WalletRepo.Balances(ctx, repository.BalanceReq{ + // UID: []string{"OOOOOOOK"}, + // Currency: []string{"USD"}, + // // Kind: make([]domain.WalletType, 0), + // }) + // if err != nil { + // fmt.Println(err) + // return + // } + + // err := WalletRepo.Transaction(ctx, func(tx sqlx.Session) error { + // wm := model.NewWalletJournalModel(sqlConn) + // _, err := wm.InsertWithSession(ctx, tx, &model.WalletJournal{ + // Id: 1, + // TransactionId: 10001, + // OrderId: "ORD123456789", + // Brand: "BrandX", + // Uid: "user123", + // WalletType: 1, // 1=可用 + // Currency: "USD", + // TransactionAmount: 500.00, + // PostTransactionBalance: 1500.00, + // BusinessType: 2, // 假設 2 表示特定業務類型 + // Status: 1, // 假設 1 表示成功狀態 + // DueTime: 1698289200000, // T+N 執行時間 + // CreatedAt: 1698192800000, // 創建時間 (Unix 時間戳,毫秒) + // }) + // + // if err != nil { + // return err + // } + // + // return nil + // }) + // + // if err != nil { + // return + // } + + // fmt.Println(balances) +}