当前位置:首页 » 《随便一记》 » 正文

《一步一步看源码:Nacos》框架源码系列之一(其二,服务注册源码)

12 人参与  2022年07月21日 15:40  分类 : 《随便一记》  评论

点击全文阅读


老规矩,先找到nacos源码中的demo:nacos-example,找到nacos-example模块下的App这个案例:

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-KAgQFiKW-1657502573566)(nacos源码学习/image-20220614103949873.png)]

从实例注册registerInstance进入:传入服务名称、ip、端口号、实例的集群名称。
public class App {    public static void main(String[] args) throws NacosException {        Properties properties = new Properties();        properties.setProperty("serverAddr", "21.34.53.5:8848,21.34.53.6:8848");        properties.setProperty("namespace", "quickStart");        NamingService naming = NamingFactory.createNamingService(properties);        naming.registerInstance("nacos.test.3", "11.11.11.11", 8888, "TEST1");        naming.registerInstance("nacos.test.3", "2.2.2.2", 9999, "DEFAULT");        System.out.println(naming.getAllInstances("nacos.test.3"));    }}
从该接口进入,一路找到具体的实现方法:

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-YL5qn5Wy-1657502573568)(nacos源码学习/image-20220614111720932.png)]

创造实例,传入ip,端口号,实例的权重和集群名称,使用服务名,组名,实例,注册。
    public void registerInstance(String serviceName, String groupName, String ip, int port, String clusterName)            throws NacosException {        Instance instance = new Instance();        instance.setIp(ip);        instance.setPort(port);        instance.setWeight(1.0);        instance.setClusterName(clusterName);        registerInstance(serviceName, groupName, instance);    }
​ 进入registerInstance,发现使用clientProxy注册服务,查看接口:
    @Override    public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException {        NamingUtils.checkInstanceIsLegal(instance);        clientProxy.registerService(serviceName, groupName, instance);    }
进入registerService方法:包含注册、注销、更新、查询、订阅、取消订阅
public interface NamingClientProxy extends Closeable {        /**使用指定的实例属性注册实例以提供服务。     * Register a instance to service with specified instance properties.     *     * @param serviceName name of service     * @param groupName   group of service     * @param instance    instance to register     * @throws NacosException nacos exception     */    void registerService(String serviceName, String groupName, Instance instance) throws NacosException;        /**从服务中注销实例。     * Deregister instance from a service.     *     * @param serviceName name of service     * @param groupName   group name     * @param instance    instance     * @throws NacosException nacos exception     */    void deregisterService(String serviceName, String groupName, Instance instance) throws NacosException;        /**将实例更新为服务。     * Update instance to service.     *     * @param serviceName service name     * @param groupName   group name     * @param instance    instance     * @throws NacosException nacos exception     */    void updateInstance(String serviceName, String groupName, Instance instance) throws NacosException;        /**     * 查询实例列表。     *     * @param serviceName service name     * @param groupName   group name     * @param clusters    clusters     * @param udpPort     udp port     * @param healthyOnly healthy only     * @return service info     * @throws NacosException nacos exception     */    ServiceInfo queryInstancesOfService(String serviceName, String groupName, String clusters, int udpPort, boolean healthyOnly)            throws NacosException;        /**     * 查询服务。     *     * @param serviceName service name     * @param groupName   group name     * @return service     * @throws NacosException nacos exception     */    Service queryService(String serviceName, String groupName) throws NacosException;        /**     * 创建服务。     *     * @param service  service     * @param selector selector     * @throws NacosException nacos exception     */    void createService(Service service, AbstractSelector selector) throws NacosException;        /**     * 删除服务。     *     * @param serviceName service name     * @param groupName   group name     * @return true if delete ok     * @throws NacosException nacos exception     */    boolean deleteService(String serviceName, String groupName) throws NacosException;        /**     * 更新服务。     *     * @param service  service     * @param selector selector     * @throws NacosException nacos exception     */    void updateService(Service service, AbstractSelector selector) throws NacosException;        /**获取服务列表。     * Get service list.     *     * @param pageNo    page number     * @param pageSize  size per page     * @param groupName group name of service     * @param selector  selector     * @return list of service     * @throws NacosException nacos exception     */    ListView<String> getServiceList(int pageNo, int pageSize, String groupName, AbstractSelector selector)            throws NacosException;        /**订阅服务。     * Subscribe service.     *     * @param serviceName service name     * @param groupName   group name     * @param clusters    clusters, current only support subscribe all clusters, maybe deprecated     * @return current service info of subscribe service     * @throws NacosException nacos exception     */    ServiceInfo subscribe(String serviceName, String groupName, String clusters) throws NacosException;        /**退订服务。     * Unsubscribe service.     *     * @param serviceName service name     * @param groupName   group name     * @param clusters    clusters, current only support subscribe all clusters, maybe deprecated     * @throws NacosException nacos exception     */    void unsubscribe(String serviceName, String groupName, String clusters) throws NacosException;        /**判断服务是否已订阅     * Judge whether service has been subscribed.     *     * @param serviceName service name     * @param groupName   group name     * @param clusters    clusters, current only support subscribe all clusters, maybe deprecated     * @return {@code true} if subscribed, otherwise {@code false}     * @throws NacosException nacos exception     */    boolean isSubscribed(String serviceName, String groupName, String clusters) throws NacosException;        /**更新节拍信息。     * Update beat info.     *     * @param modifiedInstances modified instances     */    void updateBeatInfo(Set<Instance> modifiedInstances);        /**检查服务器是否健康。     * Check Server healthy.     *     * @return true if server is healthy     */    boolean serverHealthy();}

