add wallet repo

This commit is contained in:
daniel.w 2024-10-30 19:13:56 +08:00
parent fc748fb66f
commit 3c1c611f96
12 changed files with 395 additions and 60 deletions

View File

@ -31,8 +31,8 @@ Mongo:
DB:
Host: 127.0.0.1
Port: 3306
User: username
Password: password
User: root
Password: yytt
name: permission
MaxIdleConns: 10
MaxOpenConns: 200

5
go.mod
View File

@ -13,6 +13,8 @@ require (
go.uber.org/mock v0.5.0
google.golang.org/grpc v1.67.1
google.golang.org/protobuf v1.35.1
gorm.io/driver/sqlite v1.5.6
gorm.io/gorm v1.25.12
)
require (
@ -44,6 +46,8 @@ require (
github.com/google/gofuzz v1.2.0 // indirect
github.com/google/uuid v1.6.0 // indirect
github.com/grpc-ecosystem/grpc-gateway/v2 v2.20.0 // indirect
github.com/jinzhu/inflection v1.0.0 // indirect
github.com/jinzhu/now v1.1.5 // indirect
github.com/josharian/intern v1.0.0 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/klauspost/compress v1.17.9 // indirect
@ -51,6 +55,7 @@ require (
github.com/mailru/easyjson v0.7.7 // indirect
github.com/mattn/go-colorable v0.1.13 // indirect
github.com/mattn/go-isatty v0.0.20 // indirect
github.com/mattn/go-sqlite3 v1.14.22 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/montanaflynn/stats v0.7.1 // indirect

View File

@ -27,6 +27,8 @@ const (
const (
_ ErrorCode = 20 + iota
CreateWalletErrorCode
WalletTxErrorCode
WalletBalanceNotFound
)
const (

View File

@ -1,4 +1,21 @@
package repository
import (
"app-cloudep-trade-service/internal/domain"
"app-cloudep-trade-service/internal/model"
"context"
"github.com/shopspring/decimal"
)
// 如果有需要Select for update 的話,請在 Option 當中加上鎖並且傳入 sqlx.conn)
// UserWalletOperator 針對使用者的錢包基本操作接口
type UserWalletOperator interface{}
type UserWalletOperator interface {
// Balances 取得多種類別餘額
Balances(ctx context.Context, kind []domain.WalletType, opts ...WalletOperatorOption) ([]model.Wallet, error)
// LocalBalance 取得本地錢包的數額
LocalBalance(kind domain.WalletType) decimal.Decimal
// GetBalancesByID 取得錢包的數額 ByID
GetBalancesByID(ctx context.Context, ids []int64, opts ...WalletOperatorOption) ([]model.Wallet, error)
}

View File

@ -10,14 +10,14 @@ import (
// WalletRepository 錢包基礎操作(可能有平台,使用者等多元的錢包)
type WalletRepository interface {
// Create 建立錢包組合(某人的可用,凍結,限制三種)
// Create 建立錢包(某 id 的可用,凍結,限制三種)
Create(ctx context.Context, uid, currency, brand string) ([]*model.Wallet, error)
// Balances 取得某個的餘額
// Balances 取得某個錢包的餘額
Balances(ctx context.Context, req BalanceReq) ([]model.Wallet, error)
// GetTxDatabaseConn 取得 sql 要做 tx 的連線
GetTxDatabaseConn() sqlx.SqlConn
// GetUserWalletOperator 取得使用者錢包操作看使否需要使用 transaction
GetUserWalletOperator(uid, currency string, opts ...Option) UserWalletOperator
// Transaction 把 tx 暴露出來
Transaction(ctx context.Context, fn func(tx sqlx.Session) error) error
}
// BalanceReq 取得全部的,因為一個人錢包種類的不會太多,故全撈

View File

@ -0,0 +1,67 @@
package repository
import (
"app-cloudep-trade-service/internal/domain"
"github.com/zeromicro/go-zero/core/stores/sqlx"
"github.com/shopspring/decimal"
)
// WalletOperatorOption 選項模式
type WalletOperatorOption func(*WalletOptions)
type WalletOptions struct {
WithLock bool
OrderID string
Amount decimal.Decimal
Kind domain.WalletType
Business domain.BusinessName
Tx *sqlx.SqlConn
}
// ApplyOptions 將多個 WalletOperatorOption 應用到一個 walletOptions 中
func ApplyOptions(opts ...WalletOperatorOption) WalletOptions {
options := WalletOptions{}
for _, opt := range opts {
opt(&options)
}
return options
}
func WithLock() WalletOperatorOption {
return func(opts *WalletOptions) {
opts.WithLock = true
}
}
func WithOrderID(orderID string) WalletOperatorOption {
return func(opts *WalletOptions) {
opts.OrderID = orderID
}
}
func WithAmount(amount decimal.Decimal) WalletOperatorOption {
return func(opts *WalletOptions) {
opts.Amount = amount
}
}
func WithKind(kind domain.WalletType) WalletOperatorOption {
return func(opts *WalletOptions) {
opts.Kind = kind
}
}
func WithBusiness(business domain.BusinessName) WalletOperatorOption {
return func(opts *WalletOptions) {
opts.Business = business
}
}
func WithSession(session *sqlx.SqlConn) WalletOperatorOption {
return func(opts *WalletOptions) {
opts.Tx = session
}
}

View File

@ -0,0 +1,37 @@
package model
import (
"fmt"
"strings"
)
// queryBindINBuilder 用來動態生成 WHERE 條件和參數
func queryBindINBuilder(baseQuery string, conditions map[string][]any) (string, []any) {
args := make([]any, 0)
whereClause := make([]string, 0)
for column, values := range conditions {
if len(values) > 0 {
placeholders := strings.Repeat("?,", len(values))
placeholders = placeholders[:len(placeholders)-1]
whereClause = append(whereClause, fmt.Sprintf("%s IN (%s)", column, placeholders))
args = append(args, values...)
}
}
// 將 WHERE 條件添加到查詢中
if len(whereClause) > 0 {
baseQuery += " WHERE " + strings.Join(whereClause, " AND ")
}
return baseQuery, args
}
func convertSliceToInterface[T any](slice []T) []any {
result := make([]any, 0, len(slice))
for _, v := range slice {
result = append(result, v)
}
return result
}

View File

@ -1,6 +1,12 @@
package model
import "github.com/zeromicro/go-zero/core/stores/sqlx"
import (
"context"
"database/sql"
"fmt"
"github.com/zeromicro/go-zero/core/stores/sqlx"
)
var _ WalletJournalModel = (*customWalletJournalModel)(nil)
@ -9,6 +15,7 @@ type (
// and implement the added methods in customWalletJournalModel.
WalletJournalModel interface {
walletJournalModel
InsertWithSession(ctx context.Context, tx sqlx.Session, data *WalletJournal) (sql.Result, error)
}
customWalletJournalModel struct {
@ -22,3 +29,9 @@ func NewWalletJournalModel(conn sqlx.SqlConn) WalletJournalModel {
defaultWalletJournalModel: newWalletJournalModel(conn),
}
}
func (m *customWalletJournalModel) InsertWithSession(ctx context.Context, tx sqlx.Session, data *WalletJournal) (sql.Result, error) {
query := fmt.Sprintf("insert into %s (%s) values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", m.table, walletJournalRowsExpectAutoSet)
ret, err := tx.ExecCtx(ctx, query, data.TransactionId, data.OrderId, data.Brand, data.Uid, data.WalletType, data.Currency, data.TransactionAmount, data.PostTransactionBalance, data.BusinessType, data.Status, data.DueTime, data.CreatedAt)
return ret, err
}

View File

@ -2,7 +2,6 @@ package model
import (
"app-cloudep-trade-service/internal/domain"
"context"
"database/sql"
"errors"
@ -22,7 +21,10 @@ type (
WalletModel interface {
walletModel
InsertMany(ctx context.Context, wallets []*Wallet) (sql.Result, error)
Balances(ctx context.Context, req BalanceReq) ([]Wallet, error)
// Balances 給他 select for update 的鎖先加上去
Balances(ctx context.Context, req BalanceReq, withLock bool, tx sqlx.Session) ([]Wallet, error)
// BalancesByIDs 給他 select for update 的鎖先加上去
BalancesByIDs(ctx context.Context, ids []int64, withLock bool, tx sqlx.Session) ([]Wallet, error)
}
customWalletModel struct {
@ -64,53 +66,60 @@ func (m *customWalletModel) InsertMany(ctx context.Context, wallets []*Wallet) (
return m.conn.ExecCtx(ctx, query, valueArgs...)
}
func (m *customWalletModel) Balances(ctx context.Context, req BalanceReq) ([]Wallet, error) {
var data []Wallet
query := fmt.Sprintf("select 'id', 'uid', 'currency', 'balance', 'type', 'update_time' from %s", m.table)
var conditions []string
var args []any
func (m *customWalletModel) Balances(ctx context.Context, req BalanceReq, isLock bool, tx sqlx.Session) ([]Wallet, error) {
baseQuery := fmt.Sprintf("SELECT `id`, `currency`, `balance`, `wallet_type` FROM %s", m.table)
// 根據條件動態拼接 WHERE 子句
if len(req.UID) > 0 {
placeholders := strings.Repeat("?,", len(req.UID))
placeholders = placeholders[:len(placeholders)-1] // 移除最後一個逗號
conditions = append(conditions, fmt.Sprintf("uid IN (%s)", placeholders))
args = append(args, convertSliceToInterface(req.UID)...)
}
if len(req.Currency) > 0 {
placeholders := strings.Repeat("?,", len(req.Currency))
placeholders = placeholders[:len(placeholders)-1]
conditions = append(conditions, fmt.Sprintf("currency IN (%s)", placeholders))
args = append(args, convertSliceToInterface(req.Currency)...)
}
if len(req.Kind) > 0 {
placeholders := strings.Repeat("?,", len(req.Kind))
placeholders = placeholders[:len(placeholders)-1]
conditions = append(conditions, fmt.Sprintf("type IN (%s)", placeholders))
args = append(args, convertSliceToInterface(req.Kind)...)
// 構建條件字典
conditions := map[string][]any{
"`uid`": convertSliceToInterface(req.UID),
"`currency`": convertSliceToInterface(req.Currency),
"`wallet_type`": convertSliceToInterface(req.Kind),
}
// 使用 queryBuilder 構建完整查詢
query, args := queryBindINBuilder(baseQuery, conditions)
// 如果有條件,則拼接 WHERE 子句
if len(conditions) > 0 {
query += " WHERE " + strings.Join(conditions, " AND ")
if isLock {
// 加上排他鎖
query += " FOR UPDATE"
}
// 執行查詢
err := m.conn.QueryRowCtx(ctx, &data, query, args...)
var wallets []Wallet
err := tx.QueryRowsPartialCtx(ctx, &wallets, query, args...)
switch {
case err == nil:
return data, nil
case errors.Is(err, sqlc.ErrNotFound):
return wallets, nil
case errors.As(sqlc.ErrNotFound, &err):
return nil, ErrNotFound
default:
return nil, err
}
}
func convertSliceToInterface[T any](slice []T) []any {
interfaces := make([]any, 0, len(slice))
for i, v := range slice {
interfaces[i] = v
func (m *customWalletModel) BalancesByIDs(ctx context.Context, ids []int64, withLock bool, tx sqlx.Session) ([]Wallet, error) {
baseQuery := fmt.Sprintf("SELECT `id`, `currency`, `balance`, `wallet_type` FROM %s", m.table)
// 構建條件字典
conditions := map[string][]any{
"`id`": convertSliceToInterface(ids),
}
// 使用 queryBuilder 構建完整查詢
query, args := queryBindINBuilder(baseQuery, conditions)
if withLock {
// 加上排他鎖
query += " FOR UPDATE"
}
// 執行查詢
var wallets []Wallet
err := tx.QueryRowsPartialCtx(ctx, &wallets, query, args...)
switch {
case err == nil:
return wallets, nil
case errors.As(sqlc.ErrNotFound, &err):
return nil, ErrNotFound
default:
return nil, err
}
return interfaces
}

View File

@ -4,10 +4,10 @@ import (
"app-cloudep-trade-service/internal/domain"
"app-cloudep-trade-service/internal/domain/repository"
"app-cloudep-trade-service/internal/model"
"github.com/zeromicro/go-zero/core/stores/sqlx"
"context"
"github.com/shopspring/decimal"
"github.com/zeromicro/go-zero/core/stores/sqlx"
)
// 用戶某個幣種餘額
@ -15,8 +15,8 @@ type userLocalWallet struct {
wm model.WalletModel
txConn sqlx.SqlConn
uid string
crypto string
uid string
currency string
// local wallet 相關計算的餘額存在這裡
walletBalance map[domain.WalletType]model.Wallet
@ -28,12 +28,64 @@ type userLocalWallet struct {
transactions []model.WalletJournal
}
func NewUserWalletOperator(uid, crypto string, wm model.WalletModel, txConn sqlx.SqlConn) repository.UserWalletOperator {
func (use *userLocalWallet) GetBalancesByID(ctx context.Context, ids []int64, opts ...repository.WalletOperatorOption) ([]model.Wallet, error) {
o := repository.ApplyOptions(opts...)
tx := use.txConn
if o.Tx != nil {
tx = *o.Tx
}
wallets, err := use.wm.BalancesByIDs(ctx, ids, o.WithLock, tx)
if err != nil {
return nil, err
}
for _, wallet := range wallets {
use.walletBalance[wallet.WalletType] = wallet
}
return wallets, nil
}
// LocalBalance 內存餘額
func (use *userLocalWallet) LocalBalance(kind domain.WalletType) decimal.Decimal {
wallet, ok := use.walletBalance[kind]
if !ok {
return decimal.Zero
}
return wallet.Balance
}
func (use *userLocalWallet) Balances(ctx context.Context, kind []domain.WalletType, opts ...repository.WalletOperatorOption) ([]model.Wallet, error) {
o := repository.ApplyOptions(opts...)
tx := use.txConn
if o.Tx != nil {
tx = *o.Tx
}
wallets, err := use.wm.Balances(ctx, model.BalanceReq{
UID: []string{use.uid},
Currency: []string{use.currency},
Kind: kind,
}, o.WithLock, tx)
if err != nil {
return nil, err
}
for _, wallet := range wallets {
use.walletBalance[wallet.WalletType] = wallet
}
return wallets, nil
}
func NewUserWalletOperator(uid, currency string, wm model.WalletModel, txConn sqlx.SqlConn) repository.UserWalletOperator {
return &userLocalWallet{
wm: wm,
txConn: txConn,
uid: uid,
crypto: crypto,
wm: wm,
txConn: txConn,
uid: uid,
currency: currency,
walletBalance: make(map[domain.WalletType]model.Wallet, len(domain.AllWalletType)),
localOrderBalance: make(map[int64]decimal.Decimal, len(domain.AllWalletType)),

View File

@ -5,9 +5,13 @@ import (
"app-cloudep-trade-service/internal/domain/repository"
"app-cloudep-trade-service/internal/model"
"context"
"database/sql"
"errors"
"fmt"
"time"
"github.com/zeromicro/go-zero/core/stores/sqlc"
"github.com/shopspring/decimal"
"github.com/zeromicro/go-zero/core/logx"
"github.com/zeromicro/go-zero/core/stores/sqlx"
@ -22,6 +26,54 @@ type WalletRepository struct {
WalletRepositoryParam
}
func (repo *WalletRepository) Transaction(ctx context.Context, fn func(tx sqlx.Session) error) error {
// 取得原生的 *sql.DB
db, err := repo.TxConn.RawDB()
if err != nil {
return err
}
// 定義事務選項
txOptions := &sql.TxOptions{
Isolation: sql.LevelReadCommitted,
ReadOnly: false,
}
// 開啟事務
tx, err := db.BeginTx(ctx, txOptions)
if err != nil {
return err
}
// 使用 sqlx.Session 包裹事務
session := sqlx.NewSessionFromTx(tx)
// 執行傳入的操作
if err := fn(session); err != nil {
// 若有錯誤則回滾
err := tx.Rollback()
if err != nil {
// 錯誤代碼 06-021-22
e := domain.CommentErrorL(
domain.WalletTxErrorCode,
logx.WithContext(ctx),
[]logx.LogField{
{Key: "func", Value: "WalletModel.Transaction.Rollback"},
{Key: "err", Value: err},
},
"failed to find balance into mongo:").Wrap(err)
return e
}
// 裡面那一層會紀錄
return err
}
// 提交事務
return tx.Commit()
}
// GetUserWalletOperator 取得本地操作使用者錢包的操作運算元
func (repo *WalletRepository) GetUserWalletOperator(uid, currency string, opts ...repository.Option) repository.UserWalletOperator {
db := repo.TxConn
@ -75,9 +127,15 @@ func (repo *WalletRepository) Balances(ctx context.Context, req repository.Balan
data, err := repo.WalletModel.Balances(ctx, model.BalanceReq{
UID: req.UID,
Currency: req.Currency,
})
Kind: req.Kind,
}, false, repo.TxConn)
if err != nil {
if errors.Is(sqlc.ErrNotFound, err) {
// 錯誤代碼 06-031-23
return nil, domain.NotFoundError(domain.WalletBalanceNotFound, "balance not found")
}
// 錯誤代碼 06-021-20
e := domain.CommentErrorL(
domain.CreateWalletErrorCode,
@ -87,18 +145,14 @@ func (repo *WalletRepository) Balances(ctx context.Context, req repository.Balan
{Key: "func", Value: "WalletModel.Balances"},
{Key: "err", Value: err},
},
"failed to find balance into mongo:").Wrap(err)
"failed to get balance")
return []model.Wallet{}, e
return nil, e
}
return data, nil
}
func (repo *WalletRepository) GetTxDatabaseConn() sqlx.SqlConn {
return repo.TxConn
}
func NewWalletRepository(param WalletRepositoryParam) repository.WalletRepository {
return &WalletRepository{
WalletRepositoryParam: param,

View File

@ -0,0 +1,79 @@
package repository
import (
"app-cloudep-trade-service/internal/domain"
"app-cloudep-trade-service/internal/model"
"context"
"fmt"
"testing"
"github.com/zeromicro/go-zero/core/stores/sqlx"
)
func TestByMe(t *testing.T) {
sqlConn := sqlx.NewSqlConn("mysql",
"root:yytt@tcp(127.0.0.1:3306)/digimon_wallet?parseTime=true&interpolateParams=false")
WalletRepo := NewWalletRepository(WalletRepositoryParam{
model.NewWalletModel(sqlConn),
sqlConn,
})
ctx := context.Background()
wo := WalletRepo.GetUserWalletOperator("OOOOOOOK", "USD")
balances, err := wo.Balances(ctx, domain.AllWalletType)
if err != nil {
return
}
fmt.Println(balances)
// create, err := WalletRepo.Create(ctx, "OOOOOOOK", "USD", "Digimon")
// if err != nil {
// return
// }
// fmt.Println(create)
// balances, err := WalletRepo.Balances(ctx, repository.BalanceReq{
// UID: []string{"OOOOOOOK"},
// Currency: []string{"USD"},
// // Kind: make([]domain.WalletType, 0),
// })
// if err != nil {
// fmt.Println(err)
// return
// }
// err := WalletRepo.Transaction(ctx, func(tx sqlx.Session) error {
// wm := model.NewWalletJournalModel(sqlConn)
// _, err := wm.InsertWithSession(ctx, tx, &model.WalletJournal{
// Id: 1,
// TransactionId: 10001,
// OrderId: "ORD123456789",
// Brand: "BrandX",
// Uid: "user123",
// WalletType: 1, // 1=可用
// Currency: "USD",
// TransactionAmount: 500.00,
// PostTransactionBalance: 1500.00,
// BusinessType: 2, // 假設 2 表示特定業務類型
// Status: 1, // 假設 1 表示成功狀態
// DueTime: 1698289200000, // T+N 執行時間
// CreatedAt: 1698192800000, // 創建時間 (Unix 時間戳,毫秒)
// })
//
// if err != nil {
// return err
// }
//
// return nil
// })
//
// if err != nil {
// return
// }
// fmt.Println(balances)
}