Nacos配置中心——DataSource

王守钰 2021-11-02 17:11:04

NamespaceController

@RestController
@RequestMapping("/v1/console/namespaces")
public class NamespaceController {

    @Autowired
    private PersistService persistService;
    
}

NamespaceController实例化时进行注入了PersistService

PersistService

PersistService的实现类有EmbeddedStoragePersistServiceImplExternalStoragePersistServiceImpl
image

@Conditional(value = ConditionOnExternalStorage.class)
@Component
public class ExternalStoragePersistServiceImpl implements PersistService {
}

ExternalStoragePersistServiceImpl的类在注入的时候加入了限定的条件ConditionOnExternalStorage

public class ConditionOnExternalStorage implements Condition {
    
    @Override
    public boolean matches(ConditionContext context, AnnotatedTypeMetadata metadata) {
        return !PropertyUtil.isEmbeddedStorage();
    }
    
}

matches方法中最终通过PropertyUtilisEmbeddedStorage来进行取反判断。

@Conditional(value = ConditionOnEmbeddedStorage.class)
@Component
public class EmbeddedStoragePersistServiceImpl implements PersistService {
}

ConditionOnEmbeddedStorage的类在注入的时候加入了限定的条件ConditionOnEmbeddedStorage

public class ConditionOnEmbeddedStorage implements Condition {
    
    @Override
    public boolean matches(ConditionContext context, AnnotatedTypeMetadata metadata) {
        return PropertyUtil.isEmbeddedStorage();
    }
}

这里面同EmbeddedStoragePersistServiceImpl的写法相同只是取反进行处理了数据。

PropertyUtil

public class PropertyUtil implements ApplicationContextInitializer<ConfigurableApplicationContext> {

    private static final Logger LOGGER = LogUtil.DEFAULT_LOG;
    
    private static int notifyConnectTimeout = 100;
    
    private static int notifySocketTimeout = 200;
    
    private static int maxHealthCheckFailCount = 12;
    
    private static boolean isHealthCheck = true;
    
    private static int maxContent = 10 * 1024 * 1024;
    
    /**
     * Whether to enable capacity management.
     */
    private static boolean isManageCapacity = true;
    
    /**
     * Whether to enable the limit check function of capacity management, including the upper limit of configuration
     * number, configuration content size limit, etc.
     */
    private static boolean isCapacityLimitCheck = false;
    
    /**
     * The default cluster capacity limit.
     */
    private static int defaultClusterQuota = 100000;
    
    /**
     * the default capacity limit per Group.
     */
    private static int defaultGroupQuota = 200;
    
    /**
     * The default capacity limit per Tenant.
     */
    private static int defaultTenantQuota = 200;
    
    /**
     * The maximum size of the content in the configuration of a single, unit for bytes.
     */
    private static int defaultMaxSize = 100 * 1024;
    
    /**
     * The default Maximum number of aggregated data.
     */
    private static int defaultMaxAggrCount = 10000;
    
    /**
     * The maximum size of content in a single subconfiguration of aggregated data.
     */
    private static int defaultMaxAggrSize = 1024;
    
    /**
     * Initialize the expansion percentage of capacity has reached the limit.
     */
    private static int initialExpansionPercent = 100;
    
    /**
     * Fixed capacity information table usage (usage) time interval, the unit is in seconds.
     */
    private static int correctUsageDelay = 10 * 60;
    
    /**
     * Standalone mode uses DB.
     */
    private static boolean useExternalDB = false;
    
    /**
     * Inline storage value = ${nacos.standalone}.
     */
    private static boolean embeddedStorage = EnvUtil.getStandaloneMode();
    
