Nacos注册中心的执行流程

王守钰 2021-09-22 18:09:43

NacosDiscoveryAutoConfiguration

@Configuration
@EnableConfigurationProperties
@ConditionalOnNacosDiscoveryEnabled
@ConditionalOnProperty(value = "spring.cloud.service-registry.auto-registration.enabled", matchIfMissing = true)
@AutoConfigureAfter({ AutoServiceRegistrationConfiguration.class,
		AutoServiceRegistrationAutoConfiguration.class })
public class NacosDiscoveryAutoConfiguration {

	@Bean
	public NacosServiceRegistry nacosServiceRegistry(
			NacosDiscoveryProperties nacosDiscoveryProperties) {
		return new NacosServiceRegistry(nacosDiscoveryProperties);
	}

	@Bean
	@ConditionalOnBean(AutoServiceRegistrationProperties.class)
	public NacosRegistration nacosRegistration(
			NacosDiscoveryProperties nacosDiscoveryProperties,
			ApplicationContext context) {
		return new NacosRegistration(nacosDiscoveryProperties, context);
	}

	@Bean
	@ConditionalOnBean(AutoServiceRegistrationProperties.class)
	public NacosAutoServiceRegistration nacosAutoServiceRegistration(
			NacosServiceRegistry registry,
			AutoServiceRegistrationProperties autoServiceRegistrationProperties,
			NacosRegistration registration) {
		return new NacosAutoServiceRegistration(registry,
				autoServiceRegistrationProperties, registration);
	}
}

在nacos的注册发现自动配置类中,总共实例化了三个Bean,NacosServiceRegistryNacosRegistrationNacosAutoServiceRegistration

NacosAutoServiceRegistration

NacosAutoServiceRegistration继承了AbstractAutoServiceRegistration类。在AbstractAutoServiceRegistration类实现了ApplicationListener<WebServerInitializedEvent>监听了WebServerInitializedEvent事件,也就是说当web容器初始话的时候就会执行onApplicationEvent方法。
image

public void onApplicationEvent(WebServerInitializedEvent event) {
	bind(event);
}
public void bind(WebServerInitializedEvent event) {
	ApplicationContext context = event.getApplicationContext();
	if (context instanceof ConfigurableWebServerApplicationContext) {
		if ("management".equals(((ConfigurableWebServerApplicationContext) context)
				.getServerNamespace())) {
			return;
		}
	}
	this.port.compareAndSet(0, event.getWebServer().getPort());
	this.start();
}

onApplicationEvent方法调用了bind方法,在bind方法后最终调用了satrt方法。

public void start() {
	if (!isEnabled()) {
		if (logger.isDebugEnabled()) {
			logger.debug("Discovery Lifecycle disabled. Not starting");
		}
		return;
	}

	// only initialize if nonSecurePort is greater than 0 and it isn't already running
	// because of containerPortInitializer below
	if (!this.running.get()) {
		this.context.publishEvent(
				new InstancePreRegisteredEvent(this, getRegistration()));
		register();
		if (shouldRegisterManagement()) {
			registerManagement();
		}
		this.context.publishEvent(
				new InstanceRegisteredEvent<>(this, getConfiguration()));
		this.running.compareAndSet(false, true);
	}

}

start方法最终调用了register方法。在NacosAutoServiceRegistration子类中重写了register方法。

@Override
protected void register() {
	if (!this.registration.getNacosDiscoveryProperties().isRegisterEnabled()) {
		log.debug("Registration disabled.");
		return;
	}
	if (this.registration.getPort() < 0) {
		this.registration.setPort(getPort().get());
	}
	super.register();
}

最终还是去调用了父类AbstractAutoServiceRegistration中的register方法。

protected void register() {
	this.serviceRegistry.register(getRegistration());
}

这里面的serviceRegistry既是上文NacosDiscoveryAutoConfiguration配置中的NacosServiceRegistry

@Override
public void register(Registration registration) {

	if (StringUtils.isEmpty(registration.getServiceId())) {
		log.warn("No service to register for nacos client...");
		return;
	}

	String serviceId = registration.getServiceId();

	Instance instance = getNacosInstanceFromRegistration(registration);

	try {
		namingService.registerInstance(serviceId, instance);
		log.info("nacos registry, {} {}:{} register finished", serviceId,
				instance.getIp(), instance.getPort());
	}
	catch (Exception e) {
		log.error("nacos registry, {} register failed...{},", serviceId,
				registration.toString(), e);
	}
}

最终register方法调用了namingServiceregisterInstance方法。

@Override
public void registerInstance(String serviceName, Instance instance) throws NacosException {
    registerInstance(serviceName, Constants.DEFAULT_GROUP, instance);
}

@Override
public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException {

    if (instance.isEphemeral()) {
        BeatInfo beatInfo = new BeatInfo();
        beatInfo.setServiceName(NamingUtils.getGroupedName(serviceName, groupName));
        beatInfo.setIp(instance.getIp());
        beatInfo.setPort(instance.getPort());
        beatInfo.setCluster(instance.getClusterName());
        beatInfo.setWeight(instance.getWeight());
        beatInfo.setMetadata(instance.getMetadata());
        beatInfo.setScheduled(false);
        long instanceInterval = instance.getInstanceHeartBeatInterval();
        beatInfo.setPeriod(instanceInterval == 0 ? DEFAULT_HEART_BEAT_INTERVAL : instanceInterval);

        beatReactor.addBeatInfo(NamingUtils.getGroupedName(serviceName, groupName), beatInfo);
    }

    serverProxy.registerService(NamingUtils.getGroupedName(serviceName, groupName), groupName, instance);
}

