10、Nacos源码分析-Server与 Client间的UDP通信

1.Nacos Server与Nacos Client间的UDP通信

  • Nacos Server向Nacos Client进行UDP推送
  • Nacos Client接收Nacos Server的UDP推送
1.1 Nacos Server向Nacos Client进行UDP推送

 

之前的文章中Nacos Server处理心跳请求的时候, 最后有一个向其他服务发布服务变更事件。

InstanceController#beat() -> Service#processClientBeat()

 

 

创建一个处理器,其是一个任务 (ClientBeatProcessor), 开启一个立即执行的任务, 执行clientBeatProcessor任务的run()

 

 

PushService#onApplicationEvent(): 发布服务变更事件给Server

    @Override
    public void onApplicationEvent(ServiceChangeEvent event) {

        Service service = event.getService();
        String serviceName = service.getName();
        String namespaceId = service.getNamespaceId();

        // 启动一个定时操作,异步执行相关内容
        Future future = GlobalExecutor.scheduleUdpSender(() -> {

            try {

                Loggers.PUSH.info(serviceName + " is changed, add it to push queue.");
                // 从缓存map中获取当前服务的内层map,内层map中存放着当前服务的所有Nacos Client的
                // UDP客户端PushClient
                ConcurrentMap<String, PushClient> clients = clientMap
                        .get(UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName));
                if (MapUtils.isEmpty(clients)) {

                    return;
                }

                Map<String, Object> cache = new HashMap<>(16);
                // 更新最后引用时间
                long lastRefTime = System.nanoTime();
                // 遍历所有PushClient,向所有该服务的订阅者Nacos Client进行UDP推送
                for (PushClient client : clients.values()) {

                    // 若当前PushClient为僵尸客户端
                    if (client.zombie()) {

                        Loggers.PUSH.debug("client is zombie: " + client.toString());
                        // 将该PushClient干掉
                        clients.remove(client.toString());
                        Loggers.PUSH.debug("client is zombie: " + client.toString());
                        continue;
                    }

                    Receiver.AckEntry ackEntry;
                    Loggers.PUSH.debug("push serviceName: {} to client: {}", serviceName, client.toString());
                    String key = getPushCacheKey(serviceName, client.getIp(), client.getAgent());
                    byte[] compressData = null;
                    Map<String, Object> data = null;
                    if (switchDomain.getDefaultPushCacheMillis() >= 20000 && cache.containsKey(key)) {

                        org.javatuples.Pair pair = (org.javatuples.Pair) cache.get(key);
                        compressData = (byte[]) (pair.getValue0());
                        data = (Map<String, Object>) pair.getValue1();

                        Loggers.PUSH.debug("[PUSH-CACHE] cache hit: {}:{}", serviceName, client.getAddrStr());
                    }

                    if (compressData != null) {

                        ackEntry = prepareAckEntry(client, compressData, data, lastRefTime);
                    } else {

                        ackEntry = prepareAckEntry(client, prepareHostsData(client), lastRefTime);
                        if (ackEntry != null) {

                            cache.put(key, new org.javatuples.Pair<>(ackEntry.origin.getData(), ackEntry.data));
                        }
                    }

                    Loggers.PUSH.info("serviceName: {} changed, schedule push for: {}, agent: {}, key: {}",
                            client.getServiceName(), client.getAddrStr(), client.getAgent(),
                            (ackEntry == null ? null : ackEntry.key));

                    // UDP通信
                    udpPush(ackEntry);
                }
            } catch (Exception e) {

                Loggers.PUSH.error("[NACOS-PUSH] failed to push serviceName: {} to client, error: {}", serviceName, e);

            } finally {

                futureMap.remove(UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName));
            }

        }, 1000, TimeUnit.MILLISECONDS);

        futureMap.put(UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName), future);

    }

启动一个定时操作,异步执行相关内容:

1、 从缓存map中获取当前服务的内层map,内层map中存放着当前服务的所有NacosClient的UDP客户端PushClient;
2、 遍历所有PushClient,向所有该服务的订阅者NacosClient进行UDP推送udpPush(ackEntry);

