
上文说了Nacos配置中心客户端的源码流程,这篇介绍下Nacos配置中心服务端的源码。
服务端的启动
先来看服务启动时干了啥?
init()方法上面有@PostConstruct,该方法会在ExternalDumpService实例化后执行。
 com.alibaba.nacos.config.server.service.dump.ExternalDumpService#init
@PostConstruct
@Override
protected void init() throws Throwable {
	dumpOperate(processor, dumpAllProcessor, dumpAllBetaProcessor, dumpAllTagProcessor);
}
dumpOperate()主要干了两件事:
- dumpConfigInfo(),这个方法里面也是调用的DumpAllTask
- 提交DumpAllTask的定时任务
 com.alibaba.nacos.config.server.service.dump.DumpService#dumpOperate
protected void dumpOperate(DumpProcessor processor, DumpAllProcessor dumpAllProcessor,
						   DumpAllBetaProcessor dumpAllBetaProcessor, DumpAllTagProcessor dumpAllTagProcessor) throws NacosException {
	String dumpFileContext = "CONFIG_DUMP_TO_FILE";
	TimerContext.start(dumpFileContext);
	try {
		LogUtil.DEFAULT_LOG.warn("DumpService start");
		Runnable dumpAll = () -> dumpAllTaskMgr.addTask(DumpAllTask.TASK_ID, new DumpAllTask());
		... ...
		try {
			// 转存配置
			// 执行一次DumpAllTask
			dumpConfigInfo(dumpAllProcessor);
			... ...
		} catch (Exception e) {
			LogUtil.FATAL_LOG
				.error("Nacos Server did not start because dumpservice bean construction failure :\n" + e
					   .toString());
			throw new NacosException(NacosException.SERVER_ERROR,
									 "Nacos Server did not start because dumpservice bean construction failure :\n" + e.getMessage(),
									 e);
		}
		if (!EnvUtil.getStandaloneMode()) {
			Runnable heartbeat = () -> {
				String heartBeatTime = TimeUtils.getCurrentTime().toString();
				// write disk
				try {
					DiskUtil.saveHeartBeatToDisk(heartBeatTime);
				} catch (IOException e) {
					LogUtil.FATAL_LOG.error("save heartbeat fail" + e.getMessage());
				}
			};
			ConfigExecutor.scheduleConfigTask(heartbeat, 0, 10, TimeUnit.SECONDS);
			long initialDelay = new Random().nextInt(INITIAL_DELAY_IN_MINUTE) + 10;
			LogUtil.DEFAULT_LOG.warn("initialDelay:{}", initialDelay);
			// 6个小时执行一次DumpAllTask
			ConfigExecutor.scheduleConfigTask(dumpAll, initialDelay, DUMP_ALL_INTERVAL_IN_MINUTE, TimeUnit.MINUTES);
			ConfigExecutor
				.scheduleConfigTask(dumpAllBeta, initialDelay, DUMP_ALL_INTERVAL_IN_MINUTE, TimeUnit.MINUTES);
			ConfigExecutor
				.scheduleConfigTask(dumpAllTag, initialDelay, DUMP_ALL_INTERVAL_IN_MINUTE, TimeUnit.MINUTES);
		}
		ConfigExecutor.scheduleConfigTask(clearConfigHistory, 10, 10, TimeUnit.MINUTES);
	} finally {
		TimerContext.end(dumpFileContext, LogUtil.DUMP_LOG);
	}
}
dumpConfigInfo()里面还是执行了DumpAllTask。
 com.alibaba.nacos.config.server.service.dump.DumpService#dumpConfigInfo
private void dumpConfigInfo(DumpAllProcessor dumpAllProcessor) throws IOException {
	int timeStep = 6;
	Boolean isAllDump = true;
	// initial dump all
	FileInputStream fis = null;
	Timestamp heartheatLastStamp = null;
	try {
		... ...
		if (isAllDump) {
			LogUtil.DEFAULT_LOG.info("start clear all config-info.");
			DiskUtil.clearAll();
			// 执行DumpAllTask
			dumpAllProcessor.process(new DumpAllTask());
		} else {
			... ...
		}
	} catch (IOException e) {
		LogUtil.FATAL_LOG.error("dump config fail" + e.getMessage());
		throw e;
	} finally {
		if (null != fis) {
			try {
				fis.close();
			} catch (IOException e) {
				LogUtil.DEFAULT_LOG.warn("close file failed");
			}
		}
	}
}
process()会分页查询出数据库的所有配置,然后一个一个调用ConfigCacheService.dump()。
 com.alibaba.nacos.config.server.service.dump.processor.DumpAllProcessor#process
