Nacos配置中心——服务端启动

王守钰 2021-11-08 08:11:48

image
BaseRpcServer的子类有BaseGrpcServerGrpcClusterServerGrpcSdkServer三个子类。

GrpcSdkServer

@Service
public class GrpcSdkServer extends BaseGrpcServer {
    
    private static final int PORT_OFFSET = 1000;
    
    @Override
    public int rpcPortOffset() {
        return PORT_OFFSET;
    }
    
    @Override
    public ThreadPoolExecutor getRpcExecutor() {
        return GlobalExecutor.sdkRpcExecutor;
    }
}

GrpcSdkServer上被@Service注解修饰,GrpcSdkServer实例化后进行执行父类方法中的@PostConstruct修饰的start方法。

BaseRpcServer

public abstract class BaseRpcServer {
    
    @PostConstruct
    public void start() throws Exception {
        String serverName = getClass().getSimpleName();
        Loggers.REMOTE.info("Nacos {} Rpc server starting at port {}", serverName, getServicePort());
        
        startServer();
        
        Loggers.REMOTE.info("Nacos {} Rpc server started at port {}", serverName, getServicePort());
        Runtime.getRuntime().addShutdownHook(new Thread() {
            @Override
            public void run() {
                Loggers.REMOTE.info("Nacos {} Rpc server stopping", serverName);
                try {
                    BaseRpcServer.this.stopServer();
                    Loggers.REMOTE.info("Nacos {} Rpc server stopped successfully...", serverName);
                } catch (Exception e) {
                    Loggers.REMOTE.error("Nacos {} Rpc server stopped fail...", serverName, e);
                }
            }
        });

    }
    
    public abstract void startServer() throws Exception;
}

BaseRpcServer为基础的请求rpc服务,在实例化后start方法上加了@PostConstruct注解,所以会调用start方法。start方法在通过调用startServer方法来进行执行真正的服务启动。

BaseGrpcServer

public abstract class BaseGrpcServer extends BaseRpcServer {

    @Autowired
    private GrpcRequestAcceptor grpcCommonRequestAcceptor;
    
    @Autowired
    private GrpcBiStreamRequestAcceptor grpcBiStreamRequestAcceptor;
    
    @Autowired
    private ConnectionManager connectionManager;
    
    @Override
    public ConnectionType getConnectionType() {
        return ConnectionType.GRPC;
    }
    
