Nacos注册中心——服务端执行流程

王守钰 2021-10-11 09:10:01

模拟请求

curl -X POST 'http://127.0.0.1:8848/nacos/v1/ns/instance?port=8848&healthy=true&ip=11.11.11.11&weight=1.0&serviceName=nacosTest'

从url的请求地址中可以看到请求到了instace的控制器中。在查看instace控制器之前,先进行看下filter。在进行第一断点调试的时候,发现进入控制器后的serviceNamerequest中获取发生了变化,默认请求的数据为nacosTests结果却变成了DEFAULT_GROUP@@nacosTests。观察发现在调用接口前经过了多个filter

ServiceNameFilter

image

private static final String UTL_PATTERNS = "/v1/ns/*";
private static final String SERVICE_NAME_FILTER = "serviceNameFilter";
@Bean
public FilterRegistrationBean serviceNameFilterRegistration() {
    FilterRegistrationBean<ServiceNameFilter> registration = new FilterRegistrationBean<>();
    registration.setFilter(serviceNameFilter());
    registration.addUrlPatterns(UTL_PATTERNS);
    registration.setName(SERVICE_NAME_FILTER);
    registration.setOrder(5);
    return registration;
}

服务注册的时候请求的路径被/v1/ns/*所匹配,所以会经过ServiceNameFilter

public class ServiceNameFilter implements Filter {
    
    @Override
    public void doFilter(ServletRequest servletRequest, ServletResponse servletResponse, FilterChain filterChain)
            throws IOException, ServletException {
        HttpServletRequest request = (HttpServletRequest) servletRequest;
        HttpServletResponse resp = (HttpServletResponse) servletResponse;
        try {
            // 获取参数serviceName
            String serviceName = request.getParameter(CommonParams.SERVICE_NAME);
            
            if (StringUtils.isNotBlank(serviceName)) {
                serviceName = serviceName.trim();
            }
            // 获取参数groupName
            String groupName = request.getParameter(CommonParams.GROUP_NAME);
            if (StringUtils.isBlank(groupName)) {
                // groupName 为空的情况下取默认值DEFAULT_GROUP
                groupName = Constants.DEFAULT_GROUP;
            }
            
            // 使用groupName@@serviceName 作为新的serviceName
            // use groupName@@serviceName as new service name:
            String groupedServiceName = serviceName;
            if (StringUtils.isNotBlank(serviceName) && !serviceName.contains(Constants.SERVICE_INFO_SPLITER)) {
                groupedServiceName = groupName + Constants.SERVICE_INFO_SPLITER + serviceName;
            }
            // 覆盖原有request请求中的serviceName参数
            OverrideParameterRequestWrapper requestWrapper = OverrideParameterRequestWrapper.buildRequest(request);
            requestWrapper.addParameter(CommonParams.SERVICE_NAME, groupedServiceName);
            filterChain.doFilter(requestWrapper, servletResponse);
        } catch (Exception e) {
            resp.sendError(HttpServletResponse.SC_INTERNAL_SERVER_ERROR,
                    "Service name filter error," + ExceptionUtil.getAllExceptionMsg(e));
        }
    }
}

InstanceController.register

nacos-naming主要负责服务注册的功能。在InstanceController的控制器中查看请求地址。

@RestController
@RequestMapping(UtilsAndCommons.NACOS_NAMING_CONTEXT + UtilsAndCommons.NACOS_NAMING_INSTANCE_CONTEXT)
public class InstanceController {
    ……
}

上文中的UtilsAndCommons.NACOS_NAMING_CONTEXT 也就是/v1/ns; UtilsAndCommons.NACOS_NAMING_INSTANCE_CONTEX也就是/instance

/**
 * Register new instance.
 *
 * @param request http request
 * @return 'ok' if success
 * @throws Exception any error during register
 */
@CanDistro
@PostMapping
@Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE)
public String register(HttpServletRequest request) throws Exception {
    
    final String namespaceId = WebUtils
            .optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);
    final String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
    NamingUtils.checkServiceNameFormat(serviceName);
    
    final Instance instance = HttpRequestInstanceBuilder.newBuilder()
            .setDefaultInstanceEphemeral(switchDomain.isDefaultInstanceEphemeral()).setRequest(request).build();
    
    getInstanceOperator().registerInstance(namespaceId, serviceName, instance);
    return "ok";
}
public HttpRequestInstanceBuilder setRequest(HttpServletRequest request) throws NacosException {
    for (InstanceExtensionHandler each : handlers) {
        each.configExtensionInfoFromRequest(request);
    }
    setAttributesToBuilder(request);
    return this;
}
private void setAttributesToBuilder(HttpServletRequest request) throws NacosException {
    actualBuilder.setServiceName(WebUtils.required(request, CommonParams.SERVICE_NAME));
    actualBuilder.setIp(WebUtils.required(request, "ip"));
    actualBuilder.setPort(Integer.parseInt(WebUtils.required(request, "port")));
    actualBuilder.setHealthy(ConvertUtils.toBoolean(WebUtils.optional(request, "healthy", "true")));
    actualBuilder.setEphemeral(ConvertUtils
            .toBoolean(WebUtils.optional(request, "ephemeral", String.valueOf(defaultInstanceEphemeral))));
    setWeight(request);
    setCluster(request);
    setEnabled(request);
    setMetadata(request);
}

