Nacos配置中心——Client读取配置文件

王守钰 2021-11-11 15:11:14

GAV坐标

<dependency>
    <groupId>com.alibaba.nacos</groupId>
    <artifactId>nacos-client</artifactId>
    <version>${version}</version>
</dependency>

请求示例

import java.util.Properties;
import java.util.concurrent.Executor;
import com.alibaba.nacos.api.NacosFactory;
import com.alibaba.nacos.api.config.ConfigService;
import com.alibaba.nacos.api.config.listener.Listener;
import com.alibaba.nacos.api.exception.NacosException;

/**
 * Config service example
 *
 * @author Nacos
 *
 */
public class ConfigExample {

	public static void main(String[] args) throws NacosException, InterruptedException {
		String serverAddr = "localhost";
		String dataId = "my_test";
		String group = "DEFAULT_GROUP";
		Properties properties = new Properties();
		properties.put(PropertyKeyConst.SERVER_ADDR, serverAddr);
		ConfigService configService = NacosFactory.createConfigService(properties);
		String content = configService.getConfig(dataId, group, 5000);
		System.out.println(content);
	}
}

在官方给提供的示例代码中,进行读取配置文件获取配置信息。通过NacosConfigService中的getConfig方法获取配置。

NacosConfigService

public class NacosConfigService implements ConfigService {

    public String getConfig(String dataId, String group, long timeoutMs) throws NacosException {
        return getConfigInner(namespace, dataId, group, timeoutMs);
    }
    
    private String getConfigInner(String tenant, String dataId, String group, long timeoutMs) throws NacosException {
        group = blank2defaultGroup(group);
        ParamUtils.checkKeyParam(dataId, group);
        ConfigResponse cr = new ConfigResponse();
        
        cr.setDataId(dataId);
        cr.setTenant(tenant);
        cr.setGroup(group);
        
        // use local config first
        String content = LocalConfigInfoProcessor.getFailover(worker.getAgentName(), dataId, group, tenant);
        if (content != null) {
            LOGGER.warn("[{}] [get-config] get failover ok, dataId={}, group={}, tenant={}, config={}",
                    worker.getAgentName(), dataId, group, tenant, ContentUtils.truncateContent(content));
            cr.setContent(content);
            String encryptedDataKey = LocalEncryptedDataKeyProcessor
                    .getEncryptDataKeyFailover(agent.getName(), dataId, group, tenant);
            cr.setEncryptedDataKey(encryptedDataKey);
            configFilterChainManager.doFilter(null, cr);
            content = cr.getContent();
            return content;
        }
        
        try {
            ConfigResponse response = worker.getServerConfig(dataId, group, tenant, timeoutMs, false);
            cr.setContent(response.getContent());
            cr.setEncryptedDataKey(response.getEncryptedDataKey());
            configFilterChainManager.doFilter(null, cr);
            content = cr.getContent();
            
            return content;
        } catch (NacosException ioe) {
            if (NacosException.NO_RIGHT == ioe.getErrCode()) {
                throw ioe;
            }
            LOGGER.warn("[{}] [get-config] get from server error, dataId={}, group={}, tenant={}, msg={}",
                    worker.getAgentName(), dataId, group, tenant, ioe.toString());
        }
        
        LOGGER.warn("[{}] [get-config] get snapshot ok, dataId={}, group={}, tenant={}, config={}",
                worker.getAgentName(), dataId, group, tenant, ContentUtils.truncateContent(content));
        content = LocalConfigInfoProcessor.getSnapshot(worker.getAgentName(), dataId, group, tenant);
        cr.setContent(content);
        String encryptedDataKey = LocalEncryptedDataKeyProcessor
                .getEncryptDataKeyFailover(agent.getName(), dataId, group, tenant);
        cr.setEncryptedDataKey(encryptedDataKey);
        configFilterChainManager.doFilter(null, cr);
        content = cr.getContent();
        return content;
    }
}

getConfig方法调用getConfigInner方法,首先先通过LocalConfigInfoProcessorgetFailover方法获取本地配置文件。如果本地的配置文件不为空的话返回本地的内容。当本地的配置文件为空的情况下回通过worker来进行调用getServerConfig方法进行获取。