public boolean process(NacosTask task) {
	long currentMaxId = persistService.findConfigMaxId();
	long lastMaxId = 0;
	while (lastMaxId < currentMaxId) {
		// 分页查询出数据库的所有配置
		Page<ConfigInfoWrapper> page = persistService.findAllConfigInfoFragment(lastMaxId, PAGE_SIZE);
		if (page != null && page.getPageItems() != null && !page.getPageItems().isEmpty()) {
			for (ConfigInfoWrapper cf : page.getPageItems()) {
				long id = cf.getId();
				lastMaxId = id > lastMaxId ? id : lastMaxId;
				if (cf.getDataId().equals(AggrWhitelist.AGGRIDS_METADATA)) {
					AggrWhitelist.load(cf.getContent());
				}
				if (cf.getDataId().equals(ClientIpWhiteList.CLIENT_IP_WHITELIST_METADATA)) {
					ClientIpWhiteList.load(cf.getContent());
				}
				if (cf.getDataId().equals(SwitchService.SWITCH_META_DATAID)) {
					SwitchService.load(cf.getContent());
				}
				// dump为文件
				boolean result = ConfigCacheService
					.dump(cf.getDataId(), cf.getGroup(), cf.getTenant(), cf.getContent(), cf.getLastModified(),
						  cf.getType());
				final String content = cf.getContent();
				final String md5 = MD5Utils.md5Hex(content, Constants.ENCODE);
				LogUtil.DUMP_LOG.info("[dump-all-ok] {}, {}, length={}, md5={}",
									  GroupKey2.getKey(cf.getDataId(), cf.getGroup()), cf.getLastModified(), content.length(),
									  md5);
			}
			DEFAULT_LOG.info("[all-dump] {} / {}", lastMaxId, currentMaxId);
		} else {
			lastMaxId += PAGE_SIZE;
		}
	}
	return true;
}
dump()就是将数据库的配置,保存到本地,一个配置对应一个文件,这样客户端来查询配置,直接查的本地文件,而不是查数据库。
 com.alibaba.nacos.config.server.service.ConfigCacheService#dump
public static boolean dump(String dataId, String group, String tenant, String content, long lastModifiedTs,
						   String type) {
	String groupKey = GroupKey2.getKey(dataId, group, tenant);
	CacheItem ci = makeSure(groupKey);
	ci.setType(type);
	final int lockResult = tryWriteLock(groupKey);
	assert (lockResult != 0);
	if (lockResult < 0) {
		DUMP_LOG.warn("[dump-error] write lock failed. {}", groupKey);
		return false;
	}
	try {
		final String md5 = MD5Utils.md5Hex(content, Constants.ENCODE);
		if (md5.equals(ConfigCacheService.getContentMd5(groupKey))) {
			DUMP_LOG.warn("[dump-ignore] ignore to save cache file. groupKey={}, md5={}, lastModifiedOld={}, "
						  + "lastModifiedNew={}", groupKey, md5, ConfigCacheService.getLastModifiedTs(groupKey),
						  lastModifiedTs);
		} else if (!PropertyUtil.isDirectRead()) {
			// 写入磁盘
			DiskUtil.saveToDisk(dataId, group, tenant, content);
		}
		// 更新md5,发布LocalDataChangeEvent事件
		updateMd5(groupKey, md5, lastModifiedTs);
		return true;
	} catch (IOException ioe) {
		DUMP_LOG.error("[dump-exception] save disk error. " + groupKey + ", " + ioe.toString(), ioe);
		if (ioe.getMessage() != null) {
			String errMsg = ioe.getMessage();
			if (NO_SPACE_CN.equals(errMsg) || NO_SPACE_EN.equals(errMsg) || errMsg.contains(DISK_QUATA_CN) || errMsg
				.contains(DISK_QUATA_EN)) {
				// Protect from disk full.
				FATAL_LOG.error("磁盘满自杀退出", ioe);
				System.exit(0);
			}
		}
		return false;
	} finally {
		releaseWriteLock(groupKey);
	}
}
服务启动过程中主要就是将数据库的配置全部保存到本地。
客户端来查询配置
客户端启动时会调用/v1/cs/configs来查询配置。
com.alibaba.nacos.config.server.controller.ConfigController#getConfig
@GetMapping
@Secured(action = ActionTypes.READ, parser = ConfigResourceParser.class)
public void getConfig(HttpServletRequest request, HttpServletResponse response,
					  @RequestParam("dataId") String dataId, @RequestParam("group") String group,
					  @RequestParam(value = "tenant", required = false, defaultValue = StringUtils.EMPTY) String tenant,
					  @RequestParam(value = "tag", required = false) String tag)
	throws IOException, ServletException, NacosException {
	// 读取配置的入口
	// check tenant
	ParamUtils.checkTenant(tenant);
	tenant = NamespaceUtil.processNamespaceParameter(tenant);
	// check params
	ParamUtils.checkParam(dataId, group, "datumId", "content");
	ParamUtils.checkParam(tag);
	final String clientIp = RequestUtil.getRemoteIp(request);
	inner.doGetConfig(request, response, dataId, group, tenant, tag, clientIp);
}
doGetConfig()直接找到文件,使用jdk的零拷贝传输直接将文件输入流转response输出流中。
 com.alibaba.nacos.config.server.controller.ConfigServletInner#doGetConfig
