317 lines
10 KiB
Go
317 lines
10 KiB
Go
|
package mongo
|
||
|
|
||
|
import (
|
||
|
"context"
|
||
|
"finclip-app-manager/infrastructure/config"
|
||
|
mgo "gitlab.finogeeks.club/finclip-backend-v2/finclip-mgo"
|
||
|
"gitlab.finogeeks.club/finclip-backend-v2/finclip-mgo/bson"
|
||
|
"gitlab.finogeeks.club/finclip-backend/apm"
|
||
|
"time"
|
||
|
)
|
||
|
|
||
|
type Table struct {
|
||
|
DbName string
|
||
|
CollName string
|
||
|
}
|
||
|
|
||
|
func NewTable(colName string) *Table {
|
||
|
return &Table{
|
||
|
DbName: config.Cfg.DBName,
|
||
|
CollName: colName,
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func (t *Table) GetOne(ctx context.Context, filter bson.M, result interface{}) error {
|
||
|
span := apm.ApmClient().CreateMongoExitSpan(ctx, "Table.GetOne", config.Cfg.MongoURL, t.DbName)
|
||
|
defer span.End()
|
||
|
span.Log(time.Now(), "collection", t.CollName, "method", "GetOne")
|
||
|
|
||
|
conn := Dbsession.Copy()
|
||
|
defer conn.Close()
|
||
|
col := conn.DB(t.DbName).C(t.CollName)
|
||
|
return col.Find(filter).One(result)
|
||
|
}
|
||
|
|
||
|
func (t *Table) GetSortOne(ctx context.Context, filter bson.M, sort []string, result interface{}) error {
|
||
|
span := apm.ApmClient().CreateMongoExitSpan(ctx, "Table.GetSortOne", config.Cfg.MongoURL, t.DbName)
|
||
|
defer span.End()
|
||
|
span.Log(time.Now(), "collection", t.CollName, "method", "GetSortOne")
|
||
|
conn := Dbsession.Copy()
|
||
|
defer conn.Close()
|
||
|
col := conn.DB(t.DbName).C(t.CollName)
|
||
|
|
||
|
query := col.Find(filter)
|
||
|
return query.Sort(sort...).Limit(1).One(result)
|
||
|
}
|
||
|
|
||
|
func (t *Table) GetSome(ctx context.Context, filter bson.M, sort []string, pageSize, pageNo int, result interface{}) (int, error) {
|
||
|
span := apm.ApmClient().CreateMongoExitSpan(ctx, "Table.GetSome", config.Cfg.MongoURL, t.DbName)
|
||
|
defer span.End()
|
||
|
span.Log(time.Now(), "collection", t.CollName, "method", "GetSome")
|
||
|
conn := Dbsession.Copy()
|
||
|
defer conn.Close()
|
||
|
col := conn.DB(t.DbName).C(t.CollName)
|
||
|
|
||
|
query := col.Find(filter)
|
||
|
total, err := query.Count()
|
||
|
if err != nil {
|
||
|
return 0, err
|
||
|
}
|
||
|
return total, query.Sort(sort...).Skip(pageSize * (pageNo - 1)).Limit(pageSize).All(result)
|
||
|
}
|
||
|
|
||
|
func (t *Table) OnlyGetSome(ctx context.Context, filter bson.M, sort []string, pageSize, pageNo int, result interface{}) error {
|
||
|
span := apm.ApmClient().CreateMongoExitSpan(ctx, "Table.GetSome", config.Cfg.MongoURL, t.DbName)
|
||
|
defer span.End()
|
||
|
span.Log(time.Now(), "collection", t.CollName, "method", "GetSome")
|
||
|
conn := Dbsession.Copy()
|
||
|
defer conn.Close()
|
||
|
col := conn.DB(t.DbName).C(t.CollName)
|
||
|
|
||
|
query := col.Find(filter)
|
||
|
return query.Sort(sort...).Skip(pageSize * (pageNo - 1)).Limit(pageSize).All(result)
|
||
|
|
||
|
}
|
||
|
|
||
|
func (t *Table) GetAll(ctx context.Context, filter bson.M, sort []string, result interface{}) (int, error) {
|
||
|
span := apm.ApmClient().CreateMongoExitSpan(ctx, "Table.GetAll", config.Cfg.MongoURL, t.DbName)
|
||
|
defer span.End()
|
||
|
span.Log(time.Now(), "collection", t.CollName, "method", "GetAll")
|
||
|
conn := Dbsession.Copy()
|
||
|
defer conn.Close()
|
||
|
col := conn.DB(t.DbName).C(t.CollName)
|
||
|
query := col.Find(filter)
|
||
|
total, err := query.Count()
|
||
|
if err != nil {
|
||
|
return 0, err
|
||
|
}
|
||
|
return total, query.Sort(sort...).All(result)
|
||
|
}
|
||
|
|
||
|
func (t *Table) GetAllV2(ctx context.Context, filter bson.M, sort []string, result interface{}) (int, error) {
|
||
|
span := apm.ApmClient().CreateMongoExitSpan(ctx, "Table.GetAll", config.Cfg.MongoURL, t.DbName)
|
||
|
defer span.End()
|
||
|
span.Log(time.Now(), "collection", t.CollName, "method", "GetAll")
|
||
|
conn := Dbsession.Copy()
|
||
|
defer conn.Close()
|
||
|
col := conn.DB(t.DbName).C(t.CollName)
|
||
|
query := col.Find(filter)
|
||
|
total, err := query.Count()
|
||
|
if err != nil {
|
||
|
return 0, err
|
||
|
}
|
||
|
if err := query.Sort(sort...).All(result); err != nil {
|
||
|
return 0, err
|
||
|
}
|
||
|
return total, nil
|
||
|
}
|
||
|
|
||
|
func (t *Table) OnlyGetAll(ctx context.Context, filter bson.M, sort []string, result interface{}) error {
|
||
|
span := apm.ApmClient().CreateMongoExitSpan(ctx, "Table.GetAll", config.Cfg.MongoURL, t.DbName)
|
||
|
defer span.End()
|
||
|
span.Log(time.Now(), "collection", t.CollName, "method", "GetAll")
|
||
|
conn := Dbsession.Copy()
|
||
|
defer conn.Close()
|
||
|
col := conn.DB(t.DbName).C(t.CollName)
|
||
|
query := col.Find(filter)
|
||
|
if err := query.Sort(sort...).All(result); err != nil {
|
||
|
return err
|
||
|
}
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
func (t *Table) GetAllAndSomeField(ctx context.Context, filter bson.M, selector bson.M, sort []string, result interface{}) (int, error) {
|
||
|
span := apm.ApmClient().CreateMongoExitSpan(ctx, "Table.GetAllAndSomeField", config.Cfg.MongoURL, t.DbName)
|
||
|
defer span.End()
|
||
|
span.Log(time.Now(), "collection", t.CollName, "method", "GetAllAndSomeField")
|
||
|
conn := Dbsession.Copy()
|
||
|
defer conn.Close()
|
||
|
col := conn.DB(t.DbName).C(t.CollName)
|
||
|
query := col.Find(filter)
|
||
|
total, err := query.Count()
|
||
|
if err != nil {
|
||
|
return 0, err
|
||
|
}
|
||
|
return total, query.Sort(sort...).Select(selector).All(result)
|
||
|
}
|
||
|
|
||
|
func (t *Table) Aggregate(ctx context.Context, pipeSelector []bson.M, result interface{}) error {
|
||
|
span := apm.ApmClient().CreateMongoExitSpan(ctx, "Table.Aggregate", config.Cfg.MongoURL, t.DbName)
|
||
|
defer span.End()
|
||
|
span.Log(time.Now(), "collection", t.CollName, "method", "Aggregate")
|
||
|
|
||
|
conn := Dbsession.Copy()
|
||
|
defer conn.Close()
|
||
|
col := conn.DB(t.DbName).C(t.CollName)
|
||
|
return col.Pipe(pipeSelector).All(result)
|
||
|
}
|
||
|
|
||
|
func (t *Table) AggregateOnly(ctx context.Context, pipeSelector []bson.M, result interface{}) error {
|
||
|
span := apm.ApmClient().CreateMongoExitSpan(ctx, "Table.AggregateOnly", config.Cfg.MongoURL, t.DbName)
|
||
|
defer span.End()
|
||
|
|
||
|
conn := Dbsession.Copy()
|
||
|
defer conn.Close()
|
||
|
col := conn.DB(t.DbName).C(t.CollName)
|
||
|
if err := col.Pipe(pipeSelector).All(result); err != nil {
|
||
|
log.Errorf("pip find error:%s", err.Error())
|
||
|
span.Error(time.Now(), "collection", t.DbName, "method", "Pipe.All", "error", err.Error())
|
||
|
return err
|
||
|
}
|
||
|
span.Log(time.Now(), "collection", t.DbName, "method", "Pipe.All")
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
func (t *Table) AggregateCount(ctx context.Context, countPip []bson.M) (int, error) {
|
||
|
span := apm.ApmClient().CreateMongoExitSpan(ctx, "Table.AggregateCount", config.Cfg.MongoURL, t.DbName)
|
||
|
defer span.End()
|
||
|
|
||
|
type TotalInfo struct {
|
||
|
Id string `bson:"_id"`
|
||
|
Total int `bson:"total"`
|
||
|
}
|
||
|
conn := Dbsession.Copy()
|
||
|
defer conn.Close()
|
||
|
col := conn.DB(t.DbName).C(t.CollName)
|
||
|
|
||
|
result := &TotalInfo{}
|
||
|
err := col.Pipe(countPip).One(result)
|
||
|
if err != nil && err != mgo.ErrNotFound {
|
||
|
log.Errorf("pip find error:%s", err.Error())
|
||
|
span.Error(time.Now(), "collection", t.CollName, "method", "Pipe.One", "error", err.Error())
|
||
|
return 0, err
|
||
|
}
|
||
|
if err == mgo.ErrNotFound {
|
||
|
span.Error(time.Now(), "collection", t.CollName, "method", "Pipe.One", "error", err.Error())
|
||
|
return 0, nil
|
||
|
}
|
||
|
span.Log(time.Now(), "collection", t.CollName, "method", "Pipe.One")
|
||
|
return result.Total, nil
|
||
|
}
|
||
|
|
||
|
func (t *Table) Insert(ctx context.Context, data interface{}) error {
|
||
|
span := apm.ApmClient().CreateMongoExitSpan(ctx, "Table.Insert", config.Cfg.MongoURL, t.DbName)
|
||
|
defer span.End()
|
||
|
span.Log(time.Now(), "collection", t.CollName, "method", "Insert")
|
||
|
|
||
|
conn := Dbsession.Copy()
|
||
|
defer conn.Close()
|
||
|
return conn.DB(t.DbName).C(t.CollName).Insert(data)
|
||
|
}
|
||
|
|
||
|
func (t *Table) UpdateOne(ctx context.Context, selector bson.M, update interface{}) error {
|
||
|
span := apm.ApmClient().CreateMongoExitSpan(ctx, "Table.UpdateOne", config.Cfg.MongoURL, t.DbName)
|
||
|
defer span.End()
|
||
|
span.Log(time.Now(), "collection", t.CollName, "method", "UpdateOne")
|
||
|
|
||
|
conn := Dbsession.Copy()
|
||
|
defer conn.Close()
|
||
|
col := conn.DB(t.DbName).C(t.CollName)
|
||
|
return col.Update(selector, update)
|
||
|
}
|
||
|
|
||
|
func (t *Table) UpdateAll(ctx context.Context, selector bson.M, update interface{}) error {
|
||
|
span := apm.ApmClient().CreateMongoExitSpan(ctx, "Table.UpdateAll", config.Cfg.MongoURL, t.DbName)
|
||
|
defer span.End()
|
||
|
span.Log(time.Now(), "collection", t.CollName, "method", "UpdateAll")
|
||
|
conn := Dbsession.Copy()
|
||
|
defer conn.Close()
|
||
|
col := conn.DB(t.DbName).C(t.CollName)
|
||
|
_, err := col.UpdateAll(selector, update)
|
||
|
return err
|
||
|
}
|
||
|
|
||
|
func (t *Table) Upsert(selector bson.M, update interface{}) error {
|
||
|
conn := Dbsession.Copy()
|
||
|
defer conn.Close()
|
||
|
col := conn.DB(t.DbName).C(t.CollName)
|
||
|
_, err := col.Upsert(selector, update)
|
||
|
return err
|
||
|
}
|
||
|
|
||
|
func (t *Table) UpdateData(filter bson.M, data bson.M) error {
|
||
|
conn := Dbsession.Copy()
|
||
|
defer conn.Close()
|
||
|
col := conn.DB(t.DbName).C(t.CollName)
|
||
|
|
||
|
return col.Update(filter, data)
|
||
|
}
|
||
|
|
||
|
func (t *Table) UpSert(filter bson.M, data interface{}) error {
|
||
|
conn := Dbsession.Copy()
|
||
|
defer conn.Close()
|
||
|
col := conn.DB(t.DbName).C(t.CollName)
|
||
|
|
||
|
_, err := col.Upsert(filter, data)
|
||
|
return err
|
||
|
}
|
||
|
|
||
|
func (t *Table) EnsureIndex(index mgo.Index) error {
|
||
|
conn := Dbsession.Copy()
|
||
|
defer conn.Close()
|
||
|
col := conn.DB(t.DbName).C(t.CollName)
|
||
|
return col.EnsureIndex(index)
|
||
|
}
|
||
|
|
||
|
func (t *Table) Count(ctx context.Context, filter bson.M) (int, error) {
|
||
|
span := apm.ApmClient().CreateMongoExitSpan(ctx, "Table.Count", config.Cfg.MongoURL, t.DbName)
|
||
|
defer span.End()
|
||
|
span.Log(time.Now(), "collection", t.CollName, "method", "Count")
|
||
|
|
||
|
conn := Dbsession.Copy()
|
||
|
defer conn.Close()
|
||
|
col := conn.DB(t.DbName).C(t.CollName)
|
||
|
total, err := col.Find(filter).Count()
|
||
|
if err != nil {
|
||
|
return 0, err
|
||
|
}
|
||
|
return total, nil
|
||
|
}
|
||
|
|
||
|
func (t *Table) Del(ctx context.Context, selector bson.M) error {
|
||
|
span := apm.ApmClient().CreateMongoExitSpan(ctx, "Table.Del", config.Cfg.MongoURL, t.DbName)
|
||
|
defer span.End()
|
||
|
span.Log(time.Now(), "collection", t.CollName, "method", "Del")
|
||
|
|
||
|
conn := Dbsession.Copy()
|
||
|
defer conn.Close()
|
||
|
col := conn.DB(t.DbName).C(t.CollName)
|
||
|
err := col.Remove(selector)
|
||
|
if err != nil && err != mgo.ErrNotFound {
|
||
|
return err
|
||
|
}
|
||
|
return err
|
||
|
}
|
||
|
|
||
|
func (t *Table) DelAll(ctx context.Context, selector bson.M) error {
|
||
|
span := apm.ApmClient().CreateMongoExitSpan(ctx, "Table.DelAll", config.Cfg.MongoURL, t.DbName)
|
||
|
defer span.End()
|
||
|
span.Log(time.Now(), "collection", t.CollName, "method", "DelAll")
|
||
|
|
||
|
conn := Dbsession.Copy()
|
||
|
defer conn.Close()
|
||
|
col := conn.DB(t.DbName).C(t.CollName)
|
||
|
_, err := col.RemoveAll(selector)
|
||
|
if err != nil && err != mgo.ErrNotFound {
|
||
|
return err
|
||
|
}
|
||
|
return err
|
||
|
}
|
||
|
|
||
|
func (t *Table) BulkInsert(ctx context.Context, list []interface{}) error {
|
||
|
span := apm.ApmClient().CreateMongoExitSpan(ctx, "Table.BulkInsert", config.Cfg.MongoURL, t.DbName)
|
||
|
defer span.End()
|
||
|
span.Log(time.Now(), "collection", t.CollName, "method", "BulkInsert")
|
||
|
|
||
|
conn := Dbsession.Copy()
|
||
|
defer conn.Close()
|
||
|
col := conn.DB(t.DbName).C(t.CollName)
|
||
|
b := col.Bulk()
|
||
|
b.Insert(list...)
|
||
|
_, err := b.Run()
|
||
|
if err != nil {
|
||
|
return err
|
||
|
}
|
||
|
return err
|
||
|
}
|