本篇內(nèi)容主要講解“NacosNamingService中selectInstances的原理和作用”,感興趣的朋友不妨來看看。本文介紹的方法操作簡單快捷,實(shí)用性強(qiáng)。下面就讓小編來帶大家學(xué)習(xí)“NacosNamingService中selectInstances的原理和作用”吧!
在泉山等地區(qū),都構(gòu)建了全面的區(qū)域性戰(zhàn)略布局,加強(qiáng)發(fā)展的系統(tǒng)性、市場前瞻性、產(chǎn)品創(chuàng)新能力,以專注、極致的服務(wù)理念,為客戶提供網(wǎng)站制作、成都網(wǎng)站設(shè)計(jì) 網(wǎng)站設(shè)計(jì)制作按需設(shè)計(jì)網(wǎng)站,公司網(wǎng)站建設(shè),企業(yè)網(wǎng)站建設(shè),品牌網(wǎng)站設(shè)計(jì),成都營銷網(wǎng)站建設(shè),外貿(mào)網(wǎng)站制作,泉山網(wǎng)站建設(shè)費(fèi)用合理。
本文主要研究一下NacosNamingService的selectInstances
nacos-1.1.3/client/src/main/java/com/alibaba/nacos/client/naming/NacosNamingService.java
public class NacosNamingService implements NamingService { private static final String DEFAULT_PORT = "8080"; private static final long DEFAULT_HEART_BEAT_INTERVAL = TimeUnit.SECONDS.toMillis(5); /** * Each Naming instance should have different namespace. */ private String namespace; private String endpoint; private String serverList; private String cacheDir; private String logName; private HostReactor hostReactor; private BeatReactor beatReactor; private EventDispatcher eventDispatcher; private NamingProxy serverProxy; //...... @Override public ListselectInstances(String serviceName, boolean healthy) throws NacosException { return selectInstances(serviceName, new ArrayList (), healthy); } @Override public List selectInstances(String serviceName, String groupName, boolean healthy) throws NacosException { return selectInstances(serviceName, groupName, healthy, true); } @Override public List selectInstances(String serviceName, boolean healthy, boolean subscribe) throws NacosException { return selectInstances(serviceName, new ArrayList (), healthy, subscribe); } @Override public List selectInstances(String serviceName, String groupName, boolean healthy, boolean subscribe) throws NacosException { return selectInstances(serviceName, groupName, new ArrayList (), healthy, subscribe); } @Override public List selectInstances(String serviceName, List clusters, boolean healthy) throws NacosException { return selectInstances(serviceName, clusters, healthy, true); } @Override public List selectInstances(String serviceName, String groupName, List clusters, boolean healthy) throws NacosException { return selectInstances(serviceName, groupName, clusters, healthy, true); } @Override public List selectInstances(String serviceName, List clusters, boolean healthy, boolean subscribe) throws NacosException { return selectInstances(serviceName, Constants.DEFAULT_GROUP, clusters, healthy, subscribe); } @Override public List selectInstances(String serviceName, String groupName, List clusters, boolean healthy, boolean subscribe) throws NacosException { ServiceInfo serviceInfo; if (subscribe) { serviceInfo = hostReactor.getServiceInfo(NamingUtils.getGroupedName(serviceName, groupName), StringUtils.join(clusters, ",")); } else { serviceInfo = hostReactor.getServiceInfoDirectlyFromServer(NamingUtils.getGroupedName(serviceName, groupName), StringUtils.join(clusters, ",")); } return selectInstances(serviceInfo, healthy); } private List selectInstances(ServiceInfo serviceInfo, boolean healthy) { List list; if (serviceInfo == null || CollectionUtils.isEmpty(list = serviceInfo.getHosts())) { return new ArrayList (); } Iterator iterator = list.iterator(); while (iterator.hasNext()) { Instance instance = iterator.next(); if (healthy != instance.isHealthy() || !instance.isEnabled() || instance.getWeight() <= 0) { iterator.remove(); } } return list; } //...... }
selectInstances首先從hostReactor獲取serviceInfo,然后再從serviceInfo.getHosts()剔除非healty、非enabled、weight小于等于0的instance再返回;如果subscribe為true,則執(zhí)行hostReactor.getServiceInfo獲取serviceInfo,否則執(zhí)行hostReactor.getServiceInfoDirectlyFromServer獲取serviceInfo
nacos-1.1.3/client/src/main/java/com/alibaba/nacos/client/naming/core/HostReactor.java
public class HostReactor { private static final long DEFAULT_DELAY = 1000L; private static final long UPDATE_HOLD_INTERVAL = 5000L; private final Map> futureMap = new HashMap >(); private Map serviceInfoMap; private Map updatingMap; private PushReceiver pushReceiver; private EventDispatcher eventDispatcher; private NamingProxy serverProxy; private FailoverReactor failoverReactor; private String cacheDir; private ScheduledExecutorService executor; //...... public ServiceInfo getServiceInfo(final String serviceName, final String clusters) { NAMING_LOGGER.debug("failover-mode: " + failoverReactor.isFailoverSwitch()); String key = ServiceInfo.getKey(serviceName, clusters); if (failoverReactor.isFailoverSwitch()) { return failoverReactor.getService(key); } ServiceInfo serviceObj = getServiceInfo0(serviceName, clusters); if (null == serviceObj) { serviceObj = new ServiceInfo(serviceName, clusters); serviceInfoMap.put(serviceObj.getKey(), serviceObj); updatingMap.put(serviceName, new Object()); updateServiceNow(serviceName, clusters); updatingMap.remove(serviceName); } else if (updatingMap.containsKey(serviceName)) { if (UPDATE_HOLD_INTERVAL > 0) { // hold a moment waiting for update finish synchronized (serviceObj) { try { serviceObj.wait(UPDATE_HOLD_INTERVAL); } catch (InterruptedException e) { NAMING_LOGGER.error("[getServiceInfo] serviceName:" + serviceName + ", clusters:" + clusters, e); } } } } scheduleUpdateIfAbsent(serviceName, clusters); return serviceInfoMap.get(serviceObj.getKey()); } private ServiceInfo getServiceInfo0(String serviceName, String clusters) { String key = ServiceInfo.getKey(serviceName, clusters); return serviceInfoMap.get(key); } public void updateServiceNow(String serviceName, String clusters) { ServiceInfo oldService = getServiceInfo0(serviceName, clusters); try { String result = serverProxy.queryList(serviceName, clusters, pushReceiver.getUDPPort(), false); if (StringUtils.isNotEmpty(result)) { processServiceJSON(result); } } catch (Exception e) { NAMING_LOGGER.error("[NA] failed to update serviceName: " + serviceName, e); } finally { if (oldService != null) { synchronized (oldService) { oldService.notifyAll(); } } } } public void scheduleUpdateIfAbsent(String serviceName, String clusters) { if (futureMap.get(ServiceInfo.getKey(serviceName, clusters)) != null) { return; } synchronized (futureMap) { if (futureMap.get(ServiceInfo.getKey(serviceName, clusters)) != null) { return; } ScheduledFuture> future = addTask(new UpdateTask(serviceName, clusters)); futureMap.put(ServiceInfo.getKey(serviceName, clusters), future); } } public ServiceInfo getServiceInfoDirectlyFromServer(final String serviceName, final String clusters) throws NacosException { String result = serverProxy.queryList(serviceName, clusters, 0, false); if (StringUtils.isNotEmpty(result)) { return JSON.parseObject(result, ServiceInfo.class); } return null; } //...... }
getServiceInfo首先判斷failoverReactor.isFailoverSwitch(),如果是則返回failoverReactor.getService(key);接著通過getServiceInfo0從serviceInfoMap查找,如果找不到則創(chuàng)建一個(gè)新的然后放入serviceInfoMap,同時(shí)放入updatingMap,執(zhí)行updateServiceNow,再從updatingMap移除;如果從serviceInfoMap找出來的serviceObj在updatingMap中則等待UPDATE_HOLD_INTERVAL;最后執(zhí)行scheduleUpdateIfAbsent,再從serviceInfoMap取出serviceInfo
updateServiceNow則從serverProxy.queryList獲取結(jié)果,然后通過processServiceJSON解析并根據(jù)需要更新serviceInfoMap;scheduleUpdateIfAbsent方法判斷futureMap是否有該任務(wù),如果沒有則添加一個(gè)UpdateTask
getServiceInfoDirectlyFromServer方法則直接請求serverProxy.queryList獲取ServiceInfo
nacos-1.1.3/client/src/main/java/com/alibaba/nacos/client/naming/core/HostReactor.java
public class UpdateTask implements Runnable { long lastRefTime = Long.MAX_VALUE; private String clusters; private String serviceName; public UpdateTask(String serviceName, String clusters) { this.serviceName = serviceName; this.clusters = clusters; } @Override public void run() { try { ServiceInfo serviceObj = serviceInfoMap.get(ServiceInfo.getKey(serviceName, clusters)); if (serviceObj == null) { updateServiceNow(serviceName, clusters); executor.schedule(this, DEFAULT_DELAY, TimeUnit.MILLISECONDS); return; } if (serviceObj.getLastRefTime() <= lastRefTime) { updateServiceNow(serviceName, clusters); serviceObj = serviceInfoMap.get(ServiceInfo.getKey(serviceName, clusters)); } else { // if serviceName already updated by push, we should not override it // since the push data may be different from pull through force push refreshOnly(serviceName, clusters); } executor.schedule(this, serviceObj.getCacheMillis(), TimeUnit.MILLISECONDS); lastRefTime = serviceObj.getLastRefTime(); } catch (Throwable e) { NAMING_LOGGER.warn("[NA] failed to update serviceName: " + serviceName, e); } } } public void refreshOnly(String serviceName, String clusters) { try { serverProxy.queryList(serviceName, clusters, pushReceiver.getUDPPort(), false); } catch (Exception e) { NAMING_LOGGER.error("[NA] failed to update serviceName: " + serviceName, e); } }
UpdateTask實(shí)現(xiàn)了Runnable接口,其run方法首先從serviceInfoMap獲取serviceObj,獲取不到則執(zhí)行updateServiceNow,然后再次延時(shí)調(diào)度UpdateTask;可以從serviceInfoMap獲取serviceObj的話則判斷serviceObj.getLastRefTime()是否小于等于lastRefTime,是的話則執(zhí)行updateServiceNow,否則執(zhí)行refreshOnly;最后再次延時(shí)調(diào)度UpdateTask,并更新lastRefTime
nacos-1.1.3/client/src/main/java/com/alibaba/nacos/client/naming/net/NamingProxy.java
public class NamingProxy { private static final int DEFAULT_SERVER_PORT = 8848; private int serverPort = DEFAULT_SERVER_PORT; private String namespaceId; private String endpoint; private String nacosDomain; private ListserverList; private List serversFromEndpoint = new ArrayList (); private long lastSrvRefTime = 0L; private long vipSrvRefInterMillis = TimeUnit.SECONDS.toMillis(30); private Properties properties; //...... public String queryList(String serviceName, String clusters, int udpPort, boolean healthyOnly) throws NacosException { final Map params = new HashMap (8); params.put(CommonParams.NAMESPACE_ID, namespaceId); params.put(CommonParams.SERVICE_NAME, serviceName); params.put("clusters", clusters); params.put("udpPort", String.valueOf(udpPort)); params.put("clientIP", NetUtils.localIP()); params.put("healthyOnly", String.valueOf(healthyOnly)); return reqAPI(UtilAndComs.NACOS_URL_BASE + "/instance/list", params, HttpMethod.GET); } //...... }
queryList方法會(huì)往/instance/list
接口發(fā)送GET請求查詢服務(wù)實(shí)例列表
selectInstances首先從hostReactor獲取serviceInfo,然后再從serviceInfo.getHosts()剔除非healty、非enabled、weight小于等于0的instance再返回;如果subscribe為true,則執(zhí)行hostReactor.getServiceInfo獲取serviceInfo,否則執(zhí)行hostReactor.getServiceInfoDirectlyFromServer獲取serviceInfo
到此,相信大家對“NacosNamingService中selectInstances的原理和作用”有了更深的了解,不妨來實(shí)際操作一番吧!這里是創(chuàng)新互聯(lián)網(wǎng)站,更多相關(guān)內(nèi)容可以進(jìn)入相關(guān)頻道進(jìn)行查詢,關(guān)注我們,繼續(xù)學(xué)習(xí)!