public String doGetConfig(HttpServletRequest request, HttpServletResponse response, String dataId, String group,
						  String tenant, String tag, String clientIp) throws IOException, ServletException {
	final String groupKey = GroupKey2.getKey(dataId, group, tenant);
	String autoTag = request.getHeader("Vipserver-Tag");
	String requestIpApp = RequestUtil.getAppName(request);
	int lockResult = tryConfigReadLock(groupKey);
	final String requestIp = RequestUtil.getRemoteIp(request);
	boolean isBeta = false;
	if (lockResult > 0) {
		// LockResult > 0 means cacheItem is not null and other thread can`t delete this cacheItem
		FileInputStream fis = null;
		try {
			String md5 = Constants.NULL;
			long lastModified = 0L;
			CacheItem cacheItem = ConfigCacheService.getContentCache(groupKey);
			if (cacheItem.isBeta() && cacheItem.getIps4Beta().contains(clientIp)) {
				isBeta = true;
			}
			final String configType =
				(null != cacheItem.getType()) ? cacheItem.getType() : FileTypeEnum.TEXT.getFileType();
			response.setHeader("Config-Type", configType);
			FileTypeEnum fileTypeEnum = FileTypeEnum.getFileTypeEnumByFileExtensionOrFileType(configType);
			String contentTypeHeader = fileTypeEnum.getContentType();
			response.setHeader(HttpHeaderConsts.CONTENT_TYPE, contentTypeHeader);
			File file = null;
			ConfigInfoBase configInfoBase = null;
			PrintWriter out = null;
			if (isBeta) {
				md5 = cacheItem.getMd54Beta();
				lastModified = cacheItem.getLastModifiedTs4Beta();
				if (PropertyUtil.isDirectRead()) {
					configInfoBase = persistService.findConfigInfo4Beta(dataId, group, tenant);
				} else {
					file = DiskUtil.targetBetaFile(dataId, group, tenant);
				}
				response.setHeader("isBeta", "true");
			} else {
				if (StringUtils.isBlank(tag)) {
					if (isUseTag(cacheItem, autoTag)) {
					... ...
					} else {
						md5 = cacheItem.getMd5();
						lastModified = cacheItem.getLastModifiedTs();
						if (PropertyUtil.isDirectRead()) {
							// 单节点模式,直接读取数据库
							configInfoBase = persistService.findConfigInfo(dataId, group, tenant);
						} else {
							// 集群模式,读取磁盘文件
							file = DiskUtil.targetFile(dataId, group, tenant);
						}
						... ...
					}
				} else {
					... ...
				}
			}
			response.setHeader(Constants.CONTENT_MD5, md5);
			// Disable cache.
			response.setHeader("Pragma", "no-cache");
			response.setDateHeader("Expires", 0);
			response.setHeader("Cache-Control", "no-cache,no-store");
			if (PropertyUtil.isDirectRead()) {
				response.setDateHeader("Last-Modified", lastModified);
			} else {
				fis = new FileInputStream(file);
				response.setDateHeader("Last-Modified", file.lastModified());
			}
			if (PropertyUtil.isDirectRead()) {
				out = response.getWriter();
				out.print(configInfoBase.getContent());
				out.flush();
				out.close();
			} else {
				// 零拷贝
				fis.getChannel()
					.transferTo(0L, fis.getChannel().size(), Channels.newChannel(response.getOutputStream()));
			}
			LogUtil.PULL_CHECK_LOG.warn("{}|{}|{}|{}", groupKey, requestIp, md5, TimeUtils.getCurrentTimeStr());
			final long delayed = System.currentTimeMillis() - lastModified;
			// TODO distinguish pull-get && push-get
			/*
                Otherwise, delayed cannot be used as the basis of push delay directly,
                because the delayed value of active get requests is very large.
                */
			ConfigTraceService.logPullEvent(dataId, group, tenant, requestIpApp, lastModified,
											ConfigTraceService.PULL_EVENT_OK, delayed, requestIp);
		} finally {
			releaseConfigReadLock(groupKey);
			IoUtils.closeQuietly(fis);
		}
	} else if (lockResult == 0) {
... ...
	} else {
... ...
	}
	return HttpServletResponse.SC_OK + "";
}
客户端长轮询监听配置
客户端启动成功后,会调用Http接口/v1/cs/configs/listener长轮询来监听配置的变更。
com.alibaba.nacos.config.server.controller.ConfigController#listener
@PostMapping("/listener")
@Secured(action = ActionTypes.READ, parser = ConfigResourceParser.class)
public void listener(HttpServletRequest request, HttpServletResponse response)
	throws ServletException, IOException {
	// 监听配置更新的入口
	request.setAttribute("org.apache.catalina.ASYNC_SUPPORTED", true);
	String probeModify = request.getParameter("Listening-Configs");
	if (StringUtils.isBlank(probeModify)) {
		throw new IllegalArgumentException("invalid probeModify");
	}
	probeModify = URLDecoder.decode(probeModify, Constants.ENCODE);
	Map<String, String> clientMd5Map;
	try {
		clientMd5Map = MD5Util.getClientMd5Map(probeModify);
	} catch (Throwable e) {
		throw new IllegalArgumentException("invalid probeModify");
	}
	// 长轮询
	// do long-polling
	inner.doPollingConfig(request, response, clientMd5Map, probeModify.length());
}
doPollingConfig()会判断是否支持长轮询,依据是header是否包含Long-Pulling-Timeout属性。
 com.alibaba.nacos.config.server.controller.ConfigServletInner#doPollingConfig
