475 lines
13 KiB
Markdown
475 lines
13 KiB
Markdown
# Centrifugo Client Library
|
||
|
||
Go 語言的 Centrifugo 即時訊息服務客戶端庫,提供完整的 Server-side API 支援。
|
||
|
||
## 功能特色
|
||
|
||
- ✅ **HTTP API 客戶端** - 發布訊息、訂閱管理、在線狀態、歷史訊息
|
||
- ✅ **JWT Token 生成** - 連線認證和私有頻道訂閱
|
||
- ✅ **Token 黑名單** - 撤銷單一 Token 或用戶所有 Token
|
||
- ✅ **在線狀態追蹤** - Redis 存儲 + Centrifugo Presence API
|
||
- ✅ **統一服務入口** - 簡單易用的 `Service` 整合介面
|
||
|
||
## 安裝依賴
|
||
|
||
```bash
|
||
go get github.com/golang-jwt/jwt/v5
|
||
go get github.com/zeromicro/go-zero/core/stores/redis
|
||
```
|
||
|
||
## 快速開始
|
||
|
||
### 1. 創建服務實例(推薦方式)
|
||
|
||
```go
|
||
import (
|
||
"backend/pkg/library/centrifugo"
|
||
"github.com/zeromicro/go-zero/core/stores/redis"
|
||
)
|
||
|
||
// 創建 Redis 客戶端
|
||
rds, _ := redis.NewRedis(redis.RedisConf{
|
||
Host: "localhost:6379",
|
||
Type: "node",
|
||
})
|
||
|
||
// 創建 Centrifugo 服務
|
||
svc := centrifugo.NewService(centrifugo.ServiceConfig{
|
||
APIURL: "http://localhost:8000",
|
||
APIKey: "your-api-key",
|
||
TokenSecret: "your-jwt-secret",
|
||
Redis: rds, // 可選,用於黑名單和在線狀態
|
||
})
|
||
```
|
||
|
||
### 2. 發布訊息
|
||
|
||
```go
|
||
ctx := context.Background()
|
||
|
||
// 方法 1: 使用 Service 便捷方法
|
||
result, err := svc.PublishJSON(ctx, "chat:room-123", map[string]interface{}{
|
||
"message": "Hello, World!",
|
||
"user": "daniel",
|
||
})
|
||
|
||
// 方法 2: 使用 Client 直接調用
|
||
result, err := svc.Client().Publish(ctx, "chat:room-123", []byte(`{"message": "Hello!"}`))
|
||
|
||
// 批量發布到多個頻道
|
||
channels := []string{"user:1", "user:2", "user:3"}
|
||
err := svc.BroadcastJSON(ctx, channels, map[string]string{
|
||
"type": "notification",
|
||
"message": "System maintenance",
|
||
})
|
||
```
|
||
|
||
### 3. Token 生成
|
||
|
||
```go
|
||
// 快速生成連線 Token
|
||
token, err := svc.GenerateToken("user-123")
|
||
|
||
// 生成帶用戶資訊的 Token
|
||
token, err := svc.GenerateTokenWithInfo("user-123", map[string]interface{}{
|
||
"name": "Daniel",
|
||
"avatar": "https://example.com/avatar.jpg",
|
||
})
|
||
|
||
// 完整選項
|
||
token, err := svc.Token().GenerateConnectionToken(centrifugo.ConnectionTokenOptions{
|
||
UserID: "user-123",
|
||
Info: map[string]interface{}{"role": "admin"},
|
||
Channels: []string{"chat:room-1", "chat:room-2"}, // 自動訂閱
|
||
})
|
||
|
||
// 訂閱 Token(用於私有頻道)
|
||
token, err := svc.Token().QuickSubscriptionToken("user-123", "private:room-456")
|
||
```
|
||
|
||
### 4. 撤銷 Token(踢人)
|
||
|
||
```go
|
||
// 最常用:撤銷用戶所有 Token 並斷開連線
|
||
// 適用於:用戶被封禁、密碼變更、用戶登出全部設備
|
||
err := svc.InvalidateUser(ctx, "user-123")
|
||
|
||
// 只斷開連線(不撤銷 Token)
|
||
err := svc.Disconnect(ctx, "user-123")
|
||
|
||
// 撤銷特定 Token(需要 JTI)
|
||
err := svc.Blacklist().RevokeToken(ctx, jti, time.Hour)
|
||
|
||
// 撤銷用戶所有 Token(不斷開連線)
|
||
err := svc.Blacklist().RevokeUserTokens(ctx, "user-123")
|
||
```
|
||
|
||
### 5. 在線狀態追蹤
|
||
|
||
```go
|
||
// 檢查單一用戶是否在線
|
||
online, err := svc.IsUserOnline(ctx, "user-123")
|
||
|
||
// 批量獲取在線狀態
|
||
status, err := svc.GetUsersOnlineStatus(ctx, []string{"user-1", "user-2", "user-3"})
|
||
// status = map[string]bool{"user-1": true, "user-2": false, "user-3": true}
|
||
|
||
// 處理 Centrifugo Connect/Disconnect Proxy 事件
|
||
svc.Online().HandleConnect(ctx, "user-123")
|
||
svc.Online().HandleDisconnect(ctx, "user-123")
|
||
|
||
// 使用 Centrifugo Presence API(頻道級別)
|
||
users, err := svc.Online().GetChannelOnlineUsers(ctx, "chat:room-123")
|
||
stats, err := svc.Online().GetChannelStats(ctx, "chat:room-123")
|
||
```
|
||
|
||
---
|
||
|
||
## 獨立使用各元件
|
||
|
||
如果不需要完整的 Service,可以獨立使用各元件:
|
||
|
||
### HTTP API Client
|
||
|
||
```go
|
||
// 創建客戶端
|
||
client := centrifugo.NewClient("http://localhost:8000", "your-api-key")
|
||
|
||
// 使用自定義配置
|
||
client := centrifugo.NewClientWithConfig(centrifugo.ClientConfig{
|
||
APIURL: "http://localhost:8000",
|
||
APIKey: "your-api-key",
|
||
Timeout: 5 * time.Second,
|
||
MaxIdleConns: 200,
|
||
MaxIdleConnsPerHost: 50,
|
||
})
|
||
|
||
// API 調用
|
||
client.Publish(ctx, channel, data)
|
||
client.PublishJSON(ctx, channel, data)
|
||
client.Broadcast(ctx, channels, data)
|
||
client.Subscribe(ctx, user, channel)
|
||
client.Unsubscribe(ctx, user, channel)
|
||
client.Disconnect(ctx, user)
|
||
client.DisconnectWithCode(ctx, user, code, reason)
|
||
client.Presence(ctx, channel)
|
||
client.PresenceStats(ctx, channel)
|
||
client.History(ctx, channel, limit)
|
||
client.HistoryReverse(ctx, channel, limit)
|
||
client.Channels(ctx)
|
||
client.ChannelsWithPattern(ctx, pattern)
|
||
client.Info(ctx)
|
||
client.Ping(ctx)
|
||
```
|
||
|
||
### Token Generator
|
||
|
||
```go
|
||
// 創建生成器
|
||
tokenGen := centrifugo.NewTokenGenerator("your-jwt-secret")
|
||
|
||
// 使用自定義配置
|
||
tokenGen := centrifugo.NewTokenGeneratorWithConfig(centrifugo.TokenConfig{
|
||
Secret: "your-jwt-secret",
|
||
ExpireIn: 24 * time.Hour,
|
||
})
|
||
|
||
// 生成 Token
|
||
tokenGen.QuickConnectionToken(userID)
|
||
tokenGen.QuickSubscriptionToken(userID, channel)
|
||
tokenGen.GenerateConnectionToken(opts)
|
||
tokenGen.GenerateSubscriptionToken(opts)
|
||
tokenGen.GenerateAnonymousToken()
|
||
```
|
||
|
||
### Token Blacklist
|
||
|
||
```go
|
||
// 創建黑名單管理器
|
||
blacklist := centrifugo.NewTokenBlacklist(redisClient)
|
||
|
||
// 撤銷操作
|
||
blacklist.RevokeToken(ctx, jti, ttl) // 撤銷單一 Token
|
||
blacklist.RevokeUserTokens(ctx, userID) // 撤銷用戶所有 Token
|
||
|
||
// 驗證操作
|
||
blacklist.IsTokenRevoked(ctx, jti) // 檢查 Token 是否被撤銷
|
||
blacklist.GetUserTokenVersion(ctx, userID) // 獲取用戶 Token 版本
|
||
blacklist.IsTokenVersionValid(ctx, userID, v) // 檢查版本是否有效
|
||
```
|
||
|
||
### Online Manager
|
||
|
||
```go
|
||
// 創建管理器
|
||
store := centrifugo.NewRedisOnlineStore(redisClient)
|
||
onlineManager := centrifugo.NewOnlineManagerWithTTL(client, store, 5*time.Minute)
|
||
|
||
// Redis 存儲操作
|
||
onlineManager.HandleConnect(ctx, userID)
|
||
onlineManager.HandleDisconnect(ctx, userID)
|
||
onlineManager.IsUserOnline(ctx, userID)
|
||
onlineManager.GetUsersOnlineStatus(ctx, userIDs)
|
||
onlineManager.RefreshOnline(ctx, userID)
|
||
|
||
// Centrifugo Presence API
|
||
onlineManager.IsUserInChannel(ctx, userID, channel)
|
||
onlineManager.GetChannelOnlineUsers(ctx, channel)
|
||
onlineManager.GetChannelStats(ctx, channel)
|
||
```
|
||
|
||
---
|
||
|
||
## 前端整合範例
|
||
|
||
### JavaScript (使用 centrifuge-js)
|
||
|
||
```javascript
|
||
import { Centrifuge } from 'centrifuge';
|
||
|
||
// 從後端 API 獲取 Token
|
||
const getToken = async () => {
|
||
const response = await fetch('/api/centrifugo/token');
|
||
const data = await response.json();
|
||
return data.token;
|
||
};
|
||
|
||
// 創建連線
|
||
const centrifuge = new Centrifuge('ws://localhost:8000/connection/websocket', {
|
||
getToken: getToken,
|
||
});
|
||
|
||
// 訂閱頻道
|
||
const sub = centrifuge.newSubscription('chat:room-123');
|
||
sub.on('publication', (ctx) => {
|
||
console.log('Received:', ctx.data);
|
||
});
|
||
sub.subscribe();
|
||
|
||
// 連線
|
||
centrifuge.connect();
|
||
```
|
||
|
||
### 後端 Token API
|
||
|
||
```go
|
||
// handlers/centrifugo.go
|
||
func (h *Handler) GetConnectionToken(c *gin.Context) {
|
||
userID := c.GetString("user_id") // 從 JWT 或 session 獲取
|
||
|
||
token, err := h.svc.GenerateToken(userID)
|
||
if err != nil {
|
||
c.JSON(500, gin.H{"error": "failed to generate token"})
|
||
return
|
||
}
|
||
|
||
c.JSON(200, gin.H{"token": token})
|
||
}
|
||
|
||
func (h *Handler) GetSubscriptionToken(c *gin.Context) {
|
||
userID := c.GetString("user_id")
|
||
channel := c.Query("channel")
|
||
|
||
// 驗證用戶是否有權限訂閱此頻道
|
||
if !h.canSubscribe(userID, channel) {
|
||
c.JSON(403, gin.H{"error": "forbidden"})
|
||
return
|
||
}
|
||
|
||
token, err := h.svc.Token().QuickSubscriptionToken(userID, channel)
|
||
if err != nil {
|
||
c.JSON(500, gin.H{"error": "failed to generate token"})
|
||
return
|
||
}
|
||
|
||
c.JSON(200, gin.H{"token": token})
|
||
}
|
||
```
|
||
|
||
---
|
||
|
||
## Centrifugo Proxy 整合
|
||
|
||
### Connect Proxy
|
||
|
||
```go
|
||
// POST /centrifugo/connect
|
||
func (h *Handler) CentrifugoConnect(c *gin.Context) {
|
||
var req struct {
|
||
Client string `json:"client"`
|
||
Transport string `json:"transport"`
|
||
Protocol string `json:"protocol"`
|
||
Data []byte `json:"data"`
|
||
}
|
||
c.BindJSON(&req)
|
||
|
||
// 從 Token 驗證用戶(Centrifugo 會傳遞)
|
||
userID := extractUserID(req.Data)
|
||
|
||
// 記錄連線
|
||
h.svc.Online().HandleConnect(c, userID)
|
||
|
||
c.JSON(200, gin.H{
|
||
"result": map[string]interface{}{
|
||
"user": userID,
|
||
},
|
||
})
|
||
}
|
||
```
|
||
|
||
### Disconnect Proxy
|
||
|
||
```go
|
||
// POST /centrifugo/disconnect
|
||
func (h *Handler) CentrifugoDisconnect(c *gin.Context) {
|
||
var req struct {
|
||
Client string `json:"client"`
|
||
User string `json:"user"`
|
||
}
|
||
c.BindJSON(&req)
|
||
|
||
// 記錄離線
|
||
h.svc.Online().HandleDisconnect(c, req.User)
|
||
|
||
c.JSON(200, gin.H{"result": map[string]interface{}{}})
|
||
}
|
||
```
|
||
|
||
---
|
||
|
||
## Centrifugo 配置參考
|
||
|
||
```json
|
||
{
|
||
"token_hmac_secret_key": "your-jwt-secret",
|
||
"api_key": "your-api-key",
|
||
"admin": true,
|
||
"allowed_origins": ["http://localhost:3000"],
|
||
"proxy_connect_endpoint": "http://localhost:8080/centrifugo/connect",
|
||
"proxy_disconnect_endpoint": "http://localhost:8080/centrifugo/disconnect",
|
||
"namespaces": [
|
||
{
|
||
"name": "chat",
|
||
"presence": true,
|
||
"history_size": 100,
|
||
"history_ttl": "300s"
|
||
},
|
||
{
|
||
"name": "private",
|
||
"presence": true,
|
||
"protected": true
|
||
}
|
||
]
|
||
}
|
||
```
|
||
|
||
---
|
||
|
||
## API 參考
|
||
|
||
### Service 方法
|
||
|
||
| 方法 | 說明 |
|
||
|------|------|
|
||
| `Client()` | 返回 HTTP API 客戶端 |
|
||
| `Token()` | 返回 Token 生成器 |
|
||
| `Blacklist()` | 返回黑名單管理器(可能為 nil) |
|
||
| `Online()` | 返回在線狀態管理器(可能為 nil) |
|
||
| `PublishJSON(ctx, channel, data)` | 發布 JSON 訊息 |
|
||
| `BroadcastJSON(ctx, channels, data)` | 批量發布 JSON 訊息 |
|
||
| `Disconnect(ctx, userID)` | 斷開用戶連線 |
|
||
| `GenerateToken(userID)` | 快速生成連線 Token |
|
||
| `GenerateTokenWithInfo(userID, info)` | 生成帶資訊的連線 Token |
|
||
| `InvalidateUser(ctx, userID)` | 撤銷所有 Token 並斷開連線 |
|
||
| `IsUserOnline(ctx, userID)` | 檢查用戶是否在線 |
|
||
| `GetUsersOnlineStatus(ctx, userIDs)` | 批量獲取在線狀態 |
|
||
|
||
### Client 方法
|
||
|
||
| 方法 | 說明 | 返回值 |
|
||
|------|------|--------|
|
||
| `Publish(ctx, channel, data)` | 發布訊息 | `*PublishResult, error` |
|
||
| `PublishJSON(ctx, channel, data)` | 發布 JSON | `*PublishResult, error` |
|
||
| `Broadcast(ctx, channels, data)` | 批量發布 | `error` |
|
||
| `BroadcastJSON(ctx, channels, data)` | 批量發布 JSON | `error` |
|
||
| `Subscribe(ctx, user, channel)` | 訂閱用戶 | `error` |
|
||
| `Unsubscribe(ctx, user, channel)` | 取消訂閱 | `error` |
|
||
| `Disconnect(ctx, user)` | 斷開連線 | `error` |
|
||
| `DisconnectWithCode(ctx, user, code, reason)` | 帶代碼斷開連線 | `error` |
|
||
| `Presence(ctx, channel)` | 在線用戶 | `*PresenceResult, error` |
|
||
| `PresenceStats(ctx, channel)` | 在線統計 | `*PresenceStatsResult, error` |
|
||
| `History(ctx, channel, limit)` | 歷史訊息 | `*HistoryResult, error` |
|
||
| `HistoryReverse(ctx, channel, limit)` | 歷史訊息(倒序) | `*HistoryResult, error` |
|
||
| `Channels(ctx)` | 活躍頻道 | `*ChannelsResult, error` |
|
||
| `ChannelsWithPattern(ctx, pattern)` | 匹配頻道 | `*ChannelsResult, error` |
|
||
| `Info(ctx)` | 伺服器資訊 | `*InfoResult, error` |
|
||
| `Ping(ctx)` | 健康檢查 | `error` |
|
||
|
||
### TokenGenerator 方法
|
||
|
||
| 方法 | 說明 |
|
||
|------|------|
|
||
| `GenerateConnectionToken(opts)` | 生成連線 Token(完整選項) |
|
||
| `GenerateSubscriptionToken(opts)` | 生成訂閱 Token(完整選項) |
|
||
| `GenerateAnonymousToken()` | 生成匿名 Token |
|
||
| `QuickConnectionToken(userID)` | 快速生成連線 Token |
|
||
| `QuickSubscriptionToken(userID, channel)` | 快速生成訂閱 Token |
|
||
|
||
### TokenBlacklist 方法
|
||
|
||
| 方法 | 說明 |
|
||
|------|------|
|
||
| `RevokeToken(ctx, jti, ttl)` | 撤銷特定 Token |
|
||
| `RevokeUserTokens(ctx, userID)` | 撤銷用戶所有 Token |
|
||
| `IsTokenRevoked(ctx, jti)` | 檢查 Token 是否被撤銷 |
|
||
| `GetUserTokenVersion(ctx, userID)` | 獲取用戶 Token 版本 |
|
||
| `IsTokenVersionValid(ctx, userID, version)` | 檢查版本是否有效 |
|
||
|
||
---
|
||
|
||
## 錯誤處理
|
||
|
||
```go
|
||
result, err := svc.Client().Publish(ctx, channel, data)
|
||
if err != nil {
|
||
// 檢查是否為 Centrifugo API 錯誤
|
||
if apiErr, ok := err.(*centrifugo.APIError); ok {
|
||
fmt.Printf("Centrifugo error code: %d, message: %s\n",
|
||
apiErr.Code, apiErr.Message)
|
||
} else {
|
||
// 網路錯誤或其他錯誤
|
||
fmt.Printf("Error: %v\n", err)
|
||
}
|
||
}
|
||
|
||
// 檢查特定錯誤
|
||
if errors.Is(err, centrifugo.ErrBlacklistNotConfigured) {
|
||
// 黑名單未配置
|
||
}
|
||
if errors.Is(err, centrifugo.ErrOnlineStoreNotConfigured) {
|
||
// 在線狀態存儲未配置
|
||
}
|
||
```
|
||
|
||
---
|
||
|
||
## 檔案結構
|
||
|
||
```
|
||
pkg/library/centrifugo/
|
||
├── centrifugo.go # 主入口,Service 整合介面
|
||
├── client.go # HTTP API 客戶端
|
||
├── token.go # JWT Token 生成器
|
||
├── blacklist.go # Token 黑名單管理
|
||
├── online.go # 在線狀態管理介面
|
||
├── online_redis.go # Redis 在線狀態實作
|
||
├── README.md # 文檔
|
||
└── *_test.go # 測試文件
|
||
```
|
||
|
||
---
|
||
|
||
## License
|
||
|
||
MIT
|