Nacos配置中心——ExternalDumpService

王守钰 2021-11-08 08:11:58

ExternalDumpService

@Conditional(ConditionOnExternalStorage.class)
@Component
@DependsOn({"rpcConfigChangeNotifier"})
public class ExternalDumpService extends DumpService {

    /**
     * Here you inject the dependent objects constructively, ensuring that some of the dependent functionality is
     * initialized ahead of time.
     *
     * @param persistService {@link PersistService}
     * @param memberManager  {@link ServerMemberManager}
     */
    public ExternalDumpService(PersistService persistService, ServerMemberManager memberManager) {
        super(persistService, memberManager);
    }
    
    @PostConstruct
    @Override
    protected void init() throws Throwable {
        dumpOperate(processor, dumpAllProcessor, dumpAllBetaProcessor, dumpAllTagProcessor);
    }
    
    @Override
    protected boolean canExecute() {
        return memberManager.isFirstIp();
    }
}

ExternalDumpService上添加了Conditional标识,ConditionOnExternalStorage的条件因为db使用的是mysql,所以这个条件是成立的。ExternalDumpService继承于DumpService。实例化后进行调用init方法,再调用父类中的dumpOperate方法执行。

DumpService

public abstract class DumpService {
    
    private static final Logger LOGGER = LoggerFactory.getLogger(DumpService.class);
    
    protected DumpProcessor processor;
    
    protected DumpAllProcessor dumpAllProcessor;
    
    protected DumpAllBetaProcessor dumpAllBetaProcessor;
    
    protected DumpAllTagProcessor dumpAllTagProcessor;
    
    protected final PersistService persistService;
    
    protected final ServerMemberManager memberManager;
    
    /**
     * full dump interval.
     */
    static final int DUMP_ALL_INTERVAL_IN_MINUTE = 6 * 60;
    
    /**
     * full dump delay.
     */
    static final int INITIAL_DELAY_IN_MINUTE = 6 * 60;
    
    private TaskManager dumpTaskMgr;
    
    private TaskManager dumpAllTaskMgr;
    
    static final AtomicInteger FINISHED = new AtomicInteger();
    
    static final int INIT_THREAD_COUNT = 10;
    
    int total = 0;
    
    private static final String TRUE_STR = "true";
    
    private static final String BETA_TABLE_NAME = "config_info_beta";
    
    private static final String TAG_TABLE_NAME = "config_info_tag";
    
    Boolean isQuickStart = false;
    
    private int retentionDays = 30;
    
    /**
     * Here you inject the dependent objects constructively, ensuring that some of the dependent functionality is
     * initialized ahead of time.
     *
     * @param persistService {@link PersistService}
     * @param memberManager  {@link ServerMemberManager}
     */
    public DumpService(PersistService persistService, ServerMemberManager memberManager) {
        this.persistService = persistService;
        this.memberManager = memberManager;
        this.processor = new DumpProcessor(this);
        this.dumpAllProcessor = new DumpAllProcessor(this);
        this.dumpAllBetaProcessor = new DumpAllBetaProcessor(this);
        this.dumpAllTagProcessor = new DumpAllTagProcessor(this);
        this.dumpTaskMgr = new TaskManager("com.alibaba.nacos.server.DumpTaskManager");
        this.dumpTaskMgr.setDefaultTaskProcessor(processor);
        
        this.dumpAllTaskMgr = new TaskManager("com.alibaba.nacos.server.DumpAllTaskManager");
        this.dumpAllTaskMgr.setDefaultTaskProcessor(dumpAllProcessor);
        
        this.dumpAllTaskMgr.addProcessor(DumpAllTask.TASK_ID, dumpAllProcessor);
        this.dumpAllTaskMgr.addProcessor(DumpAllBetaTask.TASK_ID, dumpAllBetaProcessor);
        this.dumpAllTaskMgr.addProcessor(DumpAllTagTask.TASK_ID, dumpAllTagProcessor);
        
        DynamicDataSource.getInstance().getDataSource();
    }
    
    public PersistService getPersistService() {
        return persistService;
    }
    
    public ServerMemberManager getMemberManager() {
        return memberManager;
    }
    
    /**
     * initialize.
     *
     * @throws Throwable throws Exception when actually operate.
     */
    protected abstract void init() throws Throwable;
    