ClientWorker.getServerConfig

public ConfigResponse getServerConfig(String dataId, String group, String tenant, long readTimeout, boolean notify)
        throws NacosException {
    if (StringUtils.isBlank(group)) {
        group = Constants.DEFAULT_GROUP;
    }
    return this.agent.queryConfig(dataId, group, tenant, readTimeout, notify);
}

此时的agent也就是ConfigRpcTransportClient,通过agent调用queryConfig方法获取配置信息。

ConfigRpcTransportClient.queryConfig

public class ConfigRpcTransportClient extends ConfigTransportClient {
    
    public ConfigResponse queryConfig(String dataId, String group, String tenant, long readTimeouts, boolean notify)
            throws NacosException {
        ConfigQueryRequest request = ConfigQueryRequest.build(dataId, group, tenant);
        request.putHeader(NOTIFY_HEADER, String.valueOf(notify));
        RpcClient rpcClient = getOneRunningClient();
        if (notify) {
            CacheData cacheData = cacheMap.get().get(GroupKey.getKeyTenant(dataId, group, tenant));
            if (cacheData != null) {
                rpcClient = ensureRpcClient(String.valueOf(cacheData.getTaskId()));
            }
        }
        ConfigQueryResponse response = (ConfigQueryResponse) requestProxy(rpcClient, request, readTimeouts);
        
        ConfigResponse configResponse = new ConfigResponse();
        if (response.isSuccess()) {
            LocalConfigInfoProcessor.saveSnapshot(this.getName(), dataId, group, tenant, response.getContent());
            configResponse.setContent(response.getContent());
            String configType;
            if (StringUtils.isNotBlank(response.getContentType())) {
                configType = response.getContentType();
            } else {
                configType = ConfigType.TEXT.getType();
            }
            configResponse.setConfigType(configType);
            String encryptedDataKey = response.getEncryptedDataKey();
            LocalEncryptedDataKeyProcessor
                    .saveEncryptDataKeySnapshot(agent.getName(), dataId, group, tenant, encryptedDataKey);
            configResponse.setEncryptedDataKey(encryptedDataKey);
            return configResponse;
        } else if (response.getErrorCode() == ConfigQueryResponse.CONFIG_NOT_FOUND) {
            LocalConfigInfoProcessor.saveSnapshot(this.getName(), dataId, group, tenant, null);
            LocalEncryptedDataKeyProcessor.saveEncryptDataKeySnapshot(agent.getName(), dataId, group, tenant, null);
            return configResponse;
        } else if (response.getErrorCode() == ConfigQueryResponse.CONFIG_QUERY_CONFLICT) {
            LOGGER.error(
                    "[{}] [sub-server-error] get server config being modified concurrently, dataId={}, group={}, "
                            + "tenant={}", this.getName(), dataId, group, tenant);
            throw new NacosException(NacosException.CONFLICT,
                    "data being modified, dataId=" + dataId + ",group=" + group + ",tenant=" + tenant);
        } else {
            LOGGER.error("[{}] [sub-server-error]  dataId={}, group={}, tenant={}, code={}", this.getName(), dataId,
                    group, tenant, response);
            throw new NacosException(response.getErrorCode(),
                    "http error, code=" + response.getErrorCode() + ",dataId=" + dataId + ",group=" + group
                            + ",tenant=" + tenant);
            
        }
    }
    
    private Response requestProxy(RpcClient rpcClientInner, Request request, long timeoutMills)
            throws NacosException {
        try {
            request.putAllHeader(super.getSecurityHeaders());
            request.putAllHeader(super.getSpasHeaders());
            request.putAllHeader(super.getCommonHeader());
        } catch (Exception e) {
            throw new NacosException(NacosException.CLIENT_INVALID_PARAM, e);
        }
        
        Map<String, String> signHeaders = SpasAdapter.getSignHeaders(resourceBuild(request), secretKey);
        if (signHeaders != null && !signHeaders.isEmpty()) {
            request.putAllHeader(signHeaders);
        }
        JsonObject asJsonObjectTemp = new Gson().toJsonTree(request).getAsJsonObject();
        asJsonObjectTemp.remove("headers");
        asJsonObjectTemp.remove("requestId");
        boolean limit = Limiter.isLimit(request.getClass() + asJsonObjectTemp.toString());
        if (limit) {
            throw new NacosException(NacosException.CLIENT_OVER_THRESHOLD,
                    "More than client-side current limit threshold");
        }
        return rpcClientInner.request(request, timeoutMills);
    }
}