public String doPollingConfig(HttpServletRequest request, HttpServletResponse response,
							  Map<String, String> clientMd5Map, int probeRequestSize) throws IOException {
	// Long polling.
	if (LongPollingService.isSupportLongPolling(request)) {
		// 支持长轮询
		longPollingService.addLongPollingClient(request, response, clientMd5Map, probeRequestSize);
		return HttpServletResponse.SC_OK + "";
	}
... ...
}
addLongPollingClient()会将客户端保存起来,方便后面有配置变更时找到客户端并进行响应。
 com.alibaba.nacos.config.server.service.LongPollingService#addLongPollingClient
public void addLongPollingClient(HttpServletRequest req, HttpServletResponse rsp, Map<String, String> clientMd5Map,
								 int probeRequestSize) {
	String str = req.getHeader(LongPollingService.LONG_POLLING_HEADER);
	String noHangUpFlag = req.getHeader(LongPollingService.LONG_POLLING_NO_HANG_UP_HEADER);
	String appName = req.getHeader(RequestUtil.CLIENT_APPNAME_HEADER);
	String tag = req.getHeader("Vipserver-Tag");
	int delayTime = SwitchService.getSwitchInteger(SwitchService.FIXED_DELAY_TIME, 500);
	// Add delay time for LoadBalance, and one response is returned 500 ms in advance to avoid client timeout.
	long timeout = Math.max(10000, Long.parseLong(str) - delayTime);
	if (isFixedPolling()) {
		timeout = Math.max(10000, getFixedPollingInterval());
		// Do nothing but set fix polling timeout.
	} else {
		long start = System.currentTimeMillis();
		// 校验md5
		List<String> changedGroups = MD5Util.compareMd5(req, rsp, clientMd5Map);
		if (changedGroups.size() > 0) {
			// 如果有变更立马返回
			generateResponse(req, rsp, changedGroups);
			LogUtil.CLIENT_LOG.info("{}|{}|{}|{}|{}|{}|{}", System.currentTimeMillis() - start, "instant",
									RequestUtil.getRemoteIp(req), "polling", clientMd5Map.size(), probeRequestSize,
									changedGroups.size());
			return;
		} else if (noHangUpFlag != null && noHangUpFlag.equalsIgnoreCase(TRUE_STR)) {
			// 如果是初始化请求,直接返回,不挂起
			LogUtil.CLIENT_LOG.info("{}|{}|{}|{}|{}|{}|{}", System.currentTimeMillis() - start, "nohangup",
									RequestUtil.getRemoteIp(req), "polling", clientMd5Map.size(), probeRequestSize,
									changedGroups.size());
			return;
		}
	}
	String ip = RequestUtil.getRemoteIp(req);
	// Must be called by http thread, or send response.
	final AsyncContext asyncContext = req.startAsync();
	// AsyncContext.setTimeout() is incorrect, Control by oneself
	asyncContext.setTimeout(0L);
	// 如果md5是一样的,异步执行ClientLongPolling
	ConfigExecutor.executeLongPolling(
		new ClientLongPolling(asyncContext, clientMd5Map, ip, probeRequestSize, timeout, appName, tag));
}
ClientLongPolling()会启动一个延时30执行的任务,如果30s内配置没有变更,任务就会执行,对客户端进行响应,如果30s内配置发生了变更,此任务就会被取消。
 com.alibaba.nacos.config.server.service.LongPollingService.ClientLongPolling#run
public void run() {
	// 延时30s执行
	asyncTimeoutFuture = ConfigExecutor.scheduleLongPolling(new Runnable() {
		@Override
		public void run() {
			try {
				getRetainIps().put(ClientLongPolling.this.ip, System.currentTimeMillis());
				// Delete subsciber's relations.
				allSubs.remove(ClientLongPolling.this);
				if (isFixedPolling()) {
					... ...
				} else {
					LogUtil.CLIENT_LOG
						.info("{}|{}|{}|{}|{}|{}", (System.currentTimeMillis() - createTime), "timeout",
							  RequestUtil.getRemoteIp((HttpServletRequest) asyncContext.getRequest()),
							  "polling", clientMd5Map.size(), probeRequestSize);
					// 超时直接返回
					sendResponse(null);
				}
			} catch (Throwable t) {
				LogUtil.DEFAULT_LOG.error("long polling error:" + t.getMessage(), t.getCause());
			}
		}
	}, timeoutTime, TimeUnit.MILLISECONDS);
	// 将客户端端缓存至队列中
	allSubs.add(this);
}
sendResponse()对客户端进行响应,如果配置有变更,就会取消上面创建的任务。
 com.alibaba.nacos.config.server.service.LongPollingService.ClientLongPolling#sendResponse
void sendResponse(List<String> changedGroups) {
	// Cancel time out task.
	if (null != asyncTimeoutFuture) {
		// 取消任务
		asyncTimeoutFuture.cancel(false);
	}
	generateResponse(changedGroups);
}
generateResponse()会将变更配置的dataId和group新信息返回给客户端,并不会返回具体的配置内容,内容会由客户端来查询。
 com.alibaba.nacos.config.server.service.LongPollingService.ClientLongPolling#generateResponse