image
Instance通过HttpRequestInstanceBuilderbuilder方法来进行实例化,在进行操作setRequest的时候,从request对象中获取了serviceNameipporthealthephemeralweightclusterenabledmetadata等属性。继续进入registerInstance注册实例的方法。

InstanceOperatorClientImpl.registerInstance

@org.springframework.stereotype.Service
public class InstanceOperatorClientImpl implements InstanceOperator {

    private final ClientManager clientManager;
    
    private final ClientOperationService clientOperationService;
    
    private final ServiceStorage serviceStorage;
    
    private final NamingMetadataOperateService metadataOperateService;
    
    private final NamingMetadataManager metadataManager;
    
    private final SwitchDomain switchDomain;
    
    private final UdpPushService pushService;
    public InstanceOperatorClientImpl(ClientManagerDelegate clientManager,
            ClientOperationServiceProxy clientOperationService, ServiceStorage serviceStorage,
            NamingMetadataOperateService metadataOperateService, NamingMetadataManager metadataManager,
            SwitchDomain switchDomain, UdpPushService pushService) {
        this.clientManager = clientManager;
        this.clientOperationService = clientOperationService;
        this.serviceStorage = serviceStorage;
        this.metadataOperateService = metadataOperateService;
        this.metadataManager = metadataManager;
        this.switchDomain = switchDomain;
        this.pushService = pushService;
    }
        
    public void registerInstance(String namespaceId, String serviceName, Instance instance) {
        boolean ephemeral = instance.isEphemeral();
        String clientId = IpPortBasedClient.getClientId(instance.toInetAddr(), ephemeral);
        createIpPortClientIfAbsent(clientId);
        Service service = getService(namespaceId, serviceName, ephemeral);
        clientOperationService.registerInstance(service, instance, clientId);
    }
    public String toInetAddr() {
        return ip + ":" + port;
    }
    public static final String ID_DELIMITER = "#";
    public static String getClientId(String address, boolean ephemeral) {
        return address + ID_DELIMITER + ephemeral;
    }
    private void createIpPortClientIfAbsent(String clientId) {
        if (!clientManager.contains(clientId)) {
            clientManager.clientConnected(clientId, new ClientAttributes());
        }
    }
}

clientId的生成方式为ip:port#ephemeral,例:10.10.10.10:8848#true

InstanceOperatorClientImpl.createIpPortClientIfAbsent

createIpPortClientIfAbsent方法通过clientManager进行处理clientId关联信息。InstanceOperatorClientImpl在进行实例化的时候,ClientManager进行注入的实例化对象为ClientManagerDelegate
image
ClientManagerDelegate可以看做为一个代理的ClientManager

@Component("clientManager")
public class ClientManagerDelegate implements ClientManager {
    
    private final ConnectionBasedClientManager connectionBasedClientManager;
    
    private final EphemeralIpPortClientManager ephemeralIpPortClientManager;
    
    private final PersistentIpPortClientManager persistentIpPortClientManager;
    
    private static final String SUFFIX = "false";
    
    public ClientManagerDelegate(ConnectionBasedClientManager connectionBasedClientManager,
            EphemeralIpPortClientManager ephemeralIpPortClientManager,
            PersistentIpPortClientManager persistentIpPortClientManager) {
        this.connectionBasedClientManager = connectionBasedClientManager;
        this.ephemeralIpPortClientManager = ephemeralIpPortClientManager;
        this.persistentIpPortClientManager = persistentIpPortClientManager;
    }
    public boolean contains(String clientId) {
        return connectionBasedClientManager.contains(clientId) || ephemeralIpPortClientManager.contains(clientId)
                || persistentIpPortClientManager.contains(clientId);
    }
    public boolean clientConnected(String clientId, ClientAttributes attributes) {
        return getClientManagerById(clientId).clientConnected(clientId, attributes);
    }
    private ClientManager getClientManagerById(String clientId) {
        if (isConnectionBasedClient(clientId)) {
            return connectionBasedClientManager;
        }
        return clientId.endsWith(SUFFIX) ? persistentIpPortClientManager : ephemeralIpPortClientManager;
    }
    private boolean isConnectionBasedClient(String clientId) {
        // ID_DELIMITER = "#";
        return !clientId.contains(IpPortBasedClient.ID_DELIMITER);
    }
}

