1.Nacos Server与Nacos Client间的UDP通信
- Nacos Server向Nacos Client进行UDP推送
- Nacos Client接收Nacos Server的UDP推送
1.1 Nacos Server向Nacos Client进行UDP推送
之前的文章中Nacos Server处理心跳请求的时候, 最后有一个向其他服务发布服务变更事件。
InstanceController#beat() -> Service#processClientBeat()
创建一个处理器,其是一个任务 (ClientBeatProcessor), 开启一个立即执行的任务, 执行clientBeatProcessor任务的run()
PushService#onApplicationEvent(): 发布服务变更事件给Server
@Override
public void onApplicationEvent(ServiceChangeEvent event) {
Service service = event.getService();
String serviceName = service.getName();
String namespaceId = service.getNamespaceId();
// 启动一个定时操作,异步执行相关内容
Future future = GlobalExecutor.scheduleUdpSender(() -> {
try {
Loggers.PUSH.info(serviceName + " is changed, add it to push queue.");
// 从缓存map中获取当前服务的内层map,内层map中存放着当前服务的所有Nacos Client的
// UDP客户端PushClient
ConcurrentMap<String, PushClient> clients = clientMap
.get(UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName));
if (MapUtils.isEmpty(clients)) {
return;
}
Map<String, Object> cache = new HashMap<>(16);
// 更新最后引用时间
long lastRefTime = System.nanoTime();
// 遍历所有PushClient,向所有该服务的订阅者Nacos Client进行UDP推送
for (PushClient client : clients.values()) {
// 若当前PushClient为僵尸客户端
if (client.zombie()) {
Loggers.PUSH.debug("client is zombie: " + client.toString());
// 将该PushClient干掉
clients.remove(client.toString());
Loggers.PUSH.debug("client is zombie: " + client.toString());
continue;
}
Receiver.AckEntry ackEntry;
Loggers.PUSH.debug("push serviceName: {} to client: {}", serviceName, client.toString());
String key = getPushCacheKey(serviceName, client.getIp(), client.getAgent());
byte[] compressData = null;
Map<String, Object> data = null;
if (switchDomain.getDefaultPushCacheMillis() >= 20000 && cache.containsKey(key)) {
org.javatuples.Pair pair = (org.javatuples.Pair) cache.get(key);
compressData = (byte[]) (pair.getValue0());
data = (Map<String, Object>) pair.getValue1();
Loggers.PUSH.debug("[PUSH-CACHE] cache hit: {}:{}", serviceName, client.getAddrStr());
}
if (compressData != null) {
ackEntry = prepareAckEntry(client, compressData, data, lastRefTime);
} else {
ackEntry = prepareAckEntry(client, prepareHostsData(client), lastRefTime);
if (ackEntry != null) {
cache.put(key, new org.javatuples.Pair<>(ackEntry.origin.getData(), ackEntry.data));
}
}
Loggers.PUSH.info("serviceName: {} changed, schedule push for: {}, agent: {}, key: {}",
client.getServiceName(), client.getAddrStr(), client.getAgent(),
(ackEntry == null ? null : ackEntry.key));
// UDP通信
udpPush(ackEntry);
}
} catch (Exception e) {
Loggers.PUSH.error("[NACOS-PUSH] failed to push serviceName: {} to client, error: {}", serviceName, e);
} finally {
futureMap.remove(UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName));
}
}, 1000, TimeUnit.MILLISECONDS);
futureMap.put(UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName), future);
}
启动一个定时操作,异步执行相关内容:
1、 从缓存map中获取当前服务的内层map,内层map中存放着当前服务的所有NacosClient的UDP客户端PushClient;
2、 遍历所有PushClient,向所有该服务的订阅者NacosClient进行UDP推送udpPush(ackEntry);
;
PushService#udpPush(): udp通信, Nacos Server向Nacos Client进行UDP推送
private static Receiver.AckEntry udpPush(Receiver.AckEntry ackEntry) {
if (ackEntry == null) {
Loggers.PUSH.error("[NACOS-PUSH] ackEntry is null.");
return null;
}
// 若UDP通信重试次数超出了最大阈值,则将该UDP通信从两个缓存map中干掉
if (ackEntry.getRetryTimes() > MAX_RETRY_TIMES) {
Loggers.PUSH.warn("max re-push times reached, retry times {}, key: {}", ackEntry.retryTimes, ackEntry.key);
ackMap.remove(ackEntry.key);
udpSendTimeMap.remove(ackEntry.key);
failedPush += 1; // 失败计数器加一
return ackEntry;
}
try {
// 计数器加一
if (!ackMap.containsKey(ackEntry.key)) {
totalPush++;
}
ackMap.put(ackEntry.key, ackEntry);
udpSendTimeMap.put(ackEntry.key, System.currentTimeMillis());
Loggers.PUSH.info("send udp packet: " + ackEntry.key);
// 发送UDP
udpSocket.send(ackEntry.origin);
ackEntry.increaseRetryTime();
// 开启定时任务,进行UPD通信失败后的重新推送
GlobalExecutor.scheduleRetransmitter(new Retransmitter(ackEntry),
TimeUnit.NANOSECONDS.toMillis(ACK_TIMEOUT_NANOS), TimeUnit.MILLISECONDS);
return ackEntry;
} catch (Exception e) {
Loggers.PUSH.error("[NACOS-PUSH] failed to push data: {} to client: {}, error: {}", ackEntry.data,
ackEntry.origin.getAddress().getHostAddress(), e);
ackMap.remove(ackEntry.key);
udpSendTimeMap.remove(ackEntry.key);
failedPush += 1;
return null;
}
}
若UDP通信重试次数超出了最大阈值,则将该UDP通信从两个缓存map中干掉, ackMap和udpSendTimeMap。
最后通过DatagramSocket.send()方法发送数据并开启定时任务,进行UPD通信失败后的重新推送。
1.2 Nacos Client接收Nacos Server的UDP推送
在之前的分析中, 会在项目启动后注入AbstractAutoServiceRegistration
AbstractAutoServiceRegistration#bind()
PushReceiver#run()
public void run() {
while (true) {
try {
// byte[] is initialized with 0 full filled by default
byte[] buffer = new byte[UDP_MSS];
DatagramPacket packet = new DatagramPacket(buffer, buffer.length);
udpSocket.receive(packet);
String json = new String(IoUtils.tryDecompress(packet.getData()), "UTF-8").trim();
NAMING_LOGGER.info("received push data: " + json + " from " + packet.getAddress().toString());
PushPacket pushPacket = JSON.parseObject(json, PushPacket.class);
String ack;
if ("dom".equals(pushPacket.type) || "service".equals(pushPacket.type)) {
hostReactor.processServiceJSON(pushPacket.data);
// send ack to server
ack = "{\"type\": \"push-ack\""
+ ", \"lastRefTime\":\"" + pushPacket.lastRefTime
+ "\", \"data\":" + "\"\"}";
} else if ("dump".equals(pushPacket.type)) {
// dump data to server
ack = "{\"type\": \"dump-ack\""
+ ", \"lastRefTime\": \"" + pushPacket.lastRefTime
+ "\", \"data\":" + "\""
+ StringUtils.escapeJavaScript(JSON.toJSONString(hostReactor.getServiceInfoMap()))
+ "\"}";
} else {
// do nothing send ack only
ack = "{\"type\": \"unknown-ack\""
+ ", \"lastRefTime\":\"" + pushPacket.lastRefTime
+ "\", \"data\":" + "\"\"}";
}
udpSocket.send(new DatagramPacket(ack.getBytes(Charset.forName("UTF-8")),
ack.getBytes(Charset.forName("UTF-8")).length, packet.getSocketAddress()));
} catch (Exception e) {
NAMING_LOGGER.error("[NA] error while receiving push data", e);
}
}
}
最后也是通过DatagramSocket.send()方法发送的数据, 表示Nacos Client接收Nacos Server的UDP推送成功