void generateResponse(List<String> changedGroups) {
	if (null == changedGroups) {
		// Tell web container to send http response.
		asyncContext.complete();
		return;
	}
	HttpServletResponse response = (HttpServletResponse) asyncContext.getResponse();
	try {
		// 封装更新的配置,返回客户端
		final String respString = MD5Util.compareMd5ResultString(changedGroups);
		// Disable cache.
		response.setHeader("Pragma", "no-cache");
		response.setDateHeader("Expires", 0);
		response.setHeader("Cache-Control", "no-cache,no-store");
		response.setStatus(HttpServletResponse.SC_OK);
		response.getWriter().println(respString);
		asyncContext.complete();
	} catch (Exception ex) {
		PULL_LOG.error(ex.toString(), ex);
		asyncContext.complete();
	}
}
配置变更通知客户端
当在Nacos管理后台修改了配置后,会调用/v1/cs/configs来更新配置。
publishConfig()会将配置保存到数据库中,并发布ConfigDataChangeEvent事件。
 com.alibaba.nacos.config.server.controller.ConfigController#publishConfig
@PostMapping
@Secured(action = ActionTypes.WRITE, parser = ConfigResourceParser.class)
public Boolean publishConfig(HttpServletRequest request, HttpServletResponse response,
							 @RequestParam(value = "dataId") String dataId, @RequestParam(value = "group") String group,
							 @RequestParam(value = "tenant", required = false, defaultValue = StringUtils.EMPTY) String tenant,
							 @RequestParam(value = "content") String content, @RequestParam(value = "tag", required = false) String tag,
							 @RequestParam(value = "appName", required = false) String appName,
							 @RequestParam(value = "src_user", required = false) String srcUser,
							 @RequestParam(value = "config_tags", required = false) String configTags,
							 @RequestParam(value = "desc", required = false) String desc,
							 @RequestParam(value = "use", required = false) String use,
							 @RequestParam(value = "effect", required = false) String effect,
							 @RequestParam(value = "type", required = false) String type,
							 @RequestParam(value = "schema", required = false) String schema) throws NacosException {
	// 修改配置入口
	final String srcIp = RequestUtil.getRemoteIp(request);
	final String requestIpApp = RequestUtil.getAppName(request);
	srcUser = RequestUtil.getSrcUserName(request);
	//check type
	if (!ConfigType.isValidType(type)) {
		type = ConfigType.getDefaultType().getType();
	}
	// check tenant
	ParamUtils.checkTenant(tenant);
	ParamUtils.checkParam(dataId, group, "datumId", content);
	ParamUtils.checkParam(tag);
	Map<String, Object> configAdvanceInfo = new HashMap<String, Object>(10);
	MapUtils.putIfValNoNull(configAdvanceInfo, "config_tags", configTags);
	MapUtils.putIfValNoNull(configAdvanceInfo, "desc", desc);
	MapUtils.putIfValNoNull(configAdvanceInfo, "use", use);
	MapUtils.putIfValNoNull(configAdvanceInfo, "effect", effect);
	MapUtils.putIfValNoNull(configAdvanceInfo, "type", type);
	MapUtils.putIfValNoNull(configAdvanceInfo, "schema", schema);
	ParamUtils.checkParam(configAdvanceInfo);
	if (AggrWhitelist.isAggrDataId(dataId)) {
		LOGGER.warn("[aggr-conflict] {} attempt to publish single data, {}, {}", RequestUtil.getRemoteIp(request),
					dataId, group);
		throw new NacosException(NacosException.NO_RIGHT, "dataId:" + dataId + " is aggr");
	}
	final Timestamp time = TimeUtils.getCurrentTime();
	String betaIps = request.getHeader("betaIps");
	ConfigInfo configInfo = new ConfigInfo(dataId, group, tenant, appName, content);
	configInfo.setType(type);
	if (StringUtils.isBlank(betaIps)) {
		if (StringUtils.isBlank(tag)) {
			// 插入数据库
			persistService.insertOrUpdate(srcIp, srcUser, configInfo, time, configAdvanceInfo, true);
			// 发布ConfigDataChangeEvent事件
			/**
                 * AsyncNotifyService监听了ConfigDataChangeEvent事件
                 * @see AsyncNotifyService#AsyncNotifyService(com.alibaba.nacos.core.cluster.ServerMemberManager)
                 */
			ConfigChangePublisher
				.notifyConfigChange(new ConfigDataChangeEvent(false, dataId, group, tenant, time.getTime()));
		} else {
			persistService.insertOrUpdateTag(configInfo, tag, srcIp, srcUser, time, true);
			ConfigChangePublisher.notifyConfigChange(
				new ConfigDataChangeEvent(false, dataId, group, tenant, tag, time.getTime()));
		}
	} else {
		// beta publish
		persistService.insertOrUpdateBeta(configInfo, betaIps, srcIp, srcUser, time, true);
		ConfigChangePublisher
			.notifyConfigChange(new ConfigDataChangeEvent(true, dataId, group, tenant, time.getTime()));
	}
	ConfigTraceService
		.logPersistenceEvent(dataId, group, tenant, requestIpApp, time.getTime(), InetUtils.getSelfIP(),
							 ConfigTraceService.PERSISTENCE_EVENT_PUB, content);
	return true;
}
AsyncNotifyService监听了ConfigDataChangeEvent事件,然后提交了AsyncTask任务来对Nacos集群中的节点进行通知配置的变化。
 com.alibaba.nacos.config.server.service.notify.AsyncNotifyService#AsyncNotifyService