当服务第一次注册的时候contains方法返回结果一定为false,接下来进入clientConnected方法,getClientManagerById首先判断clientId中是否包含#,如果不包含的话,返回connectionBasedClientManager。包含的话,再次判断是否以false结尾,如果以false结尾,返回persistentIpPortClientManager否则返回ephemeralIpPortClientManager。现在的clientIdstandalone模式下为ip:port#true。所以返回的为ephemeralIpPortClientManager,进入到EphemeralIpPortClientManager.clientConnected

@Component("ephemeralIpPortClientManager")
public class EphemeralIpPortClientManager implements ClientManager {
    
    private final ConcurrentMap<String, IpPortBasedClient> clients = new ConcurrentHashMap<>();
    
    private final DistroMapper distroMapper;
    
    private final ClientFactory<IpPortBasedClient> clientFactory;
    
    public EphemeralIpPortClientManager(DistroMapper distroMapper, SwitchDomain switchDomain) {
        this.distroMapper = distroMapper;
        GlobalExecutor.scheduleExpiredClientCleaner(new ExpiredClientCleaner(this, switchDomain), 0,
                Constants.DEFAULT_HEART_BEAT_INTERVAL, TimeUnit.MILLISECONDS);
        clientFactory = ClientFactoryHolder.getInstance().findClientFactory(ClientConstants.EPHEMERAL_IP_PORT);
    }

    public boolean clientConnected(String clientId, ClientAttributes attributes) {
        return clientConnected(clientFactory.newClient(clientId, attributes));
    }
    public boolean clientConnected(final Client client) {
        clients.computeIfAbsent(client.getClientId(), s -> {
            Loggers.SRV_LOG.info("Client connection {} connect", client.getClientId());
            IpPortBasedClient ipPortBasedClient = (IpPortBasedClient) client;
            ipPortBasedClient.init();
            return ipPortBasedClient;
        });
        return true;
    }
}

EphemeralIpPortClientManager中的clientConnected方法向clients的map中put了一个clientId为key,以IpPortBasedClient包装的clientvalue

后续会在服务清理的文章中,针对ipPortBasedClient.init()的方法做一个详细的讲解。

InstanceOperatorClientImpl.clientOperationService.registerInstance

image
ClientOperationService接口的实现类有ClientOperationServiceProxyEphemeralClientOperationServiceImplPersistentClientOperationServiceImpl。在InstanceOperatorClientImpl中指定ClientOperationServiceProxy为代理服务实现对象为clientOperationService

@Component
public class ClientOperationServiceProxy implements ClientOperationService {
    
    private final ClientOperationService ephemeralClientOperationService;
    
    private final ClientOperationService persistentClientOperationService;
    
    public ClientOperationServiceProxy(EphemeralClientOperationServiceImpl ephemeralClientOperationService,
            PersistentClientOperationServiceImpl persistentClientOperationService) {
        this.ephemeralClientOperationService = ephemeralClientOperationService;
        this.persistentClientOperationService = persistentClientOperationService;
    }
    public void registerInstance(Service service, Instance instance, String clientId) {
        final ClientOperationService operationService = chooseClientOperationService(instance);
        operationService.registerInstance(service, instance, clientId);
    }
    private ClientOperationService chooseClientOperationService(final Instance instance) {
        return instance.isEphemeral() ? ephemeralClientOperationService : persistentClientOperationService;
    }
}

instance对象中的ephemeralstandalone模式下为true所以返回ephemeralClientOperationService来调用registerInstance

@Component("ephemeralClientOperationService")
public class EphemeralClientOperationServiceImpl implements ClientOperationService {
    
    private final ClientManager clientManager;
    
    public EphemeralClientOperationServiceImpl(ClientManagerDelegate clientManager) {
        this.clientManager = clientManager;
    }
    public void registerInstance(Service service, Instance instance, String clientId) {
        Service singleton = ServiceManager.getInstance().getSingleton(service);
        Client client = clientManager.getClient(clientId);
        if (!clientIsLegal(client, clientId)) {
            return;
        }
        InstancePublishInfo instanceInfo = getPublishInfo(instance);
        client.addServiceInstance(singleton, instanceInfo);
        client.setLastUpdatedTime();
        NotifyCenter.publishEvent(new ClientOperationEvent.ClientRegisterServiceEvent(singleton, clientId));
        NotifyCenter
                .publishEvent(new MetadataEvent.InstanceMetadataEvent(singleton, instanceInfo.getMetadataId(), false));
    }
    
