真实的国产乱ⅩXXX66竹夫人,五月香六月婷婷激情综合,亚洲日本VA一区二区三区,亚洲精品一区二区三区麻豆

成都創(chuàng)新互聯(lián)網(wǎng)站制作重慶分公司

如何用源碼分析canal的deployer模塊

這期內(nèi)容當(dāng)中小編將會(huì)給大家?guī)?lái)有關(guān)如何用源碼分析canal的deployer模塊,文章內(nèi)容豐富且以專業(yè)的角度為大家分析和敘述,閱讀完這篇文章希望大家可以有所收獲。

創(chuàng)新新互聯(lián),憑借十載的成都網(wǎng)站設(shè)計(jì)、網(wǎng)站建設(shè)經(jīng)驗(yàn),本著真心·誠(chéng)心服務(wù)的企業(yè)理念服務(wù)于成都中小企業(yè)設(shè)計(jì)網(wǎng)站有千余家案例。做網(wǎng)站建設(shè),選成都創(chuàng)新互聯(lián)公司。

  • CanalLauncher是啟動(dòng)入口類

  1. 獲取canal.properties配置文件

  2. 如果canal.properties配置文件中屬性root.admin.manager有值,那么構(gòu)造PlainCanalConfigClient,調(diào)用PlainCanalConfigClient的findServer獲取PlainCanal,調(diào)用PlainCanal的getProperties方法獲取properties

  3. 通過(guò)properties構(gòu)造 CanalStarter并調(diào)用其start方法

CanalStarter是啟動(dòng)類

public synchronized void start() throws Throwable {
        //首先根據(jù)canal.serverMode構(gòu)造CanalMQProducer,如果是kafka,構(gòu)造的是CanalKafkaProducer
        String serverMode = CanalController.getProperty(properties, CanalConstants.CANAL_SERVER_MODE);
        if (serverMode.equalsIgnoreCase("kafka")) {
            canalMQProducer = new CanalKafkaProducer();
        } else if (serverMode.equalsIgnoreCase("rocketmq")) {
            canalMQProducer = new CanalRocketMQProducer();
        }

        if (canalMQProducer != null) {
            // disable netty
            System.setProperty(CanalConstants.CANAL_WITHOUT_NETTY, "true");
            // 設(shè)置為raw避免ByteString->Entry的二次解析
            System.setProperty("canal.instance.memory.rawEntry", "false");
        }
        //接下來(lái)構(gòu)造CanalController并調(diào)用其start方法
        logger.info("## start the canal server.");
        controller = new CanalController(properties);
        controller.start();
        logger.info("## the canal server is running now ......");
        ...
        //構(gòu)造CanalMQStarter并調(diào)用其start方法,同時(shí)設(shè)置為CanalController的屬性
        if (canalMQProducer != null) {
            canalMQStarter = new CanalMQStarter(canalMQProducer);
            MQProperties mqProperties = buildMQProperties(properties);
            String destinations = CanalController.getProperty(properties, CanalConstants.CANAL_DESTINATIONS);
            canalMQStarter.start(mqProperties, destinations);
            controller.setCanalMQStarter(canalMQStarter);
        }
        ...
        running = true;
    }
  • CanalController是實(shí)例調(diào)度控制器

