refactor(task-3): add repository layer implementation

- Migrate AccountPool from internal/pool
- Update import paths to use pkg/repository
This commit is contained in:
王性驊 2026-04-03 17:33:51 +08:00
parent 294bd74a43
commit d4fcb8d3b8
2 changed files with 436 additions and 0 deletions

284
pkg/repository/account.go Normal file
View File

@ -0,0 +1,284 @@
package repository
import (
"sync"
"time"
)
type accountStatus struct {
configDir string
activeRequests int
lastUsed int64
rateLimitUntil int64
totalRequests int
totalSuccess int
totalErrors int
totalRateLimits int
totalLatencyMs int64
}
type AccountStat struct {
ConfigDir string
ActiveRequests int
TotalRequests int
TotalSuccess int
TotalErrors int
TotalRateLimits int
TotalLatencyMs int64
IsRateLimited bool
RateLimitUntil int64
}
type AccountPool struct {
mu sync.Mutex
accounts []*accountStatus
}
func NewAccountPool(configDirs []string) *AccountPool {
accounts := make([]*accountStatus, 0, len(configDirs))
for _, dir := range configDirs {
accounts = append(accounts, &accountStatus{configDir: dir})
}
return &AccountPool{accounts: accounts}
}
func (p *AccountPool) GetNextConfigDir() string {
p.mu.Lock()
defer p.mu.Unlock()
if len(p.accounts) == 0 {
return ""
}
now := time.Now().UnixMilli()
available := make([]*accountStatus, 0, len(p.accounts))
for _, a := range p.accounts {
if a.rateLimitUntil < now {
available = append(available, a)
}
}
target := available
if len(target) == 0 {
target = make([]*accountStatus, len(p.accounts))
copy(target, p.accounts)
// sort by earliest recovery
for i := 1; i < len(target); i++ {
for j := i; j > 0 && target[j].rateLimitUntil < target[j-1].rateLimitUntil; j-- {
target[j], target[j-1] = target[j-1], target[j]
}
}
}
// pick least busy then least recently used
best := target[0]
for _, a := range target[1:] {
if a.activeRequests < best.activeRequests {
best = a
} else if a.activeRequests == best.activeRequests && a.lastUsed < best.lastUsed {
best = a
}
}
best.lastUsed = now
return best.configDir
}
func (p *AccountPool) find(configDir string) *accountStatus {
for _, a := range p.accounts {
if a.configDir == configDir {
return a
}
}
return nil
}
func (p *AccountPool) ReportRequestStart(configDir string) {
if configDir == "" {
return
}
p.mu.Lock()
defer p.mu.Unlock()
if a := p.find(configDir); a != nil {
a.activeRequests++
a.totalRequests++
}
}
func (p *AccountPool) ReportRequestEnd(configDir string) {
if configDir == "" {
return
}
p.mu.Lock()
defer p.mu.Unlock()
if a := p.find(configDir); a != nil && a.activeRequests > 0 {
a.activeRequests--
}
}
func (p *AccountPool) ReportRequestSuccess(configDir string, latencyMs int64) {
if configDir == "" {
return
}
p.mu.Lock()
defer p.mu.Unlock()
if a := p.find(configDir); a != nil {
a.totalSuccess++
a.totalLatencyMs += latencyMs
}
}
func (p *AccountPool) ReportRequestError(configDir string, latencyMs int64) {
if configDir == "" {
return
}
p.mu.Lock()
defer p.mu.Unlock()
if a := p.find(configDir); a != nil {
a.totalErrors++
a.totalLatencyMs += latencyMs
}
}
func (p *AccountPool) ReportRateLimit(configDir string, penaltyMs int64) {
if configDir == "" {
return
}
if penaltyMs <= 0 {
penaltyMs = 60000
}
p.mu.Lock()
defer p.mu.Unlock()
if a := p.find(configDir); a != nil {
a.rateLimitUntil = time.Now().UnixMilli() + penaltyMs
a.totalRateLimits++
}
}
func (p *AccountPool) GetStats() []AccountStat {
p.mu.Lock()
defer p.mu.Unlock()
now := time.Now().UnixMilli()
stats := make([]AccountStat, len(p.accounts))
for i, a := range p.accounts {
stats[i] = AccountStat{
ConfigDir: a.configDir,
ActiveRequests: a.activeRequests,
TotalRequests: a.totalRequests,
TotalSuccess: a.totalSuccess,
TotalErrors: a.totalErrors,
TotalRateLimits: a.totalRateLimits,
TotalLatencyMs: a.totalLatencyMs,
IsRateLimited: a.rateLimitUntil > now,
RateLimitUntil: a.rateLimitUntil,
}
}
return stats
}
func (p *AccountPool) Count() int {
return len(p.accounts)
}
// ─── PoolHandle interface ──────────────────────────────────────────────────
// PoolHandle 讓 handler 可以注入獨立的 pool 實例,避免多 port 模式共用全域 pool。
type PoolHandle interface {
GetNextConfigDir() string
ReportRequestStart(configDir string)
ReportRequestEnd(configDir string)
ReportRequestSuccess(configDir string, latencyMs int64)
ReportRequestError(configDir string, latencyMs int64)
ReportRateLimit(configDir string, penaltyMs int64)
GetStats() []AccountStat
}
// GlobalPoolHandle 包裝全域函式以實作 PoolHandle 介面(單 port 模式使用)
type GlobalPoolHandle struct{}
func (GlobalPoolHandle) GetNextConfigDir() string { return GetNextAccountConfigDir() }
func (GlobalPoolHandle) ReportRequestStart(d string) { ReportRequestStart(d) }
func (GlobalPoolHandle) ReportRequestEnd(d string) { ReportRequestEnd(d) }
func (GlobalPoolHandle) ReportRequestSuccess(d string, l int64) { ReportRequestSuccess(d, l) }
func (GlobalPoolHandle) ReportRequestError(d string, l int64) { ReportRequestError(d, l) }
func (GlobalPoolHandle) ReportRateLimit(d string, p int64) { ReportRateLimit(d, p) }
func (GlobalPoolHandle) GetStats() []AccountStat { return GetAccountStats() }
// ─── Global pool ───────────────────────────────────────────────────────────
var (
globalPool *AccountPool
globalMu sync.Mutex
)
func InitAccountPool(configDirs []string) {
globalMu.Lock()
defer globalMu.Unlock()
globalPool = NewAccountPool(configDirs)
}
func GetNextAccountConfigDir() string {
globalMu.Lock()
p := globalPool
globalMu.Unlock()
if p == nil {
return ""
}
return p.GetNextConfigDir()
}
func ReportRequestStart(configDir string) {
globalMu.Lock()
p := globalPool
globalMu.Unlock()
if p != nil {
p.ReportRequestStart(configDir)
}
}
func ReportRequestEnd(configDir string) {
globalMu.Lock()
p := globalPool
globalMu.Unlock()
if p != nil {
p.ReportRequestEnd(configDir)
}
}
func ReportRequestSuccess(configDir string, latencyMs int64) {
globalMu.Lock()
p := globalPool
globalMu.Unlock()
if p != nil {
p.ReportRequestSuccess(configDir, latencyMs)
}
}
func ReportRequestError(configDir string, latencyMs int64) {
globalMu.Lock()
p := globalPool
globalMu.Unlock()
if p != nil {
p.ReportRequestError(configDir, latencyMs)
}
}
func ReportRateLimit(configDir string, penaltyMs int64) {
globalMu.Lock()
p := globalPool
globalMu.Unlock()
if p != nil {
p.ReportRateLimit(configDir, penaltyMs)
}
}
func GetAccountStats() []AccountStat {
globalMu.Lock()
p := globalPool
globalMu.Unlock()
if p == nil {
return nil
}
return p.GetStats()
}

