SpringCloud-Nacos注册中心——服务注册

王守钰 2021-11-01 17:11:34

使用版本:2.2.x

spring.factories

org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
  com.alibaba.cloud.nacos.discovery.NacosDiscoveryAutoConfiguration,\
  com.alibaba.cloud.nacos.ribbon.RibbonNacosAutoConfiguration,\
  com.alibaba.cloud.nacos.endpoint.NacosDiscoveryEndpointAutoConfiguration,\
  com.alibaba.cloud.nacos.registry.NacosServiceRegistryAutoConfiguration,\
  com.alibaba.cloud.nacos.discovery.NacosDiscoveryClientConfiguration,\
  com.alibaba.cloud.nacos.discovery.reactive.NacosReactiveDiscoveryClientConfiguration,\
  com.alibaba.cloud.nacos.discovery.configclient.NacosConfigServerAutoConfiguration,\
  com.alibaba.cloud.nacos.NacosServiceAutoConfiguration
org.springframework.cloud.bootstrap.BootstrapConfiguration=\
  com.alibaba.cloud.nacos.discovery.configclient.NacosDiscoveryClientConfigServiceBootstrapConfiguration
org.springframework.context.ApplicationListener=\
  com.alibaba.cloud.nacos.discovery.logging.NacosLoggingListener

NacosDiscoveryAutoConfiguration

@Configuration(proxyBeanMethods = false)
@ConditionalOnDiscoveryEnabled
@ConditionalOnNacosDiscoveryEnabled
public class NacosDiscoveryAutoConfiguration {

	@Bean
	@ConditionalOnMissingBean
	public NacosDiscoveryProperties nacosProperties() {
		return new NacosDiscoveryProperties();
	}

	@Bean
	@ConditionalOnMissingBean
	public NacosServiceDiscovery nacosServiceDiscovery(
			NacosDiscoveryProperties discoveryProperties,
			NacosServiceManager nacosServiceManager) {
		return new NacosServiceDiscovery(discoveryProperties, nacosServiceManager);
	}

}

NacosDiscoveryAutoConfiguration中创建了NacosDiscoveryProperties配置信息和NacosServiceDiscovery服务发现。

RibbonNacosAutoConfiguration

如果是用了ribbon支持可以通过ribbon中的配置来配置一些规则实现负载均衡等。

@Configuration(proxyBeanMethods = false)
@EnableConfigurationProperties
@ConditionalOnBean(SpringClientFactory.class)
@ConditionalOnRibbonNacos
@ConditionalOnNacosDiscoveryEnabled
@AutoConfigureAfter(RibbonAutoConfiguration.class)
@RibbonClients(defaultConfiguration = NacosRibbonClientConfiguration.class)
public class RibbonNacosAutoConfiguration {

}

RibbonNacosAutoConfiguration类中什么都没有写,只是加了RibbonClientsNacosRibbonClientConfiguration来进行处理Ribbon的Client支持。

@Configuration(proxyBeanMethods = false)
@ConditionalOnRibbonNacos
public class NacosRibbonClientConfiguration {

	@Autowired
	private PropertiesFactory propertiesFactory;

	@Bean
	@ConditionalOnMissingBean
	public ServerList<?> ribbonServerList(IClientConfig config,
			NacosDiscoveryProperties nacosDiscoveryProperties) {
		if (this.propertiesFactory.isSet(ServerList.class, config.getClientName())) {
			ServerList serverList = this.propertiesFactory.get(ServerList.class, config,
					config.getClientName());
			return serverList;
		}
		NacosServerList serverList = new NacosServerList(nacosDiscoveryProperties);
		serverList.initWithNiwsConfig(config);
		return serverList;
	}

	@Bean
	@ConditionalOnMissingBean
	public NacosServerIntrospector nacosServerIntrospector() {
		return new NacosServerIntrospector();
	}

}

NacosRibbonClientConfiguration中配置了ServerList,ServerIntrospector的实例信息。通过ConditionalOnRibbonNacos注解来进行控制开关。这里面当lb通过ribbon获取实例的时候会调用ServerList来进行获取实例列表。

@Retention(RetentionPolicy.RUNTIME)
@Target({ ElementType.TYPE, ElementType.METHOD })
@ConditionalOnProperty(value = "ribbon.nacos.enabled", matchIfMissing = true)
public @interface ConditionalOnRibbonNacos {

}

