merge: refactor/repository
This commit is contained in:
commit
e5f19c243b
|
|
@ -0,0 +1,284 @@
|
|||
package repository
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
type accountStatus struct {
|
||||
configDir string
|
||||
activeRequests int
|
||||
lastUsed int64
|
||||
rateLimitUntil int64
|
||||
totalRequests int
|
||||
totalSuccess int
|
||||
totalErrors int
|
||||
totalRateLimits int
|
||||
totalLatencyMs int64
|
||||
}
|
||||
|
||||
type AccountStat struct {
|
||||
ConfigDir string
|
||||
ActiveRequests int
|
||||
TotalRequests int
|
||||
TotalSuccess int
|
||||
TotalErrors int
|
||||
TotalRateLimits int
|
||||
TotalLatencyMs int64
|
||||
IsRateLimited bool
|
||||
RateLimitUntil int64
|
||||
}
|
||||
|
||||
type AccountPool struct {
|
||||
mu sync.Mutex
|
||||
accounts []*accountStatus
|
||||
}
|
||||
|
||||
func NewAccountPool(configDirs []string) *AccountPool {
|
||||
accounts := make([]*accountStatus, 0, len(configDirs))
|
||||
for _, dir := range configDirs {
|
||||
accounts = append(accounts, &accountStatus{configDir: dir})
|
||||
}
|
||||
return &AccountPool{accounts: accounts}
|
||||
}
|
||||
|
||||
func (p *AccountPool) GetNextConfigDir() string {
|
||||
p.mu.Lock()
|
||||
defer p.mu.Unlock()
|
||||
|
||||
if len(p.accounts) == 0 {
|
||||
return ""
|
||||
}
|
||||
|
||||
now := time.Now().UnixMilli()
|
||||
|
||||
available := make([]*accountStatus, 0, len(p.accounts))
|
||||
for _, a := range p.accounts {
|
||||
if a.rateLimitUntil < now {
|
||||
available = append(available, a)
|
||||
}
|
||||
}
|
||||
|
||||
target := available
|
||||
if len(target) == 0 {
|
||||
target = make([]*accountStatus, len(p.accounts))
|
||||
copy(target, p.accounts)
|
||||
// sort by earliest recovery
|
||||
for i := 1; i < len(target); i++ {
|
||||
for j := i; j > 0 && target[j].rateLimitUntil < target[j-1].rateLimitUntil; j-- {
|
||||
target[j], target[j-1] = target[j-1], target[j]
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// pick least busy then least recently used
|
||||
best := target[0]
|
||||
for _, a := range target[1:] {
|
||||
if a.activeRequests < best.activeRequests {
|
||||
best = a
|
||||
} else if a.activeRequests == best.activeRequests && a.lastUsed < best.lastUsed {
|
||||
best = a
|
||||
}
|
||||
}
|
||||
best.lastUsed = now
|
||||
return best.configDir
|
||||
}
|
||||
|
||||
func (p *AccountPool) find(configDir string) *accountStatus {
|
||||
for _, a := range p.accounts {
|
||||
if a.configDir == configDir {
|
||||
return a
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *AccountPool) ReportRequestStart(configDir string) {
|
||||
if configDir == "" {
|
||||
return
|
||||
}
|
||||
p.mu.Lock()
|
||||
defer p.mu.Unlock()
|
||||
if a := p.find(configDir); a != nil {
|
||||
a.activeRequests++
|
||||
a.totalRequests++
|
||||
}
|
||||
}
|
||||
|
||||
func (p *AccountPool) ReportRequestEnd(configDir string) {
|
||||
if configDir == "" {
|
||||
return
|
||||
}
|
||||
p.mu.Lock()
|
||||
defer p.mu.Unlock()
|
||||
if a := p.find(configDir); a != nil && a.activeRequests > 0 {
|
||||
a.activeRequests--
|
||||
}
|
||||
}
|
||||
|
||||
func (p *AccountPool) ReportRequestSuccess(configDir string, latencyMs int64) {
|
||||
if configDir == "" {
|
||||
return
|
||||
}
|
||||
p.mu.Lock()
|
||||
defer p.mu.Unlock()
|
||||
if a := p.find(configDir); a != nil {
|
||||
a.totalSuccess++
|
||||
a.totalLatencyMs += latencyMs
|
||||
}
|
||||
}
|
||||
|
||||
func (p *AccountPool) ReportRequestError(configDir string, latencyMs int64) {
|
||||
if configDir == "" {
|
||||
return
|
||||
}
|
||||
p.mu.Lock()
|
||||
defer p.mu.Unlock()
|
||||
if a := p.find(configDir); a != nil {
|
||||
a.totalErrors++
|
||||
a.totalLatencyMs += latencyMs
|
||||
}
|
||||
}
|
||||
|
||||
func (p *AccountPool) ReportRateLimit(configDir string, penaltyMs int64) {
|
||||
if configDir == "" {
|
||||
return
|
||||
}
|
||||
if penaltyMs <= 0 {
|
||||
penaltyMs = 60000
|
||||
}
|
||||
p.mu.Lock()
|
||||
defer p.mu.Unlock()
|
||||
if a := p.find(configDir); a != nil {
|
||||
a.rateLimitUntil = time.Now().UnixMilli() + penaltyMs
|
||||
a.totalRateLimits++
|
||||
}
|
||||
}
|
||||
|
||||
func (p *AccountPool) GetStats() []AccountStat {
|
||||
p.mu.Lock()
|
||||
defer p.mu.Unlock()
|
||||
now := time.Now().UnixMilli()
|
||||
stats := make([]AccountStat, len(p.accounts))
|
||||
for i, a := range p.accounts {
|
||||
stats[i] = AccountStat{
|
||||
ConfigDir: a.configDir,
|
||||
ActiveRequests: a.activeRequests,
|
||||
TotalRequests: a.totalRequests,
|
||||
TotalSuccess: a.totalSuccess,
|
||||
TotalErrors: a.totalErrors,
|
||||
TotalRateLimits: a.totalRateLimits,
|
||||
TotalLatencyMs: a.totalLatencyMs,
|
||||
IsRateLimited: a.rateLimitUntil > now,
|
||||
RateLimitUntil: a.rateLimitUntil,
|
||||
}
|
||||
}
|
||||
return stats
|
||||
}
|
||||
|
||||
func (p *AccountPool) Count() int {
|
||||
return len(p.accounts)
|
||||
}
|
||||
|
||||
|
||||
// ─── PoolHandle interface ──────────────────────────────────────────────────
|
||||
// PoolHandle 讓 handler 可以注入獨立的 pool 實例,避免多 port 模式共用全域 pool。
|
||||
|
||||
type PoolHandle interface {
|
||||
GetNextConfigDir() string
|
||||
ReportRequestStart(configDir string)
|
||||
ReportRequestEnd(configDir string)
|
||||
ReportRequestSuccess(configDir string, latencyMs int64)
|
||||
ReportRequestError(configDir string, latencyMs int64)
|
||||
ReportRateLimit(configDir string, penaltyMs int64)
|
||||
GetStats() []AccountStat
|
||||
}
|
||||
|
||||
// GlobalPoolHandle 包裝全域函式以實作 PoolHandle 介面(單 port 模式使用)
|
||||
type GlobalPoolHandle struct{}
|
||||
|
||||
func (GlobalPoolHandle) GetNextConfigDir() string { return GetNextAccountConfigDir() }
|
||||
func (GlobalPoolHandle) ReportRequestStart(d string) { ReportRequestStart(d) }
|
||||
func (GlobalPoolHandle) ReportRequestEnd(d string) { ReportRequestEnd(d) }
|
||||
func (GlobalPoolHandle) ReportRequestSuccess(d string, l int64) { ReportRequestSuccess(d, l) }
|
||||
func (GlobalPoolHandle) ReportRequestError(d string, l int64) { ReportRequestError(d, l) }
|
||||
func (GlobalPoolHandle) ReportRateLimit(d string, p int64) { ReportRateLimit(d, p) }
|
||||
func (GlobalPoolHandle) GetStats() []AccountStat { return GetAccountStats() }
|
||||
|
||||
// ─── Global pool ───────────────────────────────────────────────────────────
|
||||
|
||||
var (
|
||||
globalPool *AccountPool
|
||||
globalMu sync.Mutex
|
||||
)
|
||||
|
||||
func InitAccountPool(configDirs []string) {
|
||||
globalMu.Lock()
|
||||
defer globalMu.Unlock()
|
||||
globalPool = NewAccountPool(configDirs)
|
||||
}
|
||||
|
||||
func GetNextAccountConfigDir() string {
|
||||
globalMu.Lock()
|
||||
p := globalPool
|
||||
globalMu.Unlock()
|
||||
if p == nil {
|
||||
return ""
|
||||
}
|
||||
return p.GetNextConfigDir()
|
||||
}
|
||||
|
||||
func ReportRequestStart(configDir string) {
|
||||
globalMu.Lock()
|
||||
p := globalPool
|
||||
globalMu.Unlock()
|
||||
if p != nil {
|
||||
p.ReportRequestStart(configDir)
|
||||
}
|
||||
}
|
||||
|
||||
func ReportRequestEnd(configDir string) {
|
||||
globalMu.Lock()
|
||||
p := globalPool
|
||||
globalMu.Unlock()
|
||||
if p != nil {
|
||||
p.ReportRequestEnd(configDir)
|
||||
}
|
||||
}
|
||||
|
||||
func ReportRequestSuccess(configDir string, latencyMs int64) {
|
||||
globalMu.Lock()
|
||||
p := globalPool
|
||||
globalMu.Unlock()
|
||||
if p != nil {
|
||||
p.ReportRequestSuccess(configDir, latencyMs)
|
||||
}
|
||||
}
|
||||
|
||||
func ReportRequestError(configDir string, latencyMs int64) {
|
||||
globalMu.Lock()
|
||||
p := globalPool
|
||||
globalMu.Unlock()
|
||||
if p != nil {
|
||||
p.ReportRequestError(configDir, latencyMs)
|
||||
}
|
||||
}
|
||||
|
||||
func ReportRateLimit(configDir string, penaltyMs int64) {
|
||||
globalMu.Lock()
|
||||
p := globalPool
|
||||
globalMu.Unlock()
|
||||
if p != nil {
|
||||
p.ReportRateLimit(configDir, penaltyMs)
|
||||
}
|
||||
}
|
||||
|
||||
func GetAccountStats() []AccountStat {
|
||||
globalMu.Lock()
|
||||
p := globalPool
|
||||
globalMu.Unlock()
|
||||
if p == nil {
|
||||
return nil
|
||||
}
|
||||
return p.GetStats()
|
||||
}
|
||||
|
|
@ -0,0 +1,152 @@
|
|||
package repository
|
||||
|
||||
import (
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
func TestEmptyPool(t *testing.T) {
|
||||
p := NewAccountPool(nil)
|
||||
if got := p.GetNextConfigDir(); got != "" {
|
||||
t.Fatalf("expected empty string for empty pool, got %q", got)
|
||||
}
|
||||
if p.Count() != 0 {
|
||||
t.Fatalf("expected count 0, got %d", p.Count())
|
||||
}
|
||||
}
|
||||
|
||||
func TestSingleDir(t *testing.T) {
|
||||
p := NewAccountPool([]string{"/dir1"})
|
||||
if got := p.GetNextConfigDir(); got != "/dir1" {
|
||||
t.Fatalf("expected /dir1, got %q", got)
|
||||
}
|
||||
if got := p.GetNextConfigDir(); got != "/dir1" {
|
||||
t.Fatalf("expected /dir1 again, got %q", got)
|
||||
}
|
||||
}
|
||||
|
||||
func TestRoundRobin(t *testing.T) {
|
||||
p := NewAccountPool([]string{"/a", "/b", "/c"})
|
||||
got := []string{
|
||||
p.GetNextConfigDir(),
|
||||
p.GetNextConfigDir(),
|
||||
p.GetNextConfigDir(),
|
||||
p.GetNextConfigDir(),
|
||||
}
|
||||
want := []string{"/a", "/b", "/c", "/a"}
|
||||
for i, w := range want {
|
||||
if got[i] != w {
|
||||
t.Fatalf("call %d: expected %q, got %q", i, w, got[i])
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestLeastBusy(t *testing.T) {
|
||||
p := NewAccountPool([]string{"/dir1", "/dir2", "/dir3"})
|
||||
p.ReportRequestStart("/dir1")
|
||||
p.ReportRequestStart("/dir2")
|
||||
|
||||
if got := p.GetNextConfigDir(); got != "/dir3" {
|
||||
t.Fatalf("expected /dir3 (least busy), got %q", got)
|
||||
}
|
||||
|
||||
p.ReportRequestStart("/dir3")
|
||||
p.ReportRequestEnd("/dir1")
|
||||
|
||||
if got := p.GetNextConfigDir(); got != "/dir1" {
|
||||
t.Fatalf("expected /dir1 after end, got %q", got)
|
||||
}
|
||||
}
|
||||
|
||||
func TestSkipsRateLimited(t *testing.T) {
|
||||
p := NewAccountPool([]string{"/dir1", "/dir2"})
|
||||
p.ReportRateLimit("/dir1", 60000)
|
||||
|
||||
if got := p.GetNextConfigDir(); got != "/dir2" {
|
||||
t.Fatalf("expected /dir2, got %q", got)
|
||||
}
|
||||
if got := p.GetNextConfigDir(); got != "/dir2" {
|
||||
t.Fatalf("expected /dir2 again, got %q", got)
|
||||
}
|
||||
}
|
||||
|
||||
func TestFallbackToSoonestRecovery(t *testing.T) {
|
||||
p := NewAccountPool([]string{"/dir1", "/dir2"})
|
||||
p.ReportRateLimit("/dir1", 60000)
|
||||
p.ReportRateLimit("/dir2", 30000)
|
||||
|
||||
// dir2 recovers sooner — should be selected
|
||||
if got := p.GetNextConfigDir(); got != "/dir2" {
|
||||
t.Fatalf("expected /dir2 (sooner recovery), got %q", got)
|
||||
}
|
||||
}
|
||||
|
||||
func TestActiveRequestsDoesNotGoNegative(t *testing.T) {
|
||||
p := NewAccountPool([]string{"/dir1"})
|
||||
p.ReportRequestEnd("/dir1")
|
||||
p.ReportRequestEnd("/dir1")
|
||||
if got := p.GetNextConfigDir(); got != "/dir1" {
|
||||
t.Fatalf("pool should still work, got %q", got)
|
||||
}
|
||||
}
|
||||
|
||||
func TestIgnoreUnknownConfigDir(t *testing.T) {
|
||||
p := NewAccountPool([]string{"/dir1"})
|
||||
p.ReportRequestStart("/nonexistent")
|
||||
p.ReportRequestEnd("/nonexistent")
|
||||
p.ReportRateLimit("/nonexistent", 60000)
|
||||
if got := p.GetNextConfigDir(); got != "/dir1" {
|
||||
t.Fatalf("expected /dir1, got %q", got)
|
||||
}
|
||||
}
|
||||
|
||||
func TestRateLimitExpires(t *testing.T) {
|
||||
p := NewAccountPool([]string{"/dir1", "/dir2"})
|
||||
p.ReportRateLimit("/dir1", 50)
|
||||
|
||||
if got := p.GetNextConfigDir(); got != "/dir2" {
|
||||
t.Fatalf("immediately expected /dir2, got %q", got)
|
||||
}
|
||||
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
|
||||
if got := p.GetNextConfigDir(); got != "/dir1" {
|
||||
t.Fatalf("after expiry expected /dir1, got %q", got)
|
||||
}
|
||||
}
|
||||
|
||||
func TestGlobalPool(t *testing.T) {
|
||||
InitAccountPool([]string{"/g1", "/g2"})
|
||||
if got := GetNextAccountConfigDir(); got != "/g1" {
|
||||
t.Fatalf("expected /g1, got %q", got)
|
||||
}
|
||||
if got := GetNextAccountConfigDir(); got != "/g2" {
|
||||
t.Fatalf("expected /g2, got %q", got)
|
||||
}
|
||||
if got := GetNextAccountConfigDir(); got != "/g1" {
|
||||
t.Fatalf("expected /g1 again, got %q", got)
|
||||
}
|
||||
}
|
||||
|
||||
func TestGlobalPoolEmpty(t *testing.T) {
|
||||
InitAccountPool(nil)
|
||||
if got := GetNextAccountConfigDir(); got != "" {
|
||||
t.Fatalf("expected empty string for empty global pool, got %q", got)
|
||||
}
|
||||
}
|
||||
|
||||
func TestGlobalPoolReinit(t *testing.T) {
|
||||
InitAccountPool([]string{"/old1", "/old2"})
|
||||
GetNextAccountConfigDir()
|
||||
InitAccountPool([]string{"/new1"})
|
||||
if got := GetNextAccountConfigDir(); got != "/new1" {
|
||||
t.Fatalf("expected /new1 after reinit, got %q", got)
|
||||
}
|
||||
}
|
||||
|
||||
func TestGlobalPoolFunctionsNoopBeforeInit(t *testing.T) {
|
||||
InitAccountPool(nil)
|
||||
ReportRequestStart("/dir1")
|
||||
ReportRequestEnd("/dir1")
|
||||
ReportRateLimit("/dir1", 1000)
|
||||
}
|
||||
Loading…
Reference in New Issue