心跳监测

registerInstance方法中会去调用一个心跳和注册实例两个方法。创建心跳监测方法,DEFAULT_HEART_BEAT_INTERVAL常量的默认值为5秒钟。可以根据配置metadata.preserved.heart.beat.interval来改变心跳监测的时间。实例:

spring:
  cloud:
    nacos:
      discovery:
        server-addr: 127.0.0.1:32088
        namespace: test
        metadata:
          preserved:
            heart:
              beat:
                interval: 3000

beatReactor中会添加beatInfo,

 beatReactor.addBeatInfo(NamingUtils.getGroupedName(serviceName, groupName), beatInfo);
 public void addBeatInfo(String serviceName, BeatInfo beatInfo) {
    NAMING_LOGGER.info("[BEAT] adding beat: {} to beat map.", beatInfo);
    dom2Beat.put(buildKey(serviceName, beatInfo.getIp(), beatInfo.getPort()), beatInfo);
    executorService.schedule(new BeatTask(beatInfo), 0, TimeUnit.MILLISECONDS);
    MetricsMonitor.getDom2BeatSizeMonitor().set(dom2Beat.size());
}

最终调用executorServiceschedule方法来进行添加任务,来保证心跳。beatTask信息。

class BeatTask implements Runnable {

    BeatInfo beatInfo;

    public BeatTask(BeatInfo beatInfo) {
        this.beatInfo = beatInfo;
    }

    @Override
    public void run() {
        if (beatInfo.isStopped()) {
            return;
        }
        long result = serverProxy.sendBeat(beatInfo);
        long nextTime = result > 0 ? result : beatInfo.getPeriod();
        executorService.schedule(new BeatTask(beatInfo), nextTime, TimeUnit.MILLISECONDS);
    }
}

BeatTask实现了Runnable接口,最终调用run方法,中的serverProxysendBeat方法来保持心跳,当请求成功后再次创建一个任务继续保证心跳。

public long sendBeat(BeatInfo beatInfo) {
    try {
        if (NAMING_LOGGER.isDebugEnabled()) {
            NAMING_LOGGER.debug("[BEAT] {} sending beat to server: {}", namespaceId, beatInfo.toString());
        }
        Map<String, String> params = new HashMap<String, String>(4);
        params.put("beat", JSON.toJSONString(beatInfo));
        params.put(CommonParams.NAMESPACE_ID, namespaceId);
        params.put(CommonParams.SERVICE_NAME, beatInfo.getServiceName());
        String result = reqAPI(UtilAndComs.NACOS_URL_BASE + "/instance/beat", params, HttpMethod.PUT);
        JSONObject jsonObject = JSON.parseObject(result);

        if (jsonObject != null) {
            return jsonObject.getLong("clientBeatInterval");
        }
    } catch (Exception e) {
        NAMING_LOGGER.error("[CLIENT-BEAT] failed to send beat: " + JSON.toJSONString(beatInfo), e);
    }
    return 0L;
}

这也就相当于会向nacos服务器发送一个PUT请求,请求地址为/nacos/v1/ns/instance/beat

{beat={"cluster":"DEFAULT","ip":"10.196.1.228","metadata":{"management.endpoints.web.base-path":"/actuator","preserved.heart.beat.interval":"3000","preserved.register.source":"SPRING_CLOUD"},"period":3000,"port":9001,"scheduled":false,"serviceName":"DEFAULT_GROUP@@test","stopped":false,"weight":1}, serviceName=DEFAULT_GROUP@@test, namespaceId=test}

注册信息

上文中心跳发送过后会立刻执行注册serverProxy.registerService(NamingUtils.getGroupedName(serviceName, groupName), groupName, instance);

public void registerService(String serviceName, String groupName, Instance instance) throws NacosException {

    NAMING_LOGGER.info("[REGISTER-SERVICE] {} registering service {} with instance: {}",
        namespaceId, serviceName, instance);

    final Map<String, String> params = new HashMap<String, String>(9);
    params.put(CommonParams.NAMESPACE_ID, namespaceId);
    params.put(CommonParams.SERVICE_NAME, serviceName);
    params.put(CommonParams.GROUP_NAME, groupName);
    params.put(CommonParams.CLUSTER_NAME, instance.getClusterName());
    params.put("ip", instance.getIp());
    params.put("port", String.valueOf(instance.getPort()));
    params.put("weight", String.valueOf(instance.getWeight()));
    params.put("enable", String.valueOf(instance.isEnabled()));
    params.put("healthy", String.valueOf(instance.isHealthy()));
    params.put("ephemeral", String.valueOf(instance.isEphemeral()));
    params.put("metadata", JSON.toJSONString(instance.getMetadata()));

    reqAPI(UtilAndComs.NACOS_URL_INSTANCE, params, HttpMethod.POST);

}

注册也就相当于向nacos服务器发送一个POST请求,请求地址为/nacos/v1/ns/instance

{groupName=DEFAULT_GROUP, metadata={"management.endpoints.web.base-path":"/actuator","preserved.heart.beat.interval":"3000","preserved.register.source":"SPRING_CLOUD"}, namespaceId=test, port=9001, enable=true, healthy=true, clusterName=DEFAULT, ip=10.196.1.228, weight=1.0, ephemeral=true, serviceName=DEFAULT_GROUP@@test}