finclip-app-manager/infrastructure/client/grpc/grpc.go

123 lines
3.1 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

package grpc
import (
"errors"
pb "finclip-app-manager/infrastructure/protobuf/golang"
"finclip-app-manager/infrastructure/utility"
"fmt"
"sync"
"sync/atomic"
"unsafe"
"google.golang.org/grpc"
"google.golang.org/grpc/connectivity"
_ "github.com/mbobakov/grpc-consul-resolver"
)
var (
grpcConnectManager GrpcConnectManager
initOnce sync.Once
)
func GetGrpcConnManager() *GrpcConnectManager {
initOnce.Do(func() {
grpcConnectManager.GrpcConnectItemMap = make(map[string]GrpcConnectItem)
})
return &grpcConnectManager
}
type GrpcConnectItem struct { //单个连接建立例如和账号系统多实例连接上一个item可以理解为对应一个mop服务多实例的连接
ClientConn unsafe.Pointer
ServerName string
}
type GrpcConnectManager struct {
CreateLock sync.Mutex //防止多协程同时建立连接
GrpcConnectItemMap map[string]GrpcConnectItem //一开始是空
}
//param是指url的参数例如wait=10s&tag=mop代表这个连接超时10秒建立consul的服务tag是mop
func (g *GrpcConnectManager) GetConn(consulUrl, server, param string) (*grpc.ClientConn, error) {
if connItem, ok := g.GrpcConnectItemMap[server]; ok { //first check
if atomic.LoadPointer(&connItem.ClientConn) != nil {
return (*grpc.ClientConn)(connItem.ClientConn), nil
}
}
g.CreateLock.Lock()
defer g.CreateLock.Unlock()
fmt.Println("GetConn lock")
connItem, ok := g.GrpcConnectItemMap[server]
if ok { //double check
if atomic.LoadPointer(&connItem.ClientConn) != nil {
cc := (*grpc.ClientConn)(connItem.ClientConn)
if g.checkState(cc) == nil {
return cc, nil
} else { //旧的连接存在服务端断开的情况,需要先关闭
cc.Close()
}
}
}
target := "consul://" + consulUrl + "/" + server
if param != "" {
target += "?" + param
}
fmt.Println("new conn, url=" + target)
cli, err := g.newGrpcConn(target)
if err != nil {
return nil, err
}
var newItem GrpcConnectItem
newItem.ServerName = server
atomic.StorePointer(&newItem.ClientConn, unsafe.Pointer(cli))
g.GrpcConnectItemMap[server] = newItem
return cli, nil
}
func (g *GrpcConnectManager) checkState(conn *grpc.ClientConn) error {
state := conn.GetState()
switch state {
case connectivity.TransientFailure, connectivity.Shutdown:
return errors.New("ErrConn")
}
return nil
}
func (g *GrpcConnectManager) newGrpcConn(target string) (*grpc.ClientConn, error) {
conn, err := grpc.Dial(
target,
//grpc.WithBlock(),
grpc.WithInsecure(),
grpc.WithBalancerName("round_robin"),
//grpc.WithDefaultServiceConfig(`{"loadBalancingPolicy": "round_robin"}`),
)
if err != nil {
return nil, err
}
return conn, nil
}
func MakeGrpcCommonResult(httpStatus int, errCode string, result **pb.CommonResult) {
*result = &pb.CommonResult{}
(*result).Error = utility.GetErrMsg(errCode)
(*result).Errcode = errCode
(*result).Httpcode = int32(httpStatus)
}
func MakeGrpcCommonResultNoLoc(httpStatus int, errCode, errmsg string, result **pb.CommonResult) {
*result = &pb.CommonResult{}
(*result).Error = errmsg
(*result).Errcode = errCode
(*result).Httpcode = int32(httpStatus)
}