156 lines
4.3 KiB
Go
156 lines
4.3 KiB
Go
package repository
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"time"
|
|
|
|
"gateway/internal/library/mongo"
|
|
"gateway/internal/model/notification"
|
|
domentity "gateway/internal/model/notification/domain/entity"
|
|
"gateway/internal/model/notification/domain/enum"
|
|
domrepo "gateway/internal/model/notification/domain/repository"
|
|
|
|
"go.mongodb.org/mongo-driver/v2/bson"
|
|
mongodriver "go.mongodb.org/mongo-driver/v2/mongo"
|
|
"go.mongodb.org/mongo-driver/v2/mongo/options"
|
|
)
|
|
|
|
// NotificationRepositoryParam configures the Mongo notification repository.
|
|
type NotificationRepositoryParam struct {
|
|
Conf *mongo.Conf
|
|
}
|
|
|
|
type notificationRepository struct {
|
|
db mongo.DocumentDBUseCase
|
|
}
|
|
|
|
// NewNotificationRepository creates a Mongo-backed NotificationRepository.
|
|
func NewNotificationRepository(param NotificationRepositoryParam) domrepo.NotificationRepository {
|
|
e := domentity.Notification{}
|
|
documentDB, err := mongo.NewDocumentDB(param.Conf, e.CollectionName())
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
return ¬ificationRepository{db: documentDB}
|
|
}
|
|
|
|
func (r *notificationRepository) Insert(ctx context.Context, data *domentity.Notification) error {
|
|
now := time.Now().UTC().UnixNano()
|
|
if data.ID.IsZero() {
|
|
data.ID = bson.NewObjectID()
|
|
}
|
|
if data.CreateAt == nil {
|
|
data.CreateAt = &now
|
|
}
|
|
if data.UpdateAt == nil {
|
|
data.UpdateAt = &now
|
|
}
|
|
if data.OccurredAt == nil {
|
|
data.OccurredAt = &now
|
|
}
|
|
|
|
_, err := r.db.GetClient().InsertOne(ctx, data)
|
|
if err != nil {
|
|
if mongodriver.IsDuplicateKeyError(err) {
|
|
return notification.ErrDuplicateIdempotency
|
|
}
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (r *notificationRepository) FindByID(ctx context.Context, tenantID, id string) (*domentity.Notification, error) {
|
|
oid, err := bson.ObjectIDFromHex(id)
|
|
if err != nil {
|
|
return nil, notification.ErrInvalidObjectID
|
|
}
|
|
|
|
var doc domentity.Notification
|
|
filter := bson.M{notification.BSONFieldID: oid, notification.BSONFieldTenantID: tenantID}
|
|
if err := r.db.GetClient().FindOne(ctx, &doc, filter); err != nil {
|
|
if errors.Is(err, mongodriver.ErrNoDocuments) {
|
|
return nil, notification.ErrNotFound
|
|
}
|
|
return nil, err
|
|
}
|
|
return &doc, nil
|
|
}
|
|
|
|
func (r *notificationRepository) FindByIdempotency(
|
|
ctx context.Context,
|
|
tenantID string,
|
|
kind enum.NotifyKind,
|
|
idempotencyKey string,
|
|
) (*domentity.Notification, error) {
|
|
filter := bson.M{
|
|
notification.BSONFieldTenantID: tenantID,
|
|
notification.BSONFieldKind: kind,
|
|
notification.BSONFieldIdempotencyKey: idempotencyKey,
|
|
}
|
|
var docs []domentity.Notification
|
|
opts := options.Find().SetLimit(1)
|
|
if err := r.db.GetClient().Find(ctx, &docs, filter, opts); err != nil {
|
|
return nil, err
|
|
}
|
|
if len(docs) == 0 {
|
|
return nil, notification.ErrNotFound
|
|
}
|
|
return &docs[0], nil
|
|
}
|
|
|
|
func (r *notificationRepository) UpdateDelivery(ctx context.Context, tenantID, id string, update *domrepo.NotificationDeliveryUpdate) error {
|
|
oid, err := bson.ObjectIDFromHex(id)
|
|
if err != nil {
|
|
return notification.ErrInvalidObjectID
|
|
}
|
|
|
|
now := time.Now().UTC().UnixNano()
|
|
set := bson.M{
|
|
"status": update.Status,
|
|
"attempts": update.Attempts,
|
|
"update_at": now,
|
|
}
|
|
if update.Provider != "" {
|
|
set["provider"] = update.Provider
|
|
}
|
|
if update.ProviderMessageID != "" {
|
|
set["provider_message_id"] = update.ProviderMessageID
|
|
}
|
|
if update.LastError != "" {
|
|
set["last_error"] = update.LastError
|
|
}
|
|
if update.Body != "" {
|
|
set["body"] = update.Body
|
|
}
|
|
if update.DeliveredAt != nil {
|
|
set["delivered_at"] = *update.DeliveredAt
|
|
}
|
|
|
|
filter := bson.M{
|
|
notification.BSONFieldID: oid,
|
|
notification.BSONFieldTenantID: tenantID,
|
|
}
|
|
_, err = r.db.GetClient().UpdateOne(ctx, filter, bson.M{"$set": set})
|
|
return err
|
|
}
|
|
|
|
func (r *notificationRepository) Index20260520001UP(ctx context.Context) error {
|
|
if err := r.db.PopulateMultiIndex(ctx, []string{
|
|
notification.BSONFieldTenantID, notification.BSONFieldKind, notification.BSONFieldIdempotencyKey,
|
|
}, []int32{1, 1, 1}, true); err != nil {
|
|
return err
|
|
}
|
|
if err := r.db.PopulateMultiIndex(ctx, []string{
|
|
notification.BSONFieldTenantID, notification.BSONFieldUID, notification.BSONFieldOccurredAt,
|
|
}, []int32{1, 1, -1}, false); err != nil {
|
|
return err
|
|
}
|
|
if err := r.db.PopulateMultiIndex(ctx, []string{
|
|
notification.BSONFieldStatus, notification.BSONFieldAttempts, notification.BSONFieldOccurredAt,
|
|
}, []int32{1, 1, 1}, false); err != nil {
|
|
return err
|
|
}
|
|
return nil
|
|
}
|