时间:2022-08-05 11:43:52 | 栏目:JAVA代码 | 点击:次
前面聊到到了我们的dubbo服务从redis迁移到nacos注册中心,迁移后发现,会时不时的抛一个异常 ERROR com.alibaba.nacos.client.naming - [CLIENT-BEAT] failed to send beat:, 所以有了这个剖析过程,当然最后查明异常是我们的SLB网络映射问题,和nacos没有关系。
在dubbo的registry包下,针对服务注册行为定义了四个接口,所有的服务注册(zookeeper、nacos、redis、etcd等)支持都是这些接口的实现
org.apache.dubbo.registry.nacos.NacosRegistry:152
@Override
public void doRegister(URL url) {
final String serviceName = getServiceName(url);
final Instance instance = createInstance(url);
execute(namingService -> namingService.registerInstance(serviceName, instance));
}
dubbo中,所以的服务都被封装成了URL,对应nacos中的服务实例Instance,所以服务注册时,只需要简单的将URL转换成Instance就可以注册到nacos中,下面看看namingService中的具体注册行为。
com.alibaba.nacos.client.naming.NacosNamingService:283
@Override
public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException {
if (instance.isEphemeral()) {
BeatInfo beatInfo = new BeatInfo();
beatInfo.setServiceName(NamingUtils.getGroupedName(serviceName, groupName));
beatInfo.setIp(instance.getIp());
beatInfo.setPort(instance.getPort());
beatInfo.setCluster(instance.getClusterName());
beatInfo.setWeight(instance.getWeight());
beatInfo.setMetadata(instance.getMetadata());
beatInfo.setScheduled(false);
beatReactor.addBeatInfo(NamingUtils.getGroupedName(serviceName, groupName), beatInfo);
}
serverProxy.registerService(NamingUtils.getGroupedName(serviceName, groupName), groupName, instance);
}
如上代码,除了注册实例外,还判断了instance实例是否是临时实例,如果是临时实例,则加入了beatReactor的心跳列表。这是因为,nacos将服务分成了两类,一类是临时性的服务, 像dubbo、spring cloud这种,需要通过心跳来保活,如果心跳没有及时发送,服务端会自动下线这个instance。一类是永久性服务,如数据库、缓存服务等, 客户端不会也没法发送心跳,这类服务就由服务端通过TCP端口检测等方式反向探活。下面看看临时实例的心跳是怎么发送的。
com.alibaba.nacos.client.naming.NacosNamingService:104
private int initClientBeatThreadCount(Properties properties) {
if (properties == null) {
return UtilAndComs.DEFAULT_CLIENT_BEAT_THREAD_COUNT;
}
return NumberUtils.toInt(properties.getProperty(PropertyKeyConst.NAMING_CLIENT_BEAT_THREAD_COUNT),
UtilAndComs.DEFAULT_CLIENT_BEAT_THREAD_COUNT);
}
//可通过配置dubbo.registries.nacos.parameters.namingClientBeatThreadCount = 10设置维护心跳的线程数
先看一段获取心跳beatReactor线程池线程数量的初始化代码,传入的Properties是配置dubbo注册中心时的参数列表,如果配置了namingClientBeatThreadCount,则取配置的值, 默认维护心跳的线程池大小为:如果是单核的,就是一个线程,多核的就CPU核心数一半的线程。继续心跳逻辑
com.alibaba.nacos.client.naming.beat.BeatReactor:78
class BeatProcessor implements Runnable {
@Override
public void run() {
try {
for (Map.Entry entry : dom2Beat.entrySet()) {
BeatInfo beatInfo = entry.getValue();
if (beatInfo.isScheduled()) {
continue;
}
beatInfo.setScheduled(true);
executorService.schedule(new BeatTask(beatInfo), 0, TimeUnit.MILLISECONDS);
}
} catch (Exception e) {
NAMING_LOGGER.error("[CLIENT-BEAT] Exception while scheduling beat.", e);
} finally {
executorService.schedule(this, clientBeatInterval, TimeUnit.MILLISECONDS);
}
}
}
class BeatTask implements Runnable {
BeatInfo beatInfo;
public BeatTask(BeatInfo beatInfo) {
this.beatInfo = beatInfo;
}
@Override
public void run() {
long result = serverProxy.sendBeat(beatInfo);
beatInfo.setScheduled(false);
if (result > 0) {
clientBeatInterval = result;
}
}
}
dom2Beat是一个存放需要心跳上报的临时实例的map容器,NacosNamingService.registerInstance中通过判断临时节点添加到心跳列表的逻辑, 最终添加到了这个map里。BeatReactor初始化后会触发BeatProcessor线程的调用,BeatProcessor线程是一个不断自我触发调用的线程,前一次 心跳上报逻辑执行完后,间隔5S触发下一次心跳上报。间隔时间由变量clientBeatInterval控制,受nacos服务端返回的心跳结果值的影响 心跳间隔可能会改变,nacos服务端从instance的元数据中寻找key为preserved.heart.beat.interval的值返回,如果为空则返回5S。 这个功能在dubbo2.7.4.1的版本里还不成熟,只能通过注解元素指定,如@Reference(parameters = "preserved.heart.beat.interval,10000"), 后面如果能够直接在注册中心的url参数配置就算成熟了,所以这个功能暂时不推荐使用,可以作为实验功能试试。
org.apache.dubbo.registry.nacos.NacosRegistry:399
private void subscribeEventListener(String serviceName, final URL url, final NotifyListener listener)
throws NacosException {
if (!nacosListeners.containsKey(serviceName)) {
EventListener eventListener = event -> {
if (event instanceof NamingEvent) {
NamingEvent e = (NamingEvent) event;
notifySubscriber(url, listener, e.getInstances());
}
};
namingService.subscribe(serviceName, eventListener);
nacosListeners.put(serviceName, eventListener);
}
}
nacos的服务监听是EventListener,所以dubbo的服务订阅只需要将NotifyListener的处理包装进onEvent中处理即可, 通过namingService.subscribe添加nacos的订阅。最终EventListener对象会被添加到事件调度器的监听器列表中,见如下代码:
com.alibaba.nacos.client.naming.core.EventDispatcher:
public class EventDispatcher {
private ExecutorService executor = null;
private BlockingQueuechangedServices = new LinkedBlockingQueue();
private ConcurrentMap observerMap = new ConcurrentHashMap();
public EventDispatcher() {
executor = Executors.newSingleThreadExecutor(new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r, "com.alibaba.nacos.naming.client.listener");
thread.setDaemon(true);
return thread;
}
});
executor.execute(new Notifier());
}
public void addListener(ServiceInfo serviceInfo, String clusters, EventListener listener) {
NAMING_LOGGER.info("[LISTENER] adding " + serviceInfo.getName() + " with " + clusters + " to listener map");
Listobservers = Collections.synchronizedList(new ArrayList());
observers.add(listener);
observers = observerMap.putIfAbsent(ServiceInfo.getKey(serviceInfo.getName(), clusters), observers);
if (observers != null) {
observers.add(listener);
}
serviceChanged(serviceInfo);
}
public void removeListener(String serviceName, String clusters, EventListener listener) {
NAMING_LOGGER.info("[LISTENER] removing " + serviceName + " with " + clusters + " from listener map");
Listobservers = observerMap.get(ServiceInfo.getKey(serviceName, clusters));
if (observers != null) {
Iteratoriter = observers.iterator();
while (iter.hasNext()) {
EventListener oldListener = iter.next();
if (oldListener.equals(listener)) {
iter.remove();
}
}
if (observers.isEmpty()) {
observerMap.remove(ServiceInfo.getKey(serviceName, clusters));
}
}
}
public ListgetSubscribeServices() {
ListserviceInfos = new ArrayList();
for (String key : observerMap.keySet()) {
serviceInfos.add(ServiceInfo.fromKey(key));
}
return serviceInfos;
}
public void serviceChanged(ServiceInfo serviceInfo) {
if (serviceInfo == null) {
return;
}
changedServices.add(serviceInfo);
}
private class Notifier implements Runnable {
@Override
public void run() {
while (true) {
ServiceInfo serviceInfo = null;
try {
serviceInfo = changedServices.poll(5, TimeUnit.MINUTES);
} catch (Exception ignore) {
}
if (serviceInfo == null) {
continue;
}
try {
Listlisteners = observerMap.get(serviceInfo.getKey());
if (!CollectionUtils.isEmpty(listeners)) {
for (EventListener listener : listeners) {
Listhosts = Collections.unmodifiableList(serviceInfo.getHosts());
listener.onEvent(new NamingEvent(serviceInfo.getName(), hosts));
}
}
} catch (Exception e) {
NAMING_LOGGER.error("[NA] notify error for service: "
+ serviceInfo.getName() + ", clusters: " + serviceInfo.getClusters(), e);
}
}
}
}
public void setExecutor(ExecutorService executor) {
ExecutorService oldExecutor = this.executor;
this.executor = executor;
oldExecutor.shutdown();
}
}
EventDispatcher中维护了一个监听器列表observerMap,同时维护了一个事件变更的阻塞队列changedServices,监听调度器初始化后,会触发一个线程消费阻塞队列的 数据,当注册服务发生变化时,将变更数据入队,就能唤醒线程更新dubbo内存中的服务列表了。上面已经聊到,nacos client会以1s的频次拉取注册的实例,当拉取到的实例和本地内存的 有出入时,就会触发入队操作,如:
com.alibaba.nacos.client.naming.core.HostReactor:296
public class UpdateTask implements Runnable {
long lastRefTime = Long.MAX_VALUE;
private String clusters;
private String serviceName;
public UpdateTask(String serviceName, String clusters) {
this.serviceName = serviceName;
this.clusters = clusters;
}
@Override
public void run() {
try {
ServiceInfo serviceObj = serviceInfoMap.get(ServiceInfo.getKey(serviceName, clusters));
if (serviceObj == null) {
updateServiceNow(serviceName, clusters);
executor.schedule(this, DEFAULT_DELAY, TimeUnit.MILLISECONDS);
return;
}
if (serviceObj.getLastRefTime() <= lastRefTime) {
updateServiceNow(serviceName, clusters);
serviceObj = serviceInfoMap.get(ServiceInfo.getKey(serviceName, clusters));
} else {
// if serviceName already updated by push, we should not override it
// since the push data may be different from pull through force push
refreshOnly(serviceName, clusters);
}
executor.schedule(this, serviceObj.getCacheMillis(), TimeUnit.MILLISECONDS);
lastRefTime = serviceObj.getLastRefTime();
} catch (Throwable e) {
NAMING_LOGGER.warn("[NA] failed to update serviceName: " + serviceName, e);
}
}
}
DEFAULT_DELAY值为1s,同时,nacos也会主动的推送数据变更事件,当遇到nacos主动推送时,serviceInfoMap中的serviceObj会被更新,那么下次 nacos client拉取的时间间隔会被设置成10S之后,具体的和本地列表比对的逻辑都在updateServiceNow方法内,这里就不展开讲述了。
dubbo注册服务到nacos以及订阅服务是一个比较复杂的过程,在剖析的过程中,带着疑问去看源码会有事半功倍的效果,比如博主在看源码前, 首先是为了寻找nacos的心跳异常,然后对nacos如何实现事件监听比较好奇。然后层层剖析渐进明朗恍然大悟。当然在剖析dubbo注册服务到nacos时,也需要了解 nacos服务端的处理逻辑,nacos服务端非常核心的两个类ClientBeatCheckTask、ClientBeatProcessor,包含了心跳处理、健康检测和事件推送的逻辑, 有兴趣可以看看