08、Nacos源码分析-Server处理心跳请求

1.InstanceController#beat()

 

    @CanDistro
    @PutMapping("/beat")
    @Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE)
    public ObjectNode beat(HttpServletRequest request) throws Exception {

        // 创建一个JSON Node,该方法的返回值就是它,后面的代码就是对这个Node进行各种初始化
        ObjectNode result = JacksonUtils.createEmptyJsonNode();
        result.put(SwitchEntry.CLIENT_BEAT_INTERVAL, switchDomain.getClientBeatInterval());

        // 从请求中获取到beat,即client端的beatInfo
        String beat = WebUtils.optional(request, "beat", StringUtils.EMPTY);
        RsInfo clientBeat = null;
        // 将beat构建为clientBeat
        if (StringUtils.isNotBlank(beat)) {

            clientBeat = JacksonUtils.toObj(beat, RsInfo.class);
        }
        String clusterName = WebUtils
                .optional(request, CommonParams.CLUSTER_NAME, UtilsAndCommons.DEFAULT_CLUSTER_NAME);
        String ip = WebUtils.optional(request, "ip", StringUtils.EMPTY);
        // 获取到客户端传递来的client的port,其将来用于UDP通信
        int port = Integer.parseInt(WebUtils.optional(request, "port", "0"));
        if (clientBeat != null) {

            if (StringUtils.isNotBlank(clientBeat.getCluster())) {

                clusterName = clientBeat.getCluster();
            } else {

                // fix #2533
                clientBeat.setCluster(clusterName);
            }
            ip = clientBeat.getIp();
            port = clientBeat.getPort();
        }
        String namespaceId = WebUtils.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);
        String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
        checkServiceNameFormat(serviceName);
        Loggers.SRV_LOG.debug("[CLIENT-BEAT] full arguments: beat: {}, serviceName: {}", clientBeat, serviceName);

        // 从注册表中获取当前发送请求的client对应的instance,Ip port对应
        Instance instance = serviceManager.getInstance(namespaceId, serviceName, clusterName, ip, port);

        // 处理注册表中不存在该client的instance的情况
        if (instance == null) {

            // 若请求中没有携带心跳数据,则直接返回
            if (clientBeat == null) {

                result.put(CommonParams.CODE, NamingResponseCode.RESOURCE_NOT_FOUND);
                return result;
            }

            Loggers.SRV_LOG.warn("[CLIENT-BEAT] The instance has been removed for health mechanism, "
                    + "perform data compensation operations, beat: {}, serviceName: {}", clientBeat, serviceName);

            // 下面处理的情况是,注册表中没有该client的instance,但其发送的请求中具有心跳数据。
            // 在client的注册请求还未到达时(网络抖动等原因),第一次心跳请求先到达了server,会出现这种情况
            // 处理方式是,使用心跳数据构建出一个instance,注册到注册表
            instance = new Instance();
            instance.setPort(clientBeat.getPort());
            instance.setIp(clientBeat.getIp());
            instance.setWeight(clientBeat.getWeight());
            instance.setMetadata(clientBeat.getMetadata());
            instance.setClusterName(clusterName);
            instance.setServiceName(serviceName);
            instance.setInstanceId(instance.getInstanceId());
            instance.setEphemeral(clientBeat.isEphemeral());
            // 注册
            serviceManager.registerInstance(namespaceId, serviceName, instance);
        }

        // 从注册表中获取service
        Service service = serviceManager.getService(namespaceId, serviceName);

        if (service == null) {

            throw new NacosException(NacosException.SERVER_ERROR,
                    "service not found: " + serviceName + "@" + namespaceId);
        }
        // 从请求中获取到beat为null
        if (clientBeat == null) {

            clientBeat = new RsInfo();
            clientBeat.setIp(ip);
            clientBeat.setPort(port);
            clientBeat.setCluster(clusterName);
        }
        // 处理本次心跳
        service.processClientBeat(clientBeat);

        result.put(CommonParams.CODE, NamingResponseCode.OK);
        if (instance.containsMetadata(PreservedMetadataKeys.HEART_BEAT_INTERVAL)) {

            result.put(SwitchEntry.CLIENT_BEAT_INTERVAL, instance.getInstanceHeartBeatInterval());
        }
        result.put(SwitchEntry.LIGHT_BEAT_ENABLED, switchDomain.isLightBeatEnabled());
        return result;
    }

