Nacos注册中心——IpPortBasedClient.init

王守钰 2021-10-12 18:10:32

IpPortBasedClient

public class IpPortBasedClient extends AbstractClient {

    public void init() {
        if (ephemeral) {
            beatCheckTask = new ClientBeatCheckTaskV2(this);
            HealthCheckReactor.scheduleCheck(beatCheckTask);
        } else {
            healthCheckTaskV2 = new HealthCheckTaskV2(this);
            HealthCheckReactor.scheduleCheck(healthCheckTaskV2);
        }
    }
}

image
image
因为是debug模式所以init方法来创建出ClientBeatCheckTaskV2任务,HealthCheckReactorscheduleCheck方法来进行执行任务。

public class HealthCheckReactor {

    private static Map<String, ScheduledFuture> futureMap = new ConcurrentHashMap<>();
    

    public static void scheduleCheck(BeatCheckTask task) {
        Runnable wrapperTask =
                task instanceof NacosHealthCheckTask ? new HealthCheckTaskInterceptWrapper((NacosHealthCheckTask) task)
                        : task;
        futureMap.computeIfAbsent(task.taskKey(),
                k -> GlobalExecutor.scheduleNamingHealth(wrapperTask, 5000, 5000, TimeUnit.MILLISECONDS));
    }
}

ClientBeatCheckTaskV2实现了NacosHealthCheckTask接口。也就相当于创建出了HealthCheckTaskInterceptWrapper作为wrapperTask,创建出一个5秒后执行周期为5秒的健康监测任务。

public class HealthCheckTaskInterceptWrapper implements Runnable {
    
    private final NacosHealthCheckTask task;
    
    private final NacosNamingInterceptorChain<NacosHealthCheckTask> interceptorChain;
    
    public HealthCheckTaskInterceptWrapper(NacosHealthCheckTask task) {
        this.task = task;
        this.interceptorChain = HealthCheckInterceptorChain.getInstance();
    }
    
    @Override
    public void run() {
        try {
            interceptorChain.doInterceptor(task);
        } catch (Exception e) {
            Loggers.SRV_LOG.info("Interceptor health check task {} failed", task.getTaskId(), e);
        }
    }
}

 public void doInterceptor(T object) {
    for (NacosNamingInterceptor<T> each : interceptors) {
        if (!each.isInterceptType(object.getClass())) {
            continue;
        }
        if (each.intercept(object)) {
            object.afterIntercept();
            return;
        }
    }
    object.passIntercept();
}

这里面的object的类型从上面缕下来可以也就是ClientBeatCheckTaskV2。接下来进入到ClientBeatCheckTaskV2passIntercept方法中。

ClientBeatCheckTaskV2.passIntercept

public class ClientBeatCheckTaskV2 extends AbstractExecuteTask implements BeatCheckTask, NacosHealthCheckTask {

    private final IpPortBasedClient client;
    
    private final String taskId;
    
    private final InstanceBeatCheckTaskInterceptorChain interceptorChain;

    public ClientBeatCheckTaskV2(IpPortBasedClient client) {
        this.client = client;
        this.taskId = client.getResponsibleId();
        this.interceptorChain = InstanceBeatCheckTaskInterceptorChain.getInstance();
    }
    
    public void passIntercept() {
        doHealthCheck();
    }
    public void doHealthCheck() {
        try {
            Collection<Service> services = client.getAllPublishedService();
            for (Service each : services) {
                HealthCheckInstancePublishInfo instance = (HealthCheckInstancePublishInfo) client
                        .getInstancePublishInfo(each);
                interceptorChain.doInterceptor(new InstanceBeatCheckTask(client, each, instance));
            }
        } catch (Exception e) {
            Loggers.SRV_LOG.warn("Exception while processing client beat time out.", e);
        }
    }
}

ClientBeatCheckTaskV2进行实例化的时候,通过InstanceBeatCheckTaskInterceptorChain创建出了InstanceBeatCheckTaskInterceptorChaindoHealthCheck的方法通过interceptorChain调用了doInterceptor,这时候传入的objectInstanceBeatCheckTask

InstanceBeatCheckTaskInterceptorChain

public class InstanceBeatCheckTaskInterceptorChain extends AbstractNamingInterceptorChain<InstanceBeatCheckTask> {
    
    private static final InstanceBeatCheckTaskInterceptorChain INSTANCE = new InstanceBeatCheckTaskInterceptorChain();
    
    private InstanceBeatCheckTaskInterceptorChain() {
        super(AbstractBeatCheckInterceptor.class);
    }
    
    public static InstanceBeatCheckTaskInterceptorChain getInstance() {
        return INSTANCE;
    }
}

InstanceBeatCheckTaskInterceptorChain实例为单例对象,在实例化的时候调用了父类AbstractNamingInterceptorChain方法。