    private void loadSetting() {
        try {
            setNotifyConnectTimeout(Integer.parseInt(EnvUtil.getProperty(PropertiesConstant.NOTIFY_CONNECT_TIMEOUT,
                    String.valueOf(notifyConnectTimeout))));
            LOGGER.info("notifyConnectTimeout:{}", notifyConnectTimeout);
            setNotifySocketTimeout(Integer.parseInt(EnvUtil.getProperty(PropertiesConstant.NOTIFY_SOCKET_TIMEOUT,
                    String.valueOf(notifySocketTimeout))));
            LOGGER.info("notifySocketTimeout:{}", notifySocketTimeout);
            setHealthCheck(Boolean.parseBoolean(
                    EnvUtil.getProperty(PropertiesConstant.IS_HEALTH_CHECK, String.valueOf(isHealthCheck))));
            LOGGER.info("isHealthCheck:{}", isHealthCheck);
            setMaxHealthCheckFailCount(Integer.parseInt(
                    EnvUtil.getProperty(PropertiesConstant.MAX_HEALTH_CHECK_FAIL_COUNT,
                            String.valueOf(maxHealthCheckFailCount))));
            LOGGER.info("maxHealthCheckFailCount:{}", maxHealthCheckFailCount);
            setMaxContent(
                    Integer.parseInt(EnvUtil.getProperty(PropertiesConstant.MAX_CONTENT, String.valueOf(maxContent))));
            LOGGER.info("maxContent:{}", maxContent);
            // capacity management
            setManageCapacity(getBoolean(PropertiesConstant.IS_MANAGE_CAPACITY, isManageCapacity));
            setCapacityLimitCheck(getBoolean(PropertiesConstant.IS_CAPACITY_LIMIT_CHECK, isCapacityLimitCheck));
            setDefaultClusterQuota(getInt(PropertiesConstant.DEFAULT_CLUSTER_QUOTA, defaultClusterQuota));
            setDefaultGroupQuota(getInt(PropertiesConstant.DEFAULT_GROUP_QUOTA, defaultGroupQuota));
            setDefaultTenantQuota(getInt(PropertiesConstant.DEFAULT_TENANT_QUOTA, defaultTenantQuota));
            setDefaultMaxSize(getInt(PropertiesConstant.DEFAULT_MAX_SIZE, defaultMaxSize));
            setDefaultMaxAggrCount(getInt(PropertiesConstant.DEFAULT_MAX_AGGR_COUNT, defaultMaxAggrCount));
            setDefaultMaxAggrSize(getInt(PropertiesConstant.DEFAULT_MAX_AGGR_SIZE, defaultMaxAggrSize));
            setCorrectUsageDelay(getInt(PropertiesConstant.CORRECT_USAGE_DELAY, correctUsageDelay));
            setInitialExpansionPercent(getInt(PropertiesConstant.INITIAL_EXPANSION_PERCENT, initialExpansionPercent));
            // External data sources are used by default in cluster mode
            setUseExternalDB(PropertiesConstant.MYSQL
                    .equalsIgnoreCase(getString(PropertiesConstant.SPRING_DATASOURCE_PLATFORM, "")));
            
            // must initialize after setUseExternalDB
            // This value is true in stand-alone mode and false in cluster mode
            // If this value is set to true in cluster mode, nacos's distributed storage engine is turned on
            // default value is depend on ${nacos.standalone}
            
            if (isUseExternalDB()) {
                setEmbeddedStorage(false);
            } else {
                boolean embeddedStorage =
                        PropertyUtil.embeddedStorage || Boolean.getBoolean(PropertiesConstant.EMBEDDED_STORAGE);
                setEmbeddedStorage(embeddedStorage);
                
                // If the embedded data source storage is not turned on, it is automatically
                // upgraded to the external data source storage, as before
                if (!embeddedStorage) {
                    setUseExternalDB(true);
                }
            }
        } catch (Exception e) {
            LOGGER.error("read application.properties failed", e);
            throw e;
        }
    }
    
    @Override
    public void initialize(ConfigurableApplicationContext configurableApplicationContext) {
        loadSetting();
    }
    
    public static boolean isEmbeddedStorage() {
        return embeddedStorage;
    }
}

因为PropertyUtil实现了ApplicationContextInitializer,所以在spring容器实例化后进行调用initialize方法。initialize方法又进行调用了loadSetting。当spring.datasource.platform配置为mysql的情况下回调用setUseExternalDB设置useExternalDB属性为true。当isUseExternalDB方法返回true的时候,会调用setEmbeddedStorage设置embeddedStorage属性为false

ExternalStoragePersistServiceImpl

因为我们开发调试的模式nacos.standalonetrue,数据库使用的是mysql

spring.datasource.platform=mysql

## Count of DB:
db.num=1

## Connect URL of DB:
db.url.0=jdbc:mysql://127.0.0.1:3306/nacos?characterEncoding=utf8&connectTimeout=1000&socketTimeout=3000&autoReconnect=true&useUnicode=true&useSSL=false&serverTimezone=UTC
db.user.0=root
db.password.0=root

所以现在的PersistService的实现方式将通过ExternalStoragePersistServiceImpl来进行实现。