ConditionalOnRibbonNacos中如果当ribbon.nacos.enabledfalse的情况下不启用。

NacosDiscoveryEndpointAutoConfiguration

@Configuration(proxyBeanMethods = false)
@ConditionalOnClass(Endpoint.class)
@ConditionalOnNacosDiscoveryEnabled
public class NacosDiscoveryEndpointAutoConfiguration {

	@Bean
	@ConditionalOnMissingBean
	@ConditionalOnAvailableEndpoint
	public NacosDiscoveryEndpoint nacosDiscoveryEndpoint(
			NacosServiceManager nacosServiceManager,
			NacosDiscoveryProperties nacosDiscoveryProperties) {
		return new NacosDiscoveryEndpoint(nacosServiceManager, nacosDiscoveryProperties);
	}

	@Bean
	@ConditionalOnEnabledHealthIndicator("nacos-discovery")
	public HealthIndicator nacosDiscoveryHealthIndicator(
			NacosServiceManager nacosServiceManager,
			NacosDiscoveryProperties nacosDiscoveryProperties) {
		Properties nacosProperties = nacosDiscoveryProperties.getNacosProperties();
		return new NacosDiscoveryHealthIndicator(
				nacosServiceManager.getNamingService(nacosProperties));
	}

}

NacosDiscoveryEndpointAutoConfiguration主要负责注册健康检测的端点信息。

NacosServiceRegistryAutoConfiguration

@Configuration(proxyBeanMethods = false)
@EnableConfigurationProperties
@ConditionalOnNacosDiscoveryEnabled
@ConditionalOnProperty(value = "spring.cloud.service-registry.auto-registration.enabled",
		matchIfMissing = true)
@AutoConfigureAfter({ AutoServiceRegistrationConfiguration.class,
		AutoServiceRegistrationAutoConfiguration.class,
		NacosDiscoveryAutoConfiguration.class })
public class NacosServiceRegistryAutoConfiguration {

	@Bean
	public NacosServiceRegistry nacosServiceRegistry(
			NacosDiscoveryProperties nacosDiscoveryProperties) {
		return new NacosServiceRegistry(nacosDiscoveryProperties);
	}

	@Bean
	@ConditionalOnBean(AutoServiceRegistrationProperties.class)
	public NacosRegistration nacosRegistration(
			ObjectProvider<List<NacosRegistrationCustomizer>> registrationCustomizers,
			NacosDiscoveryProperties nacosDiscoveryProperties,
			ApplicationContext context) {
		return new NacosRegistration(registrationCustomizers.getIfAvailable(),
				nacosDiscoveryProperties, context);
	}

	@Bean
	@ConditionalOnBean(AutoServiceRegistrationProperties.class)
	public NacosAutoServiceRegistration nacosAutoServiceRegistration(
			NacosServiceRegistry registry,
			AutoServiceRegistrationProperties autoServiceRegistrationProperties,
			NacosRegistration registration) {
		return new NacosAutoServiceRegistration(registry,
				autoServiceRegistrationProperties, registration);
	}

}

NacosServiceRegistryAutoConfiguration实例化的时候进行实例化了NacosServiceRegistry,NacosServiceRegistryServiceRegistry的实现类,后续实现服务注册的时候也就会通过ServiceRegistry来进行实现。NacosRegistration的实现了Registration接口,在NacosRegistration中主要标识了服务于nacos交互的基本信息。NacosAutoServiceRegistration继承了AbstractAutoServiceRegistration,实现了ApplicationListener的监听器。当收到容器启动事件后,服务进行发起注册。

NacosServiceRegistry

public class NacosServiceRegistry implements ServiceRegistry<Registration> {

	private static final String STATUS_UP = "UP";

	private static final String STATUS_DOWN = "DOWN";

	private static final Logger log = LoggerFactory.getLogger(NacosServiceRegistry.class);

	private final NacosDiscoveryProperties nacosDiscoveryProperties;

	@Autowired
	private NacosServiceManager nacosServiceManager;

	public NacosServiceRegistry(NacosDiscoveryProperties nacosDiscoveryProperties) {
		this.nacosDiscoveryProperties = nacosDiscoveryProperties;
	}

