feat: add transaction

This commit is contained in:
王性驊 2025-04-21 15:46:43 +08:00
parent 47fb3139c2
commit eacf85a532
11 changed files with 1244 additions and 21 deletions

View File

@ -12,11 +12,11 @@ CREATE TABLE `transaction` (
`balance` DECIMAL(30, 18) NOT NULL COMMENT '交易完成後的錢包餘額',
`before_balance` DECIMAL(30, 18) NOT NULL COMMENT '交易前的錢包餘額(方便審計與對帳)',
`status` TINYINT NOT NULL DEFAULT 1 COMMENT '狀態1: 有效、0: 無效/已取消)',
`create_time` BIGINT NOT NULL DEFAULT 0 COMMENT '建立時間Unix 秒數)',
`create_at` BIGINT NOT NULL DEFAULT 0 COMMENT '建立時間Unix 秒數)',
`due_time` BIGINT NOT NULL DEFAULT 0 COMMENT '到期時間(適用於凍結或延後入帳等場景)',
PRIMARY KEY (`id`),
UNIQUE KEY `uq_transaction_id` (`transaction_id`),
KEY `idx_uid` (`uid`),
KEY `idx_order_id` (`order_id`),
KEY `idx_create_time` (`create_time`)
KEY `idx_create_at` (`create_at`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci COMMENT='交易紀錄表';

View File

@ -8,21 +8,21 @@ import (
// Transaction 代表一筆錢包交易紀錄(例如充值、扣款、轉帳等)
// 此表記錄所有交易的詳細資訊,包括金額、對象、餘額狀態與交易型態等
type Transaction struct {
ID int64 `gorm:"column:id"` // 交易主鍵 ID自動遞增
OrderID string `gorm:"column:order_id"` // 關聯的訂單 ID可為空若不是由訂單觸發
TransactionID string `gorm:"column:transaction_id"` // 此筆交易的唯一識別碼(系統內部使用,可為 UUID
Brand string `gorm:"column:brand"` // 所屬品牌(支援多品牌場景)
UID string `gorm:"column:uid"` // 交易發起者的 UID
ToUID string `gorm:"column:to_uid"` // 交易對象的 UID如為轉帳場景
TxType wallet.TxType `gorm:"column:type"` // 交易類型(如轉帳、入金、出金等,自定義列舉)
BusinessType int8 `gorm:"column:business_type"` // 業務類型(如合約、模擬、一般用途等,數字代碼)
Asset string `gorm:"column:asset"` // 幣種(如 BTC、ETH、USD、TWD 等)
Amount decimal.Decimal `gorm:"column:amount"` // 本次變動金額(正數為增加,負數為扣減)
PostTransferBalance decimal.Decimal `gorm:"column:post_transfer_balance"` // 交易完成後的錢包餘額
BeforeBalance decimal.Decimal `gorm:"column:before_balance"` // 交易前的錢包餘額(方便審計與對帳)
Status wallet.Enable `gorm:"column:status"` // 狀態1: 有效、0: 無效/已取消)
CreateAt int64 `gorm:"column:create_time;autoCreateTime"` // 建立時間Unix 秒數)
DueTime int64 `gorm:"column:due_time"` // 到期時間(適用於凍結或延後入帳等場景)
ID int64 `gorm:"column:id"` // 交易主鍵 ID自動遞增
OrderID string `gorm:"column:order_id"` // 關聯的訂單 ID可為空若不是由訂單觸發
TransactionID string `gorm:"column:transaction_id"` // 此筆交易的唯一識別碼(系統內部使用,可為 UUID
Brand string `gorm:"column:brand"` // 所屬品牌(支援多品牌場景)
UID string `gorm:"column:uid"` // 交易發起者的 UID
ToUID string `gorm:"column:to_uid"` // 交易對象的 UID如為轉帳場景
TxType wallet.TxType `gorm:"column:type"` // 交易類型(如轉帳、入金、出金等,自定義列舉)
BusinessType int8 `gorm:"column:business_type"` // 業務類型(如合約、模擬、一般用途等,數字代碼)
Asset string `gorm:"column:asset"` // 幣種(如 BTC、ETH、USD、TWD 等)
Amount decimal.Decimal `gorm:"column:amount"` // 本次變動金額(正數為增加,負數為扣減)
PostTransferBalance decimal.Decimal `gorm:"column:post_transfer_balance"` // 交易完成後的錢包餘額
BeforeBalance decimal.Decimal `gorm:"column:before_balance"` // 交易前的錢包餘額(方便審計與對帳)
Status wallet.Enable `gorm:"column:status"` // 狀態1: 有效、0: 無效/已取消)
CreateAt int64 `gorm:"column:create_at;autoCreateTime"` // 建立時間Unix 秒數)
DueTime int64 `gorm:"column:due_time"` // 到期時間(適用於凍結或延後入帳等場景)
}
// TableName 指定 GORM 對應的資料表名稱

View File

@ -86,4 +86,10 @@ type UserWalletService interface {
PersistOrderBalances(ctx context.Context) error
// HasAvailableBalance 確認此使用者此資產是否已有可用餘額錢包
HasAvailableBalance(ctx context.Context) (bool, error)
// GetOrderBalance 查詢某筆交易(訂單),詳情寫入本地暫存
GetOrderBalance(ctx context.Context, txID int64) (entity.Transaction, error)
// GetOrderBalanceForUpdate 查詢某筆交易(訂單),詳情寫入本地暫存 (FOR UPDATE)
GetOrderBalanceForUpdate(ctx context.Context, txID int64) (entity.Transaction, error)
// ClearCache 清空本地所有暫存
ClearCache()
}

View File

@ -5,3 +5,9 @@ type BusinessName string
func (b BusinessName) ToInt8() int8 {
return int8(0)
}
const (
WalletNonStatus = iota
// WalletUnconfirmedSettleStatus 執行過限制餘額結算
WalletUnconfirmedSettleStatus
)

View File

@ -0,0 +1,160 @@
package repository
import (
"code.30cm.net/digimon/app-cloudep-wallet-service/pkg/domain/entity"
"code.30cm.net/digimon/app-cloudep-wallet-service/pkg/domain/repository"
"code.30cm.net/digimon/app-cloudep-wallet-service/pkg/domain/wallet"
"context"
"errors"
"gorm.io/gorm"
"time"
)
type TransactionRepositoryParam struct {
DB *gorm.DB `name:"dbM"`
}
type TransactionRepository struct {
TransactionRepositoryParam
}
func MustTransactionRepository(param TransactionRepositoryParam) repository.TransactionRepository {
return &TransactionRepository{
param,
}
}
func (repo *TransactionRepository) FindByOrderID(ctx context.Context, orderID string) (entity.Transaction, error) {
var result entity.Transaction
err := repo.DB.WithContext(ctx).Where("order_id = ?", orderID).Take(&result).Error
if err != nil {
if errors.Is(err, gorm.ErrRecordNotFound) {
return entity.Transaction{}, repository.ErrRecordNotFound
}
}
return result, nil
}
func (repo *TransactionRepository) Insert(ctx context.Context, tx *entity.Transaction) error {
return repo.DB.Create(tx).Error
}
func (repo *TransactionRepository) BatchInsert(ctx context.Context, txs []*entity.Transaction) error {
return repo.DB.Create(txs).Error
}
func (repo *TransactionRepository) List(ctx context.Context, query repository.TransactionQuery) ([]entity.Transaction, int64, error) {
sql := repo.DB.WithContext(ctx)
if len(query.BusinessType) != 0 {
sql = sql.Where("business_type IN ?", query.BusinessType)
}
if query.UID != nil {
sql = sql.Where("uid = ?", query.UID)
}
if query.OrderID != nil {
sql = sql.Where("order_id = ?", query.OrderID)
}
if query.Assets != nil {
sql = sql.Where("asset = ?", query.Assets)
}
if len(query.TxTypes) > 0 {
sql = sql.Where("type IN ?", query.TxTypes)
}
//sql = sql.Where("status = ?", 0)
if query.StartTime != nil {
if query.EndTime != nil {
sql = sql.Where("create_at BETWEEN ? AND ?", query.StartTime, query.EndTime)
}
}
var transactions []entity.Transaction
var count int64
if err := sql.Model(&entity.Transaction{}).Count(&count).Error; err != nil {
return []entity.Transaction{}, 0, err
}
if count == 0 {
return []entity.Transaction{}, 0, repository.ErrRecordNotFound
}
if query.PageIndex == 0 {
query.PageIndex = 1
}
if query.PageSize == 0 {
query.PageSize = 20
}
err := sql.Offset(int((query.PageIndex - 1) * query.PageSize)).
Limit(int(query.PageSize)).
Order("create_at desc").
Find(&transactions).Error
if err != nil {
if errors.Is(err, gorm.ErrRecordNotFound) {
return []entity.Transaction{}, 0, repository.ErrRecordNotFound
}
return []entity.Transaction{}, 0, err
}
return transactions, count, nil
}
func (repo *TransactionRepository) FindByDueTimeRange(ctx context.Context, start time.Time, txType []wallet.TxType) ([]entity.Transaction, error) {
var data []entity.Transaction
err := repo.DB.WithContext(ctx).
Where("type IN ?", txType).
Where("status = ?", wallet.WalletNonStatus).
Where("due_time <= ?", start.UTC().Unix()).
Where("due_time != ?", 0).
Find(&data).Error
if err != nil {
return nil, err
}
return data, nil
}
func (repo *TransactionRepository) UpdateStatusByID(ctx context.Context, id int64, status int) error {
err := repo.DB.WithContext(ctx).
Model(&entity.Transaction{}).
Where("id = ?", id).
UpdateColumn("status", status).Error
if err != nil {
return err
}
return nil
}
func (repo *TransactionRepository) ListWalletTransactions(ctx context.Context, uid string, orderIDs []string, walletType wallet.Types) ([]entity.WalletTransaction, error) {
sql := repo.DB.WithContext(ctx)
if uid != "" {
sql = sql.Where("uid = ?", uid)
}
sql = sql.Where("order_id IN ?", orderIDs)
if walletType > 0 {
sql = sql.Where("wallet_type", walletType)
}
result := make([]entity.WalletTransaction, len(orderIDs))
if err := sql.Order("create_at desc").Find(&result).Error; err != nil {
return []entity.WalletTransaction{}, err
}
return result, nil
}

View File

@ -0,0 +1,639 @@
package repository
import (
"code.30cm.net/digimon/app-cloudep-wallet-service/internal/config"
"code.30cm.net/digimon/app-cloudep-wallet-service/pkg/domain/entity"
"code.30cm.net/digimon/app-cloudep-wallet-service/pkg/domain/repository"
"code.30cm.net/digimon/app-cloudep-wallet-service/pkg/domain/wallet"
"code.30cm.net/digimon/app-cloudep-wallet-service/pkg/lib/sql_client"
"context"
"errors"
"fmt"
"github.com/shopspring/decimal"
"github.com/stretchr/testify/assert"
"google.golang.org/protobuf/proto"
"gorm.io/gorm"
"testing"
"time"
)
func SetupTestTransactionRepository() (repository.TransactionRepository, *gorm.DB, func(), error) {
host, port, _, tearDown, err := startMySQLContainer()
if err != nil {
return nil, nil, nil, fmt.Errorf("failed to start MySQL container: %w", err)
}
conf := config.Config{
MySQL: struct {
UserName string
Password string
Host string
Port string
Database string
MaxIdleConns int
MaxOpenConns int
ConnMaxLifetime time.Duration
LogLevel string
}{
UserName: MySQLUser,
Password: MySQLPassword,
Host: host,
Port: port,
Database: MySQLDatabase,
MaxIdleConns: 10,
MaxOpenConns: 100,
ConnMaxLifetime: 300,
LogLevel: "info",
},
}
db, err := sql_client.NewMySQLClient(conf)
if err != nil {
tearDown()
return nil, nil, nil, fmt.Errorf("failed to create db client: %w", err)
}
repo := MustTransactionRepository(TransactionRepositoryParam{DB: db})
return repo, db, tearDown, nil
}
func createTransactionTable(t *testing.T, db *gorm.DB) {
sql := `
CREATE TABLE IF NOT EXISTS transaction (
id BIGINT AUTO_INCREMENT PRIMARY KEY,
order_id VARCHAR(64) NOT NULL,
transaction_id VARCHAR(64),
brand VARCHAR(32),
uid VARCHAR(64),
to_uid VARCHAR(64),
type TINYINT,
business_type TINYINT,
asset VARCHAR(32),
amount DECIMAL(30,18),
before_balance DECIMAL(30,18),
post_transfer_balance DECIMAL(30,18),
status TINYINT,
create_at BIGINT,
due_time BIGINT
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;`
assert.NoError(t, db.Exec(sql).Error)
}
func createWalletTransactionTable(t *testing.T, db *gorm.DB) {
createTableSQL := `
CREATE TABLE IF NOT EXISTS wallet_transaction (
id BIGINT UNSIGNED NOT NULL AUTO_INCREMENT COMMENT '主鍵 ID自動遞增',
transaction_id BIGINT NOT NULL COMMENT '交易流水號可對應某次業務操作例如同一訂單的多筆變化',
order_id VARCHAR(64) NOT NULL COMMENT '訂單編號對應實際訂單或業務事件',
brand VARCHAR(50) NOT NULL COMMENT '品牌多租戶或多平台識別',
uid VARCHAR(64) NOT NULL COMMENT '使用者 UID',
wallet_type TINYINT NOT NULL COMMENT '錢包類型如主錢包獎勵錢包凍結錢包等',
business_type TINYINT NOT NULL COMMENT '業務類型如購物退款加值等',
asset VARCHAR(32) NOT NULL COMMENT '資產代號 BTCETHGEM_REDUSD ',
amount DECIMAL(30,18) NOT NULL COMMENT '變動金額正數為收入負數為支出',
balance DECIMAL(30,18) NOT NULL COMMENT '當前錢包餘額這筆交易後的餘額快照',
create_at BIGINT NOT NULL DEFAULT 0 COMMENT '建立時間UnixNano紀錄交易發生時間',
PRIMARY KEY (id),
KEY idx_uid (uid),
KEY idx_transaction_id (transaction_id),
KEY idx_order_id (order_id),
KEY idx_brand (brand),
KEY idx_wallet_type (wallet_type)
) ENGINE=InnoDB
DEFAULT CHARSET=utf8mb4
COLLATE=utf8mb4_unicode_ci
COMMENT='錢包資金異動紀錄每一次交易行為的快照記錄';`
assert.NoError(t, db.Exec(createTableSQL).Error)
}
func TestTransactionRepository_InsertAndBatchInsert(t *testing.T) {
// start container and connect
repo, db, tearDown, err := SetupTestTransactionRepository()
assert.NoError(t, err)
defer tearDown()
// prepare table
createTransactionTable(t, db)
now := time.Now().Unix()
template := entity.Transaction{
OrderID: "o1",
TransactionID: "tx1",
Brand: "b1",
UID: "u1",
ToUID: "u2",
TxType: 1,
BusinessType: 2,
Asset: "BTC",
Amount: decimal.RequireFromString("100.5"),
BeforeBalance: decimal.RequireFromString("50.0"),
PostTransferBalance: decimal.RequireFromString("150.5"),
Status: 1,
CreateAt: now,
DueTime: now + 3600,
}
tests := []struct {
name string
setup func(t *testing.T)
action func() error
validate func(t *testing.T)
}{
{
name: "single insert",
setup: func(t *testing.T) {
// clean table
assert.NoError(t, db.Exec("DELETE FROM transaction").Error)
},
action: func() error {
tx := template
return repo.Insert(context.Background(), &tx)
},
validate: func(t *testing.T) {
var count int64
assert.NoError(t, db.Raw("SELECT COUNT(*) FROM transaction").Scan(&count).Error)
assert.Equal(t, int64(1), count)
var got entity.Transaction
err := db.Take(&got, "order_id = ?", template.OrderID).Error
assert.NoError(t, err)
},
},
{
name: "batch insert",
setup: func(t *testing.T) {
assert.NoError(t, db.Exec("DELETE FROM transaction").Error)
},
action: func() error {
// clone two entries with different order IDs
tx1 := template
tx2 := template
tx1.OrderID = "o2"
tx2.OrderID = "o3"
return repo.BatchInsert(context.Background(), []*entity.Transaction{&tx1, &tx2})
},
validate: func(t *testing.T) {
var count int64
assert.NoError(t, db.Raw("SELECT COUNT(*) FROM transaction").Scan(&count).Error)
assert.Equal(t, int64(2), count)
var orders []string
rows, _ := db.Raw("SELECT order_id FROM transaction ORDER BY order_id").Rows()
defer rows.Close()
for rows.Next() {
var oid string
rows.Scan(&oid)
orders = append(orders, oid)
}
assert.Equal(t, []string{"o2", "o3"}, orders)
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
tt.setup(t)
err := tt.action()
assert.NoError(t, err)
tt.validate(t)
})
}
}
func TestTransactionRepository_FindByOrderID(t *testing.T) {
// start container and connect
repo, db, tearDown, err := SetupTestTransactionRepository()
assert.NoError(t, err)
defer tearDown()
// prepare table
createTransactionTable(t, db)
// 4) seed one row
now := time.Now().Unix()
res := db.Exec(
`INSERT INTO transaction
(order_id, transaction_id, brand, uid, to_uid, type, business_type, asset,
amount, before_balance, post_transfer_balance, status, create_at, due_time)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`,
"order-123", "tx-abc", "brandX", "user1", "user2",
1, 2, "BTC",
"10.0", "5.0", "15.0",
1, now, now+3600,
)
assert.NoError(t, res.Error)
assert.Equal(t, int64(1), res.RowsAffected)
type want struct {
tx entity.Transaction
wantErr bool
}
tests := []struct {
name string
orderID string
want want
}{
{
name: "found existing",
orderID: "order-123",
want: want{
tx: entity.Transaction{
ID: 1,
OrderID: "order-123",
TransactionID: "tx-abc",
Brand: "brandX",
UID: "user1",
ToUID: "user2",
TxType: wallet.TxType(1),
BusinessType: int8(2),
Asset: "BTC",
Amount: decimal.RequireFromString("10.0"),
BeforeBalance: decimal.RequireFromString("5.0"),
PostTransferBalance: decimal.RequireFromString("15.0"),
Status: wallet.Enable(1),
CreateAt: now,
DueTime: now + 3600,
},
wantErr: false,
},
},
{
name: "not found",
orderID: "missing",
want: want{wantErr: true},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := repo.FindByOrderID(context.Background(), tt.orderID)
if tt.want.wantErr {
assert.Error(t, err)
assert.True(t, errors.Is(err, repository.ErrRecordNotFound))
return
}
assert.NoError(t, err)
assert.Equal(t, tt.want.tx.ID, got.ID)
assert.Equal(t, tt.want.tx.OrderID, got.OrderID)
assert.Equal(t, tt.want.tx.TransactionID, got.TransactionID)
assert.Equal(t, tt.want.tx.Brand, got.Brand)
assert.Equal(t, tt.want.tx.UID, got.UID)
assert.Equal(t, tt.want.tx.ToUID, got.ToUID)
assert.Equal(t, tt.want.tx.TxType, got.TxType)
assert.Equal(t, tt.want.tx.BusinessType, got.BusinessType)
assert.Equal(t, tt.want.tx.Asset, got.Asset)
assert.True(t, tt.want.tx.Amount.Equal(got.Amount))
assert.True(t, tt.want.tx.BeforeBalance.Equal(got.BeforeBalance))
assert.True(t, tt.want.tx.PostTransferBalance.Equal(got.PostTransferBalance))
assert.Equal(t, tt.want.tx.Status, got.Status)
assert.Equal(t, tt.want.tx.CreateAt, got.CreateAt)
assert.Equal(t, tt.want.tx.DueTime, got.DueTime)
})
}
}
func TestTransactionRepository_List(t *testing.T) {
repo, db, tearDown, err := SetupTestTransactionRepository()
assert.NoError(t, err)
defer tearDown()
createTransactionTable(t, db)
now := time.Now().Unix()
rows := []entity.Transaction{
{OrderID: "A", UID: "u1", TxType: wallet.Deposit, BusinessType: 1, Asset: "BTC", CreateAt: now - 30},
{OrderID: "B", UID: "u2", TxType: wallet.Withdraw, BusinessType: 2, Asset: "ETH", CreateAt: now - 20},
{OrderID: "C", UID: "u1", TxType: wallet.Deposit, BusinessType: 1, Asset: "BTC", CreateAt: now - 10},
}
assert.NoError(t, db.Create(&rows).Error)
tests := []struct {
name string
query repository.TransactionQuery
wantCount int64
wantIDs []int64
wantErr error
}{
{
name: "no filter returns all",
query: repository.TransactionQuery{
PageIndex: 1,
PageSize: 50,
},
wantCount: 3,
wantIDs: []int64{rows[2].ID, rows[1].ID, rows[0].ID},
wantErr: nil,
},
{
name: "filter by UID",
query: repository.TransactionQuery{
PageIndex: 1,
PageSize: 50,
UID: proto.String("u1"),
},
wantCount: 2,
wantIDs: []int64{rows[2].ID, rows[0].ID},
},
{
name: "filter by BusinessType",
query: repository.TransactionQuery{
PageIndex: 1,
PageSize: 50,
BusinessType: []int8{2},
},
wantCount: 1,
wantIDs: []int64{rows[1].ID},
},
{
name: "filter by Asset + TxType",
query: repository.TransactionQuery{
PageIndex: 1,
PageSize: 50,
Assets: proto.String("BTC"),
TxTypes: []wallet.TxType{wallet.Deposit},
},
wantCount: 2,
wantIDs: []int64{rows[2].ID, rows[0].ID},
},
{
name: "time range filter",
query: repository.TransactionQuery{
StartTime: proto.Int64(now - 25),
EndTime: proto.Int64(now - 5),
},
wantCount: 2,
wantIDs: []int64{rows[2].ID, rows[1].ID},
},
{
name: "paging page 1 size 1",
query: repository.TransactionQuery{
PageIndex: 1,
PageSize: 1,
},
wantCount: 3,
wantIDs: []int64{rows[2].ID},
},
{
name: "paging page 2 size 1",
query: repository.TransactionQuery{
PageIndex: 2,
PageSize: 1,
},
wantCount: 3,
wantIDs: []int64{rows[1].ID},
},
{
name: "no match returns ErrRecordNotFound",
query: repository.TransactionQuery{UID: proto.String("nonexist")},
wantErr: repository.ErrRecordNotFound,
},
}
ctx := context.Background()
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, cnt, err := repo.List(ctx, repository.TransactionQuery{
BusinessType: tt.query.BusinessType,
UID: tt.query.UID,
OrderID: tt.query.OrderID,
Assets: tt.query.Assets,
TxTypes: tt.query.TxTypes,
StartTime: tt.query.StartTime,
EndTime: tt.query.EndTime,
PageIndex: tt.query.PageIndex,
PageSize: tt.query.PageSize,
})
if tt.wantErr != nil {
assert.ErrorIs(t, err, tt.wantErr)
return
}
assert.NoError(t, err)
assert.Equal(t, tt.wantCount, cnt)
var ids []int64
for _, tx := range got {
ids = append(ids, tx.ID)
}
assert.Equal(t, tt.wantIDs, ids)
})
}
}
func TestTransactionRepository_FindByDueTimeRange(t *testing.T) {
// start container and connect
repo, db, tearDown, err := SetupTestTransactionRepository()
assert.NoError(t, err)
defer tearDown()
// prepare table
createTransactionTable(t, db)
// seed rows
now := time.Now().Unix()
rows := []entity.Transaction{
{TxType: wallet.Deposit, BusinessType: 0, DueTime: now - 10, Status: wallet.WalletNonStatus},
{TxType: wallet.Deposit, BusinessType: 0, DueTime: now + 10, Status: wallet.WalletNonStatus},
{TxType: wallet.Deposit, BusinessType: 0, DueTime: 0, Status: wallet.WalletNonStatus},
{TxType: wallet.Deposit, BusinessType: 0, DueTime: now - 5, Status: wallet.Enable(1)}, // status != non
}
assert.NoError(t, db.Create(&rows).Error)
tests := []struct {
name string
cutoff time.Time
types []wallet.TxType
wantIDs []int64
}{
{
name: "due before now for type=1",
cutoff: time.Unix(now, 0),
types: []wallet.TxType{wallet.Deposit},
wantIDs: []int64{rows[0].ID},
},
{
name: "include multiple types",
cutoff: time.Unix(now+20, 0),
types: []wallet.TxType{wallet.Deposit},
wantIDs: []int64{rows[0].ID, rows[1].ID},
},
{
name: "zero due_time is skipped",
cutoff: time.Unix(now+100, 0),
types: []wallet.TxType{wallet.Deposit},
wantIDs: []int64{rows[0].ID, rows[1].ID},
},
{
name: "no matches",
cutoff: time.Unix(now-100, 0),
types: []wallet.TxType{wallet.Deposit},
wantIDs: nil,
},
}
ctx := context.Background()
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := repo.FindByDueTimeRange(ctx, tt.cutoff, tt.types)
assert.NoError(t, err)
var ids []int64
for _, tx := range got {
ids = append(ids, tx.ID)
}
assert.Equal(t, tt.wantIDs, ids)
})
}
}
func TestTransactionRepository_UpdateStatusByID(t *testing.T) {
// start container and connect
repo, db, tearDown, err := SetupTestTransactionRepository()
assert.NoError(t, err)
defer tearDown()
createTransactionTable(t, db)
ctx := context.Background()
tx := &entity.Transaction{
OrderID: "ord-123",
TransactionID: "tx-abc",
Brand: "brand1",
UID: "user1",
ToUID: "user2",
TxType: wallet.TxType(1),
BusinessType: int8(2),
Asset: "BTC",
Amount: decimal.NewFromInt(100),
PostTransferBalance: decimal.NewFromInt(1000),
BeforeBalance: decimal.NewFromInt(900),
Status: wallet.Enable(0),
CreateAt: time.Now().Unix(),
DueTime: time.Now().Add(time.Hour).Unix(),
}
err = repo.Insert(ctx, tx)
assert.NoError(t, err)
existingID := tx.ID
tests := []struct {
name string
id int64
newStatus int
wantErr bool
wantRow bool
}{
{
name: "update existing row",
id: existingID,
newStatus: 1,
wantErr: false,
wantRow: true,
},
{
name: "non-existent id does not error",
id: existingID + 999,
newStatus: 5,
wantErr: false,
wantRow: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
err := repo.UpdateStatusByID(ctx, tt.id, tt.newStatus)
if tt.wantErr {
assert.Error(t, err)
return
}
assert.NoError(t, err)
var got entity.Transaction
res := db.First(&got, tt.id)
if tt.wantRow {
// existing row should reflect the new status
assert.NoError(t, res.Error)
assert.Equal(t, tt.newStatus, int(got.Status))
} else {
// non-existent id: no record found
assert.Error(t, res.Error)
assert.True(t, errors.Is(res.Error, gorm.ErrRecordNotFound))
}
})
}
}
func TestTransactionRepository_ListWalletTransactions(t *testing.T) {
// start container and connect
repo, db, tearDown, err := SetupTestTransactionRepository()
assert.NoError(t, err)
defer tearDown()
createWalletTransactionTable(t, db)
ctx := context.Background()
now := time.Now().UnixNano()
transactions := []entity.WalletTransaction{
{OrderID: "o1", UID: "u1", WalletType: wallet.TypeAvailable, Asset: "BTC", Amount: decimal.NewFromInt(10), Balance: decimal.NewFromInt(110), CreateAt: now - 3},
{OrderID: "o2", UID: "u1", WalletType: wallet.TypeFreeze, Asset: "ETH", Amount: decimal.NewFromInt(20), Balance: decimal.NewFromInt(220), CreateAt: now - 2},
{OrderID: "o1", UID: "u2", WalletType: wallet.TypeAvailable, Asset: "BTC", Amount: decimal.NewFromInt(30), Balance: decimal.NewFromInt(330), CreateAt: now - 1},
}
assert.NoError(t, db.Create(&transactions).Error)
tests := []struct {
name string
uid string
orderIDs []string
walletType wallet.Types
wantIDs []int64
wantErr bool
}{
{
name: "filter by uid=u1, both orders",
uid: "u1",
orderIDs: []string{"o1", "o2"},
walletType: 0,
wantIDs: []int64{transactions[0].ID, transactions[1].ID},
},
{
name: "filter by uid=u1, walletType=1",
uid: "u1",
orderIDs: []string{"o1", "o2"},
walletType: wallet.Types(1),
wantIDs: []int64{transactions[0].ID},
},
{
name: "filter by order=o1 only, any uid",
uid: "",
orderIDs: []string{"o1"},
walletType: wallet.Types(1),
wantIDs: []int64{transactions[0].ID, transactions[2].ID},
},
{
name: "no match yields empty",
uid: "nope",
orderIDs: []string{"o1"},
walletType: 0,
wantIDs: []int64{},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := repo.ListWalletTransactions(ctx, tt.uid, tt.orderIDs, tt.walletType)
if tt.wantErr {
assert.Error(t, err)
return
}
assert.NoError(t, err)
// 提取返回的 IDs 进行无序比较
gotIDs := make([]int64, len(got))
for i, w := range got {
gotIDs[i] = w.ID
}
assert.ElementsMatch(t, tt.wantIDs, gotIDs)
})
}
}