View File

@ -0,0 +1,152 @@
package repository
import (
"testing"
"time"
)
func TestEmptyPool(t *testing.T) {
p := NewAccountPool(nil)
if got := p.GetNextConfigDir(); got != "" {
t.Fatalf("expected empty string for empty pool, got %q", got)
}
if p.Count() != 0 {
t.Fatalf("expected count 0, got %d", p.Count())
}
}
func TestSingleDir(t *testing.T) {
p := NewAccountPool([]string{"/dir1"})
if got := p.GetNextConfigDir(); got != "/dir1" {
t.Fatalf("expected /dir1, got %q", got)
}
if got := p.GetNextConfigDir(); got != "/dir1" {
t.Fatalf("expected /dir1 again, got %q", got)
}
}
func TestRoundRobin(t *testing.T) {
p := NewAccountPool([]string{"/a", "/b", "/c"})
got := []string{
p.GetNextConfigDir(),
p.GetNextConfigDir(),
p.GetNextConfigDir(),
p.GetNextConfigDir(),
}
want := []string{"/a", "/b", "/c", "/a"}
for i, w := range want {
if got[i] != w {
t.Fatalf("call %d: expected %q, got %q", i, w, got[i])
}
}
}
func TestLeastBusy(t *testing.T) {
p := NewAccountPool([]string{"/dir1", "/dir2", "/dir3"})
p.ReportRequestStart("/dir1")
p.ReportRequestStart("/dir2")
if got := p.GetNextConfigDir(); got != "/dir3" {
t.Fatalf("expected /dir3 (least busy), got %q", got)
}
p.ReportRequestStart("/dir3")
p.ReportRequestEnd("/dir1")
if got := p.GetNextConfigDir(); got != "/dir1" {
t.Fatalf("expected /dir1 after end, got %q", got)
}
}
func TestSkipsRateLimited(t *testing.T) {
p := NewAccountPool([]string{"/dir1", "/dir2"})
p.ReportRateLimit("/dir1", 60000)
if got := p.GetNextConfigDir(); got != "/dir2" {
t.Fatalf("expected /dir2, got %q", got)
}
if got := p.GetNextConfigDir(); got != "/dir2" {
t.Fatalf("expected /dir2 again, got %q", got)
}
}
func TestFallbackToSoonestRecovery(t *testing.T) {
p := NewAccountPool([]string{"/dir1", "/dir2"})
p.ReportRateLimit("/dir1", 60000)
p.ReportRateLimit("/dir2", 30000)
// dir2 recovers sooner — should be selected
if got := p.GetNextConfigDir(); got != "/dir2" {
t.Fatalf("expected /dir2 (sooner recovery), got %q", got)
}
}
func TestActiveRequestsDoesNotGoNegative(t *testing.T) {
p := NewAccountPool([]string{"/dir1"})
p.ReportRequestEnd("/dir1")
p.ReportRequestEnd("/dir1")
if got := p.GetNextConfigDir(); got != "/dir1" {
t.Fatalf("pool should still work, got %q", got)
}
}
func TestIgnoreUnknownConfigDir(t *testing.T) {
p := NewAccountPool([]string{"/dir1"})
p.ReportRequestStart("/nonexistent")
p.ReportRequestEnd("/nonexistent")
p.ReportRateLimit("/nonexistent", 60000)
if got := p.GetNextConfigDir(); got != "/dir1" {
t.Fatalf("expected /dir1, got %q", got)
}
}
func TestRateLimitExpires(t *testing.T) {
p := NewAccountPool([]string{"/dir1", "/dir2"})
p.ReportRateLimit("/dir1", 50)
if got := p.GetNextConfigDir(); got != "/dir2" {
t.Fatalf("immediately expected /dir2, got %q", got)
}
time.Sleep(100 * time.Millisecond)
if got := p.GetNextConfigDir(); got != "/dir1" {
t.Fatalf("after expiry expected /dir1, got %q", got)
}
}
func TestGlobalPool(t *testing.T) {
InitAccountPool([]string{"/g1", "/g2"})
if got := GetNextAccountConfigDir(); got != "/g1" {
t.Fatalf("expected /g1, got %q", got)
}
if got := GetNextAccountConfigDir(); got != "/g2" {
t.Fatalf("expected /g2, got %q", got)
}
if got := GetNextAccountConfigDir(); got != "/g1" {
t.Fatalf("expected /g1 again, got %q", got)
}
}
func TestGlobalPoolEmpty(t *testing.T) {
InitAccountPool(nil)
if got := GetNextAccountConfigDir(); got != "" {
t.Fatalf("expected empty string for empty global pool, got %q", got)
}
}
func TestGlobalPoolReinit(t *testing.T) {
InitAccountPool([]string{"/old1", "/old2"})
GetNextAccountConfigDir()
InitAccountPool([]string{"/new1"})
if got := GetNextAccountConfigDir(); got != "/new1" {
t.Fatalf("expected /new1 after reinit, got %q", got)
}
}
func TestGlobalPoolFunctionsNoopBeforeInit(t *testing.T) {
InitAccountPool(nil)
ReportRequestStart("/dir1")
ReportRequestEnd("/dir1")
ReportRateLimit("/dir1", 1000)
}