    @Override
    public void startServer() throws Exception {
        final MutableHandlerRegistry handlerRegistry = new MutableHandlerRegistry();
        
        // server interceptor to set connection id.
        // 定义拦截器
        ServerInterceptor serverInterceptor = new ServerInterceptor() {
            @Override
            public <T, S> ServerCall.Listener<T> interceptCall(ServerCall<T, S> call, Metadata headers,
                    ServerCallHandler<T, S> next) {
                Context ctx = Context.current()
                        .withValue(CONTEXT_KEY_CONN_ID, call.getAttributes().get(TRANS_KEY_CONN_ID))
                        .withValue(CONTEXT_KEY_CONN_REMOTE_IP, call.getAttributes().get(TRANS_KEY_REMOTE_IP))
                        .withValue(CONTEXT_KEY_CONN_REMOTE_PORT, call.getAttributes().get(TRANS_KEY_REMOTE_PORT))
                        .withValue(CONTEXT_KEY_CONN_LOCAL_PORT, call.getAttributes().get(TRANS_KEY_LOCAL_PORT));
                if (REQUEST_BI_STREAM_SERVICE_NAME.equals(call.getMethodDescriptor().getServiceName())) {
                    Channel internalChannel = getInternalChannel(call);
                    ctx = ctx.withValue(CONTEXT_KEY_CHANNEL, internalChannel);
                }
                return Contexts.interceptCall(ctx, call, headers, next);
            }
        };
        // grpc中添加服务
        addServices(handlerRegistry, serverInterceptor);
        // 构造grpc服务端
        server = ServerBuilder.forPort(getServicePort()).executor(getRpcExecutor())
                .maxInboundMessageSize(getInboundMessageSize()).fallbackHandlerRegistry(handlerRegistry)
                .compressorRegistry(CompressorRegistry.getDefaultInstance())
                .decompressorRegistry(DecompressorRegistry.getDefaultInstance())
                .addTransportFilter(new ServerTransportFilter() {
                    @Override
                    public Attributes transportReady(Attributes transportAttrs) {
                        InetSocketAddress remoteAddress = (InetSocketAddress) transportAttrs
                                .get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR);
                        InetSocketAddress localAddress = (InetSocketAddress) transportAttrs
                                .get(Grpc.TRANSPORT_ATTR_LOCAL_ADDR);
                        int remotePort = remoteAddress.getPort();
                        int localPort = localAddress.getPort();
                        String remoteIp = remoteAddress.getAddress().getHostAddress();
                        Attributes attrWrapper = transportAttrs.toBuilder()
                                .set(TRANS_KEY_CONN_ID, System.currentTimeMillis() + "_" + remoteIp + "_" + remotePort)
                                .set(TRANS_KEY_REMOTE_IP, remoteIp).set(TRANS_KEY_REMOTE_PORT, remotePort)
                                .set(TRANS_KEY_LOCAL_PORT, localPort).build();
                        String connectionId = attrWrapper.get(TRANS_KEY_CONN_ID);
                        Loggers.REMOTE_DIGEST.info("Connection transportReady,connectionId = {} ", connectionId);
                        return attrWrapper;
                        
                    }
                    
                    @Override
                    public void transportTerminated(Attributes transportAttrs) {
                        String connectionId = null;
                        try {
                            connectionId = transportAttrs.get(TRANS_KEY_CONN_ID);
                        } catch (Exception e) {
                            // Ignore
                        }
                        if (StringUtils.isNotBlank(connectionId)) {
                            Loggers.REMOTE_DIGEST
                                    .info("Connection transportTerminated,connectionId = {} ", connectionId);
                            connectionManager.unregister(connectionId);
                        }
                    }
                }).build();
        // 启动grpc服务
        server.start();
    }
}

BaseGrpcServerBaseRpcServer的子类,实现了startServer方法,通过ServerInterceptor定义出拦截器;addServices将服务进行绑定;ServerBuilder构建出服务端信息,调用start方法进行启动。

BaseGrpcServer.addServices

private void addServices(MutableHandlerRegistry handlerRegistry, ServerInterceptor... serverInterceptor) {
    
    // unary common call register.
    final MethodDescriptor<Payload, Payload> unaryPayloadMethod = MethodDescriptor.<Payload, Payload>newBuilder()
            .setType(MethodDescriptor.MethodType.UNARY)
            .setFullMethodName(MethodDescriptor.generateFullMethodName(REQUEST_SERVICE_NAME, REQUEST_METHOD_NAME))
            .setRequestMarshaller(ProtoUtils.marshaller(Payload.getDefaultInstance()))
            .setResponseMarshaller(ProtoUtils.marshaller(Payload.getDefaultInstance())).build();
    
    final ServerCallHandler<Payload, Payload> payloadHandler = ServerCalls
            .asyncUnaryCall((request, responseObserver) -> {
                grpcCommonRequestAcceptor.request(request, responseObserver);
            });
    
    final ServerServiceDefinition serviceDefOfUnaryPayload = ServerServiceDefinition.builder(REQUEST_SERVICE_NAME)
            .addMethod(unaryPayloadMethod, payloadHandler).build();
    handlerRegistry.addService(ServerInterceptors.intercept(serviceDefOfUnaryPayload, serverInterceptor));
    
    // bi stream register.
    final ServerCallHandler<Payload, Payload> biStreamHandler = ServerCalls.asyncBidiStreamingCall(
            (responseObserver) -> grpcBiStreamRequestAcceptor.requestBiStream(responseObserver));
    
    final MethodDescriptor<Payload, Payload> biStreamMethod = MethodDescriptor.<Payload, Payload>newBuilder()
            .setType(MethodDescriptor.MethodType.BIDI_STREAMING).setFullMethodName(MethodDescriptor
                    .generateFullMethodName(REQUEST_BI_STREAM_SERVICE_NAME, REQUEST_BI_STREAM_METHOD_NAME))
            .setRequestMarshaller(ProtoUtils.marshaller(Payload.newBuilder().build()))
            .setResponseMarshaller(ProtoUtils.marshaller(Payload.getDefaultInstance())).build();
    
    final ServerServiceDefinition serviceDefOfBiStream = ServerServiceDefinition
            .builder(REQUEST_BI_STREAM_SERVICE_NAME).addMethod(biStreamMethod, biStreamHandler).build();
    handlerRegistry.addService(ServerInterceptors.intercept(serviceDefOfBiStream, serverInterceptor));
    
}