PushService#udpPush(): udp通信, Nacos Server向Nacos Client进行UDP推送

    private static Receiver.AckEntry udpPush(Receiver.AckEntry ackEntry) {

        if (ackEntry == null) {

            Loggers.PUSH.error("[NACOS-PUSH] ackEntry is null.");
            return null;
        }

        // 若UDP通信重试次数超出了最大阈值,则将该UDP通信从两个缓存map中干掉
        if (ackEntry.getRetryTimes() > MAX_RETRY_TIMES) {

            Loggers.PUSH.warn("max re-push times reached, retry times {}, key: {}", ackEntry.retryTimes, ackEntry.key);
            ackMap.remove(ackEntry.key);
            udpSendTimeMap.remove(ackEntry.key);
            failedPush += 1;  // 失败计数器加一
            return ackEntry;
        }

        try {

            // 计数器加一
            if (!ackMap.containsKey(ackEntry.key)) {

                totalPush++;
            }
            ackMap.put(ackEntry.key, ackEntry);
            udpSendTimeMap.put(ackEntry.key, System.currentTimeMillis());

            Loggers.PUSH.info("send udp packet: " + ackEntry.key);
            // 发送UDP
            udpSocket.send(ackEntry.origin);

            ackEntry.increaseRetryTime();

            // 开启定时任务,进行UPD通信失败后的重新推送
            GlobalExecutor.scheduleRetransmitter(new Retransmitter(ackEntry),
                    TimeUnit.NANOSECONDS.toMillis(ACK_TIMEOUT_NANOS), TimeUnit.MILLISECONDS);

            return ackEntry;
        } catch (Exception e) {

            Loggers.PUSH.error("[NACOS-PUSH] failed to push data: {} to client: {}, error: {}", ackEntry.data,
                    ackEntry.origin.getAddress().getHostAddress(), e);
            ackMap.remove(ackEntry.key);
            udpSendTimeMap.remove(ackEntry.key);
            failedPush += 1;

            return null;
        }
    }

若UDP通信重试次数超出了最大阈值,则将该UDP通信从两个缓存map中干掉, ackMap和udpSendTimeMap。
最后通过DatagramSocket.send()方法发送数据并开启定时任务,进行UPD通信失败后的重新推送。

&nbsp;

1.2 Nacos Client接收Nacos Server的UDP推送

&nbsp;
&nbsp;

在之前的分析中, 会在项目启动后注入AbstractAutoServiceRegistration

AbstractAutoServiceRegistration#bind()

&nbsp;

&nbsp;

&nbsp;

&nbsp;

&nbsp;

&nbsp;

&nbsp;

&nbsp;

&nbsp;

PushReceiver#run()

    public void run() {

        while (true) {

            try {

                // byte[] is initialized with 0 full filled by default
                byte[] buffer = new byte[UDP_MSS];
                DatagramPacket packet = new DatagramPacket(buffer, buffer.length);

                udpSocket.receive(packet);

                String json = new String(IoUtils.tryDecompress(packet.getData()), "UTF-8").trim();
                NAMING_LOGGER.info("received push data: " + json + " from " + packet.getAddress().toString());

                PushPacket pushPacket = JSON.parseObject(json, PushPacket.class);
                String ack;
                if ("dom".equals(pushPacket.type) || "service".equals(pushPacket.type)) {

                    hostReactor.processServiceJSON(pushPacket.data);

                    // send ack to server
                    ack = "{\"type\": \"push-ack\""
                        + ", \"lastRefTime\":\"" + pushPacket.lastRefTime
                        + "\", \"data\":" + "\"\"}";
                } else if ("dump".equals(pushPacket.type)) {

                    // dump data to server
                    ack = "{\"type\": \"dump-ack\""
                        + ", \"lastRefTime\": \"" + pushPacket.lastRefTime
                        + "\", \"data\":" + "\""
                        + StringUtils.escapeJavaScript(JSON.toJSONString(hostReactor.getServiceInfoMap()))
                        + "\"}";
                } else {

                    // do nothing send ack only
                    ack = "{\"type\": \"unknown-ack\""
                        + ", \"lastRefTime\":\"" + pushPacket.lastRefTime
                        + "\", \"data\":" + "\"\"}";
                }

                udpSocket.send(new DatagramPacket(ack.getBytes(Charset.forName("UTF-8")),
                    ack.getBytes(Charset.forName("UTF-8")).length, packet.getSocketAddress()));
            } catch (Exception e) {

                NAMING_LOGGER.error("[NA] error while receiving push data", e);
            }
        }
    }

&nbsp;

最后也是通过DatagramSocket.send()方法发送的数据, 表示Nacos Client接收Nacos Server的UDP推送成功