    // 注册
	@Override
	public void register(Registration registration) {

        // 校验服务id是否为空,在nacos中取的默认值也就是${spring.application.name},也可以通过${spring.cloud.nacos.discovery.service}来进行设置serviceId信息
		if (StringUtils.isEmpty(registration.getServiceId())) {
			log.warn("No service to register for nacos client...");
			return;
		}
        // 构建nameService
		NamingService namingService = namingService();
		String serviceId = registration.getServiceId();
		String group = nacosDiscoveryProperties.getGroup();
        // 构建实例信息,从配置中读取nacos的ip端口等信息
		Instance instance = getNacosInstanceFromRegistration(registration);

		try {
		    // 向nacos注册实例
			namingService.registerInstance(serviceId, group, instance);
			log.info("nacos registry, {} {} {}:{} register finished", group, serviceId,
					instance.getIp(), instance.getPort());
		}
		catch (Exception e) {
			if (nacosDiscoveryProperties.isFailFast()) {
				log.error("nacos registry, {} register failed...{},", serviceId,
						registration.toString(), e);
				rethrowRuntimeException(e);
			}
			else {
				log.warn("Failfast is false. {} register failed...{},", serviceId,
						registration.toString(), e);
			}
		}
	}

	@Override
	public void deregister(Registration registration) {

		log.info("De-registering from Nacos Server now...");

		if (StringUtils.isEmpty(registration.getServiceId())) {
			log.warn("No dom to de-register for nacos client...");
			return;
		}

		NamingService namingService = namingService();
		String serviceId = registration.getServiceId();
		String group = nacosDiscoveryProperties.getGroup();

		try {
		    // 向nacos下面实例信息
			namingService.deregisterInstance(serviceId, group, registration.getHost(),
					registration.getPort(), nacosDiscoveryProperties.getClusterName());
		}
		catch (Exception e) {
			log.error("ERR_NACOS_DEREGISTER, de-register failed...{},",
					registration.toString(), e);
		}

		log.info("De-registration finished.");
	}

	@Override
	public void close() {
		try {
			nacosServiceManager.nacosServiceShutDown();
		}
		catch (NacosException e) {
			log.error("Nacos namingService shutDown failed", e);
		}
	}

	private Instance getNacosInstanceFromRegistration(Registration registration) {
		Instance instance = new Instance();
		instance.setIp(registration.getHost());
		instance.setPort(registration.getPort());
		instance.setWeight(nacosDiscoveryProperties.getWeight());
		instance.setClusterName(nacosDiscoveryProperties.getClusterName());
		instance.setEnabled(nacosDiscoveryProperties.isInstanceEnabled());
		instance.setMetadata(registration.getMetadata());
		instance.setEphemeral(nacosDiscoveryProperties.isEphemeral());
		return instance;
	}

	private NamingService namingService() {
	    // 通过读取配置文件调用NacosFactory来进行生成NamingService
		return nacosServiceManager
				.getNamingService(nacosDiscoveryProperties.getNacosProperties());
	}

}

在构造方法中传入了nacos的配置文件信息。NacosServiceRegistry中实现了registerderegister方法。

NacosRegistration

public class NacosRegistration implements Registration, ServiceInstance {

	/**
	 * The metadata key of management port.
	 */
	public static final String MANAGEMENT_PORT = "management.port";

	/**
	 * The metadata key of management context-path.
	 */
	public static final String MANAGEMENT_CONTEXT_PATH = "management.context-path";

	/**
	 * The metadata key of management address.
	 */
	public static final String MANAGEMENT_ADDRESS = "management.address";

	/**
	 * The metadata key of management endpoints web base path.
	 */
	public static final String MANAGEMENT_ENDPOINT_BASE_PATH = "management.endpoints.web.base-path";

	private List<NacosRegistrationCustomizer> registrationCustomizers;

	private NacosDiscoveryProperties nacosDiscoveryProperties;

	private ApplicationContext context;

	public NacosRegistration(List<NacosRegistrationCustomizer> registrationCustomizers,
			NacosDiscoveryProperties nacosDiscoveryProperties,
			ApplicationContext context) {
		this.registrationCustomizers = registrationCustomizers;
		this.nacosDiscoveryProperties = nacosDiscoveryProperties;
		this.context = context;
	}