ServerCalls.asyncUnaryCall进行服务的调用,当客户端注册时,这时候就会收到调用grpcCommonRequestAcceptorrequest方法发起请求。ServerServiceDefinition进行构造服务。handlerRegistry将服务注册到grpc中。接下来在注册bi相关的服务。

GrpcRequestAcceptor

@Service
public class GrpcRequestAcceptor extends RequestGrpc.RequestImplBase {

    @Autowired
    RequestHandlerRegistry requestHandlerRegistry;
    
    @Autowired
    private ConnectionManager connectionManager;
    
    private void traceIfNecessary(Payload grpcRequest, boolean receive) {
        String clientIp = grpcRequest.getMetadata().getClientIp();
        String connectionId = CONTEXT_KEY_CONN_ID.get();
        try {
            if (connectionManager.traced(clientIp)) {
                Loggers.REMOTE_DIGEST.info("[{}]Payload {},meta={},body={}", connectionId, receive ? "receive" : "send",
                        grpcRequest.getMetadata().toByteString().toStringUtf8(),
                        grpcRequest.getBody().toByteString().toStringUtf8());
            }
        } catch (Throwable throwable) {
            Loggers.REMOTE_DIGEST.error("[{}]Monitor request error,payload={},error={}", connectionId, clientIp,
                    grpcRequest.toByteString().toStringUtf8());
        }
        
    }
    