    protected void dumpOperate(DumpProcessor processor, DumpAllProcessor dumpAllProcessor,
            DumpAllBetaProcessor dumpAllBetaProcessor, DumpAllTagProcessor dumpAllTagProcessor) throws NacosException {
        String dumpFileContext = "CONFIG_DUMP_TO_FILE";
        TimerContext.start(dumpFileContext);
        try {
            LogUtil.DEFAULT_LOG.warn("DumpService start");
            
            Runnable dumpAll = () -> dumpAllTaskMgr.addTask(DumpAllTask.TASK_ID, new DumpAllTask());
            
            Runnable dumpAllBeta = () -> dumpAllTaskMgr.addTask(DumpAllBetaTask.TASK_ID, new DumpAllBetaTask());
            
            Runnable dumpAllTag = () -> dumpAllTaskMgr.addTask(DumpAllTagTask.TASK_ID, new DumpAllTagTask());
            
            Runnable clearConfigHistory = () -> {
                LOGGER.warn("clearConfigHistory start");
                if (canExecute()) {
                    try {
                        Timestamp startTime = getBeforeStamp(TimeUtils.getCurrentTime(), 24 * getRetentionDays());
                        int totalCount = persistService.findConfigHistoryCountByTime(startTime);
                        if (totalCount > 0) {
                            int pageSize = 1000;
                            int removeTime = (totalCount + pageSize - 1) / pageSize;
                            LOGGER.warn(
                                    "clearConfigHistory, getBeforeStamp:{}, totalCount:{}, pageSize:{}, removeTime:{}",
                                    startTime, totalCount, pageSize, removeTime);
                            while (removeTime > 0) {
                                // delete paging to avoid reporting errors in batches
                                persistService.removeConfigHistory(startTime, pageSize);
                                removeTime--;
                            }
                        }
                    } catch (Throwable e) {
                        LOGGER.error("clearConfigHistory error : {}", e.toString());
                    }
                }
            };
            
            try {
                dumpConfigInfo(dumpAllProcessor);
                
                // update Beta cache
                LogUtil.DEFAULT_LOG.info("start clear all config-info-beta.");
                DiskUtil.clearAllBeta();
                if (persistService.isExistTable(BETA_TABLE_NAME)) {
                    dumpAllBetaProcessor.process(new DumpAllBetaTask());
                }
                // update Tag cache
                LogUtil.DEFAULT_LOG.info("start clear all config-info-tag.");
                DiskUtil.clearAllTag();
                if (persistService.isExistTable(TAG_TABLE_NAME)) {
                    dumpAllTagProcessor.process(new DumpAllTagTask());
                }
                
                // add to dump aggr
                List<ConfigInfoChanged> configList = persistService.findAllAggrGroup();
                if (configList != null && !configList.isEmpty()) {
                    total = configList.size();
                    List<List<ConfigInfoChanged>> splitList = splitList(configList, INIT_THREAD_COUNT);
                    for (List<ConfigInfoChanged> list : splitList) {
                        MergeAllDataWorker work = new MergeAllDataWorker(list);
                        work.start();
                    }
                    LOGGER.info("server start, schedule merge end.");
                }
            } catch (Exception e) {
                LogUtil.FATAL_LOG
                        .error("Nacos Server did not start because dumpservice bean construction failure :\n" + e
                                .toString());
                throw new NacosException(NacosException.SERVER_ERROR,
                        "Nacos Server did not start because dumpservice bean construction failure :\n" + e.getMessage(),
                        e);
            }
            if (!EnvUtil.getStandaloneMode()) {
                Runnable heartbeat = () -> {
                    String heartBeatTime = TimeUtils.getCurrentTime().toString();
                    // write disk
                    try {
                        DiskUtil.saveHeartBeatToDisk(heartBeatTime);
                    } catch (IOException e) {
                        LogUtil.FATAL_LOG.error("save heartbeat fail" + e.getMessage());
                    }
                };
                
                ConfigExecutor.scheduleConfigTask(heartbeat, 0, 10, TimeUnit.SECONDS);
                
                long initialDelay = new Random().nextInt(INITIAL_DELAY_IN_MINUTE) + 10;
                LogUtil.DEFAULT_LOG.warn("initialDelay:{}", initialDelay);
                
                ConfigExecutor.scheduleConfigTask(dumpAll, initialDelay, DUMP_ALL_INTERVAL_IN_MINUTE, TimeUnit.MINUTES);
                
                ConfigExecutor
                        .scheduleConfigTask(dumpAllBeta, initialDelay, DUMP_ALL_INTERVAL_IN_MINUTE, TimeUnit.MINUTES);
                
                ConfigExecutor
                        .scheduleConfigTask(dumpAllTag, initialDelay, DUMP_ALL_INTERVAL_IN_MINUTE, TimeUnit.MINUTES);
            }
            
            ConfigExecutor.scheduleConfigTask(clearConfigHistory, 10, 10, TimeUnit.MINUTES);
        } finally {
            TimerContext.end(dumpFileContext, LogUtil.DUMP_LOG);
        }
        
    }
    