	@PostConstruct
	public void init() {

		Map<String, String> metadata = nacosDiscoveryProperties.getMetadata();
		Environment env = context.getEnvironment();

		String endpointBasePath = env.getProperty(MANAGEMENT_ENDPOINT_BASE_PATH);
		if (!StringUtils.isEmpty(endpointBasePath)) {
			metadata.put(MANAGEMENT_ENDPOINT_BASE_PATH, endpointBasePath);
		}

		Integer managementPort = ManagementServerPortUtils.getPort(context);
		if (null != managementPort) {
			metadata.put(MANAGEMENT_PORT, managementPort.toString());
			String contextPath = env
					.getProperty("management.server.servlet.context-path");
			String address = env.getProperty("management.server.address");
			if (!StringUtils.isEmpty(contextPath)) {
				metadata.put(MANAGEMENT_CONTEXT_PATH, contextPath);
			}
			if (!StringUtils.isEmpty(address)) {
				metadata.put(MANAGEMENT_ADDRESS, address);
			}
		}

		if (null != nacosDiscoveryProperties.getHeartBeatInterval()) {
			metadata.put(PreservedMetadataKeys.HEART_BEAT_INTERVAL,
					nacosDiscoveryProperties.getHeartBeatInterval().toString());
		}
		if (null != nacosDiscoveryProperties.getHeartBeatTimeout()) {
			metadata.put(PreservedMetadataKeys.HEART_BEAT_TIMEOUT,
					nacosDiscoveryProperties.getHeartBeatTimeout().toString());
		}
		if (null != nacosDiscoveryProperties.getIpDeleteTimeout()) {
			metadata.put(PreservedMetadataKeys.IP_DELETE_TIMEOUT,
					nacosDiscoveryProperties.getIpDeleteTimeout().toString());
		}
		customize(registrationCustomizers, this);
	}

	private static void customize(
			List<NacosRegistrationCustomizer> registrationCustomizers,
			NacosRegistration registration) {
		if (registrationCustomizers != null) {
			for (NacosRegistrationCustomizer customizer : registrationCustomizers) {
				customizer.customize(registration);
			}
		}
	}

	@Override
	public String getServiceId() {
		return nacosDiscoveryProperties.getService();
	}

	@Override
	public String getHost() {
		return nacosDiscoveryProperties.getIp();
	}

	@Override
	public int getPort() {
		return nacosDiscoveryProperties.getPort();
	}

	public void setPort(int port) {
		this.nacosDiscoveryProperties.setPort(port);
	}

	@Override
	public boolean isSecure() {
		return nacosDiscoveryProperties.isSecure();
	}

	@Override
	public URI getUri() {
		return DefaultServiceInstance.getUri(this);
	}

	@Override
	public Map<String, String> getMetadata() {
		return nacosDiscoveryProperties.getMetadata();
	}

	public boolean isRegisterEnabled() {
		return nacosDiscoveryProperties.isRegisterEnabled();
	}

	public String getCluster() {
		return nacosDiscoveryProperties.getClusterName();
	}

	public float getRegisterWeight() {
		return nacosDiscoveryProperties.getWeight();
	}

	public NacosDiscoveryProperties getNacosDiscoveryProperties() {
		return nacosDiscoveryProperties;
	}

	@Override
	public String toString() {
		return "NacosRegistration{" + "nacosDiscoveryProperties="
				+ nacosDiscoveryProperties + '}';
	}

}

NacosRegistration主要去灌入metadata的数据信息。当然也可以通过自己实现NacosRegistrationCustomizer接口来进行添加metadata数据信息。

NacosAutoServiceRegistration