    @Override
    public void request(Payload grpcRequest, StreamObserver<Payload> responseObserver) {
        
        traceIfNecessary(grpcRequest, true);
        String type = grpcRequest.getMetadata().getType();
        
        //server is on starting.
        // 服务是否启动,没启动的话返回错误信息,错误码300
        if (!ApplicationUtils.isStarted()) {
            Payload payloadResponse = GrpcUtils.convert(
                    buildErrorResponse(NacosException.INVALID_SERVER_STATUS, "Server is starting,please try later."));
            traceIfNecessary(payloadResponse, false);
            responseObserver.onNext(payloadResponse);
            
            responseObserver.onCompleted();
            return;
        }
        
        // server check.
        // 判断是否是服务校验请求
        if (ServerCheckRequest.class.getSimpleName().equals(type)) {
            Payload serverCheckResponseP = GrpcUtils.convert(new ServerCheckResponse(CONTEXT_KEY_CONN_ID.get()));
            traceIfNecessary(serverCheckResponseP, false);
            responseObserver.onNext(serverCheckResponseP);
            responseObserver.onCompleted();
            return;
        }
        
        // 获取执行器
        RequestHandler requestHandler = requestHandlerRegistry.getByRequestType(type);
        //no handler found.
        // 校验执行器是否存在,不存在返回错误,错误码302
        if (requestHandler == null) {
            Loggers.REMOTE_DIGEST.warn(String.format("[%s] No handler for request type : %s :", "grpc", type));
            Payload payloadResponse = GrpcUtils
                    .convert(buildErrorResponse(NacosException.NO_HANDLER, "RequestHandler Not Found"));
            traceIfNecessary(payloadResponse, false);
            responseObserver.onNext(payloadResponse);
            responseObserver.onCompleted();
            return;
        }
        
        //check connection status.
        // 校验连接状态,连接失败返回错误信息,错误码301
        String connectionId = CONTEXT_KEY_CONN_ID.get();
        boolean requestValid = connectionManager.checkValid(connectionId);
        if (!requestValid) {
            Loggers.REMOTE_DIGEST
                    .warn("[{}] Invalid connection Id ,connection [{}] is un registered ,", "grpc", connectionId);
            Payload payloadResponse = GrpcUtils
                    .convert(buildErrorResponse(NacosException.UN_REGISTER, "Connection is unregistered."));
            traceIfNecessary(payloadResponse, false);
            responseObserver.onNext(payloadResponse);
            responseObserver.onCompleted();
            return;
        }
        
        // 转换grpc的请求体,转换错误返回,错误码502
        Object parseObj = null;
        try {
            parseObj = GrpcUtils.parse(grpcRequest);
        } catch (Exception e) {
            Loggers.REMOTE_DIGEST
                    .warn("[{}] Invalid request receive from connection [{}] ,error={}", "grpc", connectionId, e);
            Payload payloadResponse = GrpcUtils.convert(buildErrorResponse(NacosException.BAD_GATEWAY, e.getMessage()));
            traceIfNecessary(payloadResponse, false);
            responseObserver.onNext(payloadResponse);
            responseObserver.onCompleted();
            return;
        }
        
        // 判断请求体是否为空,为空返回错误,错误码502
        if (parseObj == null) {
            Loggers.REMOTE_DIGEST.warn("[{}] Invalid request receive  ,parse request is null", connectionId);
            Payload payloadResponse = GrpcUtils
                    .convert(buildErrorResponse(NacosException.BAD_GATEWAY, "Invalid request"));
            traceIfNecessary(payloadResponse, false);
            responseObserver.onNext(payloadResponse);
            responseObserver.onCompleted();
        }
        
        // 判断请求体是否是Request的子类,不是返回错误,错误码502
        if (!(parseObj instanceof Request)) {
            Loggers.REMOTE_DIGEST
                    .warn("[{}] Invalid request receive  ,parsed payload is not a request,parseObj={}", connectionId,
                            parseObj);
            Payload payloadResponse = GrpcUtils
                    .convert(buildErrorResponse(NacosException.BAD_GATEWAY, "Invalid request"));
            traceIfNecessary(payloadResponse, false);
            responseObserver.onNext(payloadResponse);
            responseObserver.onCompleted();
            return;
        }
        
        // 执行请求
        Request request = (Request) parseObj;
        try {
            Connection connection = connectionManager.getConnection(CONTEXT_KEY_CONN_ID.get());
            RequestMeta requestMeta = new RequestMeta();
            requestMeta.setClientIp(connection.getMetaInfo().getClientIp());
            requestMeta.setConnectionId(CONTEXT_KEY_CONN_ID.get());
            requestMeta.setClientVersion(connection.getMetaInfo().getVersion());
            requestMeta.setLabels(connection.getMetaInfo().getLabels());
            connectionManager.refreshActiveTime(requestMeta.getConnectionId());
            // 通过执行器调用,返回Response结果
            Response response = requestHandler.handleRequest(request, requestMeta);
            Payload payloadResponse = GrpcUtils.convert(response);
            traceIfNecessary(payloadResponse, false);
            responseObserver.onNext(payloadResponse);
            responseObserver.onCompleted();
        } catch (Throwable e) {
            // 异常返回错误,错误码500
            Loggers.REMOTE_DIGEST
                    .error("[{}] Fail to handle request from connection [{}] ,error message :{}", "grpc", connectionId,
                            e);
            Payload payloadResponse = GrpcUtils.convert(buildErrorResponse(
                    (e instanceof NacosException) ? ((NacosException) e).getErrCode() : ResponseCode.FAIL.getCode(),
                    e.getMessage()));
            traceIfNecessary(payloadResponse, false);
            responseObserver.onNext(payloadResponse);
            responseObserver.onCompleted();
        }
        
    }
}

GrpcRequestAcceptor类中最主要的方法就是request,request方法先去校验服务是否启动;再判断是否为服务校验请求;通过requestHandlerRegistry来进行获取服务执行器并校验;校验连接状态;转换Request请求信息并校验;检验后进行通过requestHandler发起执行并响应结果,如果错误返回错误信息。