public CanalController(final Properties properties){
        // 初始化managerClients用于請(qǐng)求admin
        managerClients = MigrateMap.makeComputingMap(new Function() {

            public PlainCanalConfigClient apply(String managerAddress) {
                return getManagerClient(managerAddress);
            }
        });

        // 初始化全局參數(shù)設(shè)置,包含了全局mode、lazy、managerAddress、springXml,初始化instanceGenerator用于創(chuàng)建instance,其根據(jù)InstanceConfig的mode值使用PlainCanalInstanceGenerator或者SpringCanalInstanceGenerator創(chuàng)建CanalInstance
        globalInstanceConfig = initGlobalConfig(properties);
        instanceConfigs = new MapMaker().makeMap();
        // 初始化instance config,包含了實(shí)例mode、lazy、managerAddress、springXml
        initInstanceConfig(properties);

        ...
        // 初始化CanalServerWithEmbedded,將instanceGenerator設(shè)置為CanalServerWithEmbedded的屬性
        embededCanalServer = CanalServerWithEmbedded.instance();
        embededCanalServer.setCanalInstanceGenerator(instanceGenerator);// 設(shè)置自定義的instanceGenerator
        int metricsPort = Integer.valueOf(getProperty(properties, CanalConstants.CANAL_METRICS_PULL_PORT, "11112"));
        embededCanalServer.setMetricsPort(metricsPort);

        this.adminUser = getProperty(properties, CanalConstants.CANAL_ADMIN_USER);
        this.adminPasswd = getProperty(properties, CanalConstants.CANAL_ADMIN_PASSWD);
        embededCanalServer.setUser(getProperty(properties, CanalConstants.CANAL_USER));
        embededCanalServer.setPasswd(getProperty(properties, CanalConstants.CANAL_PASSWD));

        ...
        final String zkServers = getProperty(properties, CanalConstants.CANAL_ZKSERVERS);
        //初始化ZkClientx用于canal集群部署,創(chuàng)建/otteradmin/canal/destinations節(jié)點(diǎn)和/otteradmin/canal/cluster節(jié)點(diǎn)
        if (StringUtils.isNotEmpty(zkServers)) {
            zkclientx = ZkClientx.getZkClient(zkServers);
            // 初始化系統(tǒng)目錄
            zkclientx.createPersistent(ZookeeperPathUtils.DESTINATION_ROOT_NODE, true);
            zkclientx.createPersistent(ZookeeperPathUtils.CANAL_CLUSTER_ROOT_NODE, true);
        }
        // 初始化ServerRunningMonitors的ServerRunningMonitor,用于啟動(dòng)、關(guān)閉實(shí)例
        final ServerRunningData serverData = new ServerRunningData(registerIp + ":" + port);
        ServerRunningMonitors.setServerData(serverData);
        ServerRunningMonitors.setRunningMonitors(MigrateMap.makeComputingMap(new Function() {
            ...
        }));

        // 初始化InstanceAction,用于啟動(dòng)和關(guān)閉實(shí)例
        autoScan = BooleanUtils.toBoolean(getProperty(properties, CanalConstants.CANAL_AUTO_SCAN));
        if (autoScan) {
            defaultAction = new InstanceAction() {
                ...
            };
            // 初始化instanceConfigMonitors,用于獲取所有instanceConfig并啟動(dòng)所有instance
            instanceConfigMonitors = MigrateMap.makeComputingMap(new Function() {

                public InstanceConfigMonitor apply(InstanceMode mode) {
                    ...
                }
            });
        }
    }
  • ManagerInstanceConfigMonitor是實(shí)例掃描器

public void start() {
        super.start();
        //啟動(dòng)定時(shí)任務(wù),定時(shí)掃描所有instance
        executor.scheduleWithFixedDelay(new Runnable() {

            public void run() {
                try {
                    scan();
                    if (isFirst) {
                        isFirst = false;
                    }
                } catch (Throwable e) {
                    logger.error("scan failed", e);
                }
            }

        }, 0, scanIntervalInSecond, TimeUnit.SECONDS);
    }

private void scan() {
        //緩存了所有instance的配置,如果發(fā)現(xiàn)有新的instance則啟動(dòng)或者修改了instance則重啟
        String instances = configClient.findInstances(null);
        final List is = Lists.newArrayList(StringUtils.split(instances, ','));
        List start = Lists.newArrayList();
        List stop = Lists.newArrayList();
        List restart = Lists.newArrayList();
        for (String instance : is) {
            if (!configs.containsKey(instance)) {
                PlainCanal newPlainCanal = configClient.findInstance(instance, null);
                if (newPlainCanal != null) {
                    configs.put(instance, newPlainCanal);
                    start.add(instance);
                }
            } else {
                PlainCanal plainCanal = configs.get(instance);
                PlainCanal newPlainCanal = configClient.findInstance(instance, plainCanal.getMd5());
                if (newPlainCanal != null) {
                    // 配置有變化
                    restart.add(instance);
                    configs.put(instance, newPlainCanal);
                }
            }
        }

        configs.forEach((instance, plainCanal) -> {
            if (!is.contains(instance)) {
                stop.add(instance);
            }
        });

        stop.forEach(instance -> {
            notifyStop(instance);
        });

        restart.forEach(instance -> {
            notifyReload(instance);
        });

        start.forEach(instance -> {
            notifyStart(instance);
        });

    }

private void notifyStart(String destination) {
        try {
            //啟動(dòng)instance調(diào)用InstanceAction啟動(dòng)實(shí)例,最后是調(diào)用ServerRunningMonitor啟動(dòng)實(shí)例
            defaultAction.start(destination);
            actions.put(destination, defaultAction);
            // 啟動(dòng)成功后記錄配置文件信息
        } catch (Throwable e) {
            logger.error(String.format("scan add found[%s] but start failed", destination), e);
        }
    }
  • ServerRunningMonitor是針對(duì)server的running實(shí)例控制