public AsyncNotifyService(ServerMemberManager memberManager) {
	this.memberManager = memberManager;
	// Register ConfigDataChangeEvent to NotifyCenter.
	NotifyCenter.registerToPublisher(ConfigDataChangeEvent.class, NotifyCenter.ringBufferSize);
	// Register A Subscriber to subscribe ConfigDataChangeEvent.
	NotifyCenter.registerSubscriber(new Subscriber() {
		@Override
		public void onEvent(Event event) {
			// Generate ConfigDataChangeEvent concurrently
			if (event instanceof ConfigDataChangeEvent) {
				// 监听ConfigDataChangeEvent事件
				ConfigDataChangeEvent evt = (ConfigDataChangeEvent) event;
				long dumpTs = evt.lastModifiedTs;
				String dataId = evt.dataId;
				String group = evt.group;
				String tenant = evt.tenant;
				String tag = evt.tag;
				Collection<Member> ipList = memberManager.allMembers();
				// In fact, any type of queue here can be
				// 遍历集群中的所有节点,封装NotifySingleTask
				Queue<NotifySingleTask> queue = new LinkedList<NotifySingleTask>();
				for (Member member : ipList) {
					queue.add(new NotifySingleTask(dataId, group, tenant, tag, dumpTs, member.getAddress(),
												   evt.isBeta));
				}
				// 提交AsyncTask任务,AsyncTask中包含了NotifySingleTask
				/**
                     * @see AsyncTask#run()
                     */
				ConfigExecutor.executeAsyncNotify(new AsyncTask(nacosAsyncRestTemplate, queue));
			}
		}
		@Override
		public Class<? extends Event> subscribeType() {
			return ConfigDataChangeEvent.class;
		}
	});
}
AsyncTask.run()会调用Nacos集群中的所有节点(包含自己)的Http接口/v1/cs/communication/dataChange来通知配置的变化。
 com.alibaba.nacos.config.server.service.notify.AsyncNotifyService.AsyncTask#run
@Override
public void run() {
	executeAsyncInvoke();
}
private void executeAsyncInvoke() {
	// 遍历所有的NotifySingleTask任务
	while (!queue.isEmpty()) {
		NotifySingleTask task = queue.poll();
		String targetIp = task.getTargetIP();
		if (memberManager.hasMember(targetIp)) {
			// start the health check and there are ips that are not monitored, put them directly in the notification queue, otherwise notify
			boolean unHealthNeedDelay = memberManager.isUnHealth(targetIp);
			if (unHealthNeedDelay) {
				// target ip is unhealthy, then put it in the notification list
				ConfigTraceService.logNotifyEvent(task.getDataId(), task.getGroup(), task.getTenant(), null,
												  task.getLastModified(), InetUtils.getSelfIP(), ConfigTraceService.NOTIFY_EVENT_UNHEALTH,
												  0, task.target);
				// get delay time and set fail count to the task
				asyncTaskExecute(task);
			} else {
				Header header = Header.newInstance();
				header.addParam(NotifyService.NOTIFY_HEADER_LAST_MODIFIED, String.valueOf(task.getLastModified()));
				header.addParam(NotifyService.NOTIFY_HEADER_OP_HANDLE_IP, InetUtils.getSelfIP());
				if (task.isBeta) {
					header.addParam("isBeta", "true");
				}
				AuthHeaderUtil.addIdentityToHeader(header);
				// 调用/v1/cs/communication/dataChange接口
				/**
                         * @see CommunicationController#notifyConfigInfo(javax.servlet.http.HttpServletRequest, java.lang.String, java.lang.String, java.lang.String, java.lang.String)
                         */
				restTemplate.get(task.url, header, Query.EMPTY, String.class, new AsyncNotifyCallBack(task));
			}
		}
	}
}
notifyConfigInfo()主要负责将变化的配置从数据库中查询出来,然后更新本地的文件。
 com.alibaba.nacos.config.server.controller.CommunicationController#notifyConfigInfo
@GetMapping("/dataChange")
public Boolean notifyConfigInfo(HttpServletRequest request, @RequestParam("dataId") String dataId,
								@RequestParam("group") String group,
								@RequestParam(value = "tenant", required = false, defaultValue = StringUtils.EMPTY) String tenant,
								@RequestParam(value = "tag", required = false) String tag) {
	// 通知配置数据变更的入口
	dataId = dataId.trim();
	group = group.trim();
	String lastModified = request.getHeader(NotifyService.NOTIFY_HEADER_LAST_MODIFIED);
	long lastModifiedTs = StringUtils.isEmpty(lastModified) ? -1 : Long.parseLong(lastModified);
	String handleIp = request.getHeader(NotifyService.NOTIFY_HEADER_OP_HANDLE_IP);
	String isBetaStr = request.getHeader("isBeta");
	if (StringUtils.isNotBlank(isBetaStr) && trueStr.equals(isBetaStr)) {
		dumpService.dump(dataId, group, tenant, lastModifiedTs, handleIp, true);
	} else {
		// 转存数据
		dumpService.dump(dataId, group, tenant, tag, lastModifiedTs, handleIp);
	}
	return true;
}
dump()操作又提交了一个DumpTask任务。
 com.alibaba.nacos.config.server.service.dump.DumpService#dump(java.lang.String, java.lang.String, java.lang.String, java.lang.String, long, java.lang.String)
