這期內(nèi)容當中小編將會給大家?guī)碛嘘P(guān)PostgreSQL中怎么實時干預(yù)搜索排序,文章內(nèi)容豐富且以專業(yè)的角度為大家分析和敘述,閱讀完這篇文章希望大家可以有所收獲。
創(chuàng)新互聯(lián)公司是專業(yè)的江南網(wǎng)站建設(shè)公司,江南接單;提供網(wǎng)站建設(shè)、網(wǎng)站設(shè)計,網(wǎng)頁設(shè)計,網(wǎng)站設(shè)計,建網(wǎng)站,PHP網(wǎng)站建設(shè)等專業(yè)做網(wǎng)站服務(wù);采用PHP框架,可快速的進行江南網(wǎng)站開發(fā)網(wǎng)頁制作和功能擴展;專業(yè)做搜索引擎喜愛的網(wǎng)站,專業(yè)的做網(wǎng)站團隊,希望更多企業(yè)前來合作!PostgreSQL是一個歷史悠久的數(shù)據(jù)庫,最早由加州大學伯克利分校的Michael Stonebraker教授領(lǐng)導設(shè)計,具備與Oracle類似的功能、性能、架構(gòu)以及穩(wěn)定性。
阿里云HybridDB for PostgreSQL,提供大規(guī)模并行處理(MPP)數(shù)據(jù)倉庫服務(wù), 支持多核并行計算、向量計算、圖計算、JSON,JSONB全文檢索。
PostgreSQL高效的并行處理能力,基于JSON格式數(shù)據(jù)合并能力以及Notify實時消息能力,給我們提供了具體實現(xiàn)思路。因此基于上文提到業(yè)務(wù)挑戰(zhàn),我們梳理了相關(guān)實現(xiàn)方案。
仔細分析整體方案,歸納起來涉及的方面有:
異構(gòu)數(shù)據(jù)源接入
歸一化服務(wù)
數(shù)據(jù)合并策略
實時分數(shù)重算
閑魚商品相關(guān)的數(shù)據(jù)非常豐富,有各種異構(gòu)數(shù)據(jù)源,如全量的離線商品數(shù)據(jù),實時商品變更數(shù)據(jù),各種算法維度數(shù)據(jù)等,在實現(xiàn)上可通過阿里云大數(shù)據(jù)平臺,binlog監(jiān)聽工具等進行統(tǒng)一處理。
如上圖所示,所有異構(gòu)數(shù)據(jù)源都按照統(tǒng)一格式,通過異步消息,輸入到歸一化服務(wù),該方案的優(yōu)點是不管全量數(shù)據(jù)還是增量數(shù)據(jù)都統(tǒng)一走消息服務(wù),簡化接入流程,同時通過消息中間層進行解耦,提高穩(wěn)定性。
歸一化服務(wù)接收上游異構(gòu)數(shù)據(jù)源消息,通過數(shù)據(jù)校驗?zāi)K、數(shù)據(jù)補全模塊、標準格式轉(zhuǎn)換模塊、數(shù)據(jù)監(jiān)控模塊為下游輸送正確的數(shù)據(jù)。
數(shù)據(jù)校驗?zāi)?如下圖所示,數(shù)據(jù)源結(jié)合元數(shù)據(jù)中心進行字段級別的校驗,如字段名稱,數(shù)據(jù)類型,數(shù)據(jù)范圍、默認值等,引入元數(shù)據(jù)中心大優(yōu)勢是可以細粒度的控制數(shù)據(jù)源,防止臟數(shù)據(jù)、不需要的數(shù)據(jù)污染下游。
數(shù)據(jù)補全模塊 數(shù)據(jù)源通常需要實時補全一些數(shù)據(jù)干預(yù)指標,如用戶編輯商品,需實時分析打標是否有黃圖,商品價格預(yù)測等,整個干預(yù)流程要以pipeline的形式,暴露擴展點,允許插入干預(yù)能力。
標準格式轉(zhuǎn)換模塊 標準格式轉(zhuǎn)換模塊將數(shù)據(jù)源統(tǒng)一按標準的格式轉(zhuǎn)換成JSON結(jié)構(gòu),便于下游統(tǒng)一數(shù)據(jù)合并。
數(shù)據(jù)監(jiān)控模塊 數(shù)據(jù)監(jiān)控模塊記錄數(shù)據(jù)源的每一條數(shù)據(jù)以及異常數(shù)據(jù)記錄,并將數(shù)據(jù)投遞到監(jiān)控系統(tǒng),監(jiān)控每個異構(gòu)數(shù)據(jù)源異常數(shù)據(jù),流量異常情況,第一時間發(fā)現(xiàn)并恢復(fù)問題。
數(shù)據(jù)合并策略主要包括基于時間戳的數(shù)據(jù)合開和數(shù)據(jù)變更通知兩個先后處理流程,在數(shù)據(jù)合并流程會遇到一個核心問題,即如何快速有效的解決每個字段的沖突合并,基于時間戳統(tǒng)一merge。這里首先會涉及到數(shù)據(jù)存儲結(jié)構(gòu),參考如下表設(shè)計結(jié)構(gòu):
create table Test (id int8 primary key, -- 商品IDatt jsonb -- 商品屬性);
屬性設(shè)計為JSON,JSON里面是K-V的屬性對,如下屬性結(jié)構(gòu)示例,V里面是數(shù)組,包含K的值以及這對屬性的最后更新時間,更新時間用于merge update,當屬性發(fā)生變化時才更新,沒有發(fā)生變化時,不更新。這種設(shè)計優(yōu)點:
字段級別細粒度merge,保證最小集數(shù)據(jù)實時性
高擴展性,表不需要增減字段
屬性結(jié)構(gòu)示例
{"count": [100, "2017-01-01 10:10:00"], "price": [8880, "2018-01-04 10:10:12"], "newatt": [120, "2017-01-01 12:22:00"]}
定義完存儲結(jié)構(gòu), 接下來利用PostgreSQL的JSON處理能力進行數(shù)據(jù)merge,參考如下merge udf 偽代碼:
create or replace function merge_json(jsonb, jsonb) returns jsonb as $$ select jsonb_object_agg(key,value) from ( select coalesce(a.key, b.key) as key, case when coalesce(jsonb_array_element(a.value,1)::text::timestamp, '1970-01-01'::timestamp) > coalesce(jsonb_array_element(b.value,1)::text::timestamp, '1970-01-01'::timestamp) then a.value else b.value end from jsonb_each($1) a full outer join jsonb_each($2) b using (key) ) t; $$ language sql strict ;
定義完merge方法后,我們在數(shù)據(jù)源有數(shù)據(jù)變更時直接調(diào)用。
insert into a values (1, '{"price":[1000, "2019-01-01 10:10:12"], "newatt": ["hello", "2018-01-01"]}') on conflict (id) do update set att = merge_json(a.att, excluded.att) where a.att <> merge_json(a.att, excluded.att);
從上面可以看出當商品ID出現(xiàn)沖突時,會調(diào)用merge_json 進行數(shù)據(jù)合并,至此數(shù)據(jù)合并流程完成,接下來需要將合并結(jié)果實時通知下游,可以利用PostgreSQL的觸發(fā)品和Notify機制來處理。
觸發(fā)器設(shè)計
//觸發(fā)器要執(zhí)行的udf CREATE OR REPLACE FUNCTION notify1() returns trigger AS $function$ declare begin perform pg_notify( 'a', -- 異步消息通道名字 format('CLASS:notify, ID:%s, ATT:%s', NEW.id, NEW.att) -- 消息內(nèi)容 ); return null; end $function$ language plpgsql strict; //創(chuàng)建觸發(fā)器 create trigger tg1 after insert or update on Test for each row execute procedure notify1();
可以看出當數(shù)據(jù)插入或更新會觸發(fā)trigger 執(zhí)行nofity1 函數(shù)創(chuàng)建異步nofity消息,并向指定的通道發(fā)送通知,下游應(yīng)用可通過jdbc監(jiān)聽相應(yīng)的通道,接收消息,進行后續(xù)實時打分流程,參考如下偽代碼:
this.pgconn = conn.unwrap(org.postgresql.PGConnection.class); Statement stmt = conn.createStatement(); stmt.execute("LISTEN a"); stmt.close(); org.postgresql.PGNotification notifications[] = pgconn.getNotifications(); if (notifications != null) { for (int i=0; i < notifications.length; i++) { System.out.println("Got notification: " + notifications[i].getName()); } }
另外PostgreSQL并發(fā)處理性能非常高效,綁定觸發(fā)器后會增加PostgreSQL的數(shù)據(jù)寫入時長,但是壓測結(jié)果來看,依然能夠滿足我們的業(yè)務(wù)寫入性能要求,
以1000萬數(shù)據(jù)測試結(jié)果為例:
數(shù)據(jù)實時打分干預(yù)搜索
服務(wù)層在監(jiān)聽到Notify消息,解析消息數(shù)據(jù),通過規(guī)則引擎對各指標權(quán)重進行分數(shù)重算,計算綜合分數(shù),打到搜索tag表,搜索引擎實時監(jiān)測tag表,將綜合分數(shù)dump到搜索引擎,實時干擾排序結(jié)果。
上述就是小編為大家分享的PostgreSQL中怎么實時干預(yù)搜索排序了,如果剛好有類似的疑惑,不妨參照上述分析進行理解。如果想知道更多相關(guān)知識,歡迎關(guān)注創(chuàng)新互聯(lián)-成都網(wǎng)站建設(shè)公司行業(yè)資訊頻道。