    private void dumpConfigInfo(DumpAllProcessor dumpAllProcessor) throws IOException {
        int timeStep = 6;
        boolean isAllDump = true;
        // initial dump all
        FileInputStream fis = null;
        Timestamp heartheatLastStamp = null;
        try {
            if (isQuickStart()) {
                File heartbeatFile = DiskUtil.heartBeatFile();
                if (heartbeatFile.exists()) {
                    fis = new FileInputStream(heartbeatFile);
                    String heartheatTempLast = IoUtils.toString(fis, Constants.ENCODE);
                    heartheatLastStamp = Timestamp.valueOf(heartheatTempLast);
                    if (TimeUtils.getCurrentTime().getTime() - heartheatLastStamp.getTime()
                            < timeStep * 60 * 60 * 1000) {
                        isAllDump = false;
                    }
                }
            }
            if (isAllDump) {
                LogUtil.DEFAULT_LOG.info("start clear all config-info.");
                DiskUtil.clearAll();
                dumpAllProcessor.process(new DumpAllTask());
            } else {
                Timestamp beforeTimeStamp = getBeforeStamp(heartheatLastStamp, timeStep);
                DumpChangeProcessor dumpChangeProcessor = new DumpChangeProcessor(this, beforeTimeStamp,
                        TimeUtils.getCurrentTime());
                dumpChangeProcessor.process(new DumpChangeTask());
                Runnable checkMd5Task = () -> {
                    LogUtil.DEFAULT_LOG.error("start checkMd5Task");
                    List<String> diffList = ConfigCacheService.checkMd5();
                    for (String groupKey : diffList) {
                        String[] dg = GroupKey.parseKey(groupKey);
                        String dataId = dg[0];
                        String group = dg[1];
                        String tenant = dg[2];
                        ConfigInfoWrapper configInfo = persistService.queryConfigInfo(dataId, group, tenant);
                        ConfigCacheService.dumpChange(dataId, group, tenant, configInfo.getContent(),
                                configInfo.getLastModified());
                    }
                    LogUtil.DEFAULT_LOG.error("end checkMd5Task");
                };
                ConfigExecutor.scheduleConfigTask(checkMd5Task, 0, 12, TimeUnit.HOURS);
            }
        } catch (IOException e) {
            LogUtil.FATAL_LOG.error("dump config fail" + e.getMessage());
            throw e;
        } finally {
            if (null != fis) {
                try {
                    fis.close();
                } catch (IOException e) {
                    LogUtil.DEFAULT_LOG.warn("close file failed");
                }
            }
        }
    }
}

DumpService在实例化时进行处理了多个任务处理器,以及任务执行器,初始化数据源等操作。dumpOperate方法实例化后创建出多个dump的执行器,然后再去调用dumpConfigInfo方法把配置dump出来,线程池再执行各种dumpheartbeatclearConfigHistory信息。dumpConfigInfo进行判断了heartbeatFile,当有heartbeat文件时,判断是否小于6小时,也就是每6小时进行一个dump出全部的配置信息,否则的话进行增量更新。

DumpAllProcessor

public class DumpAllProcessor implements NacosTaskProcessor {

    public DumpAllProcessor(DumpService dumpService) {
        this.dumpService = dumpService;
        this.persistService = dumpService.getPersistService();
    }
    
    @Override
    public boolean process(NacosTask task) {
        long currentMaxId = persistService.findConfigMaxId();
        long lastMaxId = 0;
        while (lastMaxId < currentMaxId) {
            Page<ConfigInfoWrapper> page = persistService.findAllConfigInfoFragment(lastMaxId, PAGE_SIZE);
            if (page != null && page.getPageItems() != null && !page.getPageItems().isEmpty()) {
                for (ConfigInfoWrapper cf : page.getPageItems()) {
                    long id = cf.getId();
                    lastMaxId = Math.max(id, lastMaxId);
                    if (cf.getDataId().equals(AggrWhitelist.AGGRIDS_METADATA)) {
                        AggrWhitelist.load(cf.getContent());
                    }
                    
                    if (cf.getDataId().equals(ClientIpWhiteList.CLIENT_IP_WHITELIST_METADATA)) {
                        ClientIpWhiteList.load(cf.getContent());
                    }
                    
                    if (cf.getDataId().equals(SwitchService.SWITCH_META_DATAID)) {
                        SwitchService.load(cf.getContent());
                    }
                    
                    boolean result = ConfigCacheService
                            .dump(cf.getDataId(), cf.getGroup(), cf.getTenant(), cf.getContent(), cf.getLastModified(),
                                    cf.getType());
                    
                    final String content = cf.getContent();
                    final String md5 = MD5Utils.md5Hex(content, Constants.ENCODE);
                    LogUtil.DUMP_LOG.info("[dump-all-ok] {}, {}, length={}, md5={}",
                            GroupKey2.getKey(cf.getDataId(), cf.getGroup()), cf.getLastModified(), content.length(),
                            md5);
                }
                DEFAULT_LOG.info("[all-dump] {} / {}", lastMaxId, currentMaxId);
            } else {
                lastMaxId += PAGE_SIZE;
            }
        }
        return true;
    }
    