public void dump(String dataId, String group, String tenant, String tag, long lastModified, String handleIp) {
	dump(dataId, group, tenant, tag, lastModified, handleIp, false);
}
public void dump(String dataId, String group, String tenant, String tag, long lastModified, String handleIp,
				 boolean isBeta) {
	String groupKey = GroupKey2.getKey(dataId, group, tenant);
	String taskKey = String.join("+", dataId, group, tenant, String.valueOf(isBeta), tag);
	// 添加DumpTask任务
	/**
         * @see DumpProcessor#process(com.alibaba.nacos.common.task.NacosTask)
         */
	dumpTaskMgr.addTask(taskKey, new DumpTask(groupKey, tag, lastModified, handleIp, isBeta));
	DUMP_LOG.info("[dump-task] add task. groupKey={}, taskKey={}", groupKey, taskKey);
}
process()会将变化的配置从数据库中查询出来,交于DumpConfigHandler.configDump()处理配置。
 com.alibaba.nacos.config.server.service.dump.processor.DumpProcessor#process
public boolean process(NacosTask task) {
	// 处理DumpTask
	final PersistService persistService = dumpService.getPersistService();
	DumpTask dumpTask = (DumpTask) task;
	String[] pair = GroupKey2.parseKey(dumpTask.getGroupKey());
	String dataId = pair[0];
	String group = pair[1];
	String tenant = pair[2];
	long lastModified = dumpTask.getLastModified();
	String handleIp = dumpTask.getHandleIp();
	boolean isBeta = dumpTask.isBeta();
	String tag = dumpTask.getTag();
	ConfigDumpEvent.ConfigDumpEventBuilder build = ConfigDumpEvent.builder().namespaceId(tenant).dataId(dataId)
		.group(group).isBeta(isBeta).tag(tag).lastModifiedTs(lastModified).handleIp(handleIp);
	if (isBeta) {
。。。。。。
	} else {
		if (StringUtils.isBlank(tag)) {
			// 从数据库查询配置数据
			ConfigInfo cf = persistService.findConfigInfo(dataId, group, tenant);
			build.remove(Objects.isNull(cf));
			build.content(Objects.isNull(cf) ? null : cf.getContent());
			build.type(Objects.isNull(cf) ? null : cf.getType());
			// 转存配置数据
			return DumpConfigHandler.configDump(build.build());
		} else {
。。。。。。
		}
	}
}
configDump()又调用了ConfigCacheService.dump(),这个方法在服务端启动时保存所有的配置文件时也使用了。
 com.alibaba.nacos.config.server.service.dump.DumpConfigHandler#configDump
public static boolean configDump(ConfigDumpEvent event) {
	final String dataId = event.getDataId();
	final String group = event.getGroup();
	final String namespaceId = event.getNamespaceId();
	final String content = event.getContent();
	final String type = event.getType();
	final long lastModified = event.getLastModifiedTs();
	if (event.isBeta()) {
		。。。。。。
	}
	if (StringUtils.isBlank(event.getTag())) {
		if (dataId.equals(AggrWhitelist.AGGRIDS_METADATA)) {
			AggrWhitelist.load(content);
		}
		if (dataId.equals(ClientIpWhiteList.CLIENT_IP_WHITELIST_METADATA)) {
			ClientIpWhiteList.load(content);
		}
		if (dataId.equals(SwitchService.SWITCH_META_DATAID)) {
			SwitchService.load(content);
		}
		boolean result;
		if (!event.isRemove()) {
			// dump数据
			result = ConfigCacheService.dump(dataId, group, namespaceId, content, lastModified, type);
			if (result) {
				ConfigTraceService.logDumpEvent(dataId, group, namespaceId, null, lastModified, event.getHandleIp(),
												ConfigTraceService.DUMP_EVENT_OK, System.currentTimeMillis() - lastModified,
												content.length());
			}
		} else {
			。。。。。。
		}
		return result;
	} else {
		。。。。。。
	}
}
dump()会将新的配置写入磁盘文件,更新md5,然后发布LocalDataChangeEvent事件。
 com.alibaba.nacos.config.server.service.ConfigCacheService#dump
