finclip-app-manager/vendor/gitlab.finogeeks.club/finclip-backend/mop-middleware-auth/grpc/grpc.go

156 lines
4.6 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

package grpc
import (
"errors"
"fmt"
"sync"
"sync/atomic"
"unsafe"
"google.golang.org/grpc"
"google.golang.org/grpc/connectivity"
_ "github.com/mbobakov/grpc-consul-resolver"
)
var (
grpcConnectManager GrpcConnectManager
initOnce sync.Once
)
func getGrpcConnManager() *GrpcConnectManager {
initOnce.Do(func() {
grpcConnectManager.GrpcConnectItemMap = make(map[string]GrpcConnectItem)
})
return &grpcConnectManager
}
type GrpcConnectItem struct { //单个连接建立例如和账号系统多实例连接上一个item可以理解为对应一个mop服务多实例的连接
ClientConn unsafe.Pointer
ServerName string
}
type GrpcConnectManager struct {
CreateLock sync.Mutex //防止多协程同时建立连接
GrpcConnectItemMap map[string]GrpcConnectItem //一开始是空
}
//param是指url的参数例如wait=10s&tag=mop代表这个连接超时10秒建立consul的服务tag是mop
func (g *GrpcConnectManager) GetConn(consulUrl, server, param string) (*grpc.ClientConn, error) {
if connItem, ok := g.GrpcConnectItemMap[server]; ok { //first check
if atomic.LoadPointer(&connItem.ClientConn) != nil {
return (*grpc.ClientConn)(connItem.ClientConn), nil
}
}
g.CreateLock.Lock()
defer g.CreateLock.Unlock()
fmt.Println("GetConn lock")
connItem, ok := g.GrpcConnectItemMap[server]
if ok { //double check
if atomic.LoadPointer(&connItem.ClientConn) != nil {
cc := (*grpc.ClientConn)(connItem.ClientConn)
if g.checkState(cc) == nil {
return cc, nil
} else { //旧的连接存在服务端断开的情况,需要先关闭
cc.Close()
}
}
}
target := "consul://" + consulUrl + "/" + server
if param != "" {
target += "?" + param
}
fmt.Println("new conn, url=" + target)
cli, err := g.newGrpcConn(target)
if err != nil {
return nil, err
}
var newItem GrpcConnectItem
newItem.ServerName = server
atomic.StorePointer(&newItem.ClientConn, unsafe.Pointer(cli))
g.GrpcConnectItemMap[server] = newItem
return cli, nil
}
func (g *GrpcConnectManager) checkState(conn *grpc.ClientConn) error {
state := conn.GetState()
switch state {
case connectivity.TransientFailure, connectivity.Shutdown:
return errors.New("ErrConn")
}
return nil
}
//const (
// // DialTimeout the timeout of create connection
// DialTimeout = 5 * time.Second
//
// // BackoffMaxDelay provided maximum delay when backing off after failed connection attempts.
// BackoffMaxDelay = 3 * time.Second
//
// // KeepAliveTime is the duration of time after which if the client doesn't see
// // any activity it pings the server to see if the transport is still alive.
// KeepAliveTime = time.Duration(10) * time.Second
//
// // KeepAliveTimeout is the duration of time for which the client waits after having
// // pinged for keepalive check and if no activity is seen even after that the connection
// // is closed.
// KeepAliveTimeout = time.Duration(3) * time.Second
//
// // InitialWindowSize we set it 1GB is to provide system's throughput.
// InitialWindowSize = 1 << 30
//
// // InitialConnWindowSize we set it 1GB is to provide system's throughput.
// InitialConnWindowSize = 1 << 30
//
// // MaxSendMsgSize set max gRPC request message size sent to server.
// // If any request message size is larger than current value, an error will be reported from gRPC.
// MaxSendMsgSize = 4 << 30
//
// // MaxRecvMsgSize set max gRPC receive message size received from server.
// // If any message size is larger than current value, an error will be reported from gRPC.
// MaxRecvMsgSize = 4 << 30
//)
func (g *GrpcConnectManager) newGrpcConn(target string) (*grpc.ClientConn, error) {
//conn, err := grpc.DialContext(context.Background(),
// target,
// //grpc.WithBlock(),
// grpc.WithInsecure(),
// grpc.WithDefaultServiceConfig(`{"loadBalancingPolicy": "round_robin"}`),
// //grpc.WithConnectParams(),
// grpc.WithBackoffMaxDelay(BackoffMaxDelay),
// grpc.WithInitialWindowSize(InitialWindowSize),
// grpc.WithInitialConnWindowSize(InitialConnWindowSize),
// grpc.WithDefaultCallOptions(grpc.MaxCallSendMsgSize(MaxSendMsgSize)),
// grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(MaxRecvMsgSize)),
// grpc.WithKeepaliveParams(keepalive.ClientParameters{
// Time: KeepAliveTime,
// Timeout: KeepAliveTimeout,
// PermitWithoutStream: true,
// }),
//)
conn, err := grpc.Dial(
target,
//grpc.WithBlock(),
grpc.WithInsecure(),
grpc.WithBalancerName("round_robin"),
//grpc.WithDefaultServiceConfig(`{"loadBalancingPolicy": "round_robin"}`),
)
if err != nil {
return nil, err
}
return conn, nil
}