1、 创建一个JSONNode(result),该方法的返回值就是它,后面的代码就是对这个Node进行各种初始化;
2、 从请求中获取到beat,即client端的beatInfo,将beat构建为clientBeat;
3、 获取到客户端传递来的client的port,其将来用于UDP通信;
4、 从注册表中获取当前发送请求的client对应的instance,Ipport对应;

  • 处理注册表中不存在该client的instance的情况, 若请求中没有携带心跳数据,则直接返回
  • 注册表中没有该client的instance,但其发送的请求中具有心跳数据。在client的注册请求还未到达时(网络抖动等原因),第一次心跳请求先到达了server,会出现这种情况, 使用心跳数据构建出一个instance,注册到注册表
    5、 从注册表中获取service,并调用service.processClientBeat方法处理本次心跳;
1.1 serviceManager.registerInstance()

 

注册instance到service中。

 

之前的处理注册请求中分析过此代码。

1、 创建一个空service;
2、 从注册表中获取到service;
3、 instance写入到service,即写入到了注册表;

1.2 serviceManager.getService()

 

从注册表中获取service。

    public Service getService(String namespaceId, String serviceName) {

        if (serviceMap.get(namespaceId) == null) {

            return null;
        }
        return chooseServiceMap(namespaceId).get(serviceName);
    }

    public Map<String, Service> chooseServiceMap(String namespaceId) {

        return serviceMap.get(namespaceId);
    }

serviceMap为Server端的注册表。

1.3 处理本次心跳

&nbsp;

&nbsp;

1、 创建一个处理器,其是一个任务;
2、 开启一个立即执行的任务,即执行clientBeatProcessor任务的run();

&nbsp;

    public void run() {

        Service service = this.service;
        if (Loggers.EVT_LOG.isDebugEnabled()) {

            Loggers.EVT_LOG.debug("[CLIENT-BEAT] processing beat: {}", rsInfo.toString());
        }

        String ip = rsInfo.getIp();
        String clusterName = rsInfo.getCluster();
        int port = rsInfo.getPort();
        Cluster cluster = service.getClusterMap().get(clusterName);
        // 获取当前服务的所有临时实例
        List<Instance> instances = cluster.allIPs(true);

        // 遍历所有这些临时实例,从中查找当前发送心跳的instance
        for (Instance instance : instances) {

            // 只要ip与port与当前心跳的instance的相同,就是了
            if (instance.getIp().equals(ip) && instance.getPort() == port) {

                if (Loggers.EVT_LOG.isDebugEnabled()) {

                    Loggers.EVT_LOG.debug("[CLIENT-BEAT] refresh beat: {}", rsInfo.toString());
                }
                // 修改最后心跳时间戳
                instance.setLastBeat(System.currentTimeMillis());
                // 修改该instance的健康状态
                // 当instance被标记时,即其marked为true时,其是一个持久实例
                if (!instance.isMarked()) {

                    // instance的healthy才是临时实例健康状态的表示
                    // 若当前instance健康状态为false,但本次是其发送的心跳,说明这个instance“起死回生”了,
                    // 我们需要将其health变为true
                    if (!instance.isHealthy()) {

                        instance.setHealthy(true);
                        Loggers.EVT_LOG
                                .info("service: {} {POS} {IP-ENABLED} valid: {}:{}@{}, region: {}, msg: client beat ok",
                                        cluster.getService().getName(), ip, port, cluster.getName(),
                                        UtilsAndCommons.LOCALHOST_SITE);
                        // 发布服务变更事件(其对后续我们要分析的UDP通信非常重要)
                        getPushService().serviceChanged(service);
                    }
                }
            }
        }
    }

1、 获取当前服务的所有临时实例,只有临时实例才会有心跳;
2、 遍历所有这些临时实例,从中查找当前发送心跳的instance,只要ip与port与当前心跳的instance的相同,就是了;
3、 修改最后心跳时间戳,这个为主要目的;
4、 修改该instance的健康状态,如果没有被标记且不健康的实例为临时实例,则修改健康状态为健康的;
5、 发布服务变更事件,对于后面的UDP通信很重要;

&nbsp;