backend/pkg/library/centrifugo/client.go

519 lines
14 KiB
Go
Raw Permalink Normal View History

2026-01-06 07:15:18 +00:00
package centrifugo
import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"net"
"net/http"
"time"
)
// Client Centrifugo 客戶端
type Client struct {
apiURL string
apiKey string
client *http.Client
}
// ClientConfig 客戶端配置
type ClientConfig struct {
// APIURL Centrifugo API 地址(必填)
APIURL string
// APIKey API 密鑰(必填)
APIKey string
// Timeout 整體請求超時時間(預設 10 秒)
Timeout time.Duration
// MaxIdleConns 最大閒置連線數(預設 100
MaxIdleConns int
// MaxIdleConnsPerHost 每個 host 最大閒置連線數(預設 20適合高併發
MaxIdleConnsPerHost int
// IdleConnTimeout 閒置連線超時時間(預設 90 秒)
IdleConnTimeout time.Duration
// DialTimeout 建立連線超時時間(預設 5 秒)
DialTimeout time.Duration
// TLSHandshakeTimeout TLS 握手超時時間(預設 5 秒)
TLSHandshakeTimeout time.Duration
// ResponseHeaderTimeout 等待響應頭超時時間(預設 10 秒)
ResponseHeaderTimeout time.Duration
}
// DefaultConfig 返回預設配置
func DefaultConfig(apiURL, apiKey string) ClientConfig {
return ClientConfig{
APIURL: apiURL,
APIKey: apiKey,
Timeout: 10 * time.Second,
MaxIdleConns: 100,
MaxIdleConnsPerHost: 20, // 提高以支援高併發
IdleConnTimeout: 90 * time.Second,
DialTimeout: 5 * time.Second,
TLSHandshakeTimeout: 5 * time.Second,
ResponseHeaderTimeout: 10 * time.Second,
}
}
// HighPerformanceConfig 返回高效能配置(適合高併發場景)
func HighPerformanceConfig(apiURL, apiKey string) ClientConfig {
return ClientConfig{
APIURL: apiURL,
APIKey: apiKey,
2026-01-07 02:47:37 +00:00
Timeout: 5 * time.Second, // 更短的超時
MaxIdleConns: 200, // 更多閒置連線
MaxIdleConnsPerHost: 50, // 更多每 host 連線
2026-01-06 07:15:18 +00:00
IdleConnTimeout: 120 * time.Second,
DialTimeout: 3 * time.Second,
TLSHandshakeTimeout: 3 * time.Second,
ResponseHeaderTimeout: 5 * time.Second,
}
}
// NewClient 創建新的 Centrifugo 客戶端(使用預設配置)
func NewClient(apiURL, apiKey string) *Client {
return NewClientWithConfig(DefaultConfig(apiURL, apiKey))
}
// NewClientWithConfig 創建使用自定義配置的 Centrifugo 客戶端
func NewClientWithConfig(config ClientConfig) *Client {
// 設定預設值
if config.Timeout == 0 {
config.Timeout = 10 * time.Second
}
if config.MaxIdleConns == 0 {
config.MaxIdleConns = 100
}
if config.MaxIdleConnsPerHost == 0 {
config.MaxIdleConnsPerHost = 20
}
if config.IdleConnTimeout == 0 {
config.IdleConnTimeout = 90 * time.Second
}
if config.DialTimeout == 0 {
config.DialTimeout = 5 * time.Second
}
if config.TLSHandshakeTimeout == 0 {
config.TLSHandshakeTimeout = 5 * time.Second
}
if config.ResponseHeaderTimeout == 0 {
config.ResponseHeaderTimeout = 10 * time.Second
}
transport := &http.Transport{
DialContext: (&net.Dialer{
Timeout: config.DialTimeout,
KeepAlive: 30 * time.Second, // TCP keep-alive
}).DialContext,
MaxIdleConns: config.MaxIdleConns,
MaxIdleConnsPerHost: config.MaxIdleConnsPerHost,
IdleConnTimeout: config.IdleConnTimeout,
TLSHandshakeTimeout: config.TLSHandshakeTimeout,
ResponseHeaderTimeout: config.ResponseHeaderTimeout,
ForceAttemptHTTP2: true, // 嘗試使用 HTTP/2
}
return &Client{
apiURL: config.APIURL,
apiKey: config.APIKey,
client: &http.Client{
Timeout: config.Timeout,
Transport: transport,
},
}
}
// NewClientWithHTTP 創建使用自定義 HTTP 客戶端的 Centrifugo 客戶端
func NewClientWithHTTP(apiURL, apiKey string, httpClient *http.Client) *Client {
return &Client{
apiURL: apiURL,
apiKey: apiKey,
client: httpClient,
}
}
// ==================== Request/Response 結構 ====================
// PublishRequest 發布請求
type PublishRequest struct {
Channel string `json:"channel"`
Data interface{} `json:"data"`
}
// BroadcastRequest 批量發布請求
type BroadcastRequest struct {
Channels []string `json:"channels"`
Data interface{} `json:"data"`
}
// SubscribeRequest 訂閱請求
type SubscribeRequest struct {
User string `json:"user"`
Channel string `json:"channel"`
}
// UnsubscribeRequest 取消訂閱請求
type UnsubscribeRequest struct {
User string `json:"user"`
Channel string `json:"channel"`
}
// DisconnectRequest 斷開連線請求
type DisconnectRequest struct {
User string `json:"user"`
}
// DisconnectWithCodeRequest 帶代碼的斷開連線請求
type DisconnectWithCodeRequest struct {
User string `json:"user"`
Disconnect DisconnectInfo `json:"disconnect,omitempty"`
}
// DisconnectInfo 斷開連線資訊
type DisconnectInfo struct {
Code uint32 `json:"code"`
Reason string `json:"reason"`
}
// PresenceRequest 在線狀態請求
type PresenceRequest struct {
Channel string `json:"channel"`
}
// PresenceStatsRequest 在線統計請求
type PresenceStatsRequest struct {
Channel string `json:"channel"`
}
// HistoryRequest 歷史訊息請求
type HistoryRequest struct {
Channel string `json:"channel"`
Limit int `json:"limit,omitempty"`
Reverse bool `json:"reverse,omitempty"`
}
// ChannelsRequest 頻道列表請求
type ChannelsRequest struct {
Pattern string `json:"pattern,omitempty"`
}
// InfoRequest 伺服器資訊請求
type InfoRequest struct{}
// APIResponse Centrifugo API 通用響應
type APIResponse struct {
Error *APIError `json:"error,omitempty"`
Result interface{} `json:"result,omitempty"`
}
// APIError Centrifugo API 錯誤
type APIError struct {
Code int `json:"code"`
Message string `json:"message"`
}
func (e *APIError) Error() string {
return fmt.Sprintf("centrifugo error %d: %s", e.Code, e.Message)
}
// PublishResult 發布結果
type PublishResult struct {
Offset uint64 `json:"offset,omitempty"`
Epoch string `json:"epoch,omitempty"`
}
// PresenceResult 在線狀態結果
type PresenceResult struct {
Presence map[string]ClientInfo `json:"presence"`
}
// ClientInfo 客戶端資訊
type ClientInfo struct {
User string `json:"user"`
Client string `json:"client"`
ConnInfo json.RawMessage `json:"conn_info,omitempty"`
ChanInfo json.RawMessage `json:"chan_info,omitempty"`
}
// PresenceStatsResult 在線統計結果
type PresenceStatsResult struct {
NumClients int `json:"num_clients"`
NumUsers int `json:"num_users"`
}
// HistoryResult 歷史訊息結果
type HistoryResult struct {
Publications []Publication `json:"publications"`
Offset uint64 `json:"offset,omitempty"`
Epoch string `json:"epoch,omitempty"`
}
// Publication 發布的訊息
type Publication struct {
Offset uint64 `json:"offset,omitempty"`
Data json.RawMessage `json:"data"`
Info *ClientInfo `json:"info,omitempty"`
}
// ChannelsResult 頻道列表結果
type ChannelsResult struct {
Channels map[string]ChannelInfo `json:"channels"`
}
// ChannelInfo 頻道資訊
type ChannelInfo struct {
NumClients int `json:"num_clients"`
}
// InfoResult 伺服器資訊結果
type InfoResult struct {
Nodes []NodeInfo `json:"nodes"`
}
// NodeInfo 節點資訊
type NodeInfo struct {
UID string `json:"uid"`
Name string `json:"name"`
Version string `json:"version"`
NumClients int `json:"num_clients"`
NumUsers int `json:"num_users"`
NumChannels int `json:"num_channels"`
Uptime int `json:"uptime"`
}
// ==================== 發布相關方法 ====================
// Publish 發布訊息到指定頻道
func (c *Client) Publish(ctx context.Context, channel string, data []byte) (*PublishResult, error) {
req := PublishRequest{
Channel: channel,
Data: json.RawMessage(data),
}
return c.publish(ctx, req)
}
// PublishJSON 發布 JSON 訊息到指定頻道
func (c *Client) PublishJSON(ctx context.Context, channel string, data interface{}) (*PublishResult, error) {
req := PublishRequest{
Channel: channel,
Data: data,
}
return c.publish(ctx, req)
}
func (c *Client) publish(ctx context.Context, req PublishRequest) (*PublishResult, error) {
var result PublishResult
if err := c.callAPI(ctx, "publish", req, &result); err != nil {
return nil, err
}
return &result, nil
}
// Broadcast 批量發布訊息到多個頻道
func (c *Client) Broadcast(ctx context.Context, channels []string, data []byte) error {
req := BroadcastRequest{
Channels: channels,
Data: json.RawMessage(data),
}
return c.callAPI(ctx, "broadcast", req, nil)
}
// BroadcastJSON 批量發布 JSON 訊息到多個頻道
func (c *Client) BroadcastJSON(ctx context.Context, channels []string, data interface{}) error {
req := BroadcastRequest{
Channels: channels,
Data: data,
}
return c.callAPI(ctx, "broadcast", req, nil)
}
// ==================== 訂閱管理方法 ====================
// Subscribe 訂閱用戶到頻道
func (c *Client) Subscribe(ctx context.Context, user, channel string) error {
req := SubscribeRequest{
User: user,
Channel: channel,
}
return c.callAPI(ctx, "subscribe", req, nil)
}
// Unsubscribe 取消用戶訂閱
func (c *Client) Unsubscribe(ctx context.Context, user, channel string) error {
req := UnsubscribeRequest{
User: user,
Channel: channel,
}
return c.callAPI(ctx, "unsubscribe", req, nil)
}
// Disconnect 強制斷開用戶連線
func (c *Client) Disconnect(ctx context.Context, user string) error {
req := DisconnectRequest{
User: user,
}
return c.callAPI(ctx, "disconnect", req, nil)
}
// DisconnectWithCode 強制斷開用戶連線(帶斷開代碼和原因)
func (c *Client) DisconnectWithCode(ctx context.Context, user string, code uint32, reason string) error {
req := DisconnectWithCodeRequest{
User: user,
Disconnect: DisconnectInfo{
Code: code,
Reason: reason,
},
}
return c.callAPI(ctx, "disconnect", req, nil)
}
// ==================== 在線狀態方法 ====================
// Presence 獲取頻道在線用戶
func (c *Client) Presence(ctx context.Context, channel string) (*PresenceResult, error) {
req := PresenceRequest{
Channel: channel,
}
var result PresenceResult
if err := c.callAPI(ctx, "presence", req, &result); err != nil {
return nil, err
}
return &result, nil
}
// PresenceStats 獲取頻道在線統計
func (c *Client) PresenceStats(ctx context.Context, channel string) (*PresenceStatsResult, error) {
req := PresenceStatsRequest{
Channel: channel,
}
var result PresenceStatsResult
if err := c.callAPI(ctx, "presence_stats", req, &result); err != nil {
return nil, err
}
return &result, nil
}
// ==================== 歷史訊息方法 ====================
// History 獲取頻道歷史訊息
func (c *Client) History(ctx context.Context, channel string, limit int) (*HistoryResult, error) {
req := HistoryRequest{
Channel: channel,
Limit: limit,
}
var result HistoryResult
if err := c.callAPI(ctx, "history", req, &result); err != nil {
return nil, err
}
return &result, nil
}
// HistoryReverse 獲取頻道歷史訊息(倒序)
func (c *Client) HistoryReverse(ctx context.Context, channel string, limit int) (*HistoryResult, error) {
req := HistoryRequest{
Channel: channel,
Limit: limit,
Reverse: true,
}
var result HistoryResult
if err := c.callAPI(ctx, "history", req, &result); err != nil {
return nil, err
}
return &result, nil
}
// ==================== 頻道管理方法 ====================
// Channels 獲取所有活躍頻道
func (c *Client) Channels(ctx context.Context) (*ChannelsResult, error) {
req := ChannelsRequest{}
var result ChannelsResult
if err := c.callAPI(ctx, "channels", req, &result); err != nil {
return nil, err
}
return &result, nil
}
// ChannelsWithPattern 獲取匹配模式的活躍頻道
func (c *Client) ChannelsWithPattern(ctx context.Context, pattern string) (*ChannelsResult, error) {
req := ChannelsRequest{
Pattern: pattern,
}
var result ChannelsResult
if err := c.callAPI(ctx, "channels", req, &result); err != nil {
return nil, err
}
return &result, nil
}
// ==================== 伺服器狀態方法 ====================
// Info 獲取伺服器狀態資訊
func (c *Client) Info(ctx context.Context) (*InfoResult, error) {
req := InfoRequest{}
var result InfoResult
if err := c.callAPI(ctx, "info", req, &result); err != nil {
return nil, err
}
return &result, nil
}
// Ping 檢查伺服器是否健康
func (c *Client) Ping(ctx context.Context) error {
_, err := c.Info(ctx)
return err
}
// ==================== 內部方法 ====================
// callAPI 調用 Centrifugo API
func (c *Client) callAPI(ctx context.Context, method string, params interface{}, result interface{}) error {
body, err := json.Marshal(params)
if err != nil {
return fmt.Errorf("failed to marshal request: %w", err)
}
url := fmt.Sprintf("%s/api/%s", c.apiURL, method)
httpReq, err := http.NewRequestWithContext(ctx, "POST", url, bytes.NewReader(body))
if err != nil {
return fmt.Errorf("failed to create request: %w", err)
}
httpReq.Header.Set("Content-Type", "application/json")
if c.apiKey != "" {
httpReq.Header.Set("Authorization", "apikey "+c.apiKey)
}
resp, err := c.client.Do(httpReq)
if err != nil {
return fmt.Errorf("failed to send request: %w", err)
}
defer resp.Body.Close()
respBody, err := io.ReadAll(resp.Body)
if err != nil {
return fmt.Errorf("failed to read response: %w", err)
}
if resp.StatusCode != http.StatusOK {
return fmt.Errorf("centrifugo returned status %d: %s", resp.StatusCode, string(respBody))
}
// 解析響應
var apiResp APIResponse
if result != nil {
apiResp.Result = result
}
if err := json.Unmarshal(respBody, &apiResp); err != nil {
return fmt.Errorf("failed to unmarshal response: %w", err)
}
if apiResp.Error != nil {
return apiResp.Error
}
return nil
}