Nacos 配置中心源碼分析
本文主要和大家一起以源碼的角度來分析 Nacos 配置中心的配置信息獲取,以及配置信息動態(tài)同步的過程和原理。環(huán)境介紹和使用 環(huán)境介紹:
- Jdk 1.8
- nacos-server-1.4.2
- spring-boot-2.3.5.RELEASE
- spring-cloud-Hoxton.SR8
- spring-cloiud-alibab-2.2.5.RELEASE
如果我們需要使用 Nacos 作為配置中心,我們首先需要導(dǎo)入 Nacos Config 的依賴信息,如下所示:
- <dependency>
- <groupId>com.alibaba.cloud</groupId>
- <artifactId>spring-cloud-starter-alibaba-nacos-config</artifactId>
- </dependency>
然后再 bootstartp.yml 文件中配置 Nacos 服務(wù)信息。
- spring:
- cloud:
- nacos:
- config:
- server-addr: 127.0.0.1:8848
客戶端初始化
主要是通過 NacosConfigBootstrapConfiguration 類來進行初始化 NacosConfigManager 、NacosPropertySourceLocator
- @Configuration(proxyBeanMethods = false)
- @ConditionalOnProperty(name = "spring.cloud.nacos.config.enabled", matchIfMissing = true)
- public class NacosConfigBootstrapConfiguration {
- @Bean
- @ConditionalOnMissingBean
- public NacosConfigManager nacosConfigManager(
- NacosConfigProperties nacosConfigProperties) {
- return new NacosConfigManager(nacosConfigProperties);
- }
- @Bean
- public NacosPropertySourceLocator nacosPropertySourceLocator(
- NacosConfigManager nacosConfigManager) {
- return new NacosPropertySourceLocator(nacosConfigManager);
- }
- // ...
- }
在 NacosConfigManager 的構(gòu)造方法中會調(diào)用 createConfigService 方法來創(chuàng)建 ConfigService 實例,內(nèi)部調(diào)用工廠方法 ConfigFactory#createConfigService 通過反射實例化一個com.alibaba.nacos.client.config.NacosConfigService 的實例對象。
- public static ConfigService createConfigService(Properties properties) throws NacosException {
- try {
- Class<?> driverImplClass = Class.forName("com.alibaba.nacos.client.config.NacosConfigService");
- Constructor constructor = driverImplClass.getConstructor(Properties.class);
- ConfigService vendorImpl = (ConfigService) constructor.newInstance(properties);
- return vendorImpl;
- } catch (Throwable e) {
- throw new NacosException(NacosException.CLIENT_INVALID_PARAM, e);
- }
- }
NacosPropertySourceLocator 繼承 PropertySourceLocator(PropertySourceLocator接口支持擴展自定義配置加載到 Spring Environment中)通過 locate 加載配置信息。
- @Override
- public PropertySource<?> locate(Environment env) {
- nacosConfigProperties.setEnvironment(env);
- ConfigService configService = nacosConfigManager.getConfigService();
- if (null == configService) {
- log.warn("no instance of config service found, can't load config from nacos");
- return null;
- }
- long timeout = nacosConfigProperties.getTimeout();
- nacosPropertySourceBuilder = new NacosPropertySourceBuilder(configService,
- timeout);
- String name = nacosConfigProperties.getName();
- String dataIdPrefix = nacosConfigProperties.getPrefix();
- if (StringUtils.isEmpty(dataIdPrefix)) {
- dataIdPrefix = name;
- }
- if (StringUtils.isEmpty(dataIdPrefix)) {
- dataIdPrefix = env.getProperty("spring.application.name");
- }
- CompositePropertySource composite = new CompositePropertySource(
- NACOS_PROPERTY_SOURCE_NAME);
- // 共享配置
- loadSharedConfiguration(composite);
- // 拓展配置
- loadExtConfiguration(composite);
- // 應(yīng)用配置
- loadApplicationConfiguration(composite, dataIdPrefix, nacosConfigProperties, env);
- return composite;
- }
配置讀取過程
配置加載有三個方法 loadSharedConfiguration、loadSharedConfiguration、 loadApplicationConfiguration 以 loadApplicationConfiguration 繼續(xù)跟進。
- private void loadApplicationConfiguration(
- CompositePropertySource compositePropertySource, String dataIdPrefix,
- NacosConfigProperties properties, Environment environment) {
- String fileExtension = properties.getFileExtension();
- String nacosGroup = properties.getGroup();
- // load directly once by default
- loadNacosDataIfPresent(compositePropertySource, dataIdPrefix, nacosGroup,
- fileExtension, true);
- // load with suffix, which have a higher priority than the default
- loadNacosDataIfPresent(compositePropertySource,
- dataIdPrefix + DOT + fileExtension, nacosGroup, fileExtension, true);
- // Loaded with profile, which have a higher priority than the suffix
- for (String profile : environment.getActiveProfiles()) {
- String dataId = dataIdPrefix + SEP1 + profile + DOT + fileExtension;
- loadNacosDataIfPresent(compositePropertySource, dataId, nacosGroup,
- fileExtension, true);
- }
- }
主要通過 loadNacosDataIfPresent 讀取配置信息, 其實我們可以通過參數(shù)看出,主要配置文件包含以下部分:dataId, group, fileExtension
- private void loadNacosDataIfPresent(final CompositePropertySource composite,
- final String dataId, final String group, String fileExtension,
- boolean isRefreshable) {
- if (null == dataId || dataId.trim().length() < 1) {
- return;
- }
- if (null == group || group.trim().length() < 1) {
- return;
- }
- NacosPropertySource propertySource = this.loadNacosPropertySource(dataId, group,
- fileExtension, isRefreshable);
- this.addFirstPropertySource(composite, propertySource, false);
- }
然后調(diào)用 loadNacosPropertySource 最后一步步的會調(diào)用到 NacosConfigService#getConfigInner
- private String getConfigInner(String tenant, String dataId, String group, long timeoutMs) throws NacosException {
- group = null2defaultGroup(group);
- ParamUtils.checkKeyParam(dataId, group);
- ConfigResponse cr = new ConfigResponse();
- cr.setDataId(dataId);
- cr.setTenant(tenant);
- cr.setGroup(group);
- // 優(yōu)先使用本地配置
- String content = LocalConfigInfoProcessor.getFailover(agent.getName(), dataId, group, tenant);
- if (content != null) {
- LOGGER.warn("[{}] [get-config] get failover ok, dataId={}, group={}, tenant={}, config={}", agent.getName(),
- dataId, group, tenant, ContentUtils.truncateContent(content));
- cr.setContent(content);
- configFilterChainManager.doFilter(null, cr);
- content = cr.getContent();
- return content;
- }
- try {
- // 獲取遠程配置
- String[] ct = worker.getServerConfig(dataId, group, tenant, timeoutMs);
- cr.setContent(ct[0]);
- configFilterChainManager.doFilter(null, cr);
- content = cr.getContent();
- return content;
- } catch (NacosException ioe) {
- if (NacosException.NO_RIGHT == ioe.getErrCode()) {
- throw ioe;
- }
- LOGGER.warn("[{}] [get-config] get from server error, dataId={}, group={}, tenant={}, msg={}",
- agent.getName(), dataId, group, tenant, ioe.toString());
- }
- LOGGER.warn("[{}] [get-config] get snapshot ok, dataId={}, group={}, tenant={}, config={}", agent.getName(),
- dataId, group, tenant, ContentUtils.truncateContent(content));
- content = LocalConfigInfoProcessor.getSnapshot(agent.getName(), dataId, group, tenant);
- cr.setContent(content);
- configFilterChainManager.doFilter(null, cr);
- content = cr.getContent();
- return content;
- }
加載遠程配置
worker.getServerConfig 主要是獲取遠程配置, ClIentWorker 的 getServerConfig 定義如下:
- public String[] getServerConfig(String dataId, String group, String tenant, long readTimeout)
- throws NacosException {
- String[] ct = new String[2];
- if (StringUtils.isBlank(group)) {
- group = Constants.DEFAULT_GROUP;
- }
- HttpRestResult<String> result = null;
- try {
- Map<String, String> params = new HashMap<String, String>(3);
- if (StringUtils.isBlank(tenant)) {
- params.put("dataId", dataId);
- params.put("group", group);
- } else {
- params.put("dataId", dataId);
- params.put("group", group);
- params.put("tenant", tenant);
- }
- result = agent.httpGet(Constants.CONFIG_CONTROLLER_PATH, null, params, agent.getEncode(), readTimeout);
- } catch (Exception ex) {
- String message = String
- .format("[%s] [sub-server] get server config exception, dataId=%s, group=%s, tenant=%s",
- agent.getName(), dataId, group, tenant);
- LOGGER.error(message, ex);
- throw new NacosException(NacosException.SERVER_ERROR, ex);
- }
- switch (result.getCode()) {
- case HttpURLConnection.HTTP_OK:
- LocalConfigInfoProcessor.saveSnapshot(agent.getName(), dataId, group, tenant, result.getData());
- ct[0] = result.getData();
- if (result.getHeader().getValue(CONFIG_TYPE) != null) {
- ct[1] = result.getHeader().getValue(CONFIG_TYPE);
- } else {
- ct[1] = ConfigType.TEXT.getType();
- }
- return ct;
- case HttpURLConnection.HTTP_NOT_FOUND:
- LocalConfigInfoProcessor.saveSnapshot(agent.getName(), dataId, group, tenant, null);
- return ct;
- case HttpURLConnection.HTTP_CONFLICT: {
- LOGGER.error(
- "[{}] [sub-server-error] get server config being modified concurrently, dataId={}, group={}, "
- + "tenant={}", agent.getName(), dataId, group, tenant);
- throw new NacosException(NacosException.CONFLICT,
- "data being modified, dataId=" + dataId + ",group=" + group + ",tenant=" + tenant);
- }
- case HttpURLConnection.HTTP_FORBIDDEN: {
- LOGGER.error("[{}] [sub-server-error] no right, dataId={}, group={}, tenant={}", agent.getName(),
- dataId, group, tenant);
- throw new NacosException(result.getCode(), result.getMessage());
- }
- default: {
- LOGGER.error("[{}] [sub-server-error] dataId={}, group={}, tenant={}, code={}", agent.getName(),
- dataId, group, tenant, result.getCode());
- throw new NacosException(result.getCode(),
- "http error, code=" + result.getCode() + ",dataId=" + dataId + ",group=" + group + ",tenant="
- + tenant);
- }
- }
- }
agent 默認使用 MetricsHttpAgent 實現(xiàn)類
配置同步過程
Nacos 配置同步過程如下圖所示:
客戶端請求
客戶端初始請求配置完成后,會通過 WorkClient 進行長輪詢查詢配置, 它的構(gòu)造方法如下:
- public ClientWorker(final HttpAgent agent, final ConfigFilterChainManager configFilterChainManager,
- final Properties properties) {
- this.agent = agent;
- this.configFilterChainManager = configFilterChainManager;
- // Initialize the timeout parameter
- init(properties);
- // 檢查線程池
- this.executor = Executors.newScheduledThreadPool(1, new ThreadFactory() {
- @Override
- public Thread newThread(Runnable r) {
- Thread t = new Thread(r);
- t.setName("com.alibaba.nacos.client.Worker." + agent.getName());
- t.setDaemon(true);
- return t;
- }
- });
- // 長輪詢線程
- this.executorService = Executors
- .newScheduledThreadPool(Runtime.getRuntime().availableProcessors(), new ThreadFactory() {
- @Override
- public Thread newThread(Runnable r) {
- Thread t = new Thread(r);
- t.setName("com.alibaba.nacos.client.Worker.longPolling." + agent.getName());
- t.setDaemon(true);
- return t;
- }
- });
- this.executor.scheduleWithFixedDelay(new Runnable() {
- @Override
- public void run() {
- try {
- checkConfigInfo();
- } catch (Throwable e) {
- LOGGER.error("[" + agent.getName() + "] [sub-check] rotate check error", e);
- }
- }
- }, 1L, 10L, TimeUnit.MILLISECONDS);
- }
這里初始化了兩個線程池:
- 第一個線程池主要是用來初始化做長輪詢的;
- 第二個線程池使用來做檢查的,會每間隔 10 秒鐘執(zhí)行一次檢查方法 checkConfigInfo
checkConfigInfo
在這個方法里面主要是分配任務(wù),給每個 task 分配一個 taskId , 后面會去檢查本地配置和遠程配置,最終調(diào)用的是 LongPollingRunable 的 run 方法。
- public void checkConfigInfo() {
- // Dispatch taskes.
- int listenerSize = cacheMap.size();
- // Round up the longingTaskCount.
- int longingTaskCount = (int) Math.ceil(listenerSize / ParamUtil.getPerTaskConfigSize());
- if (longingTaskCount > currentLongingTaskCount) {
- for (int i = (int) currentLongingTaskCount; i < longingTaskCount; i++) {
- // The task list is no order.So it maybe has issues when changing.
- executorService.execute(new LongPollingRunnable(i));
- }
- currentLongingTaskCount = longingTaskCount;
- }
- }
LongPollingRunnable
長輪詢線程實現(xiàn),首先第一步檢查本地配置信息,然后通過 dataId 去檢查服務(wù)端是否有變動的配置信息,如果有就更新下來然后刷新配置。
- public void run() {
- List<CacheData> cacheDatas = new ArrayList<CacheData>();
- List<String> inInitializingCacheList = new ArrayList<String>();
- try {
- // check failover config
- for (CacheData cacheData : cacheMap.values()) {
- if (cacheData.getTaskId() == taskId) {
- cacheDatas.add(cacheData);
- try {
- checkLocalConfig(cacheData);
- if (cacheData.isUseLocalConfigInfo()) {
- // 觸發(fā)回調(diào)
- cacheData.checkListenerMd5();
- }
- } catch (Exception e) {
- LOGGER.error("get local config info error", e);
- }
- }
- }
- // check server config
- List<String> changedGroupKeys = checkUpdateDataIds(cacheDatas, inInitializingCacheList);
- if (!CollectionUtils.isEmpty(changedGroupKeys)) {
- LOGGER.info("get changedGroupKeys:" + changedGroupKeys);
- }
- for (String groupKey : changedGroupKeys) {
- String[] key = GroupKey.parseKey(groupKey);
- String dataId = key[0];
- String group = key[1];
- String tenant = null;
- if (key.length == 3) {
- tenant = key[2];
- }
- try {
- String[] ct = getServerConfig(dataId, group, tenant, 3000L);
- CacheData cache = cacheMap.get(GroupKey.getKeyTenant(dataId, group, tenant));
- cache.setContent(ct[0]);
- if (null != ct[1]) {
- cache.setType(ct[1]);
- }
- LOGGER.info("[{}] [data-received] dataId={}, group={}, tenant={}, md5={}, content={}, type={}",
- agent.getName(), dataId, group, tenant, cache.getMd5(),
- ContentUtils.truncateContent(ct[0]), ct[1]);
- } catch (NacosException ioe) {
- String message = String
- .format("[%s] [get-update] get changed config exception. dataId=%s, group=%s, tenant=%s",
- agent.getName(), dataId, group, tenant);
- LOGGER.error(message, ioe);
- }
- }
- for (CacheData cacheData : cacheDatas) {
- if (!cacheData.isInitializing() || inInitializingCacheList
- .contains(GroupKey.getKeyTenant(cacheData.dataId, cacheData.group, cacheData.tenant))) {
- cacheData.checkListenerMd5();
- cacheData.setInitializing(false);
- }
- }
- inInitializingCacheList.clear();
- executorService.execute(this);
- } catch (Throwable e) {
- // If the rotation training task is abnormal, the next execution time of the task will be punished
- LOGGER.error("longPolling error : ", e);
- executorService.schedule(this, taskPenaltyTime, TimeUnit.MILLISECONDS);
- }
- }
- }
addTenantListeners
添加監(jiān)聽,這里主要是通過 dataId , group 來獲取 cache 本地緩存的配置信息,然后再將 Listener 也傳給 cache 統(tǒng)一管理。
- public void addTenantListeners(String dataId, String group, List<? extends Listener> listeners)
- throws NacosException {
- group = null2defaultGroup(group);
- String tenant = agent.getTenant();
- CacheData cache = addCacheDataIfAbsent(dataId, group, tenant);
- for (Listener listener : listeners) {
- cache.addListener(listener);
- }
- }
回調(diào)觸發(fā)
如果 md5 值發(fā)生變化過后就會調(diào)用 safeNotifyListener 方法然后將配置信息發(fā)送給對應(yīng)的監(jiān)聽器
- void checkListenerMd5() {
- for (ManagerListenerWrap wrap : listeners) {
- if (!md5.equals(wrap.lastCallMd5)) {
- safeNotifyListener(dataId, group, content, type, md5, wrap);
- }
- }
- }
服務(wù)端響應(yīng)
當服務(wù)端收到請求后,會 hold 住當前請求,如果有變化就返回,如果沒有變化就等待超時之前返回無變化。
- /**
- * The client listens for configuration changes.
- */
- @PostMapping("/listener")
- @Secured(action = ActionTypes.READ, parser = ConfigResourceParser.class)
- public void listener(HttpServletRequest request, HttpServletResponse response)
- throws ServletException, IOException {
- request.setAttribute("org.apache.catalina.ASYNC_SUPPORTED", true);
- String probeModify = request.getParameter("Listening-Configs");
- if (StringUtils.isBlank(probeModify)) {
- throw new IllegalArgumentException("invalid probeModify");
- }
- probeModify = URLDecoder.decode(probeModify, Constants.ENCODE);
- Map<String, String> clientMd5Map;
- try {
- clientMd5Map = MD5Util.getClientMd5Map(probeModify);
- } catch (Throwable e) {
- throw new IllegalArgumentException("invalid probeModify");
- }
- // do long-polling
- inner.doPollingConfig(request, response, clientMd5Map, probeModify.length());
- }
LongPollingService
核心處理類 LongPollingService
- /**
- * Add LongPollingClient.
- *
- * @param req HttpServletRequest.
- * @param rsp HttpServletResponse.
- * @param clientMd5Map clientMd5Map.
- * @param probeRequestSize probeRequestSize.
- */
- public void addLongPollingClient(HttpServletRequest req, HttpServletResponse rsp, Map<String, String> clientMd5Map,
- int probeRequestSize) {
- String str = req.getHeader(LongPollingService.LONG_POLLING_HEADER);
- String noHangUpFlag = req.getHeader(LongPollingService.LONG_POLLING_NO_HANG_UP_HEADER);
- String appName = req.getHeader(RequestUtil.CLIENT_APPNAME_HEADER);
- String tag = req.getHeader("Vipserver-Tag");
- int delayTime = SwitchService.getSwitchInteger(SwitchService.FIXED_DELAY_TIME, 500);
- // Add delay time for LoadBalance, and one response is returned 500 ms in advance to avoid client timeout.
- long timeout = Math.max(10000, Long.parseLong(str) - delayTime);
- if (isFixedPolling()) {
- timeout = Math.max(10000, getFixedPollingInterval());
- // Do nothing but set fix polling timeout.
- } else {
- long start = System.currentTimeMillis();
- List<String> changedGroups = MD5Util.compareMd5(req, rsp, clientMd5Map);
- if (changedGroups.size() > 0) {
- generateResponse(req, rsp, changedGroups);
- LogUtil.CLIENT_LOG.info("{}|{}|{}|{}|{}|{}|{}", System.currentTimeMillis() - start, "instant",
- RequestUtil.getRemoteIp(req), "polling", clientMd5Map.size(), probeRequestSize,
- changedGroups.size());
- return;
- } else if (noHangUpFlag != null && noHangUpFlag.equalsIgnoreCase(TRUE_STR)) {
- LogUtil.CLIENT_LOG.info("{}|{}|{}|{}|{}|{}|{}", System.currentTimeMillis() - start, "nohangup",
- RequestUtil.getRemoteIp(req), "polling", clientMd5Map.size(), probeRequestSize,
- changedGroups.size());
- return;
- }
- }
- String ip = RequestUtil.getRemoteIp(req);
- // Must be called by http thread, or send response.
- final AsyncContext asyncContext = req.startAsync();
- // AsyncContext.setTimeout() is incorrect, Control by oneself
- asyncContext.setTimeout(0L);
- ConfigExecutor.executeLongPolling(
- new ClientLongPolling(asyncContext, clientMd5Map, ip, probeRequestSize, timeout, appName, tag));
- }
參考鏈接
- https://blog.csdn.net/jason_jiahongfei/article/details/108373442
- https://www.cnblogs.com/lockedsher/articles/14447700.html
本文轉(zhuǎn)載自微信公眾號「運維開發(fā)故事」,可以通過以下二維碼關(guān)注。轉(zhuǎn)載本文請聯(lián)系運維開發(fā)故事公眾號。