View File

@ -136,6 +136,7 @@ func (s *WalletService) IncreaseBalance(kind wallet.Types, orderID string, amoun
Balance: w.Balance,
})
s.localBalances[kind] = w
return nil
}
@ -210,6 +211,38 @@ func (s *WalletService) HasAvailableBalance(ctx context.Context) (bool, error) {
return exists, nil
}
func (s *WalletService) GetOrderBalance(ctx context.Context, txID int64) (entity.Transaction, error) {
var t entity.Transaction
err := s.db.WithContext(ctx).
Where("id = ?", txID).
Take(&t).Error
if err != nil {
return entity.Transaction{}, translateNotFound(err)
}
s.localOrderBalances[t.ID] = t.PostTransferBalance
return t, nil
}
func (s *WalletService) GetOrderBalanceForUpdate(ctx context.Context, txID int64) (entity.Transaction, error) {
var t entity.Transaction
err := s.db.WithContext(ctx).
Where("id = ?", txID).
Clauses(clause.Locking{Strength: "UPDATE"}).
Take(&t).Error
if err != nil {
return entity.Transaction{}, translateNotFound(err)
}
s.localOrderBalances[t.ID] = t.PostTransferBalance
return t, nil
}
func (s *WalletService) ClearCache() {
s.localBalances = make(map[wallet.Types]entity.Wallet, len(wallet.AllTypes))
s.localOrderBalances = make(map[int64]decimal.Decimal, len(wallet.AllTypes))
s.transactions = nil
}
// translateNotFound 將 GORM 的 RecordNotFound 轉為自訂錯誤
func translateNotFound(err error) error {
if errors.Is(err, gorm.ErrRecordNotFound) {

View File

@ -5,6 +5,7 @@ import (
"code.30cm.net/digimon/app-cloudep-wallet-service/pkg/domain/repository"
"code.30cm.net/digimon/app-cloudep-wallet-service/pkg/domain/wallet"
"context"
"errors"
"fmt"
"github.com/shopspring/decimal"
"github.com/stretchr/testify/assert"
@ -1023,3 +1024,239 @@ func TestWalletService_GetBalancesForUpdate(t *testing.T) {
})
}
}
func TestWalletService_GetOrderBalance(t *testing.T) {
_, db, tearDown, err := SetupTestWalletRepository()
assert.NoError(t, err)
defer tearDown()
// create transaction table
create := `
CREATE TABLE IF NOT EXISTS transaction (
id BIGINT AUTO_INCREMENT PRIMARY KEY,
order_id VARCHAR(64),
transaction_id VARCHAR(64),
brand VARCHAR(32),
uid VARCHAR(64),
to_uid VARCHAR(64),
type TINYINT,
business_type TINYINT,
asset VARCHAR(32),
amount DECIMAL(30,18),
before_balance DECIMAL(30,18),
post_transfer_balance DECIMAL(30,18),
status TINYINT,
create_at BIGINT,
due_time BIGINT
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;`
assert.NoError(t, db.Exec(create).Error)
// seed a row
now := time.Now().Unix()
seed := `
INSERT INTO transaction
(order_id, transaction_id, brand, uid, to_uid,
type, business_type, asset, amount,
before_balance, post_transfer_balance,
status, create_at, due_time)
VALUES
(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`
_, err = db.Exec(seed,
"order1", "tx-uuid-1", "brandA", "user1", "user2",
1, 2, "BTC", "42.5",
"10.0", "52.5",
1, now, now+3600,
).RowsAffected, db.Error
assert.NoError(t, err)
svc := NewWalletService(db, "user1", "BTC")
type want struct {
tx entity.Transaction
errIsNF bool
}
tests := []struct {
name string
txID int64
want want
}{
{
name: "found existing transaction",
txID: 1,
want: want{
tx: entity.Transaction{
ID: 1,
OrderID: "order1",
TransactionID: "tx-uuid-1",
Brand: "brandA",
UID: "user1",
ToUID: "user2",
TxType: wallet.TxType(1),
BusinessType: int8(2),
Asset: "BTC",
Amount: decimal.RequireFromString("42.5"),
BeforeBalance: decimal.RequireFromString("10.0"),
PostTransferBalance: decimal.RequireFromString("52.5"),
Status: wallet.Enable(1),
CreateAt: now,
DueTime: now + 3600,
},
errIsNF: false,
},
},
{
name: "not found returns ErrRecordNotFound",
txID: 999,
want: want{errIsNF: true},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := svc.GetOrderBalance(context.Background(), tt.txID)
if tt.want.errIsNF {
assert.Error(t, err)
assert.True(t, errors.Is(err, repository.ErrRecordNotFound))
} else {
assert.NoError(t, err)
// compare all fields except those autoset by GORM (e.g. pointers)
assert.Equal(t, tt.want.tx.ID, got.ID)
assert.Equal(t, tt.want.tx.OrderID, got.OrderID)
assert.Equal(t, tt.want.tx.TransactionID, got.TransactionID)
assert.Equal(t, tt.want.tx.Brand, got.Brand)
assert.Equal(t, tt.want.tx.UID, got.UID)
assert.Equal(t, tt.want.tx.ToUID, got.ToUID)
assert.Equal(t, tt.want.tx.TxType, got.TxType)
assert.Equal(t, tt.want.tx.BusinessType, got.BusinessType)
assert.Equal(t, tt.want.tx.Asset, got.Asset)
assert.True(t, tt.want.tx.Amount.Equal(got.Amount))
assert.True(t, tt.want.tx.BeforeBalance.Equal(got.BeforeBalance))
assert.True(t, tt.want.tx.PostTransferBalance.Equal(got.PostTransferBalance))
assert.Equal(t, tt.want.tx.Status, got.Status)
assert.Equal(t, tt.want.tx.CreateAt, got.CreateAt)
assert.Equal(t, tt.want.tx.DueTime, got.DueTime)
}
})
}
}
func TestWalletService_GetOrderBalanceForUpdate(t *testing.T) {
// setup container + DB + repo
_, db, tearDown, err := SetupTestWalletRepository()
assert.NoError(t, err)
defer tearDown()
// create transaction table
create := `
CREATE TABLE IF NOT EXISTS transaction (
id BIGINT AUTO_INCREMENT PRIMARY KEY,
order_id VARCHAR(64),
transaction_id VARCHAR(64),
brand VARCHAR(32),
uid VARCHAR(64),
to_uid VARCHAR(64),
type TINYINT,
business_type TINYINT,
asset VARCHAR(32),
amount DECIMAL(30,18),
before_balance DECIMAL(30,18),
post_transfer_balance DECIMAL(30,18),
status TINYINT,
create_at BIGINT,
due_time BIGINT
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;`
assert.NoError(t, db.Exec(create).Error)
// seed one row
now := time.Now().Unix()
_, err = db.Exec(
`INSERT INTO transaction
(order_id, transaction_id, brand, uid, to_uid, type, business_type, asset,
amount, before_balance, post_transfer_balance, status, create_at, due_time)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`,
"ordX", "tx-XYZ", "brandZ", "u1", "u2",
1, 2, "ETH",
"100.0", "50.0", "150.0",
1, now, now+600,
).RowsAffected, db.Error
assert.NoError(t, err)
// service under test
svc := NewWalletService(db, "u1", "ETH")
type want struct {
tx entity.Transaction
errIsNF bool
}
cases := []struct {
name string
txID int64
want want
}{
{
name: "found and locks",
txID: 1,
want: want{
tx: entity.Transaction{
ID: 1,
OrderID: "ordX",
TransactionID: "tx-XYZ",
Brand: "brandZ",
UID: "u1",
ToUID: "u2",
TxType: wallet.TxType(1),
BusinessType: int8(2),
Asset: "ETH",
Amount: decimal.RequireFromString("100.0"),
BeforeBalance: decimal.RequireFromString("50.0"),
PostTransferBalance: decimal.RequireFromString("150.0"),
Status: wallet.Enable(1),
CreateAt: now,
DueTime: now + 600,
},
errIsNF: false,
},
},
{
name: "missing row returns ErrRecordNotFound",
txID: 999,
want: want{errIsNF: true},
},
}
for _, tt := range cases {
t.Run(tt.name, func(t *testing.T) {
got, err := svc.GetOrderBalanceForUpdate(context.Background(), tt.txID)
if tt.want.errIsNF {
assert.Error(t, err)
return
}
assert.NoError(t, err)
// verify all fields
expected := tt.want.tx
assert.Equal(t, expected.ID, got.ID)
assert.Equal(t, expected.OrderID, got.OrderID)
assert.Equal(t, expected.TransactionID, got.TransactionID)
assert.Equal(t, expected.Brand, got.Brand)
assert.Equal(t, expected.UID, got.UID)
assert.Equal(t, expected.ToUID, got.ToUID)
assert.Equal(t, expected.TxType, got.TxType)
assert.Equal(t, expected.BusinessType, got.BusinessType)
assert.Equal(t, expected.Asset, got.Asset)
assert.True(t, expected.Amount.Equal(got.Amount))
assert.True(t, expected.BeforeBalance.Equal(got.BeforeBalance))
assert.True(t, expected.PostTransferBalance.Equal(got.PostTransferBalance))
assert.Equal(t, expected.Status, got.Status)
assert.Equal(t, expected.CreateAt, got.CreateAt)
assert.Equal(t, expected.DueTime, got.DueTime)
// verify local cache was set
cached, ok := svc.(*WalletService).localOrderBalances[got.ID]
assert.True(t, ok, "expected localOrderBalances to contain key")
assert.True(t, expected.PostTransferBalance.Equal(cached))
})
}
}

