Consul 介绍
服务注册与发现采用 consul 组件,是google开源的一个使用go语言开发的服务发现、配置管理中心服务。内置了服务注册与发现框架、分布一致性协议实现、健康检查、Key/Value存储、多数据中心方案等。
Consul使用gossip协议管理成员关系、广播消息到整个集群,他有两个gossip pool(LAN pool和WAN pool),LAN pool是同一个数据中心内部通信的,WAN pool是多个数据中心通信的,LAN pool有多个,WAN pool只有一个。
项目中应用
完整代码:
https://github.com/Justin02180218/micro-kit
配置文件
consul:
addr: "http://consul-server:8500"
interval: "10s"
timeout: "1s"
grpc:
retrymax: 3
retrytimeout: 500
name: "book-rpc-service"
在 /etc/hosts 中配置 consul-server 域名
config.go
type ConsulConfig struct {
Addr string `json:"addr" yaml:"addr"`
Interval string `json:"interval" yaml:"interval"`
Timeout string `json:"timeout" yaml:"timeout"`
Client struct {
RetryMax int `json:"retrymax" yaml:"retrymax"`
RetryTimeout int `json:"retrytimeout" yaml:"retrytimeout"`
}
}
type GRPCConfig struct {
RetryMax int `json:"retrymax" yaml:"retrymax"`
RetryTimeout int `json:"retrytimeout" yaml:"retrytimeout"`
Name string `json:"name" yaml:"name"`
}
pkg/registers
在 pkg 下新建目录 registers,创建 consul.go 文件:
代码如下:
// 链接 consul
func connectConsul(consulAddr string) (client consul.Client) {
consulConfig := api.DefaultConfig()
consulConfig.Address = consulAddr
consulClient, err := api.NewClient(consulConfig)
if err != nil {
panic(err)
}
client = consul.NewClient(consulClient)
return
}
// 向 consul 注册服务
func InitRegister(cfg *configs.AppConfig, check api.AgentServiceCheck, logger log.Logger) (registrar sd.Registrar) {
rand.Seed(time.Now().UnixNano())
name := cfg.ServerConfig.Name
addr := utils.LocalIP()
port := cfg.ServerConfig.Port
consulAddr := cfg.ConsulConfig.Addr
client := connectConsul(consulAddr)
num := rand.Intn(100)
asr := api.AgentServiceRegistration{
ID: name + "-" + strconv.Itoa(num),
Name: name,
Address: addr,
Port: port,
Tags: []string{name},
Check: &check,
}
registrar = consul.NewRegistrar(client, &asr, logger)
return
}
// restful 服务检测
func HttpCheck(port int, interval, timeout string) api.AgentServiceCheck {
return api.AgentServiceCheck{
HTTP: "http://" + utils.LocalIP() + ":" + strconv.Itoa(port) + "/health",
Interval: interval,
Timeout: timeout,
Notes: "Http Health Check",
}
}
// gRPC 服务检测
func GRPCCheck(port int, interval, timeout string) api.AgentServiceCheck {
return api.AgentServiceCheck{
GRPC: utils.LocalIP() + ":" + strconv.Itoa(port) + "/health",
Interval: interval,
Timeout: timeout,
Notes: "GRPC Health Check",
}
}
service层
以 library-user-service 为例,其他微服务相同。
// 增加函数
HealthCheck() bool
// 实现
func (u *UserServiceImpl) HealthCheck() bool {
return true
}
endpoint层
以 library-user-service 为例,其他微服务相同。
// 增加属性
HealthEndpoint endpoint.Endpoint
func MakeHealthEndpoint(svc service.UserService) endpoint.Endpoint {
return func(ctx context.Context, request interface{}) (response interface{}, err error) {
status := svc.HealthCheck()
return status, nil
}
}
transport层
以 library-user-service 为例,其他微服务相同。
r.GET("/health", func(c *gin.Context) {
kithttp.NewServer(
endpoints.HealthEndpoint,
decodeHealthRequest,
utils.EncodeJsonResponse,
).ServeHTTP(c.Writer, c.Request)
})
main.go
以 library-user-service 为例,其他微服务相同。
// 配置日志
var logger log.Logger
{
logger = log.NewLogfmtLogger(os.Stderr)
logger = log.With(logger, "ts", log.DefaultTimestampUTC)
logger = log.With(logger, "caller", log.DefaultCaller)
}
// consul 注册
check := registers.HttpCheck(configs.Conf.ServerConfig.Port, configs.Conf.ConsulConfig.Interval, configs.Conf.ConsulConfig.Timeout)
registrar := registers.InitRegister(configs.Conf, check, logger)
go func() {
registrar.Register()
errChan <- r.Run(fmt.Sprintf(":%s", strconv.Itoa(configs.Conf.ServerConfig.Port)))
}()
registrar.Deregister()
启动 consul
使用 Docker 启动 consul
docker pull progrium/consul
docker run --rm -p 8400:8400 -p 8500:8500 -p 8600:53/udp -h node1 progrium/consul -server -bootstrap -ui-dir /ui
启动服务,然后在浏览器地址栏输入:http://consul-server:8500/
可以看到三个微服务都以注册到 consul 上,健康检测也都已经通过。
调用
以前 user-service 调用 book-rpc-service 的接口,使用的是gPRC Client的方式。现在服务注册的 consul 上,user-service 通过 consul 获取已经注册的 book-rpc-service 的一个实例,然后调用这个实例上的接口。
在 consul.go 文件中增加如下代码:
func GRPCClient(cfg *configs.AppConfig, makeEndpoint func(string) endpoint.Endpoint, logger log.Logger) endpoint.Endpoint {
consulAddr := cfg.ConsulConfig.Addr
retryMax := cfg.GRPCConfig.RetryMax
retryTimeout := cfg.GRPCConfig.RetryTimeout
name := cfg.GRPCConfig.Name
client := connectConsul(consulAddr)
instance := consul.NewInstancer(client, logger, name, []string{name}, true)
factory := factoryFor(makeEndpoint)
endpointer := sd.NewEndpointer(instance, factory, logger)
// 负载均衡采用轮询策略
balancer := lb.NewRoundRobin(endpointer)
retry := lb.Retry(retryMax, time.Millisecond*time.Duration(retryTimeout), balancer)
return retry
}
func factoryFor(makeEndpoint func(string,) endpoint.Endpoint) sd.Factory {
return func(instance string) (endpoint.Endpoint, io.Closer, error) {
endpoint := makeEndpoint(instance)
return endpoint, nil, nil
}
}
在 endpoint 目录下增加文件 book_rpc_endpoint.go,代码如下:
type BookRPCEndpoints struct {
FindBooksEndpoint endpoint.Endpoint
}
func MakeFindBooksEndpoint(instance string) endpoint.Endpoint {
conn, err := grpc.Dial(instance, grpc.WithInsecure())
if err != nil {
fmt.Println(err)
return nil
}
findBooksEndpoint := grpctransport.NewClient(
conn, "book.Book", "FindBooksByUserID",
encodeGRPCFindBooksRequest,
decodeGRPCFindBooksResponse,
pbbook.BooksResponse{},
).Endpoint()
return findBooksEndpoint
}
修改 service.go
type UserServiceImpl struct {
userDao dao.UserDao
grpcClient kitendpoint.Endpoint
}
func NewUserServiceImpl(userDao dao.UserDao, grpcClient kitendpoint.Endpoint) UserService {
return &UserServiceImpl{
userDao: userDao,
grpcClient: grpcClient,
}
}
func (u *UserServiceImpl) FindBooksByUserID(ctx context.Context, id uint64) (interface{}, error) {
res, err := u.grpcClient(ctx, id)
if err != nil {
return nil, err
}
return res, nil
}
修改 main.go
findBooksEndpoint := endpoint.MakeFindBooksEndpoint
grpcClient := registers.GRPCClient(configs.Conf, findBooksEndpoint, tracer, logger)
userService := service.NewUserServiceImpl(userDao, grpcClient)
接口测试同 《五,微服务library-book-grpc-service》
下一篇文章,我们在微服务中加入熔断、降级功能。
完整代码:
https://github.com/Justin02180218/micro-kit
更多【分布式专辑】【架构实战专辑】系列文章,请关注公众号