    static final int PAGE_SIZE = 1000;
    
    final DumpService dumpService;
    
    final PersistService persistService;
}

process方法中进行执行了全部的任务dump,persistService这里面也就是从db去获取配置信息。分页查询配置信息后通过ConfigCacheService中的dump方法将配置信息写入本地。

ConfigCacheService

public class ConfigCacheService {

    /**
     * groupKey -> cacheItem.
     */
    private static final ConcurrentHashMap<String, CacheItem> CACHE = new ConcurrentHashMap<String, CacheItem>();

    public static boolean dump(String dataId, String group, String tenant, String content, long lastModifiedTs,
            String type) {
        String groupKey = GroupKey2.getKey(dataId, group, tenant);
        // 添加CacheItem
        CacheItem ci = makeSure(groupKey);
        ci.setType(type);
        final int lockResult = tryWriteLock(groupKey);
        assert (lockResult != 0);
        
        if (lockResult < 0) {
            DUMP_LOG.warn("[dump-error] write lock failed. {}", groupKey);
            return false;
        }
        
        try {
            final String md5 = MD5Utils.md5Hex(content, Constants.ENCODE);
            
            if (md5.equals(ConfigCacheService.getContentMd5(groupKey))) {
                DUMP_LOG.warn("[dump-ignore] ignore to save cache file. groupKey={}, md5={}, lastModifiedOld={}, "
                                + "lastModifiedNew={}", groupKey, md5, ConfigCacheService.getLastModifiedTs(groupKey),
                        lastModifiedTs);
            } else if (!PropertyUtil.isDirectRead()) {
                DiskUtil.saveToDisk(dataId, group, tenant, content);
            }
            updateMd5(groupKey, md5, lastModifiedTs);
            return true;
        } catch (IOException ioe) {
            DUMP_LOG.error("[dump-exception] save disk error. " + groupKey + ", " + ioe.toString(), ioe);
            if (ioe.getMessage() != null) {
                String errMsg = ioe.getMessage();
                if (NO_SPACE_CN.equals(errMsg) || NO_SPACE_EN.equals(errMsg) || errMsg.contains(DISK_QUATA_CN) || errMsg
                        .contains(DISK_QUATA_EN)) {
                    // Protect from disk full.
                    FATAL_LOG.error("磁盘满自杀退出", ioe);
                    System.exit(0);
                }
            }
            return false;
        } finally {
            releaseWriteLock(groupKey);
        }
    }
    
    static CacheItem makeSure(final String groupKey) {
        CacheItem item = CACHE.get(groupKey);
        if (null != item) {
            return item;
        }
        CacheItem tmp = new CacheItem(groupKey);
        item = CACHE.putIfAbsent(groupKey, tmp);
        return (null == item) ? tmp : item;
    }
}

这里会通过DiskUtilsaveToDisk方法将配置信息写到本地的文件中。

DiskUtil

public class DiskUtil {

    static final String BASE_DIR = File.separator + "data" + File.separator + "config-data";

    public static void saveToDisk(String dataId, String group, String tenant, String content) throws IOException {
        File targetFile = targetFile(dataId, group, tenant);
        FileUtils.writeStringToFile(targetFile, content, Constants.ENCODE);
    }
    
    public static File targetFile(String dataId, String group, String tenant) {
        File file;
        if (StringUtils.isBlank(tenant)) {
            file = new File(EnvUtil.getNacosHome(), BASE_DIR);
        } else {
            file = new File(EnvUtil.getNacosHome(), TENANT_BASE_DIR);
            file = new File(file, tenant);
        }
        file = new File(file, group);
        file = new File(file, dataId);
        return file;
    }
}

DiskUtil的工具也就会帮我们创建${nacos.home}/data/config-data/${group}/{daraId}文件,FileUtils工具类通过writeStringToFile方法将配置文件写入到文件中。