package mongo import ( "context" "errors" "fmt" "net/url" "strings" "time" "github.com/zeromicro/go-zero/core/logx" "github.com/zeromicro/go-zero/core/stores/mon" "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/mongo" "go.mongodb.org/mongo-driver/mongo/options" "go.mongodb.org/mongo-driver/mongo/readpref" ) type DocumentDB struct { Mon *mon.Model } func NewDocumentDB(config *Conf, collection string, opts ...mon.Option) (DocumentDBUseCase, error) { authenticationURI := "" if config.User != "" { authenticationURI = fmt.Sprintf( authenticationStringTemplate, config.User, config.Password, ) } connectionURI := fmt.Sprintf( connectionStringTemplate, config.Schema, authenticationURI, config.Host, ) connectUri, _ := url.Parse(connectionURI) printConnectUri := connectUri.String() findIndexAt := strings.Index(connectUri.String(), "@") if findIndexAt > -1 && config.User != "" { prefixIndex := len(config.Schema) + 3 + len(config.User) connectUriStr := connectUri.String() printConnectUri = fmt.Sprintf("%s:*****%s", connectUriStr[:prefixIndex], connectUriStr[findIndexAt:]) } // 初始化選項 intOpt := InitMongoOptions(*config) opts = append(opts, intOpt) logx.Infof("[DocumentDB] Try to connect document db `%s`", printConnectUri) client, err := mon.NewModel(connectionURI, config.Database, collection, opts...) if err != nil { return nil, err } ctx, cancel := context.WithTimeout( context.Background(), time.Duration(config.ConnectTimeoutMs)*time.Millisecond) defer cancel() // Force a connection to verify our connection string err = client.Database().Client().Ping(ctx, readpref.SecondaryPreferred()) if err != nil { return nil, errors.New(fmt.Sprintf("Failed to ping cluster: %s", err)) } logx.Infof("[DocumentDB] Connected to DocumentDB!") return &DocumentDB{ Mon: client, }, nil } func (document *DocumentDB) PopulateIndex(ctx context.Context, key string, sort int32, unique bool) { c := document.Mon.Collection opts := options.CreateIndexes().SetMaxTime(3 * time.Second) index := document.yieldIndexModel( []string{key}, []int32{sort}, unique, nil, ) _, err := c.Indexes().CreateOne(ctx, index, opts) if err != nil { logx.Errorf("[DocumentDb] Ensure Index Failed, %s", err.Error()) } } func (document *DocumentDB) PopulateTTLIndex(ctx context.Context, key string, sort int32, unique bool, ttl int32) { c := document.Mon.Collection opts := options.CreateIndexes().SetMaxTime(3 * time.Second) index := document.yieldIndexModel( []string{key}, []int32{sort}, unique, options.Index().SetExpireAfterSeconds(ttl), ) _, err := c.Indexes().CreateOne(ctx, index, opts) if err != nil { logx.Errorf("[DocumentDb] Ensure TTL Index Failed, %s", err.Error()) } } func (document *DocumentDB) PopulateMultiIndex(ctx context.Context, keys []string, sorts []int32, unique bool) { if len(keys) != len(sorts) { logx.Infof("[DocumentDb] Ensure Indexes Failed Please provide some item length of keys/sorts") return } c := document.Mon.Collection opts := options.CreateIndexes().SetMaxTime(3 * time.Second) index := document.yieldIndexModel(keys, sorts, unique, nil) _, err := c.Indexes().CreateOne(ctx, index, opts) if err != nil { logx.Errorf("[DocumentDb] Ensure TTL Index Failed, %s", err.Error()) } } func (document *DocumentDB) GetClient() *mon.Model { return document.Mon } func (document *DocumentDB) yieldIndexModel(keys []string, sorts []int32, unique bool, indexOpt *options.IndexOptions) mongo.IndexModel { SetKeysDoc := bson.D{} for index, _ := range keys { key := keys[index] sort := sorts[index] SetKeysDoc = append(SetKeysDoc, bson.E{Key: key, Value: sort}) } if indexOpt == nil { indexOpt = options.Index() } indexOpt.SetUnique(unique) index := mongo.IndexModel{ Keys: SetKeysDoc, Options: indexOpt, } return index }