新聞中心
本篇內(nèi)容介紹了“nacos ServiceManager的updateInstance有什么作用”的有關(guān)知識(shí),在實(shí)際案例的操作過(guò)程中,不少人都會(huì)遇到這樣的困境,接下來(lái)就讓小編帶領(lǐng)大家學(xué)習(xí)一下如何處理這些情況吧!希望大家仔細(xì)閱讀,能夠?qū)W有所成!
成都創(chuàng)新互聯(lián)一直秉承“誠(chéng)信做人,踏實(shí)做事”的原則,不欺瞞客戶,是我們最起碼的底線! 以服務(wù)為基礎(chǔ),以質(zhì)量求生存,以技術(shù)求發(fā)展,成交一個(gè)客戶多一個(gè)朋友!為您提供成都網(wǎng)站設(shè)計(jì)、網(wǎng)站制作、外貿(mào)營(yíng)銷(xiāo)網(wǎng)站建設(shè)、成都網(wǎng)頁(yè)設(shè)計(jì)、重慶小程序開(kāi)發(fā)、成都網(wǎng)站開(kāi)發(fā)、成都網(wǎng)站制作、成都軟件開(kāi)發(fā)、App定制開(kāi)發(fā)是成都本地專(zhuān)業(yè)的網(wǎng)站建設(shè)和網(wǎng)站設(shè)計(jì)公司,等你一起來(lái)見(jiàn)證!
序
本文主要研究一下nacos ServiceManager的updateInstance
ServiceManager
nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/core/ServiceManager.java
@Component @DependsOn("nacosApplicationContext") public class ServiceManager implements RecordListener{ /** * Map > */ private Map > serviceMap = new ConcurrentHashMap<>(); private LinkedBlockingDeque toBeUpdatedServicesQueue = new LinkedBlockingDeque<>(1024 * 1024); private Synchronizer synchronizer = new ServiceStatusSynchronizer(); private final Lock lock = new ReentrantLock(); @Resource(name = "consistencyDelegate") private ConsistencyService consistencyService; @Autowired private SwitchDomain switchDomain; @Autowired private DistroMapper distroMapper; @Autowired private ServerListManager serverListManager; @Autowired private PushService pushService; private final Object putServiceLock = new Object(); //...... public void updateInstance(String namespaceId, String serviceName, Instance instance) throws NacosException { Service service = getService(namespaceId, serviceName); if (service == null) { throw new NacosException(NacosException.INVALID_PARAM, "service not found, namespace: " + namespaceId + ", service: " + serviceName); } if (!service.allIPs().contains(instance)) { throw new NacosException(NacosException.INVALID_PARAM, "instance not exist: " + instance); } addInstance(namespaceId, serviceName, instance.isEphemeral(), instance); } public void addInstance(String namespaceId, String serviceName, boolean ephemeral, Instance... ips) throws NacosException { String key = KeyBuilder.buildInstanceListKey(namespaceId, serviceName, ephemeral); Service service = getService(namespaceId, serviceName); List instanceList = addIpAddresses(service, ephemeral, ips); Instances instances = new Instances(); instances.setInstanceList(instanceList); consistencyService.put(key, instances); } public List addIpAddresses(Service service, boolean ephemeral, Instance... ips) throws NacosException { return updateIpAddresses(service, UtilsAndCommons.UPDATE_INSTANCE_ACTION_ADD, ephemeral, ips); } public List updateIpAddresses(Service service, String action, boolean ephemeral, Instance... ips) throws NacosException { Datum datum = consistencyService.get(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), ephemeral)); Map oldInstanceMap = new HashMap<>(16); List currentIPs = service.allIPs(ephemeral); Map map = new ConcurrentHashMap<>(currentIPs.size()); for (Instance instance : currentIPs) { map.put(instance.toIPAddr(), instance); } if (datum != null) { oldInstanceMap = setValid(((Instances) datum.value).getInstanceList(), map); } // use HashMap for deep copy: HashMap instanceMap = new HashMap<>(oldInstanceMap.size()); instanceMap.putAll(oldInstanceMap); for (Instance instance : ips) { if (!service.getClusterMap().containsKey(instance.getClusterName())) { Cluster cluster = new Cluster(instance.getClusterName(), service); cluster.init(); service.getClusterMap().put(instance.getClusterName(), cluster); Loggers.SRV_LOG.warn("cluster: {} not found, ip: {}, will create new cluster with default configuration.", instance.getClusterName(), instance.toJSON()); } if (UtilsAndCommons.UPDATE_INSTANCE_ACTION_REMOVE.equals(action)) { instanceMap.remove(instance.getDatumKey()); } else { instanceMap.put(instance.getDatumKey(), instance); } } if (instanceMap.size() <= 0 && UtilsAndCommons.UPDATE_INSTANCE_ACTION_ADD.equals(action)) { throw new IllegalArgumentException("ip list can not be empty, service: " + service.getName() + ", ip list: " + JSON.toJSONString(instanceMap.values())); } return new ArrayList<>(instanceMap.values()); } //...... }
updateInstance會(huì)通過(guò)service.allIPs().contains(instance)校驗(yàn)要更新的instance是否存在,不存在則拋出NacosException,存在則執(zhí)行addInstance方法
addInstance方法它會(huì)獲取service,然后執(zhí)行addIpAddresses,最后執(zhí)行consistencyService.put;addIpAddresses調(diào)用的是updateIpAddresses方法,其action參數(shù)為UtilsAndCommons.UPDATE_INSTANCE_ACTION_ADD
updateIpAddresses方法首先從consistencyService獲取datum,然后通過(guò)service.allIPs方法獲取currentIPs,之后根據(jù)datum設(shè)置oldInstanceMap,對(duì)于UtilsAndCommons.UPDATE_INSTANCE_ACTION_REMOVE類(lèi)型執(zhí)行刪除,其余的action則將instance方法到instanceMap中
DistroConsistencyServiceImpl.put
nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/consistency/ephemeral/distro/DistroConsistencyServiceImpl.java
@org.springframework.stereotype.Service("distroConsistencyService") public class DistroConsistencyServiceImpl implements EphemeralConsistencyService { private ScheduledExecutorService executor = new ScheduledThreadPoolExecutor(1, new ThreadFactory() { @Override public Thread newThread(Runnable r) { Thread t = new Thread(r); t.setDaemon(true); t.setName("com.alibaba.nacos.naming.distro.notifier"); return t; } }); @Autowired private DistroMapper distroMapper; @Autowired private DataStore dataStore; @Autowired private TaskDispatcher taskDispatcher; @Autowired private DataSyncer dataSyncer; @Autowired private Serializer serializer; @Autowired private ServerListManager serverListManager; @Autowired private SwitchDomain switchDomain; @Autowired private GlobalConfig globalConfig; private boolean initialized = false; public volatile Notifier notifier = new Notifier(); private Map> listeners = new ConcurrentHashMap<>(); private Map syncChecksumTasks = new ConcurrentHashMap<>(16); //...... public void put(String key, Record value) throws NacosException { onPut(key, value); taskDispatcher.addTask(key); } public void onPut(String key, Record value) { if (KeyBuilder.matchEphemeralInstanceListKey(key)) { Datum datum = new Datum<>(); datum.value = (Instances) value; datum.key = key; datum.timestamp.incrementAndGet(); dataStore.put(key, datum); } if (!listeners.containsKey(key)) { return; } notifier.addTask(key, ApplyAction.CHANGE); } //...... }
DistroConsistencyServiceImpl的put方法會(huì)先執(zhí)行onPut,然后執(zhí)行taskDispatcher.addTask(key);onPut在判斷key是ephemeralInstanceListKey時(shí)會(huì)創(chuàng)建一個(gè)Datum,遞增其timestamp,然后放到dataStore中,最后調(diào)用notifier.addTask(key, ApplyAction.CHANGE)
Notifier.addTask
nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/consistency/ephemeral/distro/DistroConsistencyServiceImpl.java
public class Notifier implements Runnable { private ConcurrentHashMapservices = new ConcurrentHashMap<>(10 * 1024); private BlockingQueue tasks = new LinkedBlockingQueue (1024 * 1024); public void addTask(String datumKey, ApplyAction action) { if (services.containsKey(datumKey) && action == ApplyAction.CHANGE) { return; } if (action == ApplyAction.CHANGE) { services.put(datumKey, StringUtils.EMPTY); } tasks.add(Pair.with(datumKey, action)); } public int getTaskSize() { return tasks.size(); } @Override public void run() { Loggers.DISTRO.info("distro notifier started"); while (true) { try { Pair pair = tasks.take(); if (pair == null) { continue; } String datumKey = (String) pair.getValue0(); ApplyAction action = (ApplyAction) pair.getValue1(); services.remove(datumKey); int count = 0; if (!listeners.containsKey(datumKey)) { continue; } for (RecordListener listener : listeners.get(datumKey)) { count++; try { if (action == ApplyAction.CHANGE) { listener.onChange(datumKey, dataStore.get(datumKey).value); continue; } if (action == ApplyAction.DELETE) { listener.onDelete(datumKey); continue; } } catch (Throwable e) { Loggers.DISTRO.error("[NACOS-DISTRO] error while notifying listener of key: {}", datumKey, e); } } if (Loggers.DISTRO.isDebugEnabled()) { Loggers.DISTRO.debug("[NACOS-DISTRO] datum change notified, key: {}, listener count: {}, action: {}", datumKey, count, action.name()); } } catch (Throwable e) { Loggers.DISTRO.error("[NACOS-DISTRO] Error while handling notifying task", e); } } } }
Notifier的addTask方法對(duì)于action為ApplyAction.CHANGE的且不在services當(dāng)中的會(huì)放入到services當(dāng)中,最后添加到tasks;run方法會(huì)不斷從tasks取出數(shù)據(jù),執(zhí)行相應(yīng)的回調(diào)
TaskDispatcher.addTask
nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/consistency/ephemeral/distro/TaskDispatcher.java
@Component public class TaskDispatcher { @Autowired private GlobalConfig partitionConfig; @Autowired private DataSyncer dataSyncer; private ListtaskSchedulerList = new ArrayList<>(); private final int cpuCoreCount = Runtime.getRuntime().availableProcessors(); @PostConstruct public void init() { for (int i = 0; i < cpuCoreCount; i++) { TaskScheduler taskScheduler = new TaskScheduler(i); taskSchedulerList.add(taskScheduler); GlobalExecutor.submitTaskDispatch(taskScheduler); } } public void addTask(String key) { taskSchedulerList.get(UtilsAndCommons.shakeUp(key, cpuCoreCount)).addTask(key); } public class TaskScheduler implements Runnable { private int index; private int dataSize = 0; private long lastDispatchTime = 0L; private BlockingQueue queue = new LinkedBlockingQueue<>(128 * 1024); public TaskScheduler(int index) { this.index = index; } public void addTask(String key) { queue.offer(key); } public int getIndex() { return index; } @Override public void run() { List keys = new ArrayList<>(); while (true) { try { String key = queue.poll(partitionConfig.getTaskDispatchPeriod(), TimeUnit.MILLISECONDS); if (Loggers.DISTRO.isDebugEnabled() && StringUtils.isNotBlank(key)) { Loggers.DISTRO.debug("got key: {}", key); } if (dataSyncer.getServers() == null || dataSyncer.getServers().isEmpty()) { continue; } if (StringUtils.isBlank(key)) { continue; } if (dataSize == 0) { keys = new ArrayList<>(); } keys.add(key); dataSize++; if (dataSize == partitionConfig.getBatchSyncKeyCount() || (System.currentTimeMillis() - lastDispatchTime) > partitionConfig.getTaskDispatchPeriod()) { for (Server member : dataSyncer.getServers()) { if (NetUtils.localServer().equals(member.getKey())) { continue; } SyncTask syncTask = new SyncTask(); syncTask.setKeys(keys); syncTask.setTargetServer(member.getKey()); if (Loggers.DISTRO.isDebugEnabled() && StringUtils.isNotBlank(key)) { Loggers.DISTRO.debug("add sync task: {}", JSON.toJSONString(syncTask)); } dataSyncer.submit(syncTask, 0); } lastDispatchTime = System.currentTimeMillis(); dataSize = 0; } } catch (Exception e) { Loggers.DISTRO.error("dispatch sync task failed.", e); } } } } }
TaskDispatcher的addTask方法會(huì)從taskSchedulerList獲取指定的TaskScheduler,然后執(zhí)行其addTask方法;TaskScheduler的addTask方法會(huì)往queue中添加數(shù)據(jù),而run方法則不斷從queue取數(shù)據(jù),然后通過(guò)dataSyncer執(zhí)行syncTask
SyncTask
nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/consistency/ephemeral/distro/SyncTask.java
public class SyncTask { private Listkeys; private int retryCount; private long lastExecuteTime; private String targetServer; public List getKeys() { return keys; } public void setKeys(List keys) { this.keys = keys; } public int getRetryCount() { return retryCount; } public void setRetryCount(int retryCount) { this.retryCount = retryCount; } public long getLastExecuteTime() { return lastExecuteTime; } public void setLastExecuteTime(long lastExecuteTime) { this.lastExecuteTime = lastExecuteTime; } public String getTargetServer() { return targetServer; } public void setTargetServer(String targetServer) { this.targetServer = targetServer; } }
SyncTask包含了keys、targetServer屬性,其中targetServer用于告訴DataSyncer該往哪個(gè)server執(zhí)行sync操作
小結(jié)
updateInstance會(huì)通過(guò)service.allIPs().contains(instance)校驗(yàn)要更新的instance是否存在,不存在則拋出NacosException,存在則執(zhí)行addInstance方法
addInstance方法它會(huì)獲取service,然后執(zhí)行addIpAddresses,最后執(zhí)行consistencyService.put;addIpAddresses調(diào)用的是updateIpAddresses方法,其action參數(shù)為UtilsAndCommons.UPDATE_INSTANCE_ACTION_ADD
updateIpAddresses方法首先從consistencyService獲取datum,然后通過(guò)service.allIPs方法獲取currentIPs,之后根據(jù)datum設(shè)置oldInstanceMap,對(duì)于UtilsAndCommons.UPDATE_INSTANCE_ACTION_REMOVE類(lèi)型執(zhí)行刪除,其余的action則將instance方法到instanceMap中
“nacos ServiceManager的updateInstance有什么作用”的內(nèi)容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業(yè)相關(guān)的知識(shí)可以關(guān)注創(chuàng)新互聯(lián)網(wǎng)站,小編將為大家輸出更多高質(zhì)量的實(shí)用文章!
文章標(biāo)題:nacosServiceManager的updateInstance有什么作用
文章源于:http://fisionsoft.com.cn/article/gsoded.html