RequestHandlerRegistry

@Service
public class RequestHandlerRegistry implements ApplicationListener<ContextRefreshedEvent> {

    Map<String, RequestHandler> registryHandlers = new HashMap<String, RequestHandler>();
    
    @Autowired
    private TpsMonitorManager tpsMonitorManager;
    
    /**
     * Get Request Handler By request Type.
     *
     * @param requestType see definitions  of sub constants classes of RequestTypeConstants
     * @return request handler.
     */
    public RequestHandler getByRequestType(String requestType) {
        return registryHandlers.get(requestType);
    }
    
    @Override
    public void onApplicationEvent(ContextRefreshedEvent event) {
        // 查询RequestHandler实例列表
        Map<String, RequestHandler> beansOfType = event.getApplicationContext().getBeansOfType(RequestHandler.class);
        Collection<RequestHandler> values = beansOfType.values();
        for (RequestHandler requestHandler : values) {
            
            Class<?> clazz = requestHandler.getClass();
            boolean skip = false;
            while (!clazz.getSuperclass().equals(RequestHandler.class)) {
                if (clazz.getSuperclass().equals(Object.class)) {
                    skip = true;
                    break;
                }
                clazz = clazz.getSuperclass();
            }
            if (skip) {
                continue;
            }
            
            try {
                // 获取handle方法
                Method method = clazz.getMethod("handle", Request.class, RequestMeta.class);
                // 注册TPS流量控制
                if (method.isAnnotationPresent(TpsControl.class) && TpsControlConfig.isTpsControlEnabled()) {
                    TpsControl tpsControl = method.getAnnotation(TpsControl.class);
                    String pointName = tpsControl.pointName();
                    TpsMonitorPoint tpsMonitorPoint = new TpsMonitorPoint(pointName);
                    tpsMonitorManager.registerTpsControlPoint(tpsMonitorPoint);
                }
            } catch (Exception e) {
                //ignore.
            }
            // 添加执行器
            // 获取父类,获取泛型第一个参数
            Class tClass = (Class) ((ParameterizedType)
            clazz.getGenericSuperclass()).getActualTypeArguments()[0];
            registryHandlers.putIfAbsent(tClass.getSimpleName(), requestHandler);
        }
    }
}

RequestHandlerRegistry实现了ApplicationListener监听器,并且添加了@Service注解,当容器启动时触发ContextRefreshedEvent事件。监听器处理RequestHandler执行器子类,将执行器添加到registryHandlers中。registryHandlers中数据信息。

ConfigChangeClusterSyncRequest=com.alibaba.nacos.config.server.remote.ConfigChangeClusterSyncRequestHandler

ConfigPublishRequest=com.alibaba.nacos.config.server.remote.ConfigPublishRequestHandler

ConfigBatchListenRequest=com.alibaba.nacos.config.server.remote.ConfigChangeBatchListenRequestHandle

ServiceListRequest=com.alibaba.nacos.naming.remote.rpc.handler.ServiceListRequestHandler

ConfigQueryRequest=com.alibaba.nacos.config.server.remote.ConfigQueryRequestHandler

DistroDataRequest=com.alibaba.nacos.naming.remote.rpc.handler.DistroDataRequestHandler

SubscribeServiceRequest=com.alibaba.nacos.naming.remote.rpc.handler.SubscribeServiceRequestHandler

ConfigRemoveRequest=com.alibaba.nacos.config.server.remote.ConfigRemoveRequestHandler

ServiceQueryRequest=com.alibaba.nacos.naming.remote.rpc.handler.ServiceQueryRequestHandler

HealthCheckRequest=com.alibaba.nacos.core.remote.HealthCheckRequestHandler

ServerLoaderInfoRequest=com.alibaba.nacos.core.remote.core.ServerLoaderInfoRequestHandler

ServerReloadRequest=com.alibaba.nacos.core.remote.core.ServerReloaderRequestHandler

InstanceRequest=com.alibaba.nacos.naming.remote.rpc.handler.InstanceRequestHandler

image
image