真实的国产乱ⅩXXX66竹夫人,五月香六月婷婷激情综合,亚洲日本VA一区二区三区,亚洲精品一区二区三区麻豆

成都創(chuàng)新互聯(lián)網(wǎng)站制作重慶分公司

nacos中DistroConsistencyServiceImpl的原理和作用是什么

本篇內(nèi)容介紹了“nacos中DistroConsistencyServiceImpl的原理和作用是什么”的有關知識,在實際案例的操作過程中,不少人都會遇到這樣的困境,接下來就讓小編帶領大家學習一下如何處理這些情況吧!希望大家仔細閱讀,能夠?qū)W有所成!

創(chuàng)新互聯(lián)建站服務項目包括昌圖網(wǎng)站建設、昌圖網(wǎng)站制作、昌圖網(wǎng)頁制作以及昌圖網(wǎng)絡營銷策劃等。多年來,我們專注于互聯(lián)網(wǎng)行業(yè),利用自身積累的技術優(yōu)勢、行業(yè)經(jīng)驗、深度合作伙伴關系等,向廣大中小型企業(yè)、政府機構等提供互聯(lián)網(wǎng)行業(yè)的解決方案,昌圖網(wǎng)站推廣取得了明顯的社會效益與經(jīng)濟效益。目前,我們服務的客戶以成都為中心已經(jīng)輻射到昌圖省份的部分城市,未來相信會繼續(xù)擴大服務區(qū)域并繼續(xù)獲得客戶的支持與信任!

本文主要研究一下nacos的DistroConsistencyServiceImpl

ConsistencyService

nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/consistency/ConsistencyService.java

public interface ConsistencyService {

    /**
     * Put a data related to a key to Nacos cluster
     *
     * @param key   key of data, this key should be globally unique
     * @param value value of data
     * @throws NacosException
     * @see
     */
    void put(String key, Record value) throws NacosException;

    /**
     * Remove a data from Nacos cluster
     *
     * @param key key of data
     * @throws NacosException
     */
    void remove(String key) throws NacosException;

    /**
     * Get a data from Nacos cluster
     *
     * @param key key of data
     * @return data related to the key
     * @throws NacosException
     */
    Datum get(String key) throws NacosException;

    /**
     * Listen for changes of a data
     *
     * @param key      key of data
     * @param listener callback of data change
     * @throws NacosException
     */
    void listen(String key, RecordListener listener) throws NacosException;

    /**
     * Cancel listening of a data
     *
     * @param key      key of data
     * @param listener callback of data change
     * @throws NacosException
     */
    void unlisten(String key, RecordListener listener) throws NacosException;

    /**
     * Tell the status of this consistency service
     *
     * @return true if available
     */
    boolean isAvailable();
}
  • ConsistencyService定義了put、remove、get、listen、unlisten、isAvailable方法

EphemeralConsistencyService

nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/consistency/ephemeral/EphemeralConsistencyService.java

public interface EphemeralConsistencyService extends ConsistencyService {
}
  • EphemeralConsistencyService接口繼承了ConsistencyService接口

DistroConsistencyServiceImpl

nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/consistency/ephemeral/distro/DistroConsistencyServiceImpl.java

@org.springframework.stereotype.Service("distroConsistencyService")
public class DistroConsistencyServiceImpl implements EphemeralConsistencyService {

    private ScheduledExecutorService executor = new ScheduledThreadPoolExecutor(1, new ThreadFactory() {
        @Override
        public Thread newThread(Runnable r) {
            Thread t = new Thread(r);

            t.setDaemon(true);
            t.setName("com.alibaba.nacos.naming.distro.notifier");

            return t;
        }
    });

    @Autowired
    private DistroMapper distroMapper;

    @Autowired
    private DataStore dataStore;

    @Autowired
    private TaskDispatcher taskDispatcher;

    @Autowired
    private DataSyncer dataSyncer;

    @Autowired
    private Serializer serializer;

    @Autowired
    private ServerListManager serverListManager;

    @Autowired
    private SwitchDomain switchDomain;

    @Autowired
    private GlobalConfig globalConfig;

    private boolean initialized = false;

    public volatile Notifier notifier = new Notifier();

    private Map> listeners = new ConcurrentHashMap<>();

    private Map syncChecksumTasks = new ConcurrentHashMap<>(16);