@Conditional(value = ConditionOnExternalStorage.class)
@Component
public class ExternalStoragePersistServiceImpl implements PersistService {

    private DataSourceService dataSourceService;
    
    private static final String SQL_FIND_ALL_CONFIG_INFO = "SELECT id,data_id,group_id,tenant_id,app_name,content,type,md5,gmt_create,gmt_modified,src_user,src_ip,c_desc,c_use,effect,c_schema FROM config_info";
    
    private static final String SQL_TENANT_INFO_COUNT_BY_TENANT_ID = "SELECT count(*) FROM tenant_info WHERE tenant_id = ?";
    
    private static final String SQL_FIND_CONFIG_INFO_BY_IDS = "SELECT ID,data_id,group_id,tenant_id,app_name,content,md5 FROM config_info WHERE ";
    
    private static final String SQL_DELETE_CONFIG_INFO_BY_IDS = "DELETE FROM config_info WHERE ";
    
    private static final String PATTERN_STR = "*";
    
    private static final int QUERY_LIMIT_SIZE = 50;
    
    protected JdbcTemplate jt;
    
    protected TransactionTemplate tjt;
    
    /**
     * constant variables.
     */
    public static final String SPOT = ".";
    
    /**
     * init datasource.
     */
    @PostConstruct
    public void init() {
        dataSourceService = DynamicDataSource.getInstance().getDataSource();
        
        jt = getJdbcTemplate();
        tjt = getTransactionTemplate();
    }
}

init方法上面加了@PostConstruct注解,当实例加载完后进行注册了datasource的配置。

@Component
public class DynamicDataSource {
    
    private DataSourceService localDataSourceService = null;
    
    private DataSourceService basicDataSourceService = null;
    
    private static final DynamicDataSource INSTANCE = new DynamicDataSource();
    
    public static DynamicDataSource getInstance() {
        return INSTANCE;
    }
    
    public synchronized DataSourceService getDataSource() {
        try {
            
            // Embedded storage is used by default in stand-alone mode
            // In cluster mode, external databases are used by default
            
            if (PropertyUtil.isEmbeddedStorage()) {
                if (localDataSourceService == null) {
                    localDataSourceService = new LocalDataSourceServiceImpl();
                    localDataSourceService.init();
                }
                return localDataSourceService;
            } else {
                if (basicDataSourceService == null) {
                    basicDataSourceService = new ExternalDataSourceServiceImpl();
                    basicDataSourceService.init();
                }
                return basicDataSourceService;
            }
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }
    
}

获取INSTANCE信息后调用getDataSource方法,embeddedStorage的配置为false,basicDataSourceService最终创建的为ExternalDataSourceServiceImpl,然后执行init方法。

public class ExternalDataSourceServiceImpl implements DataSourceService {
    
    /**
     * JDBC execute timeout value, unit:second.
     */
    private int queryTimeout = 3;
    
    private static final int TRANSACTION_QUERY_TIMEOUT = 5;
    
    private static final int DB_MASTER_SELECT_THRESHOLD = 1;

    private static final String DB_LOAD_ERROR_MSG = "[db-load-error]load jdbc.properties error";
    
    private List<HikariDataSource> dataSourceList = new ArrayList<>();
    
    private JdbcTemplate jt;
    
    private DataSourceTransactionManager tm;
    
    private TransactionTemplate tjt;
    
    private JdbcTemplate testMasterJT;
    
    private JdbcTemplate testMasterWritableJT;
    
    private volatile List<JdbcTemplate> testJtList;
    
    private volatile List<Boolean> isHealthList;
    
    private volatile int masterIndex;
    