    default InstancePublishInfo getPublishInfo(Instance instance) {
        InstancePublishInfo result = new InstancePublishInfo(instance.getIp(), instance.getPort());
        if (null != instance.getMetadata() && !instance.getMetadata().isEmpty()) {
            result.getExtendDatum().putAll(instance.getMetadata());
        }
        if (StringUtils.isNotEmpty(instance.getInstanceId())) {
            result.getExtendDatum().put(Constants.CUSTOM_INSTANCE_ID, instance.getInstanceId());
        }
        if (Constants.DEFAULT_INSTANCE_WEIGHT != instance.getWeight()) {
            result.getExtendDatum().put(Constants.PUBLISH_INSTANCE_WEIGHT, instance.getWeight());
        }
        if (!instance.isEnabled()) {
            result.getExtendDatum().put(Constants.PUBLISH_INSTANCE_ENABLE, instance.isEnabled());
        }
        String clusterName = StringUtils.isBlank(instance.getClusterName()) ? UtilsAndCommons.DEFAULT_CLUSTER_NAME
                : instance.getClusterName();
        result.setHealthy(instance.isHealthy());
        result.setCluster(clusterName);
        return result;
    }
}

先从ServiceManager获取service信息,ClientManager获取Client信息,再通过Instance换取InstancePublishInfo,再构建Client详细信息。在addServiceInstance中,发起了一个ClientEvent.ClientChangedEvent事件。registerInstance方法又发送了ClientOperationEvent.ClientRegisterServiceEventMetadataEvent.InstanceMetadataEvent这两个事件。
image

ServiceManager.getSingleton

public class ServiceManager {
    
    private static final ServiceManager INSTANCE = new ServiceManager();
    
    private final ConcurrentHashMap<Service, Service> singletonRepository;
    
    private final ConcurrentHashMap<String, Set<Service>> namespaceSingletonMaps;
    
    private ServiceManager() {
        singletonRepository = new ConcurrentHashMap<>(1 << 10);
        namespaceSingletonMaps = new ConcurrentHashMap<>(1 << 2);
    }
    
    public static ServiceManager getInstance() {
        return INSTANCE;
    }
    
    public Service getSingleton(Service service) {
        singletonRepository.putIfAbsent(service, service);
        Service result = singletonRepository.get(service);
        namespaceSingletonMaps.computeIfAbsent(result.getNamespace(), (namespace) -> new ConcurrentHashSet<>());
        namespaceSingletonMaps.get(result.getNamespace()).add(result);
        return result;
    }
}

ServiceManager通过getInstance方法来获取单例对象。在getSingleton方法中,如果singletonRepository没有service则直接放入,获取出服务信息,如果namespaceSingletonMaps中没有service对应的namespace信息,则放入一个空的集合,再将namespace对应的service放入集合中。

client.addServiceInstance

ClientManager中取出的client类型为IpPortBasedClient

public class IpPortBasedClient extends AbstractClient {
    public boolean addServiceInstance(Service service, InstancePublishInfo instancePublishInfo) {
        return super.addServiceInstance(service, parseToHealthCheckInstance(instancePublishInfo));
    }
}

IpPortBasedClient继承了AbstractClient,在调用addServiceInstance时候,触发父类AbstractClient中的方法。
image

public abstract class AbstractClient implements Client {

    protected final ConcurrentHashMap<Service, InstancePublishInfo> publishers = new ConcurrentHashMap<>(16, 0.75f, 1);
    
    protected final ConcurrentHashMap<Service, Subscriber> subscribers = new ConcurrentHashMap<>(16, 0.75f, 1);
    
    protected volatile long lastUpdatedTime;
    
    public AbstractClient() {
        lastUpdatedTime = System.currentTimeMillis();
    }
    
    @Override
    public void setLastUpdatedTime() {
        this.lastUpdatedTime = System.currentTimeMillis();
    }
    
    @Override
    public long getLastUpdatedTime() {
        return lastUpdatedTime;
    }
    
    @Override
    public boolean addServiceInstance(Service service, InstancePublishInfo instancePublishInfo) {
        if (null == publishers.put(service, instancePublishInfo)) {
            MetricsMonitor.incrementInstanceCount();
        }
        NotifyCenter.publishEvent(new ClientEvent.ClientChangedEvent(this));
        Loggers.SRV_LOG.info("Client change for service {}, {}", service, getClientId());
        return true;
    }
}

