今天就跟大家聊聊有關(guān)什么是增量索引實(shí)現(xiàn)以及投送數(shù)據(jù)到MQ kafka,可能很多人都不太了解,為了讓大家更加了解,小編給大家總結(jié)了以下內(nèi)容,希望大家根據(jù)這篇文章可以有所收獲。
創(chuàng)新互聯(lián)是一家專業(yè)提供海拉爾企業(yè)網(wǎng)站建設(shè),專注與成都做網(wǎng)站、成都網(wǎng)站設(shè)計(jì)、HTML5建站、小程序制作等業(yè)務(wù)。10年已為海拉爾眾多企業(yè)、政府機(jī)構(gòu)等服務(wù)。創(chuàng)新互聯(lián)專業(yè)網(wǎng)絡(luò)公司優(yōu)惠進(jìn)行中。
我們將根據(jù)binlog 的數(shù)據(jù)對(duì)象,來(lái)實(shí)現(xiàn)增量數(shù)據(jù)的處理,我們構(gòu)建廣告的增量數(shù)據(jù),其實(shí)說(shuō)白了就是為了在后期能把廣告投放到索引服務(wù),實(shí)現(xiàn)增量數(shù)據(jù)到增量索引的生成。
定義一個(gè)投遞增量數(shù)據(jù)的接口(接收參數(shù)為我們上一節(jié)定義的binlog日志的轉(zhuǎn)換對(duì)象)
/** * ISender for 投遞增量數(shù)據(jù) 方法定義接口 * * @author Isaac.Zhang | 若初 */ public interface ISender { void sender(MySQLRowData rowData); }
創(chuàng)建增量索引監(jiān)聽器
/** * IncrementListener for 增量數(shù)據(jù)實(shí)現(xiàn)監(jiān)聽 * * @author Isaac.Zhang | 若初 * @since 2019/6/27 */ @Slf4j @Component public class IncrementListener implements Ilistener { private final AggregationListener aggregationListener; @Autowired public IncrementListener(AggregationListener aggregationListener) { this.aggregationListener = aggregationListener; } //根據(jù)名稱選擇要注入的投遞方式 @Resource(name = "indexSender") private ISender sender; /** * 標(biāo)注為 {@link PostConstruct}, * 即表示在服務(wù)啟動(dòng),Bean完成初始化之后,立刻初始化 */ @Override @PostConstruct public void register() { log.info("IncrementListener register db and table info."); Constant.table2db.forEach((tb, db) -> aggregationListener.register(db, tb, this)); } @Override public void onEvent(BinlogRowData eventData) { TableTemplate table = eventData.getTableTemplate(); EventType eventType = eventData.getEventType(); //包裝成最后需要投遞的數(shù)據(jù) MysqlRowData rowData = new MysqlRowData(); rowData.setTableName(table.getTableName()); rowData.setLevel(eventData.getTableTemplate().getLevel()); //將EventType轉(zhuǎn)為OperationTypeEnum OperationTypeEnum operationType = OperationTypeEnum.convert(eventType); rowData.setOperationTypeEnum(operationType); //獲取模版中該操作對(duì)應(yīng)的字段列表 ListfieldList = table.getOpTypeFieldSetMap().get(operationType); if (null == fieldList) { log.warn("{} not support for {}.", operationType, table.getTableName()); return; } for (Map afterMap : eventData.getAfter()) { Map _afterMap = new HashMap<>(); for (Map.Entry entry : afterMap.entrySet()) { String colName = entry.getKey(); String colValue = entry.getValue(); _afterMap.put(colName, colValue); } rowData.getFieldValueMap().add(_afterMap); } sender.sender(rowData); } }
首先來(lái)配置監(jiān)聽binlog的數(shù)據(jù)庫(kù)連接信息
adconf: mysql: host: 127.0.0.1 port: 3306 username: root password: 12345678 binlogName: "" position: -1 # 從當(dāng)前位置開始監(jiān)聽
編寫配置類:
/** * BinlogConfig for 定義監(jiān)聽Binlog的配置信息 * * @author Isaac.Zhang | 若初 */ @Component @ConfigurationProperties(prefix = "adconf.mysql") @Data @AllArgsConstructor @NoArgsConstructor public class BinlogConfig { private String host; private Integer port; private String username; private String password; private String binlogName; private Long position; }
在我們實(shí)現(xiàn) 監(jiān)聽binlog那節(jié),我們實(shí)現(xiàn)了一個(gè)自定義client CustomBinlogClient
,需要實(shí)現(xiàn)binlog的監(jiān)聽,這個(gè)監(jiān)聽的客戶端就必須是一個(gè)獨(dú)立運(yùn)行的線程,并且要在程序啟動(dòng)的時(shí)候進(jìn)行監(jiān)聽,我們來(lái)實(shí)現(xiàn)運(yùn)行當(dāng)前client的方式,這里我們會(huì)使用到一個(gè)新的Runnerorg.springframework.boot.CommandLineRunner
,let's code.
@Slf4j @Component public class BinlogRunner implements CommandLineRunner { @Autowired private CustomBinlogClient binlogClient; @Override public void run(String... args) throws Exception { log.info("BinlogRunner is running..."); binlogClient.connect(); } }
在binlog監(jiān)聽的過(guò)程中,我們看到針對(duì)于int, String 這類數(shù)據(jù)字段,mysql的記錄是沒(méi)有問(wèn)題的,但是針對(duì)于時(shí)間類型,它被格式化成了字符串類型:Fri Jun 21 15:07:53 CST 2019
。
--------Insert----------- WriteRowsEventData{tableId=91, includedColumns={0, 1, 2, 3, 4, 5, 6, 7}, rows=[ [10, 11, ad unit test binlog, 1, 0, 1236.7655, Thu Jun 27 08:00:00 CST 2019, Thu Jun 27 08:00:00 CST 2019] --------Update----------- UpdateRowsEventData{tableId=81, includedColumnsBeforeUpdate={0, 1, 2, 3, 4, 5}, includedColumns={0, 1, 2, 3, 4, 5}, rows=[ {before=[10, Isaac Zhang, 2D3ABB6F2434109A105170FB21D00453, 0, Fri Jun 21 15:07:53 CST 2019, Fri Jun 21 15:07:53 CST 2019], after=[10, Isaac Zhang, 2D3ABB6F2434109A105170FB21D00453, 1, Fri Jun 21 15:07:53 CST 2019, Fri Jun 21 15:07:53 CST 2019]}
對(duì)于這個(gè)時(shí)間格式,我們需要關(guān)注2點(diǎn)信息:
CST,這個(gè)時(shí)間格式會(huì)比我們的時(shí)間+ 8h(中國(guó)標(biāo)準(zhǔn)時(shí)間 China Standard Time UT+8:00)
需要對(duì)這個(gè)日期進(jìn)行解釋處理
當(dāng)然,我們也可以通過(guò)設(shè)置mysql的日期格式來(lái)改變?cè)撔袨椋诖?,我們通過(guò)編碼來(lái)解析該時(shí)間格式:
/** * Thu Jun 27 08:00:00 CST 2019 */ public static Date parseBinlogString2Date(String dateString) { try { DateFormat dateFormat = new SimpleDateFormat( "EEE MMM dd HH:mm:ss zzz yyyy", Locale.US ); return DateUtils.addHours(dateFormat.parse(dateString), -8); } catch (ParseException ex) { log.error("parseString2Date error:{}", dateString); return null; } }
因?yàn)槲覀冊(cè)诙x索引的時(shí)候,是根據(jù)表之間的層級(jí)關(guān)系(Level)來(lái)設(shè)定的,根據(jù)代碼規(guī)范,不允許出現(xiàn)Magic Number, 因此我們定義一個(gè)數(shù)據(jù)層級(jí)枚舉,來(lái)表達(dá)數(shù)據(jù)層級(jí)。
/** * AdDataLevel for 廣告數(shù)據(jù)層級(jí) * * @author Isaac.Zhang | 若初 */ @Getter public enum AdDataLevel { LEVEL2("2", "level 2"), LEVEL3("3", "level 3"), LEVEL4("4", "level 4"); private String level; private String desc; AdDataLevel(String level, String desc) { this.level = level; this.desc = desc; } }
因?yàn)樵隽繑?shù)據(jù)可以投遞到不同的位置以及用途,我們之前實(shí)現(xiàn)了一個(gè)投遞接口com.sxzhongf.ad.sender.ISender
,接下來(lái)我們實(shí)現(xiàn)一個(gè)投遞類:
@Slf4j @Component("indexSender") public class IndexSender implements ISender { /** * 根據(jù)廣告級(jí)別,投遞Binlog數(shù)據(jù) */ @Override public void sender(MysqlRowData rowData) { if (AdDataLevel.LEVEL2.getLevel().equals(rowData.getLevel())) { Level2RowData(rowData); } else if (AdDataLevel.LEVEL3.getLevel().equals(rowData.getLevel())) { Level3RowData(rowData); } else if (AdDataLevel.LEVEL4.getLevel().equals(rowData.getLevel())) { Level4RowData(rowData); } else { log.error("Binlog MysqlRowData error: {}", JSON.toJSONString(rowData)); } } private void Level2RowData(MysqlRowData rowData) { if (rowData.getTableName().equals(Constant.AD_PLAN_TABLE_INFO.TABLE_NAME)) { ListplanTables = new ArrayList<>(); for (Map fieldValueMap : rowData.getFieldValueMap()) { AdPlanTable planTable = new AdPlanTable(); //Map的第二種循環(huán)方式 fieldValueMap.forEach((k, v) -> { switch (k) { case Constant.AD_PLAN_TABLE_INFO.COLUMN_PLAN_ID: planTable.setPlanId(Long.valueOf(v)); break; case Constant.AD_PLAN_TABLE_INFO.COLUMN_USER_ID: planTable.setUserId(Long.valueOf(v)); break; case Constant.AD_PLAN_TABLE_INFO.COLUMN_PLAN_STATUS: planTable.setPlanStatus(Integer.valueOf(v)); break; case Constant.AD_PLAN_TABLE_INFO.COLUMN_START_DATE: planTable.setStartDate(CommonUtils.parseBinlogString2Date(v)); break; case Constant.AD_PLAN_TABLE_INFO.COLUMN_END_DATE: planTable.setEndDate(CommonUtils.parseBinlogString2Date(v)); break; } }); planTables.add(planTable); } //投遞推廣計(jì)劃 planTables.forEach(p -> AdLevelDataHandler.handleLevel2Index(p, rowData.getOperationTypeEnum())); } else if (rowData.getTableName().equals(Constant.AD_CREATIVE_TABLE_INFO.TABLE_NAME)) { List creativeTables = new LinkedList<>(); rowData.getFieldValueMap().forEach(afterMap -> { AdCreativeTable creativeTable = new AdCreativeTable(); afterMap.forEach((k, v) -> { switch (k) { case Constant.AD_CREATIVE_TABLE_INFO.COLUMN_CREATIVE_ID: creativeTable.setAdId(Long.valueOf(v)); break; case Constant.AD_CREATIVE_TABLE_INFO.COLUMN_TYPE: creativeTable.setType(Integer.valueOf(v)); break; case Constant.AD_CREATIVE_TABLE_INFO.COLUMN_MATERIAL_TYPE: creativeTable.setMaterialType(Integer.valueOf(v)); break; case Constant.AD_CREATIVE_TABLE_INFO.COLUMN_HEIGHT: creativeTable.setHeight(Integer.valueOf(v)); break; case Constant.AD_CREATIVE_TABLE_INFO.COLUMN_WIDTH: creativeTable.setWidth(Integer.valueOf(v)); break; case Constant.AD_CREATIVE_TABLE_INFO.COLUMN_AUDIT_STATUS: creativeTable.setAuditStatus(Integer.valueOf(v)); break; case Constant.AD_CREATIVE_TABLE_INFO.COLUMN_URL: creativeTable.setAdUrl(v); break; } }); creativeTables.add(creativeTable); }); //投遞廣告創(chuàng)意 creativeTables.forEach(c -> AdLevelDataHandler.handleLevel2Index(c, rowData.getOperationTypeEnum())); } } private void Level3RowData(MysqlRowData rowData) { ... } /** * 處理4級(jí)廣告 */ private void Level4RowData(MysqlRowData rowData) { ... } }
為了我們的數(shù)據(jù)投放更加靈活,方便數(shù)據(jù)統(tǒng)計(jì),分析等系統(tǒng)的需求,我們來(lái)實(shí)現(xiàn)一個(gè)投放到消息中的接口,其他服務(wù)可以訂閱當(dāng)前MQ 的TOPIC來(lái)實(shí)現(xiàn)數(shù)據(jù)訂閱。
配置文件中配置TOPIC adconf: kafka: topic: ad-search-mysql-data -------------------------------------- /** * KafkaSender for 投遞Binlog增量數(shù)據(jù)到kafka消息隊(duì)列 * * @author Isaac.Zhang | 若初 * @since 2019/7/1 */ @Component(value = "kafkaSender") public class KafkaSender implements ISender { @Value("${adconf.kafka.topic}") private String topic; @Autowired private KafkaTemplate kafkaTemplate; /** * 發(fā)送數(shù)據(jù)到kafka隊(duì)列 */ @Override public void sender(MysqlRowData rowData) { kafkaTemplate.send( topic, JSON.toJSONString(rowData) ); } /** * 測(cè)試消費(fèi)kafka消息 */ @KafkaListener(topics = {"ad-search-mysql-data"}, groupId = "ad-search") public void processMysqlRowData(ConsumerRecord record) { Optional kafkaMsg = Optional.ofNullable(record.value()); if (kafkaMsg.isPresent()) { Object message = kafkaMsg.get(); MysqlRowData rowData = JSON.parseObject( message.toString(), MysqlRowData.class ); System.out.println("kafka process MysqlRowData: " + JSON.toJSONString(rowData)); //sender.sender(); } } } ```
看完上述內(nèi)容,你們對(duì)什么是增量索引實(shí)現(xiàn)以及投送數(shù)據(jù)到MQ kafka有進(jìn)一步的了解嗎?如果還想了解更多知識(shí)或者相關(guān)內(nèi)容,請(qǐng)關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道,感謝大家的支持。