finclip-app-manager/infrastructure/db/repo/mongo/table.go

317 lines
10 KiB
Go
Raw Permalink Normal View History

2023-10-31 14:07:26 +08:00
package mongo
import (
"context"
"finclip-app-manager/infrastructure/config"
mgo "gitlab.finogeeks.club/finclip-backend-v2/finclip-mgo"
"gitlab.finogeeks.club/finclip-backend-v2/finclip-mgo/bson"
"gitlab.finogeeks.club/finclip-backend/apm"
"time"
)
type Table struct {
DbName string
CollName string
}
func NewTable(colName string) *Table {
return &Table{
DbName: config.Cfg.DBName,
CollName: colName,
}
}
func (t *Table) GetOne(ctx context.Context, filter bson.M, result interface{}) error {
span := apm.ApmClient().CreateMongoExitSpan(ctx, "Table.GetOne", config.Cfg.MongoURL, t.DbName)
defer span.End()
span.Log(time.Now(), "collection", t.CollName, "method", "GetOne")
conn := Dbsession.Copy()
defer conn.Close()
col := conn.DB(t.DbName).C(t.CollName)
return col.Find(filter).One(result)
}
func (t *Table) GetSortOne(ctx context.Context, filter bson.M, sort []string, result interface{}) error {
span := apm.ApmClient().CreateMongoExitSpan(ctx, "Table.GetSortOne", config.Cfg.MongoURL, t.DbName)
defer span.End()
span.Log(time.Now(), "collection", t.CollName, "method", "GetSortOne")
conn := Dbsession.Copy()
defer conn.Close()
col := conn.DB(t.DbName).C(t.CollName)
query := col.Find(filter)
return query.Sort(sort...).Limit(1).One(result)
}
func (t *Table) GetSome(ctx context.Context, filter bson.M, sort []string, pageSize, pageNo int, result interface{}) (int, error) {
span := apm.ApmClient().CreateMongoExitSpan(ctx, "Table.GetSome", config.Cfg.MongoURL, t.DbName)
defer span.End()
span.Log(time.Now(), "collection", t.CollName, "method", "GetSome")
conn := Dbsession.Copy()
defer conn.Close()
col := conn.DB(t.DbName).C(t.CollName)
query := col.Find(filter)
total, err := query.Count()
if err != nil {
return 0, err
}
return total, query.Sort(sort...).Skip(pageSize * (pageNo - 1)).Limit(pageSize).All(result)
}
func (t *Table) OnlyGetSome(ctx context.Context, filter bson.M, sort []string, pageSize, pageNo int, result interface{}) error {
span := apm.ApmClient().CreateMongoExitSpan(ctx, "Table.GetSome", config.Cfg.MongoURL, t.DbName)
defer span.End()
span.Log(time.Now(), "collection", t.CollName, "method", "GetSome")
conn := Dbsession.Copy()
defer conn.Close()
col := conn.DB(t.DbName).C(t.CollName)
query := col.Find(filter)
return query.Sort(sort...).Skip(pageSize * (pageNo - 1)).Limit(pageSize).All(result)
}
func (t *Table) GetAll(ctx context.Context, filter bson.M, sort []string, result interface{}) (int, error) {
span := apm.ApmClient().CreateMongoExitSpan(ctx, "Table.GetAll", config.Cfg.MongoURL, t.DbName)
defer span.End()
span.Log(time.Now(), "collection", t.CollName, "method", "GetAll")
conn := Dbsession.Copy()
defer conn.Close()
col := conn.DB(t.DbName).C(t.CollName)
query := col.Find(filter)
total, err := query.Count()
if err != nil {
return 0, err
}
return total, query.Sort(sort...).All(result)
}
func (t *Table) GetAllV2(ctx context.Context, filter bson.M, sort []string, result interface{}) (int, error) {
span := apm.ApmClient().CreateMongoExitSpan(ctx, "Table.GetAll", config.Cfg.MongoURL, t.DbName)
defer span.End()
span.Log(time.Now(), "collection", t.CollName, "method", "GetAll")
conn := Dbsession.Copy()
defer conn.Close()
col := conn.DB(t.DbName).C(t.CollName)
query := col.Find(filter)
total, err := query.Count()
if err != nil {
return 0, err
}
if err := query.Sort(sort...).All(result); err != nil {
return 0, err
}
return total, nil
}
func (t *Table) OnlyGetAll(ctx context.Context, filter bson.M, sort []string, result interface{}) error {
span := apm.ApmClient().CreateMongoExitSpan(ctx, "Table.GetAll", config.Cfg.MongoURL, t.DbName)
defer span.End()
span.Log(time.Now(), "collection", t.CollName, "method", "GetAll")
conn := Dbsession.Copy()
defer conn.Close()
col := conn.DB(t.DbName).C(t.CollName)
query := col.Find(filter)
if err := query.Sort(sort...).All(result); err != nil {
return err
}
return nil
}
func (t *Table) GetAllAndSomeField(ctx context.Context, filter bson.M, selector bson.M, sort []string, result interface{}) (int, error) {
span := apm.ApmClient().CreateMongoExitSpan(ctx, "Table.GetAllAndSomeField", config.Cfg.MongoURL, t.DbName)
defer span.End()
span.Log(time.Now(), "collection", t.CollName, "method", "GetAllAndSomeField")
conn := Dbsession.Copy()
defer conn.Close()
col := conn.DB(t.DbName).C(t.CollName)
query := col.Find(filter)
total, err := query.Count()
if err != nil {
return 0, err
}
return total, query.Sort(sort...).Select(selector).All(result)
}
func (t *Table) Aggregate(ctx context.Context, pipeSelector []bson.M, result interface{}) error {
span := apm.ApmClient().CreateMongoExitSpan(ctx, "Table.Aggregate", config.Cfg.MongoURL, t.DbName)
defer span.End()
span.Log(time.Now(), "collection", t.CollName, "method", "Aggregate")
conn := Dbsession.Copy()
defer conn.Close()
col := conn.DB(t.DbName).C(t.CollName)
return col.Pipe(pipeSelector).All(result)
}
func (t *Table) AggregateOnly(ctx context.Context, pipeSelector []bson.M, result interface{}) error {
span := apm.ApmClient().CreateMongoExitSpan(ctx, "Table.AggregateOnly", config.Cfg.MongoURL, t.DbName)
defer span.End()
conn := Dbsession.Copy()
defer conn.Close()
col := conn.DB(t.DbName).C(t.CollName)
if err := col.Pipe(pipeSelector).All(result); err != nil {
log.Errorf("pip find error:%s", err.Error())
span.Error(time.Now(), "collection", t.DbName, "method", "Pipe.All", "error", err.Error())
return err
}
span.Log(time.Now(), "collection", t.DbName, "method", "Pipe.All")
return nil
}
func (t *Table) AggregateCount(ctx context.Context, countPip []bson.M) (int, error) {
span := apm.ApmClient().CreateMongoExitSpan(ctx, "Table.AggregateCount", config.Cfg.MongoURL, t.DbName)
defer span.End()
type TotalInfo struct {
Id string `bson:"_id"`
Total int `bson:"total"`
}
conn := Dbsession.Copy()
defer conn.Close()
col := conn.DB(t.DbName).C(t.CollName)
result := &TotalInfo{}
err := col.Pipe(countPip).One(result)
if err != nil && err != mgo.ErrNotFound {
log.Errorf("pip find error:%s", err.Error())
span.Error(time.Now(), "collection", t.CollName, "method", "Pipe.One", "error", err.Error())
return 0, err
}
if err == mgo.ErrNotFound {
span.Error(time.Now(), "collection", t.CollName, "method", "Pipe.One", "error", err.Error())
return 0, nil
}
span.Log(time.Now(), "collection", t.CollName, "method", "Pipe.One")
return result.Total, nil
}
func (t *Table) Insert(ctx context.Context, data interface{}) error {
span := apm.ApmClient().CreateMongoExitSpan(ctx, "Table.Insert", config.Cfg.MongoURL, t.DbName)
defer span.End()
span.Log(time.Now(), "collection", t.CollName, "method", "Insert")
conn := Dbsession.Copy()
defer conn.Close()
return conn.DB(t.DbName).C(t.CollName).Insert(data)
}
func (t *Table) UpdateOne(ctx context.Context, selector bson.M, update interface{}) error {
span := apm.ApmClient().CreateMongoExitSpan(ctx, "Table.UpdateOne", config.Cfg.MongoURL, t.DbName)
defer span.End()
span.Log(time.Now(), "collection", t.CollName, "method", "UpdateOne")
conn := Dbsession.Copy()
defer conn.Close()
col := conn.DB(t.DbName).C(t.CollName)
return col.Update(selector, update)
}
func (t *Table) UpdateAll(ctx context.Context, selector bson.M, update interface{}) error {
span := apm.ApmClient().CreateMongoExitSpan(ctx, "Table.UpdateAll", config.Cfg.MongoURL, t.DbName)
defer span.End()
span.Log(time.Now(), "collection", t.CollName, "method", "UpdateAll")
conn := Dbsession.Copy()
defer conn.Close()
col := conn.DB(t.DbName).C(t.CollName)
_, err := col.UpdateAll(selector, update)
return err
}
func (t *Table) Upsert(selector bson.M, update interface{}) error {
conn := Dbsession.Copy()
defer conn.Close()
col := conn.DB(t.DbName).C(t.CollName)
_, err := col.Upsert(selector, update)
return err
}
func (t *Table) UpdateData(filter bson.M, data bson.M) error {
conn := Dbsession.Copy()
defer conn.Close()
col := conn.DB(t.DbName).C(t.CollName)
return col.Update(filter, data)
}
func (t *Table) UpSert(filter bson.M, data interface{}) error {
conn := Dbsession.Copy()
defer conn.Close()
col := conn.DB(t.DbName).C(t.CollName)
_, err := col.Upsert(filter, data)
return err
}
func (t *Table) EnsureIndex(index mgo.Index) error {
conn := Dbsession.Copy()
defer conn.Close()
col := conn.DB(t.DbName).C(t.CollName)
return col.EnsureIndex(index)
}
func (t *Table) Count(ctx context.Context, filter bson.M) (int, error) {
span := apm.ApmClient().CreateMongoExitSpan(ctx, "Table.Count", config.Cfg.MongoURL, t.DbName)
defer span.End()
span.Log(time.Now(), "collection", t.CollName, "method", "Count")
conn := Dbsession.Copy()
defer conn.Close()
col := conn.DB(t.DbName).C(t.CollName)
total, err := col.Find(filter).Count()
if err != nil {
return 0, err
}
return total, nil
}
func (t *Table) Del(ctx context.Context, selector bson.M) error {
span := apm.ApmClient().CreateMongoExitSpan(ctx, "Table.Del", config.Cfg.MongoURL, t.DbName)
defer span.End()
span.Log(time.Now(), "collection", t.CollName, "method", "Del")
conn := Dbsession.Copy()
defer conn.Close()
col := conn.DB(t.DbName).C(t.CollName)
err := col.Remove(selector)
if err != nil && err != mgo.ErrNotFound {
return err
}
return err
}
func (t *Table) DelAll(ctx context.Context, selector bson.M) error {
span := apm.ApmClient().CreateMongoExitSpan(ctx, "Table.DelAll", config.Cfg.MongoURL, t.DbName)
defer span.End()
span.Log(time.Now(), "collection", t.CollName, "method", "DelAll")
conn := Dbsession.Copy()
defer conn.Close()
col := conn.DB(t.DbName).C(t.CollName)
_, err := col.RemoveAll(selector)
if err != nil && err != mgo.ErrNotFound {
return err
}
return err
}
func (t *Table) BulkInsert(ctx context.Context, list []interface{}) error {
span := apm.ApmClient().CreateMongoExitSpan(ctx, "Table.BulkInsert", config.Cfg.MongoURL, t.DbName)
defer span.End()
span.Log(time.Now(), "collection", t.CollName, "method", "BulkInsert")
conn := Dbsession.Copy()
defer conn.Close()
col := conn.DB(t.DbName).C(t.CollName)
b := col.Bulk()
b.Insert(list...)
_, err := b.Run()
if err != nil {
return err
}
return err
}