public class NacosAutoServiceRegistration
		extends AbstractAutoServiceRegistration<Registration> {

	private static final Logger log = LoggerFactory
			.getLogger(NacosAutoServiceRegistration.class);

	private NacosRegistration registration;

	public NacosAutoServiceRegistration(ServiceRegistry<Registration> serviceRegistry,
			AutoServiceRegistrationProperties autoServiceRegistrationProperties,
			NacosRegistration registration) {
		// 设置serviceRegistry为NacosServiceRegistry
		super(serviceRegistry, autoServiceRegistrationProperties);
		this.registration = registration;
	}

	@Deprecated
	public void setPort(int port) {
		getPort().set(port);
	}

	@Override
	protected NacosRegistration getRegistration() {
		if (this.registration.getPort() < 0 && this.getPort().get() > 0) {
			this.registration.setPort(this.getPort().get());
		}
		Assert.isTrue(this.registration.getPort() > 0, "service.port has not been set");
		return this.registration;
	}

	@Override
	protected NacosRegistration getManagementRegistration() {
		return null;
	}

	@Override
	protected void register() {
	    // 读取配置文件${spring.cloud.nacos.discovery.registerEnabled}判断是否进行注册
		if (!this.registration.getNacosDiscoveryProperties().isRegisterEnabled()) {
			log.debug("Registration disabled.");
			return;
		}
		if (this.registration.getPort() < 0) {
			this.registration.setPort(getPort().get());
		}
		super.register();
	}

	@Override
	protected void registerManagement() {
		if (!this.registration.getNacosDiscoveryProperties().isRegisterEnabled()) {
			return;
		}
		super.registerManagement();

	}

	@Override
	protected Object getConfiguration() {
		return this.registration.getNacosDiscoveryProperties();
	}

	@Override
	protected boolean isEnabled() {
		return this.registration.getNacosDiscoveryProperties().isRegisterEnabled();
	}

	@Override
	@SuppressWarnings("deprecation")
	protected String getAppName() {
		String appName = registration.getNacosDiscoveryProperties().getService();
		return StringUtils.isEmpty(appName) ? super.getAppName() : appName;
	}

	@EventListener
	public void onNacosDiscoveryInfoChangedEvent(NacosDiscoveryInfoChangedEvent event) {
	    // 收到NacosDiscoveryInfoChangedEvent事件后进行调用restart方法。
		restart();
	}

	private void restart() {
		this.stop();
		this.start();
	}

}

image
NacosAutoServiceRegistrationAbstractAutoServiceRegistration的子类,AbstractAutoServiceRegistration实现了ApplicationListener监听等信息。

AbstractAutoServiceRegistration

public abstract class AbstractAutoServiceRegistration<R extends Registration>
		implements AutoServiceRegistration, ApplicationContextAware,
		ApplicationListener<WebServerInitializedEvent> {
		
	private AtomicBoolean running = new AtomicBoolean(false);
	
	private final ServiceRegistry<R> serviceRegistry;
	
	private AutoServiceRegistrationProperties properties;
	
	protected AbstractAutoServiceRegistration(ServiceRegistry<R> serviceRegistry,
			AutoServiceRegistrationProperties properties) {
		this.serviceRegistry = serviceRegistry;
		this.properties = properties;
	}

    // 接收WebServerInitializedEvent事件
    public void onApplicationEvent(WebServerInitializedEvent event) {
		bind(event);
	}
	
	@Deprecated
	public void bind(WebServerInitializedEvent event) {
		ApplicationContext context = event.getApplicationContext();
		if (context instanceof ConfigurableWebServerApplicationContext) {
			if ("management".equals(((ConfigurableWebServerApplicationContext) context)
					.getServerNamespace())) {
				return;
			}
		}
		// 设置端口
		this.port.compareAndSet(0, event.getWebServer().getPort());
		// 启动服务
		this.start();
	}
	
	public void start() {
		if (!isEnabled()) {
			if (logger.isDebugEnabled()) {
				logger.debug("Discovery Lifecycle disabled. Not starting");
			}
			return;
		}

		// only initialize if nonSecurePort is greater than 0 and it isn't already running
		// because of containerPortInitializer below
		// 如果是没启动
		if (!this.running.get()) {
		    // 发送预注册事件
			this.context.publishEvent(
					new InstancePreRegisteredEvent(this, getRegistration()));
			// 注册
			register();
			if (shouldRegisterManagement()) {
				registerManagement();
			}
			// 发送实例注册事件
			this.context.publishEvent(
					new InstanceRegisteredEvent<>(this, getConfiguration()));
			// 设置running为true
			this.running.compareAndSet(false, true);
		}

	}
	
	protected void register() {
		this.serviceRegistry.register(getRegistration());
	}
}

构造方法中设置了serviceRegistryNacosServiceRegistry。当容器启动的时候会收到WebServerInitializedEvent事件,接收到事件后发送InstancePreRegisteredEvent在进行调用register方法进行注册。InstancePreRegisteredEventNacosServiceManager被监听,设置配置信息。register方法被子类NacosAutoServiceRegistration重写,这里就判断配置是否要注册到nacos中。

NacosDiscoveryClientConfiguration

@Configuration(proxyBeanMethods = false)
@ConditionalOnDiscoveryEnabled
@ConditionalOnBlockingDiscoveryEnabled
@ConditionalOnNacosDiscoveryEnabled
@AutoConfigureBefore({ SimpleDiscoveryClientAutoConfiguration.class,
		CommonsClientAutoConfiguration.class })