到这里,就是所有服务注册可使用的API了,这里先不管,专注服务注册来看。

服务注册

​ 服务注册

    public void registerService(String serviceName, String groupName, Instance instance) throws NacosException {        //输出命名空间id和服务名,以及实体对象        NAMING_LOGGER.info("[REGISTER-SERVICE] {} registering service {} with instance: {}", namespaceId, serviceName,instance);        //判断服务名、组名是否为空,为空则异常,不为空则组名+@@+服务名。使用intern方法,返回常量池中的        String groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName);        //判断服务是否为短暂        if (instance.isEphemeral()) {            BeatInfo beatInfo = beatReactor.buildBeatInfo(groupedServiceName, instance);            beatReactor.addBeatInfo(groupedServiceName, beatInfo);        }        //如果不是短暂服务,则把各种属性填入map中        final Map<String, String> params = new HashMap<String, String>(32);        //命名空间id        params.put(CommonParams.NAMESPACE_ID, namespaceId);        //服务的组名        params.put(CommonParams.SERVICE_NAME, groupedServiceName);        //组名        params.put(CommonParams.GROUP_NAME, groupName);        //集群名        params.put(CommonParams.CLUSTER_NAME, instance.getClusterName());        //从实体获取ip        params.put(IP_PARAM, instance.getIp());        //获取端口号        params.put(PORT_PARAM, String.valueOf(instance.getPort()));        //获取权重        params.put(WEIGHT_PARAM, String.valueOf(instance.getWeight()));        //是否启用        params.put("enable", String.valueOf(instance.isEnabled()));        //健康状况        params.put(HEALTHY_PARAM, String.valueOf(instance.isHealthy()));        //实例是否是短暂的,到这里,正常情况下应该不是短暂的了        params.put(EPHEMERAL_PARAM, String.valueOf(instance.isEphemeral()));        //元数据,或者叫用户扩展数据        params.put(META_PARAM, JacksonUtils.toJson(instance.getMetadata()));        // /nacos/v1/ns/instance,将配置塞入,放入请求类型        reqApi(UtilAndComs.nacosUrlInstance, params, HttpMethod.POST);    }   
进入reqApi方法继续查看
    public String reqApi(String api, Map<String, String> params, String method) throws NacosException {        //传入 /nacos/v1/ns/instance,空map,方法        return reqApi(api, params, Collections.EMPTY_MAP, method);    }
在进入下一层reqApi查看
public String reqApi(String api, Map<String, String> params, Map<String, String> body, String method)        throws NacosException {    //传入 /nacos/v1/ns/instance,空map,服务列表,方法名    return reqApi(api, params, body, serverListManager.getServerList(), method);}
最终,reqApi调用方法
//获取服务失败,醋粗    public String reqApi(String api, Map<String, String> params, Map<String, String> body, List<String> servers,String method) throws NacosException {        //存入命名空间id        params.put(CommonParams.NAMESPACE_ID, getNamespaceId());  //如果服务列表为空并且服务管理器域名为空        if (CollectionUtils.isEmpty(servers) && !serverListManager.isDomain()) {            throw new NacosException(NacosException.INVALID_PARAM, "no server available");        }        //创建nacos异常        NacosException exception = new NacosException();                if (serverListManager.isDomain()) {//如果服务列表管理器为空            //从服务管理器中获取nacos域名            String nacosDomain = serverListManager.getNacosDomain();            for (int i = 0; i < maxRetry; i++) {                try {                 //(重点!)传入/nacos/v1/ns/instance,配置(命名空间、集群名等),nacosDomain是http或https                 //空map,http请求方式(post或get等)                    return callServer(api, params, body, nacosDomain, method);                } catch (NacosException e) {                    //防止nacos找不到连接异常                    exception = e;                    if (NAMING_LOGGER.isDebugEnabled()) {                        NAMING_LOGGER.debug("request {} failed.", nacosDomain, e);                    }                }            }        } else {            //如果服务列表管理器不为空,从服务列表中生成一个随机服务            Random random = new Random(System.currentTimeMillis());            int index = random.nextInt(servers.size());    //遍历服务,获取随机服务            for (int i = 0; i < servers.size(); i++) {               //server里存的是调用网址https或者http,可带端口,也可不带(甚至可以只有地址)                String server = servers.get(index);                try {               //调用callServer(重点)                    return callServer(api, params, body, server, method);                } catch (NacosException e) {                    exception = e;                    if (NAMING_LOGGER.isDebugEnabled()) {                        NAMING_LOGGER.debug("request {} failed.", server, e);                    }                }                //取下一个服务                index = (index + 1) % servers.size();            }        }        //获取服务失败,服务名,错误代码,错误日志        NAMING_LOGGER.error("request: {} failed, servers: {}, code: {}, msg: {}", api, servers, exception.getErrCode(), exception.getErrMsg());        throw new NacosException(exception.getErrCode(),"failed to req API:" + api + " after all servers(" + servers + ") tried: " + exception.getMessage());    }
说一下整个服务注册里最重要的方法callServer
    public String callServer(String api, Map<String, String> params, Map<String, String> body, String curServer,String method) throws NacosException {        //获取当前时间        long start = System.currentTimeMillis();        long end = 0;        //获取nacos安全令牌,用于访问nacos(accessToken)        params.putAll(getSecurityHeaders());        //存入服务名serviceName        params.putAll(getSpasHeaders(params.get(SERVICE_NAME_PARAM)));        //构建请求头:包含服务端版本号,用户代理,请求资源,请求提类型、请求体长度、编码类型等        Header header = NamingHttpUtil.builderHeader();                String url;        //这里就开始进行拼串了        //如果是https:或http开头        if (curServer.startsWith(UtilAndComs.HTTPS) || curServer.startsWith(UtilAndComs.HTTP)) {            //http://127.0.0.1:8848/nacos/ + api            url = curServer + api;        } else {            if (!InternetAddressUtil.containsPort(curServer)) {//如果连端口都没包含                // 127.0.0.1 + : +8848                curServer = curServer + InternetAddressUtil.IP_PORT_SPLITER + serverPort;            }            //http:// + 127.0.0.1:8848 + /nacos/api            url = NamingHttpClientManager.getInstance().getPrefix() + curServer + api;        }                try {            //发送请求,返回请求结果(重点)            HttpRestResult<String> restResult = nacosRestTemplate                    .exchangeForm(url, header, Query.newInstance().initParams(params), body, method, String.class);            //响应的时间            end = System.currentTimeMillis();            //指标监视器,传入方法,传入使用时间            MetricsMonitor.getNamingRequestMonitor(method, url, String.valueOf(restResult.getCode()))                    .observe(end - start);            //回传的代码为200            if (restResult.ok()) {                return restResult.getData();            }            //如果状态码为304,则返回空            if (HttpStatus.SC_NOT_MODIFIED == restResult.getCode()) {                return StringUtils.EMPTY;            }            //抛出异常            throw new NacosException(restResult.getCode(), restResult.getMessage());        } catch (Exception e) {            NAMING_LOGGER.error("[NA] failed to request", e);            throw new NacosException(NacosException.SERVER_ERROR, e);        }    }
上面已经把nacos的服务注册全部流程写完了,但,可以再讲细点,进入nacosRestTemplate的exchangeForm方法查看
    public <T> HttpRestResult<T> exchangeForm(String url, Header header, Query query, Map<String, String> bodyValues,String httpMethod, Type responseType) throws Exception {       //封装hhtp请求头,将url,请求头参数,参数寻参数,请求体,http类型等放入        RequestHttpEntity requestHttpEntity = new RequestHttpEntity(                header.setContentType(MediaType.APPLICATION_FORM_URLENCODED), query, bodyValues);        //传入url,请求方法。请求体,请求类型,最后发送给nacos获取请求结果        return execute(url, httpMethod, requestHttpEntity, responseType);    }

至此,nacos服务注册源码已经全部整理完毕,下一次来讲一讲sofa,或者把nacos的其他方法讲讲。


点击全文阅读


本文链接:http://zhangshiyu.com/post/43639.html

<< 上一篇 下一篇 >>

  • 评论(0)
  • 赞助本站

◎欢迎参与讨论,请在这里发表您的看法、交流您的观点。

关于我们 | 我要投稿 | 免责申明

Copyright © 2020-2022 ZhangShiYu.com Rights Reserved.豫ICP备2022013469号-1