nacos1.4 源代码 - 服务发现、心跳机制
nacos的服务发现主要采用服务端主动推送+客户端定时拉取;心跳机制通过每5s向服务端发送心跳任务来保活,当超过15s服务端未接收到心跳任务时,将该实例设置为非健康状态;当超过30s时,删除该实例。
1.服务发现
nacos主要采用服务端主动推送+客户端定时拉取来保证AP架构的高可用性。
NacosNamingService:服务发现的接口(客户端!)
每个nacos实例有本地缓存Map,存放所有的实例
调用查询服务实例列表
/nacos`/v1/ns/instance/list
NacosNamingService.getAllInstances() -> getServiceInfo()
核心方法是getServiceInfo,代码如下:
public ServiceInfo getServiceInfo(final String serviceName, final String clusters) {
NAMING_LOGGER.debug("failover-mode: " + failoverReactor.isFailoverSwitch());
String key = ServiceInfo.getKey(serviceName, clusters);
if (failoverReactor.isFailoverSwitch()) {
return failoverReactor.getService(key);
}
ServiceInfo serviceObj = getServiceInfo0(serviceName, clusters);// 从本地拿实例
if (null == serviceObj) {// 如果拿到为空,则更新服务列表
serviceObj = new ServiceInfo(serviceName, clusters);
serviceInfoMap.put(serviceObj.getKey(), serviceObj);
updatingMap.put(serviceName, new Object());
updateServiceNow(serviceName, clusters);
updatingMap.remove(serviceName);
} else if (updatingMap.containsKey(serviceName)) {
if (UPDATE_HOLD_INTERVAL > 0) {
// hold a moment waiting for update finish
synchronized (serviceObj) {
try {
serviceObj.wait(UPDATE_HOLD_INTERVAL);
} catch (InterruptedException e) {
NAMING_LOGGER
.error("[getServiceInfo] serviceName:" + serviceName + ", clusters:" + clusters, e);
}
}
}
}
scheduleUpdateIfAbsent(serviceName, clusters);// 定时拉取
return serviceInfoMap.get(serviceObj.getKey());// 从map拿
}
本地从服务端拉取服务实例,放到本地缓存中
public String queryList(String serviceName, String clusters, int udpPort, boolean healthyOnly)
throws NacosException {
final Map<String, String> params = new HashMap<String, String>(8);
params.put(CommonParams.NAMESPACE_ID, namespaceId);
params.put(CommonParams.SERVICE_NAME, serviceName);
params.put("clusters", clusters);
params.put("udpPort", String.valueOf(udpPort));
params.put("clientIP", NetUtils.localIP());
params.put("healthyOnly", String.valueOf(healthyOnly));
return reqApi(UtilAndComs.nacosUrlBase + "/instance/list", params, HttpMethod.GET);// 调用接口
}
客户端通过定时任务-->定时拉取服务端的实例,更新本地缓存(在发起服务调用的时候,才会执行定时任务!)
public void scheduleUpdateIfAbsent(String serviceName, String clusters) {
if (futureMap.get(ServiceInfo.getKey(serviceName, clusters)) != null) {
return;
}
synchronized (futureMap) {
if (futureMap.get(ServiceInfo.getKey(serviceName, clusters)) != null) {
return;
}
ScheduledFuture<?> future = addTask(new UpdateTask(serviceName, clusters));
futureMap.put(ServiceInfo.getKey(serviceName, clusters), future);
}
}
2.心跳机制
NamingService->register
注册实例的时候,客户端会有一个心跳任务,定时5s向服务端发送心跳!
executorService.schedule(new BeatTask(beatInfo), beatInfo.getPeriod(), TimeUnit.MILLISECONDS);
BeatTask就是心跳任务
调用Instance/beat接口,发送实例心跳检查
BeatTask#run的代码如下:
@Override
public void run() {
if (beatInfo.isStopped()) {
return;
}
long nextTime = beatInfo.getPeriod();
try {
JsonNode result = serverProxy.sendBeat(beatInfo, BeatReactor.this.lightBeatEnabled);
long interval = result.get("clientBeatInterval").asLong();
boolean lightBeatEnabled = false;
if (result.has(CommonParams.LIGHT_BEAT_ENABLED)) {
lightBeatEnabled = result.get(CommonParams.LIGHT_BEAT_ENABLED).asBoolean();
}
BeatReactor.this.lightBeatEnabled = lightBeatEnabled;
if (interval > 0) {
nextTime = interval;
}
int code = NamingResponseCode.OK;
if (result.has(CommonParams.CODE)) {
code = result.get(CommonParams.CODE).asInt();
}
if (code == NamingResponseCode.RESOURCE_NOT_FOUND) {
Instance instance = new Instance();
instance.setPort(beatInfo.getPort());
instance.setIp(beatInfo.getIp());
instance.setWeight(beatInfo.getWeight());
instance.setMetadata(beatInfo.getMetadata());
instance.setClusterName(beatInfo.getCluster());
instance.setServiceName(beatInfo.getServiceName());
instance.setInstanceId(instance.getInstanceId());
instance.setEphemeral(true);
try {
serverProxy.registerService(beatInfo.getServiceName(),
NamingUtils.getGroupName(beatInfo.getServiceName()), instance);
} catch (Exception ignore) {
}
}
} catch (NacosException ex) {
NAMING_LOGGER.warn("[CLIENT-BEAT] failed to send beat: {}, code: {}, msg: {}",
JacksonUtils.toJson(beatInfo), ex.getErrCode(), ex.getErrMsg());
} catch (Exception unknownEx) {
NAMING_LOGGER.error("[CLIENT-BEAT] failed to send beat: {}, unknown exception msg: {}",
JacksonUtils.toJson(beatInfo), unknownEx.getMessage(), unknownEx);
} finally {
executorService.schedule(new BeatTask(beatInfo), nextTime, TimeUnit.MILLISECONDS);// 定时任务
}
}
服务端会更新心跳的时间!每隔5秒检查一下实例健康状态
对于集群而言,在一台服务端执行定时任务即可!不需要再所有节点都执行定时任务
当实例下线后,服务端通过InstanceController,检查健康状态(健康任务检查)。代码入口:
createEmptyService->createServiceIfAbsent->putServiceAndInit->init->scheduleCheck(定时任务)
if (System.currentTimeMillis() - instance.getLastBeat() > instance.getInstanceHeartBeatTimeOut()) {// 心跳时间判断
如果超过15秒,设置实例为非健康(更新健康状态)
if (System.currentTimeMillis() - instance.getLastBeat() > instance.getIpDeleteTimeout()) {// 如果已经过去了30s,直接删掉这个机器(删除实例)
asyncHttpRequest(url, headers, paramValues, callback, HttpMethod.DELETE);
核心代码如下:
@Override
public void run() {
try {
if (!getDistroMapper().responsible(service.getName())) {
return;
}
if (!getSwitchDomain().isHealthCheckEnabled()) {
return;
}
List<Instance> instances = service.allIPs(true);// 获取服务端的所有实例
// first set health status of instances:
for (Instance instance : instances) {
if (System.currentTimeMillis() - instance.getLastBeat() > instance.getInstanceHeartBeatTimeOut()) {// 心跳时间判断
if (!instance.isMarked()) {
if (instance.isHealthy()) {
instance.setHealthy(false);// 如果超过15s,健康设置为false
Loggers.EVT_LOG
.info("{POS} {IP-DISABLED} valid: {}:{}@{}@{}, region: {}, msg: client timeout after {}, last beat: {}",
instance.getIp(), instance.getPort(), instance.getClusterName(),
service.getName(), UtilsAndCommons.LOCALHOST_SITE,
instance.getInstanceHeartBeatTimeOut(), instance.getLastBeat());
getPushService().serviceChanged(service);
ApplicationUtils.publishEvent(new InstanceHeartbeatTimeoutEvent(this, instance));
}
}
}
}
if (!getGlobalConfig().isExpireInstance()) {
return;
}
// then remove obsolete instances:
for (Instance instance : instances) {
if (instance.isMarked()) {
continue;
}
if (System.currentTimeMillis() - instance.getLastBeat() > instance.getIpDeleteTimeout()) {
// delete instance
Loggers.SRV_LOG.info("[AUTO-DELETE-IP] service: {}, ip: {}", service.getName(),
JacksonUtils.toJson(instance));
deleteIp(instance);
}
}
} catch (Exception e) {
Loggers.SRV_LOG.warn("Exception while processing client beat time out.", e);
}
}
3.服务事件变动
客户端每隔5秒从服务端拉取一次信息,是不是有延迟!
对于AP架构而言,有一点延迟 无所谓。Ribbon会进行重试
当服务端的服务列表变化时,服务端会主动推送给客户端信息
核心代码入口:notifier#run->handle->onChange
onChange更新注册表的相关代码!
getPushService().serviceChanged(this);// 发布 服务变化 的事件,通知客户端
public void updateIPs(Collection<Instance> instances, boolean ephemeral) {
Map<String, List<Instance>> ipMap = new HashMap<>(clusterMap.size());
for (String clusterName : clusterMap.keySet()) {
ipMap.put(clusterName, new ArrayList<>());
}
for (Instance instance : instances) {
try {
if (instance == null) {
Loggers.SRV_LOG.error("[NACOS-DOM] received malformed ip: null");
continue;
}
if (StringUtils.isEmpty(instance.getClusterName())) {
instance.setClusterName(UtilsAndCommons.DEFAULT_CLUSTER_NAME);
}
if (!clusterMap.containsKey(instance.getClusterName())) {
Loggers.SRV_LOG
.warn("cluster: {} not found, ip: {}, will create new cluster with default configuration.",
instance.getClusterName(), instance.toJson());
Cluster cluster = new Cluster(instance.getClusterName(), this);
cluster.init();
getClusterMap().put(instance.getClusterName(), cluster);
}
List<Instance> clusterIPs = ipMap.get(instance.getClusterName());
if (clusterIPs == null) {
clusterIPs = new LinkedList<>();
ipMap.put(instance.getClusterName(), clusterIPs);
}
clusterIPs.add(instance);
} catch (Exception e) {
Loggers.SRV_LOG.error("[NACOS-DOM] failed to process ip: " + instance, e);
}
}
for (Map.Entry<String, List<Instance>> entry : ipMap.entrySet()) {
//make every ip mine
List<Instance> entryIPs = entry.getValue();
clusterMap.get(entry.getKey()).updateIps(entryIPs, ephemeral);// 核心注册逻辑
}
setLastModifiedMillis(System.currentTimeMillis());
getPushService().serviceChanged(this);// 发布 服务变化 的事件
StringBuilder stringBuilder = new StringBuilder();
for (Instance instance : allIPs()) {
stringBuilder.append(instance.toIpAddr()).append("_").append(instance.isHealthy()).append(",");
}
Loggers.EVT_LOG.info("[IP-UPDATED] namespace: {}, service: {}, ips: {}", getNamespaceId(), getName(),
stringBuilder.toString());
}
(通过全文搜索事件,找到这个方法)PushService#onApplicationEvent,它来处理事件。代码如下:
udpPush(ackEntry);// 服务端给客户端发送udp信息,通知客户端实例变动
@Override
public void onApplicationEvent(ServiceChangeEvent event) {
Service service = event.getService();
String serviceName = service.getName();
String namespaceId = service.getNamespaceId();
Future future = GlobalExecutor.scheduleUdpSender(() -> {
try {
Loggers.PUSH.info(serviceName + " is changed, add it to push queue.");
ConcurrentMap<String, PushClient> clients = clientMap
.get(UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName));
if (MapUtils.isEmpty(clients)) {
return;
}
Map<String, Object> cache = new HashMap<>(16);
long lastRefTime = System.nanoTime();
for (PushClient client : clients.values()) {
if (client.zombie()) {
Loggers.PUSH.debug("client is zombie: " + client.toString());
clients.remove(client.toString());
Loggers.PUSH.debug("client is zombie: " + client.toString());
continue;
}
Receiver.AckEntry ackEntry;
Loggers.PUSH.debug("push serviceName: {} to client: {}", serviceName, client.toString());
String key = getPushCacheKey(serviceName, client.getIp(), client.getAgent());
byte[] compressData = null;
Map<String, Object> data = null;
if (switchDomain.getDefaultPushCacheMillis() >= 20000 && cache.containsKey(key)) {
org.javatuples.Pair pair = (org.javatuples.Pair) cache.get(key);
compressData = (byte[]) (pair.getValue0());
data = (Map<String, Object>) pair.getValue1();
Loggers.PUSH.debug("[PUSH-CACHE] cache hit: {}:{}", serviceName, client.getAddrStr());
}
if (compressData != null) {
ackEntry = prepareAckEntry(client, compressData, data, lastRefTime);
} else {
ackEntry = prepareAckEntry(client, prepareHostsData(client), lastRefTime);
if (ackEntry != null) {
cache.put(key, new org.javatuples.Pair<>(ackEntry.origin.getData(), ackEntry.data));
}
}
Loggers.PUSH.info("serviceName: {} changed, schedule push for: {}, agent: {}, key: {}",
client.getServiceName(), client.getAddrStr(), client.getAgent(),
(ackEntry == null ? null : ackEntry.key));
udpPush(ackEntry);// 给客户端发送udp信息
}
} catch (Exception e) {
Loggers.PUSH.error("[NACOS-PUSH] failed to push serviceName: {} to client, error: {}", serviceName, e);
} finally {
futureMap.remove(UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName));
}
}, 1000, TimeUnit.MILLISECONDS);
futureMap.put(UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName), future);
}
客户端接收udp报文的代码如下:
public ServiceInfo getServiceInfo(final String serviceName, final String clusters) {
NAMING_LOGGER.debug("failover-mode: " + failoverReactor.isFailoverSwitch());
String key = ServiceInfo.getKey(serviceName, clusters);
if (failoverReactor.isFailoverSwitch()) {
return failoverReactor.getService(key);
}
ServiceInfo serviceObj = getServiceInfo0(serviceName, clusters);// 从本地拿实例
if (null == serviceObj) {// 如果拿到为空,则更新服务列表
serviceObj = new ServiceInfo(serviceName, clusters);
serviceInfoMap.put(serviceObj.getKey(), serviceObj);
updatingMap.put(serviceName, new Object());
updateServiceNow(serviceName, clusters);// upd的端口
上一篇: [代码随机化第 30 天] 贪婪算法第 04 部分
下一篇: C++ 算法学习 - 1.3 拓扑排序
推荐阅读
-
nacos1.4 源代码 - 服务发现、心跳机制
-
服务端和客户端心跳包机制
-
【2022新手指南】Java编程进阶之路 - 六、技术架构篇 ### MySQL索引底层解析与优化实战 - 你会讲解MySQL索引的数据结构吗?性能调优技巧知多少? - Redis深度揭秘:你知道多少?从基础到哨兵、主从复制全梳理 - Redis持久化及哨兵模式详解,还有集群搭建和Leader选举黑箱打开 - Zookeeper是个啥?特性和应用场景大公开 - ZooKeeper集群搭建攻略及 Leader选举、读写一致性、共享锁实现细节 - 探究ZooKeeper中的Leader选举机制及其在分布式环境中的作用 - Zab协议深入剖析:原理、功能与在Zookeeper中的核心地位 - RabbitMQ全方位解读:工作模式、消费限流、可靠投递与配置策略 - 设计者视角:RabbitMQ过期时间、死信队列与延时队列实践指南 - RocketMQ特性和应用场景揭示:理解其精髓与差异化优势 - Kafka详细介绍:特性及广泛应用于实时数据处理的场景解析 - ElasticSearch实力揭秘:特性概述与作为搜索引擎的广泛应用 - MongoDB认知升级:非关系型数据库的优势阐述,安装与使用实战教学 - BIO/NIO/AIO网络模型对比:掌握它们的区别与在网络编程中的实际应用 - Netty带你飞:理解其超快速度背后的秘密,包括线程模型分析 - 网络通信黑科技:Netty编解码原理与常用编解码器的应用,Protostuff实战演示 - 解密Netty粘包与拆包现象,怎样有效应对这一常见问题 - 自定义Netty心跳检测机制,轻松调整检测间隔时间的艺术 - Dubbo轻骑兵介绍:核心特性概览,服务降级实战与其实现益处 - Dubbo三大神器解读:本地存根与本地伪装的实战运用与优势呈现 ----------------------- 七、结语与回顾
-
StompJS 文档摘要:如何创建 stomp 客户端、如何连接服务器、心跳机制、如何发送消息、如何订阅和取消订阅、事务、如何调试
-
玩转Java底层:JMX详解 - jconsole与自定义MBean监控工具的实际应用与区别" 在日常JVM调优中,我们熟知的jconsole工具通过JMX包装的bean以图形化形式展示管理数据,而像jstat和jmap这类内建监控工具则由JVM直接支持。本文将以jconsole为例,深入讲解其实质——基于JMX的MBean功能,包括可视化界面上的bean属性查看和操作调用。 MBeans在jconsole中的体现是那些可观察的组件属性和方法,如上图所示,通过名为"Verbose"的属性能看到其值为false,同时还能直接操作该bean的方法,例如"closeJerryMBean"。 尽管jconsole给我们提供了直观的可视化界面,但请注意,这里的MBean并非固定不变,开发者可根据JMX提供的接口将自己的自定义bean展示到jconsole。以下步骤展示了如何创建并注册一个名为"StudyJavaMBean"的自定义MBean: 1. 首先定义接口`StudyJavaMBean`,接口需遵循MBean规范,即后缀为"MBean"且包含getter方法代表属性,如`getApplicationName`,和无返回值的setter方法代表操作,如`closeJerryMBean`。 ```java public interface StudyJavaMBean { String getApplicationName(); void closeJerryMBean(); } ``` 2. 编写接口的实现类`StudyJavaMBeanImpl`,实现接口中的方法: ```java public class StudyJavaMBeanImpl implements StudyJavaMBean { @Override public String getApplicationName() { return "每天学Java"; } @Override public void closeJerryMBean() { System.out.println("关闭Jerry应用"); } } ``` 3. 在代码中注册自定义MBean,涉及的关键步骤包括: - 获取平台MBeanServer - 定义ObjectName,指定唯一的MBean标识符 - 注册MBean到服务器 - 启动RMI连接器服务,以便jconsole能够访问 ```java public void registerMBean() throws Exception { // ... 具体实现省略 ... } ``` 实际运行注册后的MBean,您将在jconsole中发现并查看自定义bean的属性和调用相关方法。然而,这种方式相较于传统的属性/日志查看和HTTP接口,实用性相对有限,可能存在潜在的安全风险。但不可否认的是,JMX及其MBean机制对于获取操作系统信息、内存状态等关键性能指标仍然具有重要价值。例如: 1. **获取操作系统信息**:通过JMX MBean,可以直接获取到诸如CPU使用率、操作系统版本等系统级信息,这对于资源管理和优化工作具有显著帮助。
-
Spring Boot与VUE的WebSocket服务端数据推送和心跳机制详解