@AutoConfigureAfter(NacosDiscoveryAutoConfiguration.class)
public class NacosDiscoveryClientConfiguration {

	@Bean
	public DiscoveryClient nacosDiscoveryClient(
			NacosServiceDiscovery nacosServiceDiscovery) {
		return new NacosDiscoveryClient(nacosServiceDiscovery);
	}

	@Bean
	@ConditionalOnMissingBean
	@ConditionalOnProperty(value = "spring.cloud.nacos.discovery.watch.enabled",
			matchIfMissing = true)
	public NacosWatch nacosWatch(NacosServiceManager nacosServiceManager,
			NacosDiscoveryProperties nacosDiscoveryProperties) {
		return new NacosWatch(nacosServiceManager, nacosDiscoveryProperties);
	}

}

NacosDiscoveryClient

public class NacosDiscoveryClient implements DiscoveryClient {

	private static final Logger log = LoggerFactory.getLogger(NacosDiscoveryClient.class);

	/**
	 * Nacos Discovery Client Description.
	 */
	public static final String DESCRIPTION = "Spring Cloud Nacos Discovery Client";

	private NacosServiceDiscovery serviceDiscovery;

	public NacosDiscoveryClient(NacosServiceDiscovery nacosServiceDiscovery) {
		this.serviceDiscovery = nacosServiceDiscovery;
	}

	@Override
	public String description() {
		return DESCRIPTION;
	}

	@Override
	public List<ServiceInstance> getInstances(String serviceId) {
		try {
			return serviceDiscovery.getInstances(serviceId);
		}
		catch (Exception e) {
			throw new RuntimeException(
					"Can not get hosts from nacos server. serviceId: " + serviceId, e);
		}
	}

	@Override
	public List<String> getServices() {
		try {
			return serviceDiscovery.getServices();
		}
		catch (Exception e) {
			log.error("get service name from nacos server fail,", e);
			return Collections.emptyList();
		}
	}

}

实现DiscoveryClient方法,通过NacosServiceDiscovery来进行获取Instance列表。

NacosWatch

public class NacosWatch implements ApplicationEventPublisherAware, SmartLifecycle, DisposableBean {

	private static final Logger log = LoggerFactory.getLogger(NacosWatch.class);

	private Map<String, EventListener> listenerMap = new ConcurrentHashMap<>(16);

	private final AtomicBoolean running = new AtomicBoolean(false);

	private final AtomicLong nacosWatchIndex = new AtomicLong(0);

	private ApplicationEventPublisher publisher;

	private ScheduledFuture<?> watchFuture;

	private NacosServiceManager nacosServiceManager;

	private final NacosDiscoveryProperties properties;

	private final ThreadPoolTaskScheduler taskScheduler;

	public NacosWatch(NacosServiceManager nacosServiceManager,
			NacosDiscoveryProperties properties) {
		this.nacosServiceManager = nacosServiceManager;
		this.properties = properties;
		this.taskScheduler = getTaskScheduler();
	}

	@Deprecated
	public NacosWatch(NacosServiceManager nacosServiceManager,
			NacosDiscoveryProperties properties,
			ObjectProvider<ThreadPoolTaskScheduler> taskScheduler) {
		this.nacosServiceManager = nacosServiceManager;
		this.properties = properties;
		this.taskScheduler = taskScheduler.stream().findAny()
				.orElseGet(NacosWatch::getTaskScheduler);
	}

	private static ThreadPoolTaskScheduler getTaskScheduler() {
		ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();
		taskScheduler.setBeanName("Nacos-Watch-Task-Scheduler");
		taskScheduler.initialize();
		return taskScheduler;
	}

	@Override
	public void setApplicationEventPublisher(ApplicationEventPublisher publisher) {
		this.publisher = publisher;
	}

	@Override
	public boolean isAutoStartup() {
		return true;
	}

	@Override
	public void stop(Runnable callback) {
		this.stop();
		callback.run();
	}

