diff --git a/pkg/repository/account.go b/pkg/repository/account.go new file mode 100644 index 0000000..8f12aef --- /dev/null +++ b/pkg/repository/account.go @@ -0,0 +1,284 @@ +package repository + +import ( + "sync" + "time" +) + +type accountStatus struct { + configDir string + activeRequests int + lastUsed int64 + rateLimitUntil int64 + totalRequests int + totalSuccess int + totalErrors int + totalRateLimits int + totalLatencyMs int64 +} + +type AccountStat struct { + ConfigDir string + ActiveRequests int + TotalRequests int + TotalSuccess int + TotalErrors int + TotalRateLimits int + TotalLatencyMs int64 + IsRateLimited bool + RateLimitUntil int64 +} + +type AccountPool struct { + mu sync.Mutex + accounts []*accountStatus +} + +func NewAccountPool(configDirs []string) *AccountPool { + accounts := make([]*accountStatus, 0, len(configDirs)) + for _, dir := range configDirs { + accounts = append(accounts, &accountStatus{configDir: dir}) + } + return &AccountPool{accounts: accounts} +} + +func (p *AccountPool) GetNextConfigDir() string { + p.mu.Lock() + defer p.mu.Unlock() + + if len(p.accounts) == 0 { + return "" + } + + now := time.Now().UnixMilli() + + available := make([]*accountStatus, 0, len(p.accounts)) + for _, a := range p.accounts { + if a.rateLimitUntil < now { + available = append(available, a) + } + } + + target := available + if len(target) == 0 { + target = make([]*accountStatus, len(p.accounts)) + copy(target, p.accounts) + // sort by earliest recovery + for i := 1; i < len(target); i++ { + for j := i; j > 0 && target[j].rateLimitUntil < target[j-1].rateLimitUntil; j-- { + target[j], target[j-1] = target[j-1], target[j] + } + } + } + + // pick least busy then least recently used + best := target[0] + for _, a := range target[1:] { + if a.activeRequests < best.activeRequests { + best = a + } else if a.activeRequests == best.activeRequests && a.lastUsed < best.lastUsed { + best = a + } + } + best.lastUsed = now + return best.configDir +} + +func (p *AccountPool) find(configDir string) *accountStatus { + for _, a := range p.accounts { + if a.configDir == configDir { + return a + } + } + return nil +} + +func (p *AccountPool) ReportRequestStart(configDir string) { + if configDir == "" { + return + } + p.mu.Lock() + defer p.mu.Unlock() + if a := p.find(configDir); a != nil { + a.activeRequests++ + a.totalRequests++ + } +} + +func (p *AccountPool) ReportRequestEnd(configDir string) { + if configDir == "" { + return + } + p.mu.Lock() + defer p.mu.Unlock() + if a := p.find(configDir); a != nil && a.activeRequests > 0 { + a.activeRequests-- + } +} + +func (p *AccountPool) ReportRequestSuccess(configDir string, latencyMs int64) { + if configDir == "" { + return + } + p.mu.Lock() + defer p.mu.Unlock() + if a := p.find(configDir); a != nil { + a.totalSuccess++ + a.totalLatencyMs += latencyMs + } +} + +func (p *AccountPool) ReportRequestError(configDir string, latencyMs int64) { + if configDir == "" { + return + } + p.mu.Lock() + defer p.mu.Unlock() + if a := p.find(configDir); a != nil { + a.totalErrors++ + a.totalLatencyMs += latencyMs + } +} + +func (p *AccountPool) ReportRateLimit(configDir string, penaltyMs int64) { + if configDir == "" { + return + } + if penaltyMs <= 0 { + penaltyMs = 60000 + } + p.mu.Lock() + defer p.mu.Unlock() + if a := p.find(configDir); a != nil { + a.rateLimitUntil = time.Now().UnixMilli() + penaltyMs + a.totalRateLimits++ + } +} + +func (p *AccountPool) GetStats() []AccountStat { + p.mu.Lock() + defer p.mu.Unlock() + now := time.Now().UnixMilli() + stats := make([]AccountStat, len(p.accounts)) + for i, a := range p.accounts { + stats[i] = AccountStat{ + ConfigDir: a.configDir, + ActiveRequests: a.activeRequests, + TotalRequests: a.totalRequests, + TotalSuccess: a.totalSuccess, + TotalErrors: a.totalErrors, + TotalRateLimits: a.totalRateLimits, + TotalLatencyMs: a.totalLatencyMs, + IsRateLimited: a.rateLimitUntil > now, + RateLimitUntil: a.rateLimitUntil, + } + } + return stats +} + +func (p *AccountPool) Count() int { + return len(p.accounts) +} + + +// ─── PoolHandle interface ────────────────────────────────────────────────── +// PoolHandle 讓 handler 可以注入獨立的 pool 實例,避免多 port 模式共用全域 pool。 + +type PoolHandle interface { + GetNextConfigDir() string + ReportRequestStart(configDir string) + ReportRequestEnd(configDir string) + ReportRequestSuccess(configDir string, latencyMs int64) + ReportRequestError(configDir string, latencyMs int64) + ReportRateLimit(configDir string, penaltyMs int64) + GetStats() []AccountStat +} + +// GlobalPoolHandle 包裝全域函式以實作 PoolHandle 介面(單 port 模式使用) +type GlobalPoolHandle struct{} + +func (GlobalPoolHandle) GetNextConfigDir() string { return GetNextAccountConfigDir() } +func (GlobalPoolHandle) ReportRequestStart(d string) { ReportRequestStart(d) } +func (GlobalPoolHandle) ReportRequestEnd(d string) { ReportRequestEnd(d) } +func (GlobalPoolHandle) ReportRequestSuccess(d string, l int64) { ReportRequestSuccess(d, l) } +func (GlobalPoolHandle) ReportRequestError(d string, l int64) { ReportRequestError(d, l) } +func (GlobalPoolHandle) ReportRateLimit(d string, p int64) { ReportRateLimit(d, p) } +func (GlobalPoolHandle) GetStats() []AccountStat { return GetAccountStats() } + +// ─── Global pool ─────────────────────────────────────────────────────────── + +var ( + globalPool *AccountPool + globalMu sync.Mutex +) + +func InitAccountPool(configDirs []string) { + globalMu.Lock() + defer globalMu.Unlock() + globalPool = NewAccountPool(configDirs) +} + +func GetNextAccountConfigDir() string { + globalMu.Lock() + p := globalPool + globalMu.Unlock() + if p == nil { + return "" + } + return p.GetNextConfigDir() +} + +func ReportRequestStart(configDir string) { + globalMu.Lock() + p := globalPool + globalMu.Unlock() + if p != nil { + p.ReportRequestStart(configDir) + } +} + +func ReportRequestEnd(configDir string) { + globalMu.Lock() + p := globalPool + globalMu.Unlock() + if p != nil { + p.ReportRequestEnd(configDir) + } +} + +func ReportRequestSuccess(configDir string, latencyMs int64) { + globalMu.Lock() + p := globalPool + globalMu.Unlock() + if p != nil { + p.ReportRequestSuccess(configDir, latencyMs) + } +} + +func ReportRequestError(configDir string, latencyMs int64) { + globalMu.Lock() + p := globalPool + globalMu.Unlock() + if p != nil { + p.ReportRequestError(configDir, latencyMs) + } +} + +func ReportRateLimit(configDir string, penaltyMs int64) { + globalMu.Lock() + p := globalPool + globalMu.Unlock() + if p != nil { + p.ReportRateLimit(configDir, penaltyMs) + } +} + +func GetAccountStats() []AccountStat { + globalMu.Lock() + p := globalPool + globalMu.Unlock() + if p == nil { + return nil + } + return p.GetStats() +} diff --git a/pkg/repository/account_test.go b/pkg/repository/account_test.go new file mode 100644 index 0000000..bb0fd4d --- /dev/null +++ b/pkg/repository/account_test.go @@ -0,0 +1,152 @@ +package repository + +import ( + "testing" + "time" +) + +func TestEmptyPool(t *testing.T) { + p := NewAccountPool(nil) + if got := p.GetNextConfigDir(); got != "" { + t.Fatalf("expected empty string for empty pool, got %q", got) + } + if p.Count() != 0 { + t.Fatalf("expected count 0, got %d", p.Count()) + } +} + +func TestSingleDir(t *testing.T) { + p := NewAccountPool([]string{"/dir1"}) + if got := p.GetNextConfigDir(); got != "/dir1" { + t.Fatalf("expected /dir1, got %q", got) + } + if got := p.GetNextConfigDir(); got != "/dir1" { + t.Fatalf("expected /dir1 again, got %q", got) + } +} + +func TestRoundRobin(t *testing.T) { + p := NewAccountPool([]string{"/a", "/b", "/c"}) + got := []string{ + p.GetNextConfigDir(), + p.GetNextConfigDir(), + p.GetNextConfigDir(), + p.GetNextConfigDir(), + } + want := []string{"/a", "/b", "/c", "/a"} + for i, w := range want { + if got[i] != w { + t.Fatalf("call %d: expected %q, got %q", i, w, got[i]) + } + } +} + +func TestLeastBusy(t *testing.T) { + p := NewAccountPool([]string{"/dir1", "/dir2", "/dir3"}) + p.ReportRequestStart("/dir1") + p.ReportRequestStart("/dir2") + + if got := p.GetNextConfigDir(); got != "/dir3" { + t.Fatalf("expected /dir3 (least busy), got %q", got) + } + + p.ReportRequestStart("/dir3") + p.ReportRequestEnd("/dir1") + + if got := p.GetNextConfigDir(); got != "/dir1" { + t.Fatalf("expected /dir1 after end, got %q", got) + } +} + +func TestSkipsRateLimited(t *testing.T) { + p := NewAccountPool([]string{"/dir1", "/dir2"}) + p.ReportRateLimit("/dir1", 60000) + + if got := p.GetNextConfigDir(); got != "/dir2" { + t.Fatalf("expected /dir2, got %q", got) + } + if got := p.GetNextConfigDir(); got != "/dir2" { + t.Fatalf("expected /dir2 again, got %q", got) + } +} + +func TestFallbackToSoonestRecovery(t *testing.T) { + p := NewAccountPool([]string{"/dir1", "/dir2"}) + p.ReportRateLimit("/dir1", 60000) + p.ReportRateLimit("/dir2", 30000) + + // dir2 recovers sooner — should be selected + if got := p.GetNextConfigDir(); got != "/dir2" { + t.Fatalf("expected /dir2 (sooner recovery), got %q", got) + } +} + +func TestActiveRequestsDoesNotGoNegative(t *testing.T) { + p := NewAccountPool([]string{"/dir1"}) + p.ReportRequestEnd("/dir1") + p.ReportRequestEnd("/dir1") + if got := p.GetNextConfigDir(); got != "/dir1" { + t.Fatalf("pool should still work, got %q", got) + } +} + +func TestIgnoreUnknownConfigDir(t *testing.T) { + p := NewAccountPool([]string{"/dir1"}) + p.ReportRequestStart("/nonexistent") + p.ReportRequestEnd("/nonexistent") + p.ReportRateLimit("/nonexistent", 60000) + if got := p.GetNextConfigDir(); got != "/dir1" { + t.Fatalf("expected /dir1, got %q", got) + } +} + +func TestRateLimitExpires(t *testing.T) { + p := NewAccountPool([]string{"/dir1", "/dir2"}) + p.ReportRateLimit("/dir1", 50) + + if got := p.GetNextConfigDir(); got != "/dir2" { + t.Fatalf("immediately expected /dir2, got %q", got) + } + + time.Sleep(100 * time.Millisecond) + + if got := p.GetNextConfigDir(); got != "/dir1" { + t.Fatalf("after expiry expected /dir1, got %q", got) + } +} + +func TestGlobalPool(t *testing.T) { + InitAccountPool([]string{"/g1", "/g2"}) + if got := GetNextAccountConfigDir(); got != "/g1" { + t.Fatalf("expected /g1, got %q", got) + } + if got := GetNextAccountConfigDir(); got != "/g2" { + t.Fatalf("expected /g2, got %q", got) + } + if got := GetNextAccountConfigDir(); got != "/g1" { + t.Fatalf("expected /g1 again, got %q", got) + } +} + +func TestGlobalPoolEmpty(t *testing.T) { + InitAccountPool(nil) + if got := GetNextAccountConfigDir(); got != "" { + t.Fatalf("expected empty string for empty global pool, got %q", got) + } +} + +func TestGlobalPoolReinit(t *testing.T) { + InitAccountPool([]string{"/old1", "/old2"}) + GetNextAccountConfigDir() + InitAccountPool([]string{"/new1"}) + if got := GetNextAccountConfigDir(); got != "/new1" { + t.Fatalf("expected /new1 after reinit, got %q", got) + } +} + +func TestGlobalPoolFunctionsNoopBeforeInit(t *testing.T) { + InitAccountPool(nil) + ReportRequestStart("/dir1") + ReportRequestEnd("/dir1") + ReportRateLimit("/dir1", 1000) +}