    @Override
    public void init() {
        queryTimeout = ConvertUtils.toInt(System.getProperty("QUERYTIMEOUT"), 3);
        jt = new JdbcTemplate();
        // Set the maximum number of records to prevent memory expansion
        jt.setMaxRows(50000);
        jt.setQueryTimeout(queryTimeout);
        
        testMasterJT = new JdbcTemplate();
        testMasterJT.setQueryTimeout(queryTimeout);
        
        testMasterWritableJT = new JdbcTemplate();
        // Prevent the login interface from being too long because the main library is not available
        testMasterWritableJT.setQueryTimeout(1);
        
        //  Database health check
        
        testJtList = new ArrayList<JdbcTemplate>();
        isHealthList = new ArrayList<Boolean>();
        
        tm = new DataSourceTransactionManager();
        tjt = new TransactionTemplate(tm);
        
        // Transaction timeout needs to be distinguished from ordinary operations.
        tjt.setTimeout(TRANSACTION_QUERY_TIMEOUT);
        if (PropertyUtil.isUseExternalDB()) {
            try {
                reload();
            } catch (IOException e) {
                FATAL_LOG.error("[ExternalDataSourceService] dats source reload error", e);
                throw new RuntimeException(DB_LOAD_ERROR_MSG);
            }

            if (this.dataSourceList.size() > DB_MASTER_SELECT_THRESHOLD) {
                ConfigExecutor.scheduleConfigTask(new SelectMasterTask(), 10, 10, TimeUnit.SECONDS);
            }
            ConfigExecutor.scheduleConfigTask(new CheckDbHealthTask(), 10, 10, TimeUnit.SECONDS);
        }
    }
    
    @Override
    public synchronized void reload() throws IOException {
        try {
            dataSourceList = new ExternalDataSourceProperties()
                    .build(EnvUtil.getEnvironment(), (dataSource) -> {
                        JdbcTemplate jdbcTemplate = new JdbcTemplate();
                        jdbcTemplate.setQueryTimeout(queryTimeout);
                        jdbcTemplate.setDataSource(dataSource);
                        testJtList.add(jdbcTemplate);
                        isHealthList.add(Boolean.TRUE);
                    });
            new SelectMasterTask().run();
            new CheckDbHealthTask().run();
        } catch (RuntimeException e) {
            FATAL_LOG.error(DB_LOAD_ERROR_MSG, e);
            throw new IOException(e);
        }
    }

    
}

init方法中创建出JdbcTemplate,调用reload方法,通过ExternalDataSourcePropertiesbuild方法创建出多个连接池。

public class ExternalDataSourceProperties {
    
    private static final String JDBC_DRIVER_NAME = "com.mysql.cj.jdbc.Driver";
    
    private static final String TEST_QUERY = "SELECT 1";
    
    private Integer num;
    
    private List<String> url = new ArrayList<>();
    
    private List<String> user = new ArrayList<>();
    
    private List<String> password = new ArrayList<>();
    
    public void setNum(Integer num) {
        this.num = num;
    }
    
    public void setUrl(List<String> url) {
        this.url = url;
    }
    
    public void setUser(List<String> user) {
        this.user = user;
    }
    
    public void setPassword(List<String> password) {
        this.password = password;
    }
    
    /**
     * Build serveral HikariDataSource.
     *
     * @param environment {@link Environment}
     * @param callback    Callback function when constructing data source
     * @return List of {@link HikariDataSource}
     */
    List<HikariDataSource> build(Environment environment, Callback<HikariDataSource> callback) {
        List<HikariDataSource> dataSources = new ArrayList<>();
        Binder.get(environment).bind("db", Bindable.ofInstance(this));
        Preconditions.checkArgument(Objects.nonNull(num), "db.num is null");
        Preconditions.checkArgument(CollectionUtils.isNotEmpty(user), "db.user or db.user.[index] is null");
        Preconditions.checkArgument(CollectionUtils.isNotEmpty(password), "db.password or db.password.[index] is null");
        for (int index = 0; index < num; index++) {
            int currentSize = index + 1;
            Preconditions.checkArgument(url.size() >= currentSize, "db.url.%s is null", index);
            DataSourcePoolProperties poolProperties = DataSourcePoolProperties.build(environment);
            poolProperties.setDriverClassName(JDBC_DRIVER_NAME);
            poolProperties.setJdbcUrl(url.get(index).trim());
            poolProperties.setUsername(getOrDefault(user, index, user.get(0)).trim());
            poolProperties.setPassword(getOrDefault(password, index, password.get(0)).trim());
            HikariDataSource ds = poolProperties.getDataSource();
            ds.setConnectionTestQuery(TEST_QUERY);
            ds.setIdleTimeout(TimeUnit.MINUTES.toMillis(10L));
            ds.setConnectionTimeout(TimeUnit.SECONDS.toMillis(3L));
            dataSources.add(ds);
            callback.accept(ds);
        }
        Preconditions.checkArgument(CollectionUtils.isNotEmpty(dataSources), "no datasource available");
        return dataSources;
    }
    
    interface Callback<D> {
        
        /**
         * Perform custom logic.
         *
         * @param datasource dataSource.
         */
        void accept(D datasource);
    }
}