# Centrifugo Client Library Go 語言的 Centrifugo 即時訊息服務客戶端庫,提供完整的 Server-side API 支援。 ## 功能特色 - ✅ **HTTP API 客戶端** - 發布訊息、訂閱管理、在線狀態、歷史訊息 - ✅ **JWT Token 生成** - 連線認證和私有頻道訂閱 - ✅ **Token 黑名單** - 撤銷單一 Token 或用戶所有 Token - ✅ **在線狀態追蹤** - Redis 存儲 + Centrifugo Presence API - ✅ **統一服務入口** - 簡單易用的 `Service` 整合介面 ## 安裝依賴 ```bash go get github.com/golang-jwt/jwt/v5 go get github.com/zeromicro/go-zero/core/stores/redis ``` ## 快速開始 ### 1. 創建服務實例(推薦方式) ```go import ( "backend/pkg/library/centrifugo" "github.com/zeromicro/go-zero/core/stores/redis" ) // 創建 Redis 客戶端 rds, _ := redis.NewRedis(redis.RedisConf{ Host: "localhost:6379", Type: "node", }) // 創建 Centrifugo 服務 svc := centrifugo.NewService(centrifugo.ServiceConfig{ APIURL: "http://localhost:8000", APIKey: "your-api-key", TokenSecret: "your-jwt-secret", Redis: rds, // 可選,用於黑名單和在線狀態 }) ``` ### 2. 發布訊息 ```go ctx := context.Background() // 方法 1: 使用 Service 便捷方法 result, err := svc.PublishJSON(ctx, "chat:room-123", map[string]interface{}{ "message": "Hello, World!", "user": "daniel", }) // 方法 2: 使用 Client 直接調用 result, err := svc.Client().Publish(ctx, "chat:room-123", []byte(`{"message": "Hello!"}`)) // 批量發布到多個頻道 channels := []string{"user:1", "user:2", "user:3"} err := svc.BroadcastJSON(ctx, channels, map[string]string{ "type": "notification", "message": "System maintenance", }) ``` ### 3. Token 生成 ```go // 快速生成連線 Token token, err := svc.GenerateToken("user-123") // 生成帶用戶資訊的 Token token, err := svc.GenerateTokenWithInfo("user-123", map[string]interface{}{ "name": "Daniel", "avatar": "https://example.com/avatar.jpg", }) // 完整選項 token, err := svc.Token().GenerateConnectionToken(centrifugo.ConnectionTokenOptions{ UserID: "user-123", Info: map[string]interface{}{"role": "admin"}, Channels: []string{"chat:room-1", "chat:room-2"}, // 自動訂閱 }) // 訂閱 Token(用於私有頻道) token, err := svc.Token().QuickSubscriptionToken("user-123", "private:room-456") ``` ### 4. 撤銷 Token(踢人) ```go // 最常用:撤銷用戶所有 Token 並斷開連線 // 適用於:用戶被封禁、密碼變更、用戶登出全部設備 err := svc.InvalidateUser(ctx, "user-123") // 只斷開連線(不撤銷 Token) err := svc.Disconnect(ctx, "user-123") // 撤銷特定 Token(需要 JTI) err := svc.Blacklist().RevokeToken(ctx, jti, time.Hour) // 撤銷用戶所有 Token(不斷開連線) err := svc.Blacklist().RevokeUserTokens(ctx, "user-123") ``` ### 5. 在線狀態追蹤 ```go // 檢查單一用戶是否在線 online, err := svc.IsUserOnline(ctx, "user-123") // 批量獲取在線狀態 status, err := svc.GetUsersOnlineStatus(ctx, []string{"user-1", "user-2", "user-3"}) // status = map[string]bool{"user-1": true, "user-2": false, "user-3": true} // 處理 Centrifugo Connect/Disconnect Proxy 事件 svc.Online().HandleConnect(ctx, "user-123") svc.Online().HandleDisconnect(ctx, "user-123") // 使用 Centrifugo Presence API(頻道級別) users, err := svc.Online().GetChannelOnlineUsers(ctx, "chat:room-123") stats, err := svc.Online().GetChannelStats(ctx, "chat:room-123") ``` --- ## 獨立使用各元件 如果不需要完整的 Service,可以獨立使用各元件: ### HTTP API Client ```go // 創建客戶端 client := centrifugo.NewClient("http://localhost:8000", "your-api-key") // 使用自定義配置 client := centrifugo.NewClientWithConfig(centrifugo.ClientConfig{ APIURL: "http://localhost:8000", APIKey: "your-api-key", Timeout: 5 * time.Second, MaxIdleConns: 200, MaxIdleConnsPerHost: 50, }) // API 調用 client.Publish(ctx, channel, data) client.PublishJSON(ctx, channel, data) client.Broadcast(ctx, channels, data) client.Subscribe(ctx, user, channel) client.Unsubscribe(ctx, user, channel) client.Disconnect(ctx, user) client.DisconnectWithCode(ctx, user, code, reason) client.Presence(ctx, channel) client.PresenceStats(ctx, channel) client.History(ctx, channel, limit) client.HistoryReverse(ctx, channel, limit) client.Channels(ctx) client.ChannelsWithPattern(ctx, pattern) client.Info(ctx) client.Ping(ctx) ``` ### Token Generator ```go // 創建生成器 tokenGen := centrifugo.NewTokenGenerator("your-jwt-secret") // 使用自定義配置 tokenGen := centrifugo.NewTokenGeneratorWithConfig(centrifugo.TokenConfig{ Secret: "your-jwt-secret", ExpireIn: 24 * time.Hour, }) // 生成 Token tokenGen.QuickConnectionToken(userID) tokenGen.QuickSubscriptionToken(userID, channel) tokenGen.GenerateConnectionToken(opts) tokenGen.GenerateSubscriptionToken(opts) tokenGen.GenerateAnonymousToken() ``` ### Token Blacklist ```go // 創建黑名單管理器 blacklist := centrifugo.NewTokenBlacklist(redisClient) // 撤銷操作 blacklist.RevokeToken(ctx, jti, ttl) // 撤銷單一 Token blacklist.RevokeUserTokens(ctx, userID) // 撤銷用戶所有 Token // 驗證操作 blacklist.IsTokenRevoked(ctx, jti) // 檢查 Token 是否被撤銷 blacklist.GetUserTokenVersion(ctx, userID) // 獲取用戶 Token 版本 blacklist.IsTokenVersionValid(ctx, userID, v) // 檢查版本是否有效 ``` ### Online Manager ```go // 創建管理器 store := centrifugo.NewRedisOnlineStore(redisClient) onlineManager := centrifugo.NewOnlineManagerWithTTL(client, store, 5*time.Minute) // Redis 存儲操作 onlineManager.HandleConnect(ctx, userID) onlineManager.HandleDisconnect(ctx, userID) onlineManager.IsUserOnline(ctx, userID) onlineManager.GetUsersOnlineStatus(ctx, userIDs) onlineManager.RefreshOnline(ctx, userID) // Centrifugo Presence API onlineManager.IsUserInChannel(ctx, userID, channel) onlineManager.GetChannelOnlineUsers(ctx, channel) onlineManager.GetChannelStats(ctx, channel) ``` --- ## 前端整合範例 ### JavaScript (使用 centrifuge-js) ```javascript import { Centrifuge } from 'centrifuge'; // 從後端 API 獲取 Token const getToken = async () => { const response = await fetch('/api/centrifugo/token'); const data = await response.json(); return data.token; }; // 創建連線 const centrifuge = new Centrifuge('ws://localhost:8000/connection/websocket', { getToken: getToken, }); // 訂閱頻道 const sub = centrifuge.newSubscription('chat:room-123'); sub.on('publication', (ctx) => { console.log('Received:', ctx.data); }); sub.subscribe(); // 連線 centrifuge.connect(); ``` ### 後端 Token API ```go // handlers/centrifugo.go func (h *Handler) GetConnectionToken(c *gin.Context) { userID := c.GetString("user_id") // 從 JWT 或 session 獲取 token, err := h.svc.GenerateToken(userID) if err != nil { c.JSON(500, gin.H{"error": "failed to generate token"}) return } c.JSON(200, gin.H{"token": token}) } func (h *Handler) GetSubscriptionToken(c *gin.Context) { userID := c.GetString("user_id") channel := c.Query("channel") // 驗證用戶是否有權限訂閱此頻道 if !h.canSubscribe(userID, channel) { c.JSON(403, gin.H{"error": "forbidden"}) return } token, err := h.svc.Token().QuickSubscriptionToken(userID, channel) if err != nil { c.JSON(500, gin.H{"error": "failed to generate token"}) return } c.JSON(200, gin.H{"token": token}) } ``` --- ## Centrifugo Proxy 整合 ### Connect Proxy ```go // POST /centrifugo/connect func (h *Handler) CentrifugoConnect(c *gin.Context) { var req struct { Client string `json:"client"` Transport string `json:"transport"` Protocol string `json:"protocol"` Data []byte `json:"data"` } c.BindJSON(&req) // 從 Token 驗證用戶(Centrifugo 會傳遞) userID := extractUserID(req.Data) // 記錄連線 h.svc.Online().HandleConnect(c, userID) c.JSON(200, gin.H{ "result": map[string]interface{}{ "user": userID, }, }) } ``` ### Disconnect Proxy ```go // POST /centrifugo/disconnect func (h *Handler) CentrifugoDisconnect(c *gin.Context) { var req struct { Client string `json:"client"` User string `json:"user"` } c.BindJSON(&req) // 記錄離線 h.svc.Online().HandleDisconnect(c, req.User) c.JSON(200, gin.H{"result": map[string]interface{}{}}) } ``` --- ## Centrifugo 配置參考 ```json { "token_hmac_secret_key": "your-jwt-secret", "api_key": "your-api-key", "admin": true, "allowed_origins": ["http://localhost:3000"], "proxy_connect_endpoint": "http://localhost:8080/centrifugo/connect", "proxy_disconnect_endpoint": "http://localhost:8080/centrifugo/disconnect", "namespaces": [ { "name": "chat", "presence": true, "history_size": 100, "history_ttl": "300s" }, { "name": "private", "presence": true, "protected": true } ] } ``` --- ## API 參考 ### Service 方法 | 方法 | 說明 | |------|------| | `Client()` | 返回 HTTP API 客戶端 | | `Token()` | 返回 Token 生成器 | | `Blacklist()` | 返回黑名單管理器(可能為 nil) | | `Online()` | 返回在線狀態管理器(可能為 nil) | | `PublishJSON(ctx, channel, data)` | 發布 JSON 訊息 | | `BroadcastJSON(ctx, channels, data)` | 批量發布 JSON 訊息 | | `Disconnect(ctx, userID)` | 斷開用戶連線 | | `GenerateToken(userID)` | 快速生成連線 Token | | `GenerateTokenWithInfo(userID, info)` | 生成帶資訊的連線 Token | | `InvalidateUser(ctx, userID)` | 撤銷所有 Token 並斷開連線 | | `IsUserOnline(ctx, userID)` | 檢查用戶是否在線 | | `GetUsersOnlineStatus(ctx, userIDs)` | 批量獲取在線狀態 | ### Client 方法 | 方法 | 說明 | 返回值 | |------|------|--------| | `Publish(ctx, channel, data)` | 發布訊息 | `*PublishResult, error` | | `PublishJSON(ctx, channel, data)` | 發布 JSON | `*PublishResult, error` | | `Broadcast(ctx, channels, data)` | 批量發布 | `error` | | `BroadcastJSON(ctx, channels, data)` | 批量發布 JSON | `error` | | `Subscribe(ctx, user, channel)` | 訂閱用戶 | `error` | | `Unsubscribe(ctx, user, channel)` | 取消訂閱 | `error` | | `Disconnect(ctx, user)` | 斷開連線 | `error` | | `DisconnectWithCode(ctx, user, code, reason)` | 帶代碼斷開連線 | `error` | | `Presence(ctx, channel)` | 在線用戶 | `*PresenceResult, error` | | `PresenceStats(ctx, channel)` | 在線統計 | `*PresenceStatsResult, error` | | `History(ctx, channel, limit)` | 歷史訊息 | `*HistoryResult, error` | | `HistoryReverse(ctx, channel, limit)` | 歷史訊息(倒序) | `*HistoryResult, error` | | `Channels(ctx)` | 活躍頻道 | `*ChannelsResult, error` | | `ChannelsWithPattern(ctx, pattern)` | 匹配頻道 | `*ChannelsResult, error` | | `Info(ctx)` | 伺服器資訊 | `*InfoResult, error` | | `Ping(ctx)` | 健康檢查 | `error` | ### TokenGenerator 方法 | 方法 | 說明 | |------|------| | `GenerateConnectionToken(opts)` | 生成連線 Token(完整選項) | | `GenerateSubscriptionToken(opts)` | 生成訂閱 Token(完整選項) | | `GenerateAnonymousToken()` | 生成匿名 Token | | `QuickConnectionToken(userID)` | 快速生成連線 Token | | `QuickSubscriptionToken(userID, channel)` | 快速生成訂閱 Token | ### TokenBlacklist 方法 | 方法 | 說明 | |------|------| | `RevokeToken(ctx, jti, ttl)` | 撤銷特定 Token | | `RevokeUserTokens(ctx, userID)` | 撤銷用戶所有 Token | | `IsTokenRevoked(ctx, jti)` | 檢查 Token 是否被撤銷 | | `GetUserTokenVersion(ctx, userID)` | 獲取用戶 Token 版本 | | `IsTokenVersionValid(ctx, userID, version)` | 檢查版本是否有效 | --- ## 錯誤處理 ```go result, err := svc.Client().Publish(ctx, channel, data) if err != nil { // 檢查是否為 Centrifugo API 錯誤 if apiErr, ok := err.(*centrifugo.APIError); ok { fmt.Printf("Centrifugo error code: %d, message: %s\n", apiErr.Code, apiErr.Message) } else { // 網路錯誤或其他錯誤 fmt.Printf("Error: %v\n", err) } } // 檢查特定錯誤 if errors.Is(err, centrifugo.ErrBlacklistNotConfigured) { // 黑名單未配置 } if errors.Is(err, centrifugo.ErrOnlineStoreNotConfigured) { // 在線狀態存儲未配置 } ``` --- ## 檔案結構 ``` pkg/library/centrifugo/ ├── centrifugo.go # 主入口,Service 整合介面 ├── client.go # HTTP API 客戶端 ├── token.go # JWT Token 生成器 ├── blacklist.go # Token 黑名單管理 ├── online.go # 在線狀態管理介面 ├── online_redis.go # Redis 在線狀態實作 ├── README.md # 文檔 └── *_test.go # 測試文件 ``` --- ## License MIT