library-go/store/valkey/valkey.go

90 lines
1.7 KiB
Go
Raw Normal View History

2024-11-24 06:45:55 +00:00
package valkey
import (
"context"
"fmt"
"github.com/valkey-io/valkey-go/valkeyhook"
"github.com/zeromicro/go-zero/core/breaker"
"github.com/zeromicro/go-zero/core/errorx"
"time"
)
// VK defines a redis node/cluster. It is thread-safe.
type VK struct {
Addr string
Type string
Pass string
tls bool
brk breaker.Breaker // 斷路器
hooks []valkeyhook.Hook
}
// NewValKey returns a Redis with given options.
func NewValKey(conf VKConf, opts ...Option) (*VK, error) {
if err := conf.Validate(); err != nil {
return nil, err
}
if conf.Type == ClusterType {
opts = append([]Option{Cluster()}, opts...)
}
if len(conf.Pass) > 0 {
opts = append([]Option{WithPass(conf.Pass)}, opts...)
}
if conf.Tls {
opts = append([]Option{WithTLS()}, opts...)
}
rds := newValKey(conf.Host, opts...)
if !conf.NonBlock {
if err := rds.checkConnection(conf.PingTimeout); err != nil {
return nil, errorx.Wrap(err, fmt.Sprintf("valkey connect error, addr: %s", conf.Host))
}
}
return rds, nil
}
func newValKey(addr string, opts ...Option) *VK {
r := &VK{
Addr: addr,
Type: NodeType,
brk: breaker.NewBreaker(),
}
for _, opt := range opts {
opt(r)
}
return r
}
func (s *VK) checkConnection(pingTimeout time.Duration) error {
conn, err := getValKey(s)
if err != nil {
return err
}
timeout := defaultPingTimeout
if pingTimeout > 0 {
timeout = pingTimeout
}
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
cmd := conn.B().Ping().Build()
return conn.Do(ctx, cmd).Error()
}
func getValKey(r *VK) (VKNode, error) {
switch r.Type {
// case ClusterType:
// return getCluster(r)
case NodeType:
return getClient(r)
default:
return nil, fmt.Errorf("valkey type '%s' is not supported", r.Type)
}
}