public abstract class AbstractNamingInterceptorChain<T extends Interceptable>
    implements NacosNamingInterceptorChain<T> {
    private final List<NacosNamingInterceptor<T>> interceptors;
    
    protected AbstractNamingInterceptorChain(Class<? extends NacosNamingInterceptor<T>> clazz) {
        this.interceptors = new LinkedList<>();
        interceptors.addAll(NacosServiceLoader.load(clazz));
        interceptors.sort(Comparator.comparingInt(NacosNamingInterceptor::order));
    }
    
    @Override
    public void addInterceptor(NacosNamingInterceptor<T> interceptor) {
        interceptors.add(interceptor);
        interceptors.sort(Comparator.comparingInt(NacosNamingInterceptor::order));
    }
    
    @Override
    public void doInterceptor(T object) {
        for (NacosNamingInterceptor<T> each : interceptors) {
            if (!each.isInterceptType(object.getClass())) {
                continue;
            }
            if (each.intercept(object)) {
                object.afterIntercept();
                return;
            }
        }
        object.passIntercept();
    }
}

AbstractNamingInterceptorChain的构造方法中,通过SPI的方式查找resources/META-INF/services下的com.alibaba.nacos.naming.healthcheck.heartbeat.AbstractBeatCheckInterceptor文件,将AbstractBeatCheckInterceptor加载。

com.alibaba.nacos.naming.healthcheck.heartbeat.ServiceEnableBeatCheckInterceptor
com.alibaba.nacos.naming.healthcheck.heartbeat.InstanceEnableBeatCheckInterceptor
com.alibaba.nacos.naming.healthcheck.heartbeat.InstanceBeatCheckResponsibleInterceptor

image

InstanceBeatCheckTask.passIntercept

public class InstanceBeatCheckTask implements Interceptable {
    
    rivate static final List<InstanceBeatChecker> CHECKERS = new LinkedList<>();
    
    static {
        CHECKERS.add(new UnhealthyInstanceChecker());
        CHECKERS.add(new ExpiredInstanceChecker());
        CHECKERS.addAll(NacosServiceLoader.load(InstanceBeatChecker.class));
    }
    public void passIntercept() {
        for (InstanceBeatChecker each : CHECKERS) {
            each.doCheck(client, service, instancePublishInfo);
        }
    }
}

InstanceBeatCheckTask类中的CHECKERSUnhealthyInstanceChecker,ExpiredInstanceChecker。当调用passIntercept方法时,先进入到UnhealthyInstanceCheckerdoCheck方法中。

UnhealthyInstanceChecker

public class UnhealthyInstanceChecker implements InstanceBeatChecker {

    public void doCheck(Client client, Service service, HealthCheckInstancePublishInfo instance) {
        if (instance.isHealthy() && isUnhealthy(service, instance)) {
            changeHealthyStatus(client, service, instance);
        }
    }
    
    private boolean isUnhealthy(Service service, HealthCheckInstancePublishInfo instance) {
        long beatTimeout = getTimeout(service, instance);
        return System.currentTimeMillis() - instance.getLastHeartBeatTime() > beatTimeout;
    }
    
    private long getTimeout(Service service, InstancePublishInfo instance) {
        Optional<Object> timeout = getTimeoutFromMetadata(service, instance);
        if (!timeout.isPresent()) {
            // 从metadata中获取preserved.heart.beat.timeout属性。
            timeout = Optional.ofNullable(instance.getExtendDatum().get(PreservedMetadataKeys.HEART_BEAT_TIMEOUT));
        }
        // 默认超时时间为15秒
        return timeout.map(ConvertUtils::toLong).orElse(Constants.DEFAULT_HEART_BEAT_TIMEOUT);
    }
    
    private Optional<Object> getTimeoutFromMetadata(Service service, InstancePublishInfo instance) {
        Optional<InstanceMetadata> instanceMetadata = ApplicationUtils.getBean(NamingMetadataManager.class)
                .getInstanceMetadata(service, instance.getMetadataId());
        return instanceMetadata.map(metadata -> metadata.getExtendData().get(PreservedMetadataKeys.HEART_BEAT_TIMEOUT));
    }
    
    private void changeHealthyStatus(Client client, Service service, HealthCheckInstancePublishInfo instance) {
        instance.setHealthy(false);
        Loggers.EVT_LOG
                .info("{POS} {IP-DISABLED} valid: {}:{}@{}@{}, region: {}, msg: client last beat: {}", instance.getIp(),
                        instance.getPort(), instance.getCluster(), service.getName(), UtilsAndCommons.LOCALHOST_SITE,
                        instance.getLastHeartBeatTime());
        NotifyCenter.publishEvent(new ServiceEvent.ServiceChangedEvent(service));
        NotifyCenter.publishEvent(new ClientEvent.ClientChangedEvent(client));
    }
}

这里面判断了服务的健康状态,也就是说如果15秒服务没有更新,就认为服务是非健康状态,会调用changeHealthyStatus方法发送ServiceEvent.ServiceChangedEventClientEvent.ClientChangedEvent事件。

ExpiredInstanceChecker

public class ExpiredInstanceChecker implements InstanceBeatChecker {

    @Override
    public void doCheck(Client client, Service service, HealthCheckInstancePublishInfo instance) {
        boolean expireInstance = ApplicationUtils.getBean(GlobalConfig.class).isExpireInstance();
        if (expireInstance && isExpireInstance(service, instance)) {
            deleteIp(client, service, instance);
        }
    }
    
