這期內(nèi)容當(dāng)中小編將會(huì)給大家?guī)?lái)有關(guān)如何用源碼分析canal的deployer模塊,文章內(nèi)容豐富且以專業(yè)的角度為大家分析和敘述,閱讀完這篇文章希望大家可以有所收獲。
創(chuàng)新新互聯(lián),憑借十載的成都網(wǎng)站設(shè)計(jì)、網(wǎng)站建設(shè)經(jīng)驗(yàn),本著真心·誠(chéng)心服務(wù)的企業(yè)理念服務(wù)于成都中小企業(yè)設(shè)計(jì)網(wǎng)站有千余家案例。做網(wǎng)站建設(shè),選成都創(chuàng)新互聯(lián)公司。
CanalLauncher是啟動(dòng)入口類
獲取canal.properties配置文件
如果canal.properties配置文件中屬性root.admin.manager有值,那么構(gòu)造PlainCanalConfigClient,調(diào)用PlainCanalConfigClient的findServer獲取PlainCanal,調(diào)用PlainCanal的getProperties方法獲取properties
通過(guò)properties構(gòu)造 CanalStarter并調(diào)用其start方法
CanalStarter是啟動(dòng)類
public synchronized void start() throws Throwable { //首先根據(jù)canal.serverMode構(gòu)造CanalMQProducer,如果是kafka,構(gòu)造的是CanalKafkaProducer String serverMode = CanalController.getProperty(properties, CanalConstants.CANAL_SERVER_MODE); if (serverMode.equalsIgnoreCase("kafka")) { canalMQProducer = new CanalKafkaProducer(); } else if (serverMode.equalsIgnoreCase("rocketmq")) { canalMQProducer = new CanalRocketMQProducer(); } if (canalMQProducer != null) { // disable netty System.setProperty(CanalConstants.CANAL_WITHOUT_NETTY, "true"); // 設(shè)置為raw避免ByteString->Entry的二次解析 System.setProperty("canal.instance.memory.rawEntry", "false"); } //接下來(lái)構(gòu)造CanalController并調(diào)用其start方法 logger.info("## start the canal server."); controller = new CanalController(properties); controller.start(); logger.info("## the canal server is running now ......"); ... //構(gòu)造CanalMQStarter并調(diào)用其start方法,同時(shí)設(shè)置為CanalController的屬性 if (canalMQProducer != null) { canalMQStarter = new CanalMQStarter(canalMQProducer); MQProperties mqProperties = buildMQProperties(properties); String destinations = CanalController.getProperty(properties, CanalConstants.CANAL_DESTINATIONS); canalMQStarter.start(mqProperties, destinations); controller.setCanalMQStarter(canalMQStarter); } ... running = true; }
CanalController是實(shí)例調(diào)度控制器
public CanalController(final Properties properties){ // 初始化managerClients用于請(qǐng)求admin managerClients = MigrateMap.makeComputingMap(new Function() { public PlainCanalConfigClient apply(String managerAddress) { return getManagerClient(managerAddress); } }); // 初始化全局參數(shù)設(shè)置,包含了全局mode、lazy、managerAddress、springXml,初始化instanceGenerator用于創(chuàng)建instance,其根據(jù)InstanceConfig的mode值使用PlainCanalInstanceGenerator或者SpringCanalInstanceGenerator創(chuàng)建CanalInstance globalInstanceConfig = initGlobalConfig(properties); instanceConfigs = new MapMaker().makeMap(); // 初始化instance config,包含了實(shí)例mode、lazy、managerAddress、springXml initInstanceConfig(properties); ... // 初始化CanalServerWithEmbedded,將instanceGenerator設(shè)置為CanalServerWithEmbedded的屬性 embededCanalServer = CanalServerWithEmbedded.instance(); embededCanalServer.setCanalInstanceGenerator(instanceGenerator);// 設(shè)置自定義的instanceGenerator int metricsPort = Integer.valueOf(getProperty(properties, CanalConstants.CANAL_METRICS_PULL_PORT, "11112")); embededCanalServer.setMetricsPort(metricsPort); this.adminUser = getProperty(properties, CanalConstants.CANAL_ADMIN_USER); this.adminPasswd = getProperty(properties, CanalConstants.CANAL_ADMIN_PASSWD); embededCanalServer.setUser(getProperty(properties, CanalConstants.CANAL_USER)); embededCanalServer.setPasswd(getProperty(properties, CanalConstants.CANAL_PASSWD)); ... final String zkServers = getProperty(properties, CanalConstants.CANAL_ZKSERVERS); //初始化ZkClientx用于canal集群部署,創(chuàng)建/otteradmin/canal/destinations節(jié)點(diǎn)和/otteradmin/canal/cluster節(jié)點(diǎn) if (StringUtils.isNotEmpty(zkServers)) { zkclientx = ZkClientx.getZkClient(zkServers); // 初始化系統(tǒng)目錄 zkclientx.createPersistent(ZookeeperPathUtils.DESTINATION_ROOT_NODE, true); zkclientx.createPersistent(ZookeeperPathUtils.CANAL_CLUSTER_ROOT_NODE, true); } // 初始化ServerRunningMonitors的ServerRunningMonitor,用于啟動(dòng)、關(guān)閉實(shí)例 final ServerRunningData serverData = new ServerRunningData(registerIp + ":" + port); ServerRunningMonitors.setServerData(serverData); ServerRunningMonitors.setRunningMonitors(MigrateMap.makeComputingMap(new Function () { ... })); // 初始化InstanceAction,用于啟動(dòng)和關(guān)閉實(shí)例 autoScan = BooleanUtils.toBoolean(getProperty(properties, CanalConstants.CANAL_AUTO_SCAN)); if (autoScan) { defaultAction = new InstanceAction() { ... }; // 初始化instanceConfigMonitors,用于獲取所有instanceConfig并啟動(dòng)所有instance instanceConfigMonitors = MigrateMap.makeComputingMap(new Function () { public InstanceConfigMonitor apply(InstanceMode mode) { ... } }); } }
ManagerInstanceConfigMonitor是實(shí)例掃描器
public void start() { super.start(); //啟動(dòng)定時(shí)任務(wù),定時(shí)掃描所有instance executor.scheduleWithFixedDelay(new Runnable() { public void run() { try { scan(); if (isFirst) { isFirst = false; } } catch (Throwable e) { logger.error("scan failed", e); } } }, 0, scanIntervalInSecond, TimeUnit.SECONDS); } private void scan() { //緩存了所有instance的配置,如果發(fā)現(xiàn)有新的instance則啟動(dòng)或者修改了instance則重啟 String instances = configClient.findInstances(null); final Listis = Lists.newArrayList(StringUtils.split(instances, ',')); List start = Lists.newArrayList(); List stop = Lists.newArrayList(); List restart = Lists.newArrayList(); for (String instance : is) { if (!configs.containsKey(instance)) { PlainCanal newPlainCanal = configClient.findInstance(instance, null); if (newPlainCanal != null) { configs.put(instance, newPlainCanal); start.add(instance); } } else { PlainCanal plainCanal = configs.get(instance); PlainCanal newPlainCanal = configClient.findInstance(instance, plainCanal.getMd5()); if (newPlainCanal != null) { // 配置有變化 restart.add(instance); configs.put(instance, newPlainCanal); } } } configs.forEach((instance, plainCanal) -> { if (!is.contains(instance)) { stop.add(instance); } }); stop.forEach(instance -> { notifyStop(instance); }); restart.forEach(instance -> { notifyReload(instance); }); start.forEach(instance -> { notifyStart(instance); }); } private void notifyStart(String destination) { try { //啟動(dòng)instance調(diào)用InstanceAction啟動(dòng)實(shí)例,最后是調(diào)用ServerRunningMonitor啟動(dòng)實(shí)例 defaultAction.start(destination); actions.put(destination, defaultAction); // 啟動(dòng)成功后記錄配置文件信息 } catch (Throwable e) { logger.error(String.format("scan add found[%s] but start failed", destination), e); } }
ServerRunningMonitor是針對(duì)server的running實(shí)例控制
public ServerRunningMonitor(){ // 創(chuàng)建父節(jié)點(diǎn) dataListener = new IZkDataListener() { public void handleDataChange(String dataPath, Object data) throws Exception { MDC.put("destination", destination); ServerRunningData runningData = JsonUtils.unmarshalFromByte((byte[]) data, ServerRunningData.class); if (!isMine(runningData.getAddress())) { mutex.set(false); } if (!runningData.isActive() && isMine(runningData.getAddress())) { // 說(shuō)明出現(xiàn)了主動(dòng)釋放的操作,并且本機(jī)之前是active releaseRunning();// 徹底釋放mainstem } activeData = (ServerRunningData) runningData; } public void handleDataDeleted(String dataPath) throws Exception { MDC.put("destination", destination); mutex.set(false); if (!release && activeData != null && isMine(activeData.getAddress())) { // 如果上一次active的狀態(tài)就是本機(jī),則即時(shí)觸發(fā)一下active搶占 initRunning(); } else { // 否則就是等待delayTime,避免因網(wǎng)絡(luò)瞬端或者zk異常,導(dǎo)致出現(xiàn)頻繁的切換操作 delayExector.schedule(new Runnable() { public void run() { initRunning(); } }, delayTime, TimeUnit.SECONDS); } } }; } public synchronized void start() { super.start(); try { //首先調(diào)用listener的processStart方法 processStart(); if (zkClient != null) { // 如果需要盡可能釋放instance資源,不需要監(jiān)聽running節(jié)點(diǎn),不然即使stop了這臺(tái)機(jī)器,另一臺(tái)機(jī)器立馬會(huì)start // 監(jiān)視/otteradmin/canal/destinations/{0}/running節(jié)點(diǎn)變化 String path = ZookeeperPathUtils.getDestinationServerRunning(destination); zkClient.subscribeDataChanges(path, dataListener); initRunning(); } else { processActiveEnter();// 沒有zk,直接啟動(dòng) } } catch (Exception e) { logger.error("start failed", e); // 沒有正常啟動(dòng),重置一下狀態(tài),避免干擾下一次start stop(); } } private void processStart() { if (listener != null) { try { //processStart方法中創(chuàng)建/otteradmin/canal/destinations/{0}/cluster/{1}節(jié)點(diǎn),0是實(shí)例名稱,1是當(dāng)前節(jié)點(diǎn)ip:port listener.processStart(); } catch (Exception e) { logger.error("processStart failed", e); } } } private void initRunning() { if (!isStart()) { return; } String path = ZookeeperPathUtils.getDestinationServerRunning(destination); // 序列化 byte[] bytes = JsonUtils.marshalToByte(serverData); try { mutex.set(false); //嘗試創(chuàng)建/otteradmin/canal/destinations/{0}/running節(jié)點(diǎn) zkClient.create(path, bytes, CreateMode.EPHEMERAL); activeData = serverData; //如果成功則調(diào)用listener的processEnter方法,processEnter方法中調(diào)用CanalServerWithEmbedded的start方法啟動(dòng)實(shí)例和CanalMQStarter的start方法啟動(dòng)實(shí)例 processActiveEnter();// 觸發(fā)一下事件 mutex.set(true); release = false; } catch (ZkNodeExistsException e) { bytes = zkClient.readData(path, true); if (bytes == null) {// 如果不存在節(jié)點(diǎn),立即嘗試一次 initRunning(); } else { activeData = JsonUtils.unmarshalFromByte(bytes, ServerRunningData.class); } } catch (ZkNoNodeException e) { zkClient.createPersistent(ZookeeperPathUtils.getDestinationPath(destination), true); // 嘗試創(chuàng)建父節(jié)點(diǎn) initRunning(); } }
canal.properties配置
canal.register.ip = canal.admin.manager = 127.0.0.1:8089 canal.admin.port = 11110 canal.admin.user = admin canal.admin.passwd = 4ACFE3202A5FF5CF467898FC58AAB1D615029441 canal.admin.register.auto = true canal.admin.register.cluster =
上述就是小編為大家分享的如何用源碼分析canal的deployer模塊了,如果剛好有類似的疑惑,不妨參照上述分析進(jìn)行理解。如果想知道更多相關(guān)知識(shí),歡迎關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道。