	@Override
	public void start() {
		if (this.running.compareAndSet(false, true)) {
			EventListener eventListener = listenerMap.computeIfAbsent(buildKey(),
					event -> new EventListener() {
						@Override
						public void onEvent(Event event) {
							if (event instanceof NamingEvent) {
								List<Instance> instances = ((NamingEvent) event)
										.getInstances();
								Optional<Instance> instanceOptional = selectCurrentInstance(
										instances);
								instanceOptional.ifPresent(currentInstance -> {
									resetIfNeeded(currentInstance);
								});
							}
						}
					});

			NamingService namingService = nacosServiceManager
					.getNamingService(properties.getNacosProperties());
			try {
				namingService.subscribe(properties.getService(), properties.getGroup(),
						Arrays.asList(properties.getClusterName()), eventListener);
			}
			catch (Exception e) {
				log.error("namingService subscribe failed, properties:{}", properties, e);
			}

			this.watchFuture = this.taskScheduler.scheduleWithFixedDelay(
					this::nacosServicesWatch, this.properties.getWatchDelay());
		}
	}

	private String buildKey() {
		return String.join(":", properties.getService(), properties.getGroup());
	}

	private void resetIfNeeded(Instance instance) {
		if (!properties.getMetadata().equals(instance.getMetadata())) {
			properties.setMetadata(instance.getMetadata());
		}
	}

	private Optional<Instance> selectCurrentInstance(List<Instance> instances) {
		return instances.stream()
				.filter(instance -> properties.getIp().equals(instance.getIp())
						&& properties.getPort() == instance.getPort())
				.findFirst();
	}

	@Override
	public void stop() {
		if (this.running.compareAndSet(true, false)) {
			if (this.watchFuture != null) {
				// shutdown current user-thread,
				// then the other daemon-threads will terminate automatic.
				this.taskScheduler.shutdown();
				this.watchFuture.cancel(true);
			}

			EventListener eventListener = listenerMap.get(buildKey());
			try {
				NamingService namingService = nacosServiceManager
						.getNamingService(properties.getNacosProperties());
				namingService.unsubscribe(properties.getService(), properties.getGroup(),
						Arrays.asList(properties.getClusterName()), eventListener);
			}
			catch (Exception e) {
				log.error("namingService unsubscribe failed, properties:{}", properties,
						e);
			}
		}
	}

	@Override
	public boolean isRunning() {
		return this.running.get();
	}

	@Override
	public int getPhase() {
		return 0;
	}

	public void nacosServicesWatch() {

		// nacos doesn't support watch now , publish an event every 30 seconds.
		this.publisher.publishEvent(
				new HeartbeatEvent(this, nacosWatchIndex.getAndIncrement()));

	}

	@Override
	public void destroy() {
		this.stop();
	}
}

NacosWatch实现DisposableBeanApplicationEventPublisherAware,SmartLifecycle接口,当实例进行销毁的时候调用destroy方法,nameservice取消注册。当实例启动的时候,调用start方法,建立监听器,发送心跳监测事件。

NacosReactiveDiscoveryClientConfiguration

@Configuration(proxyBeanMethods = false)
@ConditionalOnDiscoveryEnabled
@ConditionalOnReactiveDiscoveryEnabled
@ConditionalOnNacosDiscoveryEnabled
@AutoConfigureAfter({ NacosDiscoveryAutoConfiguration.class,
		ReactiveCompositeDiscoveryClientAutoConfiguration.class })
@AutoConfigureBefore({ ReactiveCommonsClientAutoConfiguration.class })
public class NacosReactiveDiscoveryClientConfiguration {

	@Bean
	@ConditionalOnMissingBean
	public NacosReactiveDiscoveryClient nacosReactiveDiscoveryClient(
			NacosServiceDiscovery nacosServiceDiscovery) {
		return new NacosReactiveDiscoveryClient(nacosServiceDiscovery);
	}

}

NacosReactiveDiscoveryClient

public class NacosReactiveDiscoveryClient implements ReactiveDiscoveryClient {

	private static final Logger log = LoggerFactory
			.getLogger(NacosReactiveDiscoveryClient.class);

	private NacosServiceDiscovery serviceDiscovery;

	public NacosReactiveDiscoveryClient(NacosServiceDiscovery nacosServiceDiscovery) {
		this.serviceDiscovery = nacosServiceDiscovery;
	}

	@Override
	public String description() {
		return "Spring Cloud Nacos Reactive Discovery Client";
	}

	@Override
	public Flux<ServiceInstance> getInstances(String serviceId) {

		return Mono.justOrEmpty(serviceId).flatMapMany(loadInstancesFromNacos())
				.subscribeOn(Schedulers.boundedElastic());
	}