View File

@ -0,0 +1,68 @@
package repository
import (
"code.30cm.net/digimon/app-cloudep-wallet-service/pkg/domain/entity"
"code.30cm.net/digimon/app-cloudep-wallet-service/pkg/domain/repository"
"code.30cm.net/digimon/app-cloudep-wallet-service/pkg/domain/wallet"
"context"
"gorm.io/gorm"
)
type WalletTransactionRepositoryParam struct {
DB *gorm.DB `name:"dbM"`
}
type WalletTransactionRepository struct {
WalletTransactionRepositoryParam
}
func MustWalletTransactionRepository(param WalletTransactionRepositoryParam) repository.WalletTransactionRepo {
return &WalletTransactionRepository{
param,
}
}
func (repo *WalletTransactionRepository) Create(ctx context.Context, db *gorm.DB, tx []entity.WalletTransaction) error {
err := db.WithContext(ctx).Create(tx).Error
if err != nil {
return err
}
return nil
}
func (repo *WalletTransactionRepository) HistoryBalance(ctx context.Context, req repository.HistoryReq) ([]entity.WalletTransaction, error) {
var data []entity.WalletTransaction
err := repo.DB.WithContext(ctx).Raw(
`SELECT
MAX(t.id) as id
FROM (
SELECT * FROM wallet_transaction
WHERE uid = ? AND wallet_type IN ? AND create_at <= ?
) As t
GROUP BY t.crypto, t.wallet_type`,
req.UID, wallet.AllTypes, req.StartTime,
).Find(&data).Error
if err != nil {
return nil, err
}
if len(data) == 0 {
return nil, repository.ErrRecordNotFound
}
ids := make([]int64, 0, len(wallet.AllTypes))
for _, v := range data {
ids = append(ids, v.ID)
}
err = repo.DB.WithContext(ctx).Where(ids).Find(&data).Error
if err != nil {
return nil, err
}
return data, nil
}