    @PostConstruct
    public void init() {
        GlobalExecutor.submit(new Runnable() {
            @Override
            public void run() {
                try {
                    load();
                } catch (Exception e) {
                    Loggers.DISTRO.error("load data failed.", e);
                }
            }
        });

        executor.submit(notifier);
    }

    public void load() throws Exception {
        if (SystemUtils.STANDALONE_MODE) {
            initialized = true;
            return;
        }
        // size = 1 means only myself in the list, we need at least one another server alive:
        while (serverListManager.getHealthyServers().size() <= 1) {
            Thread.sleep(1000L);
            Loggers.DISTRO.info("waiting server list init...");
        }

        for (Server server : serverListManager.getHealthyServers()) {
            if (NetUtils.localServer().equals(server.getKey())) {
                continue;
            }
            if (Loggers.DISTRO.isDebugEnabled()) {
                Loggers.DISTRO.debug("sync from " + server);
            }
            // try sync data from remote server:
            if (syncAllDataFromRemote(server)) {
                initialized = true;
                return;
            }
        }
    }

    //......

    public boolean syncAllDataFromRemote(Server server) {

        try {
            byte[] data = NamingProxy.getAllData(server.getKey());
            processData(data);
            return true;
        } catch (Exception e) {
            Loggers.DISTRO.error("sync full data from " + server + " failed!", e);
            return false;
        }
    }

    public void processData(byte[] data) throws Exception {
        if (data.length > 0) {
            Map> datumMap =
                serializer.deserializeMap(data, Instances.class);


            for (Map.Entry> entry : datumMap.entrySet()) {
                dataStore.put(entry.getKey(), entry.getValue());

                if (!listeners.containsKey(entry.getKey())) {
                    // pretty sure the service not exist:
                    if (switchDomain.isDefaultInstanceEphemeral()) {
                        // create empty service
                        Loggers.DISTRO.info("creating service {}", entry.getKey());
                        Service service = new Service();
                        String serviceName = KeyBuilder.getServiceName(entry.getKey());
                        String namespaceId = KeyBuilder.getNamespace(entry.getKey());
                        service.setName(serviceName);
                        service.setNamespaceId(namespaceId);
                        service.setGroupName(Constants.DEFAULT_GROUP);
                        // now validate the service. if failed, exception will be thrown
                        service.setLastModifiedMillis(System.currentTimeMillis());
                        service.recalculateChecksum();
                        listeners.get(KeyBuilder.SERVICE_META_KEY_PREFIX).get(0)
                            .onChange(KeyBuilder.buildServiceMetaKey(namespaceId, serviceName), service);
                    }
                }
            }

            for (Map.Entry> entry : datumMap.entrySet()) {

                if (!listeners.containsKey(entry.getKey())) {
                    // Should not happen:
                    Loggers.DISTRO.warn("listener of {} not found.", entry.getKey());
                    continue;
                }

                try {
                    for (RecordListener listener : listeners.get(entry.getKey())) {
                        listener.onChange(entry.getKey(), entry.getValue().value);
                    }
                } catch (Exception e) {
                    Loggers.DISTRO.error("[NACOS-DISTRO] error while execute listener of key: {}", entry.getKey(), e);
                    continue;
                }

                // Update data store if listener executed successfully:
                dataStore.put(entry.getKey(), entry.getValue());
            }
        }
    }

    //......

    @Override
    public void put(String key, Record value) throws NacosException {
        onPut(key, value);
        taskDispatcher.addTask(key);
    }

    @Override
    public void remove(String key) throws NacosException {
        onRemove(key);
        listeners.remove(key);
    }

    @Override
    public Datum get(String key) throws NacosException {
        return dataStore.get(key);
    }

    //......

    @Override
    public void listen(String key, RecordListener listener) throws NacosException {
        if (!listeners.containsKey(key)) {
            listeners.put(key, new CopyOnWriteArrayList<>());
        }

        if (listeners.get(key).contains(listener)) {
            return;
        }

        listeners.get(key).add(listener);
    }

    @Override
    public void unlisten(String key, RecordListener listener) throws NacosException {
        if (!listeners.containsKey(key)) {
            return;
        }
        for (RecordListener recordListener : listeners.get(key)) {
            if (recordListener.equals(listener)) {
                listeners.get(key).remove(listener);
                break;
            }
        }
    }

    @Override
    public boolean isAvailable() {
        return isInitialized() || ServerStatus.UP.name().equals(switchDomain.getOverriddenServerStatus());
    }