    private boolean isExpireInstance(Service service, HealthCheckInstancePublishInfo instance) {
        long deleteTimeout = getTimeout(service, instance);
        return System.currentTimeMillis() - instance.getLastHeartBeatTime() > deleteTimeout;
    }
    
    private long getTimeout(Service service, InstancePublishInfo instance) {
        Optional<Object> timeout = getTimeoutFromMetadata(service, instance);
        if (!timeout.isPresent()) {
            // 从metadata中读取reserved.ip.delete.timeout
            timeout = Optional.ofNullable(instance.getExtendDatum().get(PreservedMetadataKeys.IP_DELETE_TIMEOUT));
        }
        // 默认超时时间30秒
        return timeout.map(ConvertUtils::toLong).orElse(Constants.DEFAULT_IP_DELETE_TIMEOUT);
    }
    
    private Optional<Object> getTimeoutFromMetadata(Service service, InstancePublishInfo instance) {
        Optional<InstanceMetadata> instanceMetadata = ApplicationUtils.getBean(NamingMetadataManager.class)
                .getInstanceMetadata(service, instance.getMetadataId());
        // 读取metadata中的preserved.ip.delete.timeout
        return instanceMetadata.map(metadata -> metadata.getExtendData().get(PreservedMetadataKeys.IP_DELETE_TIMEOUT));
    }
    
    private void deleteIp(Client client, Service service, InstancePublishInfo instance) {
        Loggers.SRV_LOG.info("[AUTO-DELETE-IP] service: {}, ip: {}", service.toString(), JacksonUtils.toJson(instance));
        client.removeServiceInstance(service);
        NotifyCenter.publishEvent(new ClientOperationEvent.ClientDeregisterServiceEvent(service, client.getClientId()));
        NotifyCenter.publishEvent(new MetadataEvent.InstanceMetadataEvent(service, instance.getMetadataId(), true));
    }
}

当服务30秒没有心跳的时候,系统会调用deleteIp方法。deleteIp方法中调用了removeServiceInstance方法,然后发送ClientOperationEvent.ClientDeregisterServiceEventMetadataEvent.InstanceMetadataEvent事件。

AbstractClient.removeServiceInstance

public abstract class AbstractClient implements Client {

     protected final ConcurrentHashMap<Service, InstancePublishInfo> publishers = new ConcurrentHashMap<>(16, 0.75f, 1);

    public InstancePublishInfo removeServiceInstance(Service service) {
        InstancePublishInfo result = publishers.remove(service);
        if (null != result) {
            MetricsMonitor.decrementInstanceCount();
            NotifyCenter.publishEvent(new ClientEvent.ClientChangedEvent(this));
        }
        Loggers.SRV_LOG.info("Client remove for service {}, {}", service, getClientId());
        return result;
    }
}

这里也就是从publishers中移除service,最终发布ClientEvent.ClientChangedEvent事件。

ClientOperationEvent.ClientDeregisterServiceEvent

ClientServiceIndexesManager中的onEvent方法进行处理了ClientOperationEvent相关的事件。

public class ClientServiceIndexesManager extends SmartSubscriber {
    
    private final ConcurrentMap<Service, Set<String>> publisherIndexes = new ConcurrentHashMap<>();
    
    private final ConcurrentMap<Service, Set<String>> subscriberIndexes = new ConcurrentHashMap<>();
    
    public ClientServiceIndexesManager() {
        NotifyCenter.registerSubscriber(this, NamingEventPublisherFactory.getInstance());
    }

    public void onEvent(Event event) {
        if (event instanceof ClientEvent.ClientDisconnectEvent) {
            handleClientDisconnect((ClientEvent.ClientDisconnectEvent) event);
        } else if (event instanceof ClientOperationEvent) {
            handleClientOperation((ClientOperationEvent) event);
        }
    }
        
    private void handleClientOperation(ClientOperationEvent event) {
        Service service = event.getService();
        String clientId = event.getClientId();
        if (event instanceof CliremovePublisherIndexesentOperationEvent.ClientRegisterServiceEvent) {
            addPublisherIndexes(service, clientId);
        } else if (event instanceof ClientOperationEvent.ClientDeregisterServiceEvent) {
            removePublisherIndexes(service, clientId);
        } else if (event instanceof ClientOperationEvent.ClientSubscribeServiceEvent) {
            addSubscriberIndexes(service, clientId);
        } else if (event instanceof ClientOperationEvent.ClientUnsubscribeServiceEvent) {
            removeSubscriberIndexes(service, clientId);
        }
    }
    
    private void removePublisherIndexes(Service service, String clientId) {
        if (!publisherIndexes.containsKey(service)) {
            return;
        }
        publisherIndexes.get(service).remove(clientId);
        NotifyCenter.publishEvent(new ServiceEvent.ServiceChangedEvent(service, true));
    }
}

当收到了ClientOperationEvent.ClientDeregisterServiceEvent事件后,会调用removePublisherIndexes方法,publisherIndexesclientId移除。再发送ServiceEvent.ServiceChangedEvent事件。