public static boolean dump(String dataId, String group, String tenant, String content, long lastModifiedTs,
						   String type) {
	String groupKey = GroupKey2.getKey(dataId, group, tenant);
	CacheItem ci = makeSure(groupKey);
	ci.setType(type);
	final int lockResult = tryWriteLock(groupKey);
	assert (lockResult != 0);
	if (lockResult < 0) {
		DUMP_LOG.warn("[dump-error] write lock failed. {}", groupKey);
		return false;
	}
	try {
		final String md5 = MD5Utils.md5Hex(content, Constants.ENCODE);
		if (md5.equals(ConfigCacheService.getContentMd5(groupKey))) {
			DUMP_LOG.warn("[dump-ignore] ignore to save cache file. groupKey={}, md5={}, lastModifiedOld={}, "
						  + "lastModifiedNew={}", groupKey, md5, ConfigCacheService.getLastModifiedTs(groupKey),
						  lastModifiedTs);
		} else if (!PropertyUtil.isDirectRead()) {
			// 写入磁盘
			DiskUtil.saveToDisk(dataId, group, tenant, content);
		}
		// 更新md5,发布LocalDataChangeEvent事件
		updateMd5(groupKey, md5, lastModifiedTs);
		return true;
	} catch (IOException ioe) {
		DUMP_LOG.error("[dump-exception] save disk error. " + groupKey + ", " + ioe.toString(), ioe);
		if (ioe.getMessage() != null) {
			String errMsg = ioe.getMessage();
			if (NO_SPACE_CN.equals(errMsg) || NO_SPACE_EN.equals(errMsg) || errMsg.contains(DISK_QUATA_CN) || errMsg
				.contains(DISK_QUATA_EN)) {
				// Protect from disk full.
				FATAL_LOG.error("磁盘满自杀退出", ioe);
				System.exit(0);
			}
		}
		return false;
	} finally {
		releaseWriteLock(groupKey);
	}
}
updateMd5()会更新md5,然后发布LocalDataChangeEvent事件。
 com.alibaba.nacos.config.server.service.ConfigCacheService#updateMd5
public static void updateMd5(String groupKey, String md5, long lastModifiedTs) {
	CacheItem cache = makeSure(groupKey);
	if (cache.md5 == null || !cache.md5.equals(md5)) {
		cache.md5 = md5;
		cache.lastModifiedTs = lastModifiedTs;
		// 发布LocalDataChangeEvent事件
		/**
             * LongPollingService监听了LocalDataChangeEvent事件
             * @see LongPollingService#LongPollingService()
             */
		NotifyCenter.publishEvent(new LocalDataChangeEvent(groupKey));
	}
}
LongPollingService会监听LocalDataChangeEvent事件,然后提交DataChangeTask。
 com.alibaba.nacos.config.server.service.LongPollingService#LongPollingService
public LongPollingService() {
	allSubs = new ConcurrentLinkedQueue<ClientLongPolling>();
	ConfigExecutor.scheduleLongPolling(new StatTask(), 0L, 10L, TimeUnit.SECONDS);
	// Register LocalDataChangeEvent to NotifyCenter.
	NotifyCenter.registerToPublisher(LocalDataChangeEvent.class, NotifyCenter.ringBufferSize);
	// Register A Subscriber to subscribe LocalDataChangeEvent.
	NotifyCenter.registerSubscriber(new Subscriber() {
		@Override
		public void onEvent(Event event) {
			if (isFixedPolling()) {
				// Ignore.
			} else {
				// 监听LocalDataChangeEvent事件
				if (event instanceof LocalDataChangeEvent) {
					LocalDataChangeEvent evt = (LocalDataChangeEvent) event;
					// 提交DataChangeTask任务
					/**
                         * @see DataChangeTask#run()
                         */
					ConfigExecutor.executeLongPolling(new DataChangeTask(evt.groupKey, evt.isBeta, evt.betaIps));
				}
			}
		}
		@Override
		public Class<? extends Event> subscribeType() {
			return LocalDataChangeEvent.class;
		}
	});
}
DataChangeTask会找到监听这个配置的客户端,然后进行通知。
 com.alibaba.nacos.config.server.service.LongPollingService.DataChangeTask#run
public void run() {
	try {
		ConfigCacheService.getContentBetaMd5(groupKey);
		for (Iterator<ClientLongPolling> iter = allSubs.iterator(); iter.hasNext(); ) {
			ClientLongPolling clientSub = iter.next();
			// 找到监听这个配置的客户端
			if (clientSub.clientMd5Map.containsKey(groupKey)) {
				// If published tag is not in the beta list, then it skipped.
				if (isBeta && !CollectionUtils.contains(betaIps, clientSub.ip)) {
					continue;
				}
				// If published tag is not in the tag list, then it skipped.
				if (StringUtils.isNotBlank(tag) && !tag.equals(clientSub.tag)) {
					continue;
				}
				getRetainIps().put(clientSub.ip, System.currentTimeMillis());
				iter.remove(); // Delete subscribers' relationships.
				LogUtil.CLIENT_LOG
					.info("{}|{}|{}|{}|{}|{}|{}", (System.currentTimeMillis() - changeTime), "in-advance",
						  RequestUtil
						  .getRemoteIp((HttpServletRequest) clientSub.asyncContext.getRequest()),
						  "polling", clientSub.clientMd5Map.size(), clientSub.probeRequestSize, groupKey);
				// 通知客户端配置更新了
				clientSub.sendResponse(Arrays.asList(groupKey));
			}
		}
	} catch (Throwable t) {
		LogUtil.DEFAULT_LOG.error("data change error: {}", ExceptionUtil.getStackTrace(t));
	}
}



















