123 lines
3.1 KiB
Go
123 lines
3.1 KiB
Go
package grpc
|
||
|
||
import (
|
||
"errors"
|
||
pb "finclip-app-manager/infrastructure/protobuf/golang"
|
||
"finclip-app-manager/infrastructure/utility"
|
||
"fmt"
|
||
"sync"
|
||
"sync/atomic"
|
||
"unsafe"
|
||
|
||
"google.golang.org/grpc"
|
||
"google.golang.org/grpc/connectivity"
|
||
|
||
_ "github.com/mbobakov/grpc-consul-resolver"
|
||
)
|
||
|
||
var (
|
||
grpcConnectManager GrpcConnectManager
|
||
initOnce sync.Once
|
||
)
|
||
|
||
func GetGrpcConnManager() *GrpcConnectManager {
|
||
initOnce.Do(func() {
|
||
grpcConnectManager.GrpcConnectItemMap = make(map[string]GrpcConnectItem)
|
||
})
|
||
|
||
return &grpcConnectManager
|
||
}
|
||
|
||
type GrpcConnectItem struct { //单个连接建立,例如:和账号系统多实例连接上,一个item可以理解为对应一个mop服务多实例的连接
|
||
ClientConn unsafe.Pointer
|
||
ServerName string
|
||
}
|
||
|
||
type GrpcConnectManager struct {
|
||
CreateLock sync.Mutex //防止多协程同时建立连接
|
||
GrpcConnectItemMap map[string]GrpcConnectItem //一开始是空
|
||
}
|
||
|
||
//param是指url的参数,例如:wait=10s&tag=mop,代表这个连接超时10秒建立,consul的服务tag是mop
|
||
func (g *GrpcConnectManager) GetConn(consulUrl, server, param string) (*grpc.ClientConn, error) {
|
||
if connItem, ok := g.GrpcConnectItemMap[server]; ok { //first check
|
||
if atomic.LoadPointer(&connItem.ClientConn) != nil {
|
||
return (*grpc.ClientConn)(connItem.ClientConn), nil
|
||
}
|
||
}
|
||
|
||
g.CreateLock.Lock()
|
||
defer g.CreateLock.Unlock()
|
||
|
||
fmt.Println("GetConn lock")
|
||
connItem, ok := g.GrpcConnectItemMap[server]
|
||
if ok { //double check
|
||
if atomic.LoadPointer(&connItem.ClientConn) != nil {
|
||
cc := (*grpc.ClientConn)(connItem.ClientConn)
|
||
if g.checkState(cc) == nil {
|
||
return cc, nil
|
||
} else { //旧的连接存在服务端断开的情况,需要先关闭
|
||
cc.Close()
|
||
}
|
||
}
|
||
}
|
||
|
||
target := "consul://" + consulUrl + "/" + server
|
||
if param != "" {
|
||
target += "?" + param
|
||
}
|
||
|
||
fmt.Println("new conn, url=" + target)
|
||
cli, err := g.newGrpcConn(target)
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
|
||
var newItem GrpcConnectItem
|
||
newItem.ServerName = server
|
||
|
||
atomic.StorePointer(&newItem.ClientConn, unsafe.Pointer(cli))
|
||
g.GrpcConnectItemMap[server] = newItem
|
||
|
||
return cli, nil
|
||
}
|
||
|
||
func (g *GrpcConnectManager) checkState(conn *grpc.ClientConn) error {
|
||
state := conn.GetState()
|
||
switch state {
|
||
case connectivity.TransientFailure, connectivity.Shutdown:
|
||
return errors.New("ErrConn")
|
||
}
|
||
|
||
return nil
|
||
}
|
||
|
||
func (g *GrpcConnectManager) newGrpcConn(target string) (*grpc.ClientConn, error) {
|
||
conn, err := grpc.Dial(
|
||
target,
|
||
//grpc.WithBlock(),
|
||
grpc.WithInsecure(),
|
||
grpc.WithBalancerName("round_robin"),
|
||
//grpc.WithDefaultServiceConfig(`{"loadBalancingPolicy": "round_robin"}`),
|
||
)
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
|
||
return conn, nil
|
||
}
|
||
|
||
func MakeGrpcCommonResult(httpStatus int, errCode string, result **pb.CommonResult) {
|
||
*result = &pb.CommonResult{}
|
||
(*result).Error = utility.GetErrMsg(errCode)
|
||
(*result).Errcode = errCode
|
||
(*result).Httpcode = int32(httpStatus)
|
||
}
|
||
|
||
func MakeGrpcCommonResultNoLoc(httpStatus int, errCode, errmsg string, result **pb.CommonResult) {
|
||
*result = &pb.CommonResult{}
|
||
(*result).Error = errmsg
|
||
(*result).Errcode = errCode
|
||
(*result).Httpcode = int32(httpStatus)
|
||
}
|