拓展【SPI】EphemeralIpPortClientManager.clientFactory

public class ClientFactoryHolder {
    
    private static final ClientFactoryHolder INSTANCE = new ClientFactoryHolder();
    
    private final HashMap<String, ClientFactory> clientFactories;
    
    private ClientFactoryHolder() {
        clientFactories = new HashMap<>(4);
        Collection<ClientFactory> clientFactories = NacosServiceLoader.load(ClientFactory.class);
        for (ClientFactory each : clientFactories) {
            if (this.clientFactories.containsKey(each.getType())) {
                Loggers.SRV_LOG.warn("Client type {} found multiple factory, use {} default", each.getType(),
                        each.getClass().getCanonicalName());
            }
            this.clientFactories.put(each.getType(), each);
        }
    }
    
    public static ClientFactoryHolder getInstance() {
        return INSTANCE;
    }
    
    /**
     * Find target type {@link ClientFactory}.
     *
     * @param type target type
     * @return target type {@link ClientFactory}, if not fount, return 'default' client factory.
     */
    public ClientFactory findClientFactory(String type) {
        if (StringUtils.isEmpty(type) || !clientFactories.containsKey(type)) {
            return clientFactories.get(ClientConstants.DEFAULT_FACTORY);
        }
        return clientFactories.get(type);
    }
}

ClientFactoryHolder提供了ClientFactory的查找和实例化INSTANCE后,会通过NacosServiceLoader来进行加载ClientFactory

public class NacosServiceLoader {
    
    private static final Map<Class<?>, Collection<Class<?>>> SERVICES = new ConcurrentHashMap<Class<?>, Collection<Class<?>>>();
    
    /**
     * Load service.
     *
     * <p>Load service by SPI and cache the classes for reducing cost when load second time.
     *
     * @param service service class
     * @param <T> type of service
     * @return service instances
     */
    public static <T> Collection<T> load(final Class<T> service) {
        if (SERVICES.containsKey(service)) {
            return newServiceInstances(service);
        }
        Collection<T> result = new LinkedHashSet<T>();
        for (T each : ServiceLoader.load(service)) {
            result.add(each);
            cacheServiceClass(service, each);
        }
        return result;
    }
    
    private static <T> void cacheServiceClass(final Class<T> service, final T instance) {
        if (!SERVICES.containsKey(service)) {
            SERVICES.put(service, new LinkedHashSet<Class<?>>());
        }
        SERVICES.get(service).add(instance.getClass());
    }
    
    /**
     * New service instances.
     *
     * @param service service class
     * @param <T> type of service
     * @return service instances
     */
    public static <T> Collection<T> newServiceInstances(final Class<T> service) {
        return SERVICES.containsKey(service) ? newServiceInstancesFromCache(service) : Collections.<T>emptyList();
    }
    
    @SuppressWarnings("unchecked")
    private static <T> Collection<T> newServiceInstancesFromCache(Class<T> service) {
        Collection<T> result = new LinkedHashSet<T>();
        for (Class<?> each : SERVICES.get(service)) {
            result.add((T) newServiceInstance(each));
        }
        return result;
    }
    
    private static Object newServiceInstance(final Class<?> clazz) {
        try {
            return clazz.newInstance();
        } catch (IllegalAccessException | InstantiationException e) {
            throw new ServiceLoaderException(clazz, e);
        }
    }
}

NacosServiceLoaderload方法的本质就是通过ServiceLoader来进行加载也就是通过SPI的方式进行加载,在resources/META-INF/com.alibaba.nacos.naming.core.v2.client.factory.ClientFactory文件中可以看到加载的类信息。

com.alibaba.nacos.naming.core.v2.client.factory.impl.ConnectionBasedClientFactory
com.alibaba.nacos.naming.core.v2.client.factory.impl.EphemeralIpPortClientFactory
com.alibaba.nacos.naming.core.v2.client.factory.impl.PersistentIpPortClientFactory

EphemeralIpPortClientManager中的clientFactory.newClient(clientId, attributes)也就是通过EphemeralIpPortClientFactory来进行实现。

public IpPortBasedClient newClient(String clientId, ClientAttributes attributes) {
    return new IpPortBasedClient(clientId, true);
}
public IpPortBasedClient(String clientId, boolean ephemeral) {
    this.ephemeral = ephemeral;
    this.clientId = clientId;
    this.responsibleId = getResponsibleTagFromId();
}

private String getResponsibleTagFromId() {
    int index = clientId.indexOf(IpPortBasedClient.ID_DELIMITER);
    return clientId.substring(0, index);
}