    //......
}
  • DistroConsistencyServiceImpl實現(xiàn)了EphemeralConsistencyService接口

  • 其init方法會異步執(zhí)行l(wèi)oad方法,該方法會執(zhí)行syncAllDataFromRemote進行初始化,該方法會通過NamingProxy.getAllData獲取data,然后執(zhí)行processData,它主要是執(zhí)行回調(diào)然后往dataStore添加數(shù)據(jù);init方法最后會異步執(zhí)行Notifier

  • 其put方法會執(zhí)行onPut方法及taskDispatcher.addTask(key);其remove方法會執(zhí)行onRemove方法即listeners.remove(key);其get方法直接從dataStore讀??;其listen會添加RecordListener;其unlisten則會移除RecordListener;其isAvailable會通過isInitialized及ServerStatus.UP狀態(tài)來判斷

Notifier

nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/consistency/ephemeral/distro/DistroConsistencyServiceImpl.java

    public class Notifier implements Runnable {

        private ConcurrentHashMap services = new ConcurrentHashMap<>(10 * 1024);

        private BlockingQueue tasks = new LinkedBlockingQueue(1024 * 1024);

        public void addTask(String datumKey, ApplyAction action) {

            if (services.containsKey(datumKey) && action == ApplyAction.CHANGE) {
                return;
            }
            if (action == ApplyAction.CHANGE) {
                services.put(datumKey, StringUtils.EMPTY);
            }
            tasks.add(Pair.with(datumKey, action));
        }

        public int getTaskSize() {
            return tasks.size();
        }

        @Override
        public void run() {
            Loggers.DISTRO.info("distro notifier started");

            while (true) {
                try {

                    Pair pair = tasks.take();

                    if (pair == null) {
                        continue;
                    }

                    String datumKey = (String) pair.getValue0();
                    ApplyAction action = (ApplyAction) pair.getValue1();

                    services.remove(datumKey);

                    int count = 0;

                    if (!listeners.containsKey(datumKey)) {
                        continue;
                    }

                    for (RecordListener listener : listeners.get(datumKey)) {

                        count++;

                        try {
                            if (action == ApplyAction.CHANGE) {
                                listener.onChange(datumKey, dataStore.get(datumKey).value);
                                continue;
                            }

                            if (action == ApplyAction.DELETE) {
                                listener.onDelete(datumKey);
                                continue;
                            }
                        } catch (Throwable e) {
                            Loggers.DISTRO.error("[NACOS-DISTRO] error while notifying listener of key: {}", datumKey, e);
                        }
                    }

                    if (Loggers.DISTRO.isDebugEnabled()) {
                        Loggers.DISTRO.debug("[NACOS-DISTRO] datum change notified, key: {}, listener count: {}, action: {}",
                            datumKey, count, action.name());
                    }
                } catch (Throwable e) {
                    Loggers.DISTRO.error("[NACOS-DISTRO] Error while handling notifying task", e);
                }
            }
        }
    }
  • Notifier實現(xiàn)了Runnable接口,其run方法會從LinkedBlockingQueue取task,然后挨個執(zhí)行l(wèi)istener回調(diào)

小結

  • DistroConsistencyServiceImpl實現(xiàn)了EphemeralConsistencyService接口

  • 其init方法會異步執(zhí)行l(wèi)oad方法,該方法會執(zhí)行syncAllDataFromRemote進行初始化,該方法會通過NamingProxy.getAllData獲取data,然后執(zhí)行processData,它主要是執(zhí)行回調(diào)然后往dataStore添加數(shù)據(jù);init方法最后會異步執(zhí)行Notifier

  • 其put方法會執(zhí)行onPut方法及taskDispatcher.addTask(key);其remove方法會執(zhí)行onRemove方法即listeners.remove(key);其get方法直接從dataStore讀?。黄鋖isten會添加RecordListener;其unlisten則會移除RecordListener;其isAvailable會通過isInitialized及ServerStatus.UP狀態(tài)來判斷

“nacos中DistroConsistencyServiceImpl的原理和作用是什么”的內(nèi)容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業(yè)相關的知識可以關注創(chuàng)新互聯(lián)網(wǎng)站,小編將為大家輸出更多高質(zhì)量的實用文章!


分享名稱:nacos中DistroConsistencyServiceImpl的原理和作用是什么
路徑分享:http://weahome.cn/article/jhgdhe.html

其他資訊

在線咨詢

微信咨詢

電話咨詢

028-86922220(工作日)

18980820575(7×24)

提交需求

返回頂部