一篇學會配置管理客戶端流程
引言
Nacos注冊中心的主要流程基本上擼完了,下面開始擼配置中心。本文從示例入手走查了客戶端的初始化流程,Listener的注冊邏輯和執(zhí)行邏輯。
一、內(nèi)容提要
示例
- 通過示例構(gòu)建ConfigService、注冊了Listener分析其流程
Client初始化概覽
- 支持多種獲取server地址方式(本地、endpoint)
- 支持多種namespace設(shè)置(本地、阿里云)
- 支持超時時間、重試時間等參數(shù)設(shè)置
- 支持用戶名和密碼驗證
- 長輪詢會從BlockingQueue中獲取元素,隊列有元素立即執(zhí)行executeConfigListen,隊列無元素阻塞5秒鐘執(zhí)行executeConfigListen()
Listener注冊邏輯
- client添加Listener后會在cacheMap中緩存CacheData
- cacheMap中key由「dataId+group+tenant」拼接而成
- 每個CacheData會綁定注冊的Listener列表
- 每個CacheData會綁定taskId,3000個不同的CacheData對應一個taskId
- 設(shè)置isSyncWithServer=false表示 cache md5 data不是來自server同步
- BlockingQueue中添加new Object() 供長輪詢判斷立即執(zhí)行使用
配置變更執(zhí)行邏輯
- 執(zhí)行邏輯由executeConfigListen方法實現(xiàn)
- 當CacheData從Server同步后,會校驗md5是否變更了,變更則回調(diào)注冊的Listener完成通知
- 注冊Listener后會構(gòu)建與server的RPC通道rpcClient
- 向server發(fā)起變更查詢請求configChangeListenRequest
- Server端通過比較緩存的md5值,返回client變更的key列表
- Client通過變更的key列表向server發(fā)起配置查詢請求ConfigQueryRequest
- 獲取變更內(nèi)容,并回調(diào)注冊的Listener完成通知
- 回調(diào)注冊的Listener是通過線程池異步執(zhí)行Runnble Job實現(xiàn)的
二、示例
- @Test
- public void test01() throws Exception {
- String serverAddr = "localhost:8848";
- String dataId = "test";
- String group = "DEFAULT_GROUP";
- Properties properties = new Properties();
- properties.put("serverAddr", serverAddr);
- // 構(gòu)建ConfigService
- ConfigService configService = NacosFactory.createConfigService(properties);
- configService.addListener(dataId, group, new Listener() {
- @Override
- public void receiveConfigInfo(String configInfo) {
- System.out.println("receive:" + configInfo);
- }
- @Override
- public Executor getExecutor() {
- return null;
- }
- });
- System.in.read();
- }
備注: 示例中構(gòu)建了ConfigService,注入Listener接受server配置變更通知。
二、Client初始化概覽
NacosConfigService構(gòu)造方法
- public NacosConfigService(Properties properties) throws NacosException {
- ValidatorUtils.checkInitParam(properties);
- // 注解@1
- initNamespace(properties);
- // 注解@2
- ServerListManager serverListManager = new ServerListManager(properties);
- // 注解@3
- serverListManager.start();
- // 注解@4
- this.worker = new ClientWorker(this.configFilterChainManager, serverListManager, properties);
- // 將被廢棄HttpAgent,先忽略
- // will be deleted in 2.0 later versions
- agent = new ServerHttpAgent(serverListManager);
- }
注解@1 設(shè)置namespace可以通過properties.setProperty(PropertyKeyConst.NAMESPACE),代碼中會兼容阿里云環(huán)境,在此忽略,默認為空。
注解@2 初始化namespace、server地址等信息
注解@3 啟動主要用于endpoint方式定時獲取server地址,當本地傳入isFixed=true
注解@4 clientWorker初始化
ClientWorker初始化
- public ClientWorker(final ConfigFilterChainManager configFilterChainManager, ServerListManager serverListManager,
- final Properties properties) throws NacosException {
- this.configFilterChainManager = configFilterChainManager;
- // 注解@5
- init(properties);
- // 注解@6
- agent = new ConfigRpcTransportClient(properties, serverListManager);
- // 調(diào)度線程池,「處理器核數(shù)」
- ScheduledExecutorService executorService = Executors
- .newScheduledThreadPool(Runtime.getRuntime().availableProcessors(), new ThreadFactory() {
- @Override
- public Thread newThread(Runnable r) {
- Thread t = new Thread(r);
- t.setName("com.alibaba.nacos.client.Worker");
- t.setDaemon(true);
- return t;
- }
- });
- agent.setExecutor(executorService);
- // 注解@7
- agent.start();
- }
注解@5 初始化超時時間、重試時間等
- private void init(Properties properties) {
- // 超時時間,默認30秒
- timeout = Math.max(ConvertUtils.toInt(properties.getProperty(PropertyKeyConst.CONFIG_LONG_POLL_TIMEOUT),
- Constants.CONFIG_LONG_POLL_TIMEOUT), Constants.MIN_CONFIG_LONG_POLL_TIMEOUT);
- // 重試時間,默認2秒
- taskPenaltyTime = ConvertUtils
- .toInt(properties.getProperty(PropertyKeyConst.CONFIG_RETRY_TIME), Constants.CONFIG_RETRY_TIME);
- // 開啟配置刪除同步,默認false
- this.enableRemoteSyncConfig = Boolean
- .parseBoolean(properties.getProperty(PropertyKeyConst.ENABLE_REMOTE_SYNC_CONFIG));
- }
注解@6 gRPC config agent初始化
- public ConfigTransportClient(Properties properties, ServerListManager serverListManager) {
- // 默認編碼UTF-8
- String encodeTmp = properties.getProperty(PropertyKeyConst.ENCODE);
- if (StringUtils.isBlank(encodeTmp)) {
- this.encode = Constants.ENCODE;
- } else {
- this.encode = encodeTmp.trim();
- }
- // namespace租戶,默認空
- this.tenant = properties.getProperty(PropertyKeyConst.NAMESPACE);
- this.serverListManager = serverListManager;
- // 用戶名和密碼驗證
- this.securityProxy = new SecurityProxy(properties,
- ConfigHttpClientManager.getInstance().getNacosRestTemplate());
- }
注解@7 gRPC agent啟動
- public void start() throws NacosException {
- // 簡單用戶名和密碼驗證
- if (securityProxy.isEnabled()) {
- securityProxy.login(serverListManager.getServerUrls());
- this.executor.scheduleWithFixedDelay(new Runnable() {
- @Override
- public void run() {
- securityProxy.login(serverListManager.getServerUrls());
- }
- }, 0, this.securityInfoRefreshIntervalMills, TimeUnit.MILLISECONDS);
- }
- startInternal();
- }
- @Override
- public void startInternal() throws NacosException {
- executor.schedule(new Runnable() {
- @Override
- public void run() {
- while (true) { // 一直運行
- try {
- // 最長等待5秒
- listenExecutebell.poll(5L, TimeUnit.SECONDS);
- executeConfigListen();
- } catch (Exception e) {
- LOGGER.error("[ rpc listen execute ] [rpc listen] exception", e);
- }
- }
- }
- }, 0L, TimeUnit.MILLISECONDS);
- }
小結(jié): 線程會一直運行,從BlockingQueue中獲取元素。隊里不為空,獲取后立即執(zhí)行executeConfigListen();隊列為空等待5秒后執(zhí)行
executeConfigListen()。
三、Listener注冊邏輯
- configService.addListener(dataId, group, new Listener() {
- @Override
- public void receiveConfigInfo(String configInfo) {
- System.out.println("receive:" + configInfo);
- }
- @Override
- public Executor getExecutor() {
- return null;
- }
- });
- public void addTenantListeners(String dataId, String group, List<? extends Listener> listeners)
- throws NacosException {
- // 默認DEFAULT_GROUP
- group = null2defaultGroup(group);
- // 租戶,默認空
- String tenant = agent.getTenant();
- // 注解@8
- CacheData cache = addCacheDataIfAbsent(dataId, group, tenant);
- synchronized (cache) {
- for (Listener listener : listeners) {
- cache.addListener(listener);
- }
- // cache md5 data是否來自server同步
- cache.setSyncWithServer(false);
- // BlockingQueue中添加new Object()
- agent.notifyListenConfig();
- }
- }
注解@8 構(gòu)建緩存數(shù)據(jù)CacheData并放入cacheMap中,緩存的key為 「dataId+group+tenant」例如:test+DEFAULT_GROUP。
每個CacheData會綁定對應的taskId,每3000個CacheData對應一個taskId。其實從后面的代碼中可以看出,每個taskId會對應一個gRPC Client。
- public CacheData addCacheDataIfAbsent(String dataId, String group, String tenant) throws NacosException {
- // 從緩存中獲取
- CacheData cache = getCache(dataId, group, tenant);
- if (null != cache) {
- return cache;
- }
- // 構(gòu)造緩存key以+連接,test+DEFAULT_GROUP
- String key = GroupKey.getKeyTenant(dataId, group, tenant);
- synchronized (cacheMap) {
- CacheData cacheFromMap = getCache(dataId, group, tenant);
- // multiple listeners on the same dataid+group and race condition,so
- // double check again
- // other listener thread beat me to set to cacheMap
- if (null != cacheFromMap) { // 再檢查一遍
- cache = cacheFromMap;
- // reset so that server not hang this check
- cache.setInitializing(true); // 緩存正在初始化
- } else {
- // 構(gòu)造緩存數(shù)據(jù)對象
- cache = new CacheData(configFilterChainManager, agent.getName(), dataId, group, tenant);
- // 初始值taskId=0,注意此處每3000個CacheData共用一個taskId
- int taskId = cacheMap.get().size() / (int) ParamUtil.getPerTaskConfigSize();
- cache.setTaskId(taskId);
- // fix issue # 1317
- if (enableRemoteSyncConfig) { // 默認false
- String[] ct = getServerConfig(dataId, group, tenant, 3000L, false);
- cache.setContent(ct[0]);
- }
- }
- Map<String, CacheData> copy = new HashMap<String, CacheData>(this.cacheMap.get());
- // key = test+DEFAULT_GROUP
- copy.put(key, cache);
- // cacheMap = {test+DEFAULT_GROUP=CacheData [test, DEFAULT_GROUP]}
- cacheMap.set(copy);
- }
- LOGGER.info("[{}] [subscribe] {}", agent.getName(), key);
- MetricsMonitor.getListenConfigCountMonitor().set(cacheMap.get().size());
- return cache;
- }
屬性 | 含義 |
---|---|
name | ConfigTransportClient名稱,config_rpc_client |
configFilterChainManager | filter攔截鏈條,可以執(zhí)行一些列攔截器 |
dataId | dataId |
group | group名稱,默認為DEFAULT_GROUP |
tenant | 租戶名稱 |
listeners | 添加的Listener列表,線程安全CopyOnWriteArrayList |
content | 啟動時會從本地文件讀入,默認為null |
md5 | content的md5字符串 |
小結(jié):添加監(jiān)聽器邏輯如下:構(gòu)建CacheData,并緩存在cacheMap中,key是由「dataId+group+tenant」組成;每個CacheData會綁定了Listener列表,也綁定了taskId,3000個不同的CacheData對應一個taskId,對應一個gRPC通道實例;設(shè)置isSyncWithServer=false表示 cache md5 data不是來自server同步,BlockingQueue中添加new Object() 供前面提到的長輪詢判斷使用。
四、配置變更執(zhí)行邏輯
上文中提到一個線程一直在輪詢,輪詢執(zhí)行executeConfigListen方法,這個方法比較關(guān)鍵。
- public void executeConfigListen() {
- Map<String/*taskId*/, List<CacheData>> listenCachesMap = new HashMap<String, List<CacheData>>(16);
- Map<String, List<CacheData>> removeListenCachesMap = new HashMap<String, List<CacheData>>(16);
- long now = System.currentTimeMillis();
- // 超過5分鐘
- boolean needAllSync = now - lastAllSyncTime >= ALL_SYNC_INTERNAL;
- for (CacheData cache : cacheMap.get().values()) {
- synchronized (cache) {
- // --------注解@9開始--------
- if (cache.isSyncWithServer()) {
- cache.checkListenerMd5(); // 內(nèi)容有變更通知Listener執(zhí)行
- if (!needAllSync) { // 不超過5分鐘則不再全局校驗
- continue;
- }
- }
- // --------注解@9結(jié)束--------
- if (!CollectionUtils.isEmpty(cache.getListeners())) { // 有添加Listeners
- // get listen config 默認 false
- if (!cache.isUseLocalConfigInfo()) {
- List<CacheData> cacheDatas = listenCachesMap.get(String.valueOf(cache.getTaskId()));
- if (cacheDatas == null) {
- cacheDatas = new LinkedList<CacheData>();
- listenCachesMap.put(String.valueOf(cache.getTaskId()), cacheDatas);
- }
- // CacheData [test, DEFAULT_GROUP]
- cacheDatas.add(cache);
- }
- } else if (CollectionUtils.isEmpty(cache.getListeners())) { // 沒有添加Listeners
- if (!cache.isUseLocalConfigInfo()) {
- List<CacheData> cacheDatas = removeListenCachesMap.get(String.valueOf(cache.getTaskId()));
- if (cacheDatas == null) {
- cacheDatas = new LinkedList<CacheData>();
- removeListenCachesMap.put(String.valueOf(cache.getTaskId()), cacheDatas);
- }
- cacheDatas.add(cache);
- }
- }
- }
- }
- boolean hasChangedKeys = false;
- //-------------------注解@10開始---------------------------------
- if (!listenCachesMap.isEmpty()) { // 有Listeners
- for (Map.Entry<String, List<CacheData>> entry : listenCachesMap.entrySet()) {
- String taskId = entry.getKey();
- List<CacheData> listenCaches = entry.getValue();
- ConfigBatchListenRequest configChangeListenRequest = buildConfigRequest(listenCaches);
- configChangeListenRequest.setListen(true);
- try {
- // 注解@10.1 每個taskId構(gòu)建rpcClient
- RpcClient rpcClient = ensureRpcClient(taskId);
- // 注解@10.2
- ConfigChangeBatchListenResponse configChangeBatchListenResponse = (ConfigChangeBatchListenResponse) requestProxy(
- rpcClient, configChangeListenRequest);
- if (configChangeBatchListenResponse != null && configChangeBatchListenResponse.isSuccess()) {
- Set<String> changeKeys = new HashSet<String>();
- // handle changed keys,notify listener
- // 有變化的configContext
- if (!CollectionUtils.isEmpty(configChangeBatchListenResponse.getChangedConfigs())) {
- hasChangedKeys = true;
- for (ConfigChangeBatchListenResponse.ConfigContext changeConfig : configChangeBatchListenResponse.getChangedConfigs()) {
- String changeKey = GroupKey
- .getKeyTenant(changeConfig.getDataId(), changeConfig.getGroup(),
- changeConfig.getTenant());
- changeKeys.add(changeKey);
- boolean isInitializing = cacheMap.get().get(changeKey).isInitializing();
- // 注解@10.3 回調(diào)Listener
- refreshContentAndCheck(changeKey, !isInitializing);
- }
- }
- //handler content configs
- for (CacheData cacheData : listenCaches) {
- String groupKey = GroupKey
- .getKeyTenant(cacheData.dataId, cacheData.group, cacheData.getTenant());
- if (!changeKeys.contains(groupKey)) { // 注解@10.4
- //sync:cache data md5 = server md5 && cache data md5 = all listeners md5.
- synchronized (cacheData) {
- if (!cacheData.getListeners().isEmpty()) {
- cacheData.setSyncWithServer(true);
- continue;
- }
- }
- }
- cacheData.setInitializing(false);
- }
- }
- } catch (Exception e) {
- LOGGER.error("Async listen config change error ", e);
- try {
- Thread.sleep(50L);
- } catch (InterruptedException interruptedException) {
- //ignore
- }
- }
- }
- }
- //-------------------注解@10結(jié)束---------------------------------
- if (!removeListenCachesMap.isEmpty()) {
- for (Map.Entry<String, List<CacheData>> entry : removeListenCachesMap.entrySet()) {
- String taskId = entry.getKey();
- List<CacheData> removeListenCaches = entry.getValue();
- ConfigBatchListenRequest configChangeListenRequest = buildConfigRequest(removeListenCaches);
- configChangeListenRequest.setListen(false);
- try {
- // 向server發(fā)送Listener取消訂閱請求ConfigBatchListenRequest#listen為false
- RpcClient rpcClient = ensureRpcClient(taskId);
- boolean removeSuccess = unListenConfigChange(rpcClient, configChangeListenRequest);
- if (removeSuccess) {
- for (CacheData cacheData : removeListenCaches) {
- synchronized (cacheData) {
- if (cacheData.getListeners().isEmpty()) {
- // 移除本地緩存
- ClientWorker.this
- .removeCache(cacheData.dataId, cacheData.group, cacheData.tenant);
- }
- }
- }
- }
- } catch (Exception e) {
- LOGGER.error("async remove listen config change error ", e);
- }
- try {
- Thread.sleep(50L);
- } catch (InterruptedException interruptedException) {
- //ignore
- }
- }
- }
- if (needAllSync) {
- lastAllSyncTime = now;
- }
- //If has changed keys,notify re sync md5.
- if (hasChangedKeys) { // key有變化觸發(fā)下一輪輪詢
- notifyListenConfig();
- }
- }
注解@9 isSyncWithServer初始為false,在下文代碼中校驗結(jié)束后會設(shè)置為true,表示md5 cache data同步來自server。如果為true會校驗Md5.
- void checkListenerMd5() {
- for (ManagerListenerWrap wrap : listeners) {
- if (!md5.equals(wrap.lastCallMd5)) { // 注解@9.1
- safeNotifyListener(dataId, group, content, type, md5, wrap);
- }
- }
- }
注解@9.1 配置內(nèi)容有變更時,回調(diào)到我們示例中注冊的Listener中。
- private void safeNotifyListener(final String dataId, final String group, final String content, final String type,
- final String md5, final ManagerListenerWrap listenerWrap) {
- final Listener listener = listenerWrap.listener;
- if (listenerWrap.inNotifying) {
- // ...
- return;
- }
- Runnable job = new Runnable() {
- @Override
- public void run() {
- long start = System.currentTimeMillis();
- ClassLoader myClassLoader = Thread.currentThread().getContextClassLoader();
- ClassLoader appClassLoader = listener.getClass().getClassLoader();
- try {
- if (listener instanceof AbstractSharedListener) {
- AbstractSharedListener adapter = (AbstractSharedListener) listener;
- adapter.fillContext(dataId, group);
- // ...
- }
- Thread.currentThread().setContextClassLoader(appClassLoader);
- ConfigResponse cr = new ConfigResponse();
- cr.setDataId(dataId);
- cr.setGroup(group);
- cr.setContent(content);
- // filter攔截繼續(xù)過濾
- configFilterChainManager.doFilter(null, cr);
- String contentTmp = cr.getContent();
- listenerWrap.inNotifying = true;
- // 注解@9.2
- listener.receiveConfigInfo(contentTmp);
- // compare lastContent and content
- if (listener instanceof AbstractConfigChangeListener) {
- Map data = ConfigChangeHandler.getInstance()
- .parseChangeData(listenerWrap.lastContent, content, type);
- ConfigChangeEvent event = new ConfigChangeEvent(data);
- // 回調(diào)變更事件方法
- ((AbstractConfigChangeListener) listener).receiveConfigChange(event);
- listenerWrap.lastContent = content;
- }
- listenerWrap.lastCallMd5 = md5;
- // ..
- } catch (NacosException ex) {
- // ...
- } catch (Throwable t) {
- // ...
- } finally {
- listenerWrap.inNotifying = false;
- Thread.currentThread().setContextClassLoader(myClassLoader);
- }
- }
- };
- final long startNotify = System.currentTimeMillis();
- try {
- // 注解@9.3
- if (null != listener.getExecutor()) {
- listener.getExecutor().execute(job);
- } else {
- try {
- INTERNAL_NOTIFIER.submit(job); // 默認線程池執(zhí)行,為5個線程
- } catch (RejectedExecutionException rejectedExecutionException) {
- // ...
- job.run();
- } catch (Throwable throwable) {
- // ...
- job.run();
- }
- }
- } catch (Throwable t) {
- // ...
- }
- final long finishNotify = System.currentTimeMillis();
- // ...
- }
注解@9.2 回調(diào)注冊Listener的receiveConfigInfo方法或者receiveConfigChange邏輯
注解@9.3 優(yōu)先使用我們示例中注冊提供的線程池執(zhí)行job,如果沒有設(shè)置使用默認線程池「INTERNAL_NOTIFIER」,默認5個線程
備注: 當CacheData從server同步后,會校驗md5是否變更了,當變更時會回調(diào)到我們注冊的Listener完成通知。通知任務被封裝成Runnable任務,執(zhí)行線程池可以自定義,默認為5個線程。
注解@10.1 每個taskId構(gòu)建rpcClient,例如:taskId= config-0-c70e0314-4770-43f5-add4-f258a4083fd7;結(jié)合上下文每3000個CacheData對應一個rpcClient。
注解@10.2 向server發(fā)起configChangeListenRequest,server端由ConfigChangeBatchListenRequestHandler處理,還是比較md5
是否變更了,變更后server端返回變更的key列表。
注解@10.3 當server返回變更key列表時執(zhí)行refreshContentAndCheck方法。
- private void refreshContentAndCheck(CacheData cacheData, boolean notify) {
- try {
- // 注解@10.3.1
- String[] ct = getServerConfig(cacheData.dataId, cacheData.group, cacheData.tenant, 3000L, notify);
- cacheData.setContent(ct[0]);
- if (null != ct[1]) {
- cacheData.setType(ct[1]);
- }
- if (notify) { // 記錄日志
- // ...
- }
- // 注解@10.3.2
- cacheData.checkListenerMd5();
- } catch (Exception e) {
- //...
- }
- }
注解@10.3.1 向server發(fā)起ConfigQueryRequest,查詢配置內(nèi)容
注解@10.3.2 回調(diào)注冊的Listener邏輯見 注解@9
注解@10.4 key沒有變化的,內(nèi)容由server同步,設(shè)置SyncWithServer=true,下一輪邏輯會由 注解@9 部分執(zhí)行
備注: 從整個注解@10 注冊Listener后,會構(gòu)建與server的RPC通道rpcClient;向server發(fā)起變更查詢請求configChangeListenRequest,server端通過比較緩存的md5值,返回client變更的key列表;client通過變更的key列表向server發(fā)起配置查詢請求ConfigQueryRequest,獲取變更內(nèi)容,并回調(diào)我們注冊的Listener。
本文轉(zhuǎn)載自微信公眾號「瓜農(nóng)老梁」,可以通過以下二維碼關(guān)注。轉(zhuǎn)載本文請聯(lián)系瓜農(nóng)老梁公眾號。