public ServerRunningMonitor(){
        // 創(chuàng)建父節(jié)點(diǎn)
        dataListener = new IZkDataListener() {

            public void handleDataChange(String dataPath, Object data) throws Exception {
                MDC.put("destination", destination);
                ServerRunningData runningData = JsonUtils.unmarshalFromByte((byte[]) data, ServerRunningData.class);
                if (!isMine(runningData.getAddress())) {
                    mutex.set(false);
                }

                if (!runningData.isActive() && isMine(runningData.getAddress())) { // 說(shuō)明出現(xiàn)了主動(dòng)釋放的操作,并且本機(jī)之前是active
                    releaseRunning();// 徹底釋放mainstem
                }

                activeData = (ServerRunningData) runningData;
            }

            public void handleDataDeleted(String dataPath) throws Exception {
                MDC.put("destination", destination);
                mutex.set(false);
                if (!release && activeData != null && isMine(activeData.getAddress())) {
                    // 如果上一次active的狀態(tài)就是本機(jī),則即時(shí)觸發(fā)一下active搶占
                    initRunning();
                } else {
                    // 否則就是等待delayTime,避免因網(wǎng)絡(luò)瞬端或者zk異常,導(dǎo)致出現(xiàn)頻繁的切換操作
                    delayExector.schedule(new Runnable() {

                        public void run() {
                            initRunning();
                        }
                    }, delayTime, TimeUnit.SECONDS);
                }
            }

        };

    }

public synchronized void start() {
        super.start();
        try {
            //首先調(diào)用listener的processStart方法
            processStart();
            if (zkClient != null) {
                // 如果需要盡可能釋放instance資源,不需要監(jiān)聽running節(jié)點(diǎn),不然即使stop了這臺(tái)機(jī)器,另一臺(tái)機(jī)器立馬會(huì)start
                // 監(jiān)視/otteradmin/canal/destinations/{0}/running節(jié)點(diǎn)變化
                String path = ZookeeperPathUtils.getDestinationServerRunning(destination);
                zkClient.subscribeDataChanges(path, dataListener);
                
                initRunning();
            } else {
                processActiveEnter();// 沒有zk,直接啟動(dòng)
            }
        } catch (Exception e) {
            logger.error("start failed", e);
            // 沒有正常啟動(dòng),重置一下狀態(tài),避免干擾下一次start
            stop();
        }

    }
private void processStart() {
        if (listener != null) {
            try {
                //processStart方法中創(chuàng)建/otteradmin/canal/destinations/{0}/cluster/{1}節(jié)點(diǎn),0是實(shí)例名稱,1是當(dāng)前節(jié)點(diǎn)ip:port
                listener.processStart();
            } catch (Exception e) {
                logger.error("processStart failed", e);
            }
        }
    }

private void initRunning() {
        if (!isStart()) {
            return;
        }

        String path = ZookeeperPathUtils.getDestinationServerRunning(destination);
        // 序列化
        byte[] bytes = JsonUtils.marshalToByte(serverData);
        try {
            mutex.set(false);
            //嘗試創(chuàng)建/otteradmin/canal/destinations/{0}/running節(jié)點(diǎn)
            zkClient.create(path, bytes, CreateMode.EPHEMERAL);
            activeData = serverData;
            //如果成功則調(diào)用listener的processEnter方法,processEnter方法中調(diào)用CanalServerWithEmbedded的start方法啟動(dòng)實(shí)例和CanalMQStarter的start方法啟動(dòng)實(shí)例
            processActiveEnter();// 觸發(fā)一下事件
            mutex.set(true);
            release = false;
        } catch (ZkNodeExistsException e) {
            bytes = zkClient.readData(path, true);
            if (bytes == null) {// 如果不存在節(jié)點(diǎn),立即嘗試一次
                initRunning();
            } else {
                activeData = JsonUtils.unmarshalFromByte(bytes, ServerRunningData.class);
            }
        } catch (ZkNoNodeException e) {
            zkClient.createPersistent(ZookeeperPathUtils.getDestinationPath(destination), true); // 嘗試創(chuàng)建父節(jié)點(diǎn)
            initRunning();
        }
    }
  • canal.properties配置

canal.register.ip =
canal.admin.manager = 127.0.0.1:8089
canal.admin.port = 11110
canal.admin.user = admin
canal.admin.passwd = 4ACFE3202A5FF5CF467898FC58AAB1D615029441
canal.admin.register.auto = true
canal.admin.register.cluster =

上述就是小編為大家分享的如何用源碼分析canal的deployer模塊了,如果剛好有類似的疑惑,不妨參照上述分析進(jìn)行理解。如果想知道更多相關(guān)知識(shí),歡迎關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道。


本文題目:如何用源碼分析canal的deployer模塊
文章轉(zhuǎn)載:http://weahome.cn/article/posoih.html

其他資訊

在線咨詢

微信咨詢

電話咨詢

028-86922220(工作日)

18980820575(7×24)

提交需求

返回頂部