	private Function<String, Publisher<ServiceInstance>> loadInstancesFromNacos() {
		return serviceId -> {
			try {
				return Flux.fromIterable(serviceDiscovery.getInstances(serviceId));
			}
			catch (NacosException e) {
				log.error("get service instance[{}] from nacos error!", serviceId, e);
				return Flux.empty();
			}
		};
	}

	@Override
	public Flux<String> getServices() {
		return Flux.defer(() -> {
			try {
				return Flux.fromIterable(serviceDiscovery.getServices());
			}
			catch (Exception e) {
				log.error("get services from nacos server fail,", e);
				return Flux.empty();
			}
		}).subscribeOn(Schedulers.boundedElastic());
	}

}

NacosReactiveDiscoveryClient主要是作为webflux相关的配置信息。也是通过NacosServiceDiscovery来获取实例相关的信息。

NacosConfigServerAutoConfiguration

Configuration(proxyBeanMethods = false)
@EnableConfigurationProperties
@ConditionalOnClass({ NacosDiscoveryProperties.class, ConfigServerProperties.class })
public class NacosConfigServerAutoConfiguration {

	@Autowired(required = false)
	private NacosDiscoveryProperties properties;

	@Autowired(required = false)
	private ConfigServerProperties server;

	@PostConstruct
	public void init() {
		if (this.properties == null || this.server == null) {
			return;
		}
		String prefix = this.server.getPrefix();
		if (StringUtils.hasText(prefix) && !StringUtils
				.hasText(this.properties.getMetadata().get("configPath"))) {
			this.properties.getMetadata().put("configPath", prefix);
		}
	}

}

NacosConfigServerAutoConfiguration进行配置后,执行init方法,设置metadata相关信息。

NacosServiceAutoConfiguration

在老版本的时候这个类干的并不是这个事,新版本的时候做了变更NacosServiceAutoConfiguration变成了NacosServiceRegistryAutoConfiguration。新版本中NacosServiceAutoConfiguration主要做的事就是注册NacosServiceManager

@Configuration(proxyBeanMethods = false)
@ConditionalOnDiscoveryEnabled
@ConditionalOnNacosDiscoveryEnabled
public class NacosServiceAutoConfiguration {

	@Bean
	public NacosServiceManager nacosServiceManager() {
		return new NacosServiceManager();
	}

}

NacosServiceAutoConfiguration上面的注解限定了两个条件ConditionalOnDiscoveryEnabledConditionalOnNacosDiscoveryEnabled

@ConditionalOnDiscoveryEnabled

@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
@ConditionalOnProperty(value = "spring.cloud.discovery.enabled", matchIfMissing = true)
public @interface ConditionalOnDiscoveryEnabled {

}

ConditionalOnDiscoveryEnabled的标识代表着只要配置中的spring.cloud.discovery.enabled不为false,那么这个标签就认为是成立的。

@ConditionalOnNacosDiscoveryEnabled

@Retention(RetentionPolicy.RUNTIME)
@Target({ ElementType.TYPE, ElementType.METHOD })
@ConditionalOnProperty(value = "spring.cloud.nacos.discovery.enabled",
		matchIfMissing = true)
public @interface ConditionalOnNacosDiscoveryEnabled {

}

ConditionalOnNacosDiscoveryEnabled的标识代表着只要配置中的spring.cloud.nacos.discovery.enabled不为false,那么这个标签就是成立的。

NacosServiceManager

public class NacosServiceManager {

    @EventListener
	public void onInstancePreRegisteredEvent(
			InstancePreRegisteredEvent instancePreRegisteredEvent) {
		Registration registration = instancePreRegisteredEvent.getRegistration();
		if (Objects.isNull(nacosDiscoveryPropertiesCache)
				&& registration instanceof NacosRegistration) {
			NacosDiscoveryProperties nacosDiscoveryProperties = ((NacosRegistration) registration)
					.getNacosDiscoveryProperties();

			nacosDiscoveryPropertiesCache = new NacosDiscoveryProperties();
			copyProperties(nacosDiscoveryProperties, nacosDiscoveryPropertiesCache);
		}
	}
}

NacosServiceManager中进行注册了InstancePreRegisteredEvent事件的监听,当收到监听的时候,进行设置nacosDiscoveryPropertiesCache信息。