app-cloudep-permission-server/pkg/repository/casbin_redis_adapter_test.go

197 lines
5.0 KiB
Go
Raw Normal View History

package repository
import (
"context"
"encoding/json"
"testing"
"code.30cm.net/digimon/app-cloudep-permission-server/pkg/domain"
"code.30cm.net/digimon/app-cloudep-permission-server/pkg/domain/rbac"
"github.com/casbin/casbin/v2/model"
"github.com/stretchr/testify/assert"
)
func Test_rbacAdapter_AddPolicy(t *testing.T) {
mr, r := setupMiniRedis()
defer mr.Close()
adapter, err := NewRBACAdapter(RBACAdapterParam{Redis: r})
assert.NoError(t, err)
// 新增策略
err = adapter.AddPolicy("", "p", []string{"user", "data2", "read"})
assert.NoError(t, err)
// 驗證 Redis 內的數據
rk := domain.GetCasbinRuleRedisKey()
data, err := r.LrangeCtx(context.Background(), rk, 0, -1)
assert.NoError(t, err)
assert.Len(t, data, 1)
// 解析 JSON 確認內容
var policy rbac.Rule
err = json.Unmarshal([]byte(data[0]), &policy)
assert.NoError(t, err)
assert.Equal(t, "p", policy.PolicyType)
assert.Equal(t, "user", policy.Field0)
assert.Equal(t, "data2", policy.Field1)
assert.Equal(t, "read", policy.Field2)
}
// 測試 `RemovePolicy`
func TestRBACAdapter_RemovePolicy(t *testing.T) {
mr, rdb := setupMiniRedis()
defer mr.Close()
adapter, err := NewRBACAdapter(RBACAdapterParam{Redis: rdb})
assert.NoError(t, err)
// 新增策略
err = adapter.AddPolicy("", "p", []string{"user", "data2", "read"})
assert.NoError(t, err)
// 確保策略存在
rk := domain.GetCasbinRuleRedisKey()
data, err := rdb.LrangeCtx(context.Background(), rk, 0, -1)
assert.NoError(t, err)
assert.Len(t, data, 1)
// 刪除策略
err = adapter.RemovePolicy("", "p", []string{"user", "data2", "read"})
assert.NoError(t, err)
// 確保 Redis 已經刪除該策略
data, err = rdb.LrangeCtx(context.Background(), rk, 0, -1)
assert.NoError(t, err)
assert.Len(t, data, 0)
}
// 測試 SavePolicy: 確保能夠存入 Redis
func TestRBACAdapter_SavePolicy(t *testing.T) {
mr, rdb := setupMiniRedis()
defer mr.Close()
adapter, err := NewRBACAdapter(RBACAdapterParam{Redis: rdb})
assert.NoError(t, err)
// 準備 Casbin Model
m, _ := model.NewModelFromString(`
[request_definition]
r = sub, obj, act
[policy_definition]
p = sub, obj, act
[role_definition]
g = _, _
[policy_effect]
e = some(where (p.eft == allow))
[matchers]
m = g(r.sub, p.sub) && r.obj == p.obj && r.act == p.act
`)
// 加入測試策略
m.AddPolicy("p", "p", []string{"admin", "data1", "read"})
m.AddPolicy("p", "p", []string{"admin", "data1", "write"})
m.AddPolicy("p", "g", []string{"alice", "admin"})
// 確保 Redis 內最開始是空的
rk := domain.GetCasbinRuleRedisKey()
rulesBefore, _ := rdb.LrangeCtx(context.Background(), rk, 0, -1)
assert.Len(t, rulesBefore, 0)
// 執行 SavePolicy
err = adapter.SavePolicy(m)
assert.NoError(t, err)
// 確保 Redis 內的數據與 `policy` 一致
rulesAfter, err := rdb.LrangeCtx(context.Background(), rk, 0, -1)
assert.NoError(t, err)
// 解析 JSON 並確認內容
expectedPolicies := [][]string{
{"p", "admin", "data1", "read"},
{"p", "admin", "data1", "write"},
{"g", "alice", "admin"},
}
for i, ruleStr := range rulesAfter {
var rule rbac.Rule
err := json.Unmarshal([]byte(ruleStr), &rule)
assert.NoError(t, err)
assert.Equal(t, expectedPolicies[i][0], rule.PolicyType)
assert.Equal(t, expectedPolicies[i][1], rule.Field0)
assert.Equal(t, expectedPolicies[i][2], rule.Field1)
if len(expectedPolicies[i]) > 3 {
assert.Equal(t, expectedPolicies[i][3], rule.Field2)
}
}
}
// 測試 SavePolicy: Redis 寫入失敗
func TestRBACAdapter_SavePolicy_RedisError(t *testing.T) {
mr, rdb := setupMiniRedis()
defer mr.Close()
adapter, err := NewRBACAdapter(RBACAdapterParam{Redis: rdb})
assert.NoError(t, err)
m, _ := model.NewModelFromString(`
[policy_definition]
p = sub, obj, act
`)
m.AddPolicy("p", "p", []string{"admin", "data1", "read"})
err = adapter.SavePolicy(m)
assert.Error(t, err) // 應該要回傳錯誤
}
// 測試 `LoadPolicy` 是否能從 Redis 讀取 `p` 和 `g` 規則
func TestRBACAdapter_LoadPolicy(t *testing.T) {
mr, rdb := setupMiniRedis()
defer mr.Close()
adapter, err := NewRBACAdapter(RBACAdapterParam{Redis: rdb})
assert.NoError(t, err)
// 預先寫入 Redis 測試數據
rk := domain.GetCasbinRuleRedisKey()
testRules := []rbac.Rule{
{PolicyType: "p", Field0: "admin", Field1: "data1", Field2: "read"},
{PolicyType: "p", Field0: "admin", Field1: "data1", Field2: "write"},
{PolicyType: "g", Field0: "alice", Field1: "admin"},
{PolicyType: "g", Field0: "bob", Field1: "user"},
}
for _, rule := range testRules {
data, _ := json.Marshal(rule)
_, err := rdb.Rpush(rk, string(data))
assert.NoError(t, err)
}
// 創建 Casbin Model
m, _ := model.NewModelFromString(`
[request_definition]
r = sub, obj, act
[policy_definition]
p = sub, obj, act
[role_definition]
g = _, _
[policy_effect]
e = some(where (p.eft == allow))
[matchers]
m = g(r.sub, p.sub) && r.obj == p.obj && r.act == p.act
`)
// 測試 `LoadPolicy`
err = adapter.LoadPolicy(m)
assert.NoError(t, err)
}