package repository import ( "context" "encoding/json" "testing" "code.30cm.net/digimon/app-cloudep-permission-server/pkg/domain" "code.30cm.net/digimon/app-cloudep-permission-server/pkg/domain/rbac" "github.com/casbin/casbin/v2/model" "github.com/stretchr/testify/assert" ) func Test_rbacAdapter_AddPolicy(t *testing.T) { mr, r := setupMiniRedis() defer mr.Close() adapter, err := NewRBACAdapter(RBACAdapterParam{Redis: r}) assert.NoError(t, err) // 新增策略 err = adapter.AddPolicy("", "p", []string{"user", "data2", "read"}) assert.NoError(t, err) // 驗證 Redis 內的數據 rk := domain.GetCasbinRuleRedisKey() data, err := r.LrangeCtx(context.Background(), rk, 0, -1) assert.NoError(t, err) assert.Len(t, data, 1) // 解析 JSON 確認內容 var policy rbac.Rule err = json.Unmarshal([]byte(data[0]), &policy) assert.NoError(t, err) assert.Equal(t, "p", policy.PolicyType) assert.Equal(t, "user", policy.Field0) assert.Equal(t, "data2", policy.Field1) assert.Equal(t, "read", policy.Field2) } // 測試 `RemovePolicy` func TestRBACAdapter_RemovePolicy(t *testing.T) { mr, rdb := setupMiniRedis() defer mr.Close() adapter, err := NewRBACAdapter(RBACAdapterParam{Redis: rdb}) assert.NoError(t, err) // 新增策略 err = adapter.AddPolicy("", "p", []string{"user", "data2", "read"}) assert.NoError(t, err) // 確保策略存在 rk := domain.GetCasbinRuleRedisKey() data, err := rdb.LrangeCtx(context.Background(), rk, 0, -1) assert.NoError(t, err) assert.Len(t, data, 1) // 刪除策略 err = adapter.RemovePolicy("", "p", []string{"user", "data2", "read"}) assert.NoError(t, err) // 確保 Redis 已經刪除該策略 data, err = rdb.LrangeCtx(context.Background(), rk, 0, -1) assert.NoError(t, err) assert.Len(t, data, 0) } // 測試 SavePolicy: 確保能夠存入 Redis func TestRBACAdapter_SavePolicy(t *testing.T) { mr, rdb := setupMiniRedis() defer mr.Close() adapter, err := NewRBACAdapter(RBACAdapterParam{Redis: rdb}) assert.NoError(t, err) // 準備 Casbin Model m, _ := model.NewModelFromString(` [request_definition] r = sub, obj, act [policy_definition] p = sub, obj, act [role_definition] g = _, _ [policy_effect] e = some(where (p.eft == allow)) [matchers] m = g(r.sub, p.sub) && r.obj == p.obj && r.act == p.act `) // 加入測試策略 m.AddPolicy("p", "p", []string{"admin", "data1", "read"}) m.AddPolicy("p", "p", []string{"admin", "data1", "write"}) m.AddPolicy("p", "g", []string{"alice", "admin"}) // 確保 Redis 內最開始是空的 rk := domain.GetCasbinRuleRedisKey() rulesBefore, _ := rdb.LrangeCtx(context.Background(), rk, 0, -1) assert.Len(t, rulesBefore, 0) // 執行 SavePolicy err = adapter.SavePolicy(m) assert.NoError(t, err) // 確保 Redis 內的數據與 `policy` 一致 rulesAfter, err := rdb.LrangeCtx(context.Background(), rk, 0, -1) assert.NoError(t, err) // 解析 JSON 並確認內容 expectedPolicies := [][]string{ {"p", "admin", "data1", "read"}, {"p", "admin", "data1", "write"}, {"g", "alice", "admin"}, } for i, ruleStr := range rulesAfter { var rule rbac.Rule err := json.Unmarshal([]byte(ruleStr), &rule) assert.NoError(t, err) assert.Equal(t, expectedPolicies[i][0], rule.PolicyType) assert.Equal(t, expectedPolicies[i][1], rule.Field0) assert.Equal(t, expectedPolicies[i][2], rule.Field1) if len(expectedPolicies[i]) > 3 { assert.Equal(t, expectedPolicies[i][3], rule.Field2) } } } // 測試 SavePolicy: Redis 寫入失敗 func TestRBACAdapter_SavePolicy_RedisError(t *testing.T) { mr, rdb := setupMiniRedis() defer mr.Close() adapter, err := NewRBACAdapter(RBACAdapterParam{Redis: rdb}) assert.NoError(t, err) m, _ := model.NewModelFromString(` [policy_definition] p = sub, obj, act `) m.AddPolicy("p", "p", []string{"admin", "data1", "read"}) err = adapter.SavePolicy(m) assert.Error(t, err) // 應該要回傳錯誤 } // 測試 `LoadPolicy` 是否能從 Redis 讀取 `p` 和 `g` 規則 func TestRBACAdapter_LoadPolicy(t *testing.T) { mr, rdb := setupMiniRedis() defer mr.Close() adapter, err := NewRBACAdapter(RBACAdapterParam{Redis: rdb}) assert.NoError(t, err) // 預先寫入 Redis 測試數據 rk := domain.GetCasbinRuleRedisKey() testRules := []rbac.Rule{ {PolicyType: "p", Field0: "admin", Field1: "data1", Field2: "read"}, {PolicyType: "p", Field0: "admin", Field1: "data1", Field2: "write"}, {PolicyType: "g", Field0: "alice", Field1: "admin"}, {PolicyType: "g", Field0: "bob", Field1: "user"}, } for _, rule := range testRules { data, _ := json.Marshal(rule) _, err := rdb.Rpush(rk, string(data)) assert.NoError(t, err) } // 創建 Casbin Model m, _ := model.NewModelFromString(` [request_definition] r = sub, obj, act [policy_definition] p = sub, obj, act [role_definition] g = _, _ [policy_effect] e = some(where (p.eft == allow)) [matchers] m = g(r.sub, p.sub) && r.obj == p.obj && r.act == p.act `) // 測試 `LoadPolicy` err = adapter.LoadPolicy(m) assert.NoError(t, err) }