ConfigRpcTransportClient中的queryConfig方法进行获取一个rpcClient再去调用requestProxy方法执行请求。rpcClient最终会调用request方法来进行执行请求。这里面的request对象信息是ConfigQueryRequest

RpcClient.request

public abstract class RpcClient implements Closeable {
    
    public Response request(Request request, long timeoutMills) throws NacosException {
        int retryTimes = 0;
        Response response = null;
        Exception exceptionThrow = null;
        long start = System.currentTimeMillis();
        while (retryTimes < RETRY_TIMES && System.currentTimeMillis() < timeoutMills + start) {
            boolean waitReconnect = false;
            try {
                if (this.currentConnection == null || !isRunning()) {
                    waitReconnect = true;
                    throw new NacosException(NacosException.CLIENT_DISCONNECT,
                            "Client not connected,current status:" + rpcClientStatus.get());
                }
                response = this.currentConnection.request(request, timeoutMills);
                if (response == null) {
                    throw new NacosException(SERVER_ERROR, "Unknown Exception.");
                }
                if (response instanceof ErrorResponse) {
                    if (response.getErrorCode() == NacosException.UN_REGISTER) {
                        synchronized (this) {
                            waitReconnect = true;
                            if (rpcClientStatus.compareAndSet(RpcClientStatus.RUNNING, RpcClientStatus.UNHEALTHY)) {
                                LoggerUtils.printIfErrorEnabled(LOGGER,
                                        "Connection is unregistered, switch server,connectionId={},request={}",
                                        currentConnection.getConnectionId(), request.getClass().getSimpleName());
                                switchServerAsync();
                            }
                        }
                        
                    }
                    throw new NacosException(response.getErrorCode(), response.getMessage());
                }
                // return response.
                lastActiveTimeStamp = System.currentTimeMillis();
                return response;
                
            } catch (Exception e) {
                if (waitReconnect) {
                    try {
                        //wait client to re connect.
                        Thread.sleep(Math.min(100, timeoutMills / 3));
                    } catch (Exception exception) {
                        //Do nothing.
                    }
                }
                
                LoggerUtils.printIfErrorEnabled(LOGGER, "Send request fail, request={}, retryTimes={},errorMessage={}",
                        request, retryTimes, e.getMessage());
                
                exceptionThrow = e;
                
            }
            retryTimes++;
            
        }
        
        if (rpcClientStatus.compareAndSet(RpcClientStatus.RUNNING, RpcClientStatus.UNHEALTHY)) {
            switchServerAsyncOnRequestFail();
        }
        
        if (exceptionThrow != null) {
            throw (exceptionThrow instanceof NacosException) ? (NacosException) exceptionThrow
                    : new NacosException(SERVER_ERROR, exceptionThrow);
        } else {
            throw new NacosException(SERVER_ERROR, "Request fail, unknown Error");
        }
    }
}

request方法进行处理时间逻辑的判断,在获取connection信息发起request请求。这里的connectionGrpcConnection

GrpcConnection.request

public class GrpcConnection extends Connection {

    public Response request(Request request, long timeouts) throws NacosException {
        Payload grpcRequest = GrpcUtils.convert(request);
        ListenableFuture<Payload> requestFuture = grpcFutureServiceStub.request(grpcRequest);
        Payload grpcResponse;
        try {
            grpcResponse = requestFuture.get(timeouts, TimeUnit.MILLISECONDS);
        } catch (Exception e) {
            throw new NacosException(NacosException.SERVER_ERROR, e);
        }
    
        return (Response) GrpcUtils.parse(grpcResponse);
    }
}

GrpcConnection中的request方法进行请求的时由grpcFutureServiceStub发出,这时也就是真正发出grpc请求来进行获取数据。