本篇內(nèi)容主要講解“Spring Cloud Eureka服務(wù)注冊(cè)與取消方法是什么”,感興趣的朋友不妨來(lái)看看。本文介紹的方法操作簡(jiǎn)單快捷,實(shí)用性強(qiáng)。下面就讓小編來(lái)帶大家學(xué)習(xí)“Spring Cloud Eureka服務(wù)注冊(cè)與取消方法是什么”吧!
創(chuàng)新互聯(lián)主營(yíng)召陵網(wǎng)站建設(shè)的網(wǎng)絡(luò)公司,主營(yíng)網(wǎng)站建設(shè)方案,成都App制作,召陵h5成都微信小程序搭建,召陵網(wǎng)站營(yíng)銷推廣歡迎召陵等地區(qū)企業(yè)咨詢
開(kāi)啟/關(guān)閉服務(wù)注冊(cè)配置:eureka.client.register-with-eureka = true (默認(rèn))
應(yīng)用第一次啟動(dòng)時(shí),初始化EurekaClient時(shí),應(yīng)用狀態(tài)改變:從STARTING變?yōu)閁P會(huì)觸發(fā)這個(gè)Listener,調(diào)用instanceInfoReplicator.onDemandUpdate(); 可以推測(cè)出,實(shí)例狀態(tài)改變時(shí),也會(huì)通過(guò)注冊(cè)接口更新實(shí)例狀態(tài)信息
statusChangeListener = new ApplicationInfoManager.StatusChangeListener() { @Override public String getId() { return "statusChangeListener"; } @Override public void notify(StatusChangeEvent statusChangeEvent) { if (InstanceStatus.DOWN == statusChangeEvent.getStatus() || InstanceStatus.DOWN == statusChangeEvent.getPreviousStatus()) { // log at warn level if DOWN was involved logger.warn("Saw local status change event {}", statusChangeEvent); } else { logger.info("Saw local status change event {}", statusChangeEvent); } instanceInfoReplicator.onDemandUpdate(); } };
定時(shí)任務(wù),如果InstanceInfo發(fā)生改變,也會(huì)通過(guò)注冊(cè)接口更新信息
public void run() { try { discoveryClient.refreshInstanceInfo(); //如果實(shí)例信息發(fā)生改變,則需要調(diào)用register更新InstanceInfo Long dirtyTimestamp = instanceInfo.isDirtyWithTime(); if (dirtyTimestamp != null) { discoveryClient.register(); instanceInfo.unsetIsDirty(dirtyTimestamp); } } catch (Throwable t) { logger.warn("There was a problem with the instance info replicator", t); } finally { Future next = scheduler.schedule(this, replicationIntervalSeconds, TimeUnit.SECONDS); scheduledPeriodicRef.set(next); } }
在定時(shí)renew時(shí),如果renew接口返回404(代表這個(gè)實(shí)例在EurekaServer上面找不到),可能是之前注冊(cè)失敗或者注冊(cè)過(guò)期導(dǎo)致的。這時(shí)需要調(diào)用register重新注冊(cè)
boolean renew() { EurekaHttpResponsehttpResponse; try { httpResponse = eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null); logger.debug("{} - Heartbeat status: {}", PREFIX + appPathIdentifier, httpResponse.getStatusCode()); //如果renew接口返回404(代表這個(gè)實(shí)例在EurekaServer上面找不到),可能是之前注冊(cè)失敗或者注冊(cè)過(guò)期導(dǎo)致的 if (httpResponse.getStatusCode() == 404) { REREGISTER_COUNTER.increment(); logger.info("{} - Re-registering apps/{}", PREFIX + appPathIdentifier, instanceInfo.getAppName()); long timestamp = instanceInfo.setIsDirtyWithTime(); boolean success = register(); if (success) { instanceInfo.unsetIsDirty(timestamp); } return success; } return httpResponse.getStatusCode() == 200; } catch (Throwable e) { logger.error("{} - was unable to send heartbeat!", PREFIX + appPathIdentifier, e); return false; } }
主要有兩個(gè)存儲(chǔ),一個(gè)是之前提到過(guò)的registry,還有一個(gè)最近變化隊(duì)列,后面我們會(huì)知道,這個(gè)最近變化隊(duì)列里面就是客戶端獲取增量實(shí)例信息的內(nèi)容:
# 整體注冊(cè)信息緩存 private final ConcurrentHashMap>> registry = new ConcurrentHashMap >>(); # 最近變化隊(duì)列 private ConcurrentLinkedQueue recentlyChangedQueue = new ConcurrentLinkedQueue ();
EurekaServer收到實(shí)例注冊(cè)主要分兩步:
調(diào)用父類方法注冊(cè)
同步到其他EurekaServer實(shí)例
public void register(InstanceInfo info, boolean isReplication) { int leaseDuration = 90; if (info.getLeaseInfo() != null && info.getLeaseInfo().getDurationInSecs() > 0) { leaseDuration = info.getLeaseInfo().getDurationInSecs(); } //調(diào)用父類方法注冊(cè) super.register(info, leaseDuration, isReplication); //同步到其他EurekaServer實(shí)例 this.replicateToPeers(PeerAwareInstanceRegistryImpl.Action.Register, info.getAppName(), info.getId(), info, (InstanceStatus)null, isReplication); }
我們先看同步到其他EurekaServer實(shí)例
其實(shí)就是,注冊(cè)到的EurekaServer再依次調(diào)用其他集群內(nèi)的EurekaServer的Register方法將實(shí)例信息同步過(guò)去
private void replicateToPeers(Action action, String appName, String id, InstanceInfo info /* optional */, InstanceStatus newStatus /* optional */, boolean isReplication) { Stopwatch tracer = action.getTimer().start(); try { if (isReplication) { numberOfReplicationsLastMin.increment(); } // If it is a replication already, do not replicate again as this will create a poison replication if (peerEurekaNodes == Collections.EMPTY_LIST || isReplication) { return; } for (final PeerEurekaNode node : peerEurekaNodes.getPeerEurekaNodes()) { // If the url represents this host, do not replicate to yourself. if (peerEurekaNodes.isThisMyUrl(node.getServiceUrl())) { continue; } replicateInstanceActionsToPeers(action, appName, id, info, newStatus, node); } } finally { tracer.stop(); } } private void replicateInstanceActionsToPeers(Action action, String appName, String id, InstanceInfo info, InstanceStatus newStatus, PeerEurekaNode node) { try { InstanceInfo infoFromRegistry = null; CurrentRequestVersion.set(Version.V2); switch (action) { case Cancel: node.cancel(appName, id); break; case Heartbeat: InstanceStatus overriddenStatus = overriddenInstanceStatusMap.get(id); infoFromRegistry = getInstanceByAppAndId(appName, id, false); node.heartbeat(appName, id, infoFromRegistry, overriddenStatus, false); break; case Register: node.register(info); break; case StatusUpdate: infoFromRegistry = getInstanceByAppAndId(appName, id, false); node.statusUpdate(appName, id, newStatus, infoFromRegistry); break; case DeleteStatusOverride: infoFromRegistry = getInstanceByAppAndId(appName, id, false); node.deleteStatusOverride(appName, id, infoFromRegistry); break; } } catch (Throwable t) { logger.error("Cannot replicate information to {} for action {}", node.getServiceUrl(), action.name(), t); } }
然后看看調(diào)用父類方法注冊(cè):
public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) { try { //register雖然看上去好像是修改,但是這里用的是讀鎖,后面會(huì)解釋 read.lock(); //從registry中查看這個(gè)app是否存在 Map> gMap = registry.get(registrant.getAppName()); //不存在就創(chuàng)建 if (gMap == null) { final ConcurrentHashMap > gNewMap = new ConcurrentHashMap >(); gMap = registry.putIfAbsent(registrant.getAppName(), gNewMap); if (gMap == null) { gMap = gNewMap; } } //查看這個(gè)app的這個(gè)實(shí)例是否已存在 Lease existingLease = gMap.get(registrant.getId()); if (existingLease != null && (existingLease.getHolder() != null)) { //如果已存在,對(duì)比時(shí)間戳,保留比較新的實(shí)例信息...... } else { // 如果不存在,證明是一個(gè)新的實(shí)例 //更新自我保護(hù)監(jiān)控變量的值的代碼..... } Lease lease = new Lease (registrant, leaseDuration); if (existingLease != null) { lease.setServiceUpTimestamp(existingLease.getServiceUpTimestamp()); } //放入registry gMap.put(registrant.getId(), lease); //加入最近修改的記錄隊(duì)列 recentlyChangedQueue.add(new RecentlyChangedItem(lease)); //初始化狀態(tài),記錄時(shí)間等相關(guān)代碼...... //主動(dòng)讓Response緩存失效 invalidateCache(registrant.getAppName(), registrant.getVIPAddress(), registrant.getSecureVipAddress()); } finally { read.unlock(); } }
總結(jié)起來(lái),就是主要三件事:
1.將實(shí)例注冊(cè)信息放入或者更新registry
2.將實(shí)例注冊(cè)信息加入最近修改的記錄隊(duì)列
3.主動(dòng)讓Response緩存失效
我們來(lái)類比下服務(wù)取消
protected boolean internalCancel(String appName, String id, boolean isReplication) { try { //cancel雖然看上去好像是修改,但是這里用的是讀鎖,后面會(huì)解釋 read.lock(); //從registry中剔除這個(gè)實(shí)例 Map> gMap = registry.get(appName); Lease leaseToCancel = null; if (gMap != null) { leaseToCancel = gMap.remove(id); } if (leaseToCancel == null) { logger.warn("DS: Registry: cancel failed because Lease is not registered for: {}/{}", appName, id); return false; } else { //改變狀態(tài),記錄狀態(tài)修改時(shí)間等相關(guān)代碼...... if (instanceInfo != null) { instanceInfo.setActionType(ActionType.DELETED); //加入最近修改的記錄隊(duì)列 recentlyChangedQueue.add(new RecentlyChangedItem(leaseToCancel)); } //主動(dòng)讓Response緩存失效 invalidateCache(appName, vip, svip); logger.info("Cancelled instance {}/{} (replication={})", appName, id, isReplication); return true; } } finally { read.unlock(); } }
總結(jié)起來(lái),也是主要三件事:
1.從registry中剔除這個(gè)實(shí)例
2.將實(shí)例注冊(cè)信息加入最近修改的記錄隊(duì)列
3.主動(dòng)讓Response緩存失效
這里我們注意到了這個(gè)最近修改隊(duì)列,我們來(lái)詳細(xì)看看
這個(gè)最近修改隊(duì)列和消費(fèi)者定時(shí)獲取服務(wù)實(shí)例列表有著密切的關(guān)系
private TimerTask getDeltaRetentionTask() { return new TimerTask() { @Override public void run() { Iteratorit = recentlyChangedQueue.iterator(); while (it.hasNext()) { if (it.next().getLastUpdateTime() < System.currentTimeMillis() - serverConfig.getRetentionTimeInMSInDeltaQueue()) { it.remove(); } else { break; } } } }; }
這個(gè)RetentionTimeInMSInDeltaQueue默認(rèn)是180s(配置是eureka.server.retention-time-in-m-s-in-delta-queue,默認(rèn)是180s,官網(wǎng)寫(xiě)錯(cuò)了),可以看出這個(gè)隊(duì)列是一個(gè)長(zhǎng)度為180s的滑動(dòng)窗口,保存最近180s以內(nèi)的應(yīng)用實(shí)例信息修改,后面我們會(huì)看到,客戶端調(diào)用獲取增量信息,實(shí)際上就是從這個(gè)queue中讀取,所以可能一段時(shí)間內(nèi)讀取到的信息都是一樣的。
到此,相信大家對(duì)“Spring Cloud Eureka服務(wù)注冊(cè)與取消方法是什么”有了更深的了解,不妨來(lái)實(shí)際操作一番吧!這里是創(chuàng)新互聯(lián)網(wǎng)站,更多相關(guān)內(nèi)容可以進(jìn)入相關(guān)頻道進(jìn)行查詢,關(guān)注我們,繼續(xù)學(xué)習(xí)!