View File

@ -41,14 +41,33 @@ func (use *WalletUseCase) Withdraw(ctx context.Context, tx usecase.WalletTransfe
return use.ProcessTransaction(ctx, tx, userWalletFlow{
UID: tx.FromUID,
Asset: tx.Asset,
Actions: []walletActionOption{use.withLockAvailable(), use.withSubAvailable()}, //use.lockAvailable(), use.subAvailable()
Actions: []walletActionOption{use.withLockAvailable(), use.withSubAvailable()},
})
}
// Deposit 充值
// 1. 新增一筆充值交易
// 2. 錢包增加可用餘額
// 3. 錢包變化新增一筆增加可用餘額資料
func (use *WalletUseCase) Deposit(ctx context.Context, tx usecase.WalletTransferRequest) error {
//TODO implement me
panic("implement me")
// 確認錢包新增或減少的餘額是否正確
if !tx.Amount.IsPositive() {
return errs.InvalidRange("failed to get correct amount")
}
tx.TxType = wallet.Deposit
if err := use.ProcessTransaction(
ctx, tx, userWalletFlow{
UID: tx.FromUID,
Asset: tx.Asset,
Actions: []walletActionOption{use.withLockAvailable(), use.withAddAvailable()},
}); err != nil {
return err
}
return nil
}
func (use *WalletUseCase) DepositUnconfirmed(ctx context.Context, tx usecase.WalletTransferRequest) error {

View File

@ -62,7 +62,7 @@ func (use *WalletUseCase) withLockAvailable() walletActionOption {
}
}
// subAvailable 減少用戶可用餘額
// withSubAvailable 減少用戶可用餘額
func (use *WalletUseCase) withSubAvailable() walletActionOption {
return func(_ context.Context, tx *usecase.WalletTransferRequest, w repository.UserWalletService) error {
if err := w.DecreaseBalance(wallet.TypeAvailable, tx.ReferenceOrderID, tx.Amount); err != nil {
@ -77,3 +77,58 @@ func (use *WalletUseCase) withSubAvailable() walletActionOption {
return nil
}
}
// withAddAvailable 增加用戶可用餘額
func (use *WalletUseCase) withAddAvailable() walletActionOption {
return func(_ context.Context, tx *usecase.WalletTransferRequest, w repository.UserWalletService) error {
if err := w.IncreaseBalance(wallet.TypeAvailable, tx.ReferenceOrderID, tx.Amount); err != nil {
return err
}
return nil
}
}
// withAddFreeze 增加用戶凍結餘額
func (use *WalletUseCase) withAddFreeze() walletActionOption {
return func(_ context.Context, tx *usecase.WalletTransferRequest, w repository.UserWalletService) error {
if err := w.IncreaseBalance(wallet.TypeFreeze, tx.ReferenceOrderID, tx.Amount); err != nil {
return err
}
// 訂單可以做解凍解凍最大上限金額來自當初凍結金額所以在每一筆tx可以設定Balance
// 後續tx需要依據其他tx做交易時能有所依據
tx.PostTransferBalance = tx.Amount
return nil
}
}
//// WithAppendFreeze 追加用戶原凍結餘額
//func (use *WalletUseCase) withAppendFreeze() walletActionOption {
// return func(ctx context.Context, tx *usecase.WalletTransferRequest, w repository.UserWalletService) error {
// order, err := wallet.GetOrderBalance(ctx, tx.ReferenceOrderIDs)
// if err != nil {
// return use.translateError(err)
// }
//
// // 以id來做lock更可以確保只lock到該筆而不會因為index關係lock到多筆導致死鎖
// // 而且先不lock把資料先拉出來判斷餘額是否足夠在不足夠時可以直接return而不用lock減少開銷
// order, err = wallet.GetOrderBalanceXLock(ctx, order.ID)
// if err != nil {
// return use.translateError(err)
// }
//
// tx.Crypto = order.Crypto
// tx.UID = order.UID
//
// if err := wallet.AddOrder(order.ID, tx.Amount); err != nil {
// return use.translateError(err)
// }
//
// if err := wallet.AddFreeze(tx.BusinessType, tx.Amount); err != nil {
// return use.translateError(err)
// }
//
// return nil
// }
//}