這篇文章主要介紹“MongoDB代理程序如何實(shí)現(xiàn)”的相關(guān)知識(shí),小編通過實(shí)際案例向大家展示操作過程,操作方法簡單快捷,實(shí)用性強(qiáng),希望這篇“Mongodb代理程序如何實(shí)現(xiàn)”文章能幫助大家解決問題。
創(chuàng)新互聯(lián)是一家集網(wǎng)站建設(shè),二道企業(yè)網(wǎng)站建設(shè),二道品牌網(wǎng)站建設(shè),網(wǎng)站定制,二道網(wǎng)站建設(shè)報(bào)價(jià),網(wǎng)絡(luò)營銷,網(wǎng)絡(luò)優(yōu)化,二道網(wǎng)站推廣為一體的創(chuàng)新建站企業(yè),幫助傳統(tǒng)企業(yè)提升企業(yè)形象加強(qiáng)企業(yè)競爭力。可充分滿足這一群體相比中小企業(yè)更為豐富、高端、多元的互聯(lián)網(wǎng)需求。同時(shí)我們時(shí)刻保持專業(yè)、時(shí)尚、前沿,時(shí)刻以成就客戶成長自我,堅(jiān)持不斷學(xué)習(xí)、思考、沉淀、凈化自己,讓我們?yōu)楦嗟钠髽I(yè)打造出實(shí)用型網(wǎng)站。
根據(jù)一貫的風(fēng)格,我們先來梳理下項(xiàng)目目錄結(jié)構(gòu),結(jié)構(gòu)如下:
|__ bin/ # 用于存放編譯后生成的二進(jìn)制文件
|__ config/ # 用于存放配置文件
|__ connection/ # 存放連接相關(guān)的文件
| |__ proxy.go # 代理組件
| |__ pool.go # 連接池組件
| |__ repl_set.go # 復(fù)制集組件
| |__ conn.go # 連接對(duì)象組件
|__ internal/ # 存放 mongo 內(nèi)部協(xié)議相關(guān)文件
| |__ auth.go # 握手鑒權(quán)組件
| |__ protocol.go # 協(xié)議解析組件
| |__ request.go # 請(qǐng)求重寫組件
| |__ response.go # 響應(yīng)重寫組件
|__ statistics/ # 存放指標(biāo)統(tǒng)計(jì)上報(bào)組件
|__ test/ # 存放各種語言驅(qū)動(dòng)測試代碼的文件夾
|__ utils/ # 工具函數(shù)文件夾
|__ glide.yaml # 依賴包配置文件
|__ main.go # 入口文件
proxy 實(shí)現(xiàn)
最簡單的 proxy 實(shí)現(xiàn)套路就像下面這樣:
// main.go
func main() {
// 傳入配置參數(shù),實(shí)例化一個(gè)代理對(duì)象
p := NewProxy(conf)
// 卡住,循環(huán)接受客戶端請(qǐng)求
p.LoopAccept()
}
接著來實(shí)現(xiàn) NewProxy、LoopAccept 方法:
// connection/proxy.go
type Proxy struct {
sync.RWMutex
listener net.Listener
writePool, readPool *pool
}
func NewProxy(conf config.UserConf) *Proxy {
// 開始監(jiān)聽本地端口
listener, err := net.Listen("tcp", ":"+conf.GetString("port"))
if err != nil {
log.Fatalln(err)
}
p := &Proxy{
listener: listener,
}
// 實(shí)例化連接池
p.readPool, p.writePool, err = newPool(p)
if err != nil {
panic(err)
}
return p
}
func (p *Proxy) LoopAccept() {
for {
client, err := p.listener.Accept()
go func(c net.Conn) {
defer c.Close()
// 一個(gè)連接在多次 messageHandler 中共用一個(gè) Reader 對(duì)象
cr := bufio.NewReader(c)
// 因?yàn)橐粋€(gè)連接可能會(huì)進(jìn)行多次讀或?qū)懖僮?/p>
for {
// 將客戶端請(qǐng)求代理給服務(wù)端,服務(wù)端響應(yīng)代理回客戶端
// 同時(shí)中間對(duì)請(qǐng)求或響應(yīng)進(jìn)行重寫操作
err := p.messageHandler(cr, c)
if err != nil {
// 只要出現(xiàn)錯(cuò)誤,就執(zhí)行到上面的 defer c.Close() 來關(guān)閉連接
return
}
}
}(client)
}
}
接著來實(shí)現(xiàn)核心邏輯 messageHandler:
// connection/proxy.go
func (p *Proxy) messageHandler(cr *bufio.Reader, c net.Conn) error {
// 對(duì)請(qǐng)求報(bào)文進(jìn)行解析操作
req, err := internal.Decode(clientReader)
if err != nil {
return errors.New("decode error")
}
// 將客戶端請(qǐng)求發(fā)送給數(shù)據(jù)庫服務(wù)器
res, err := p.clientToServer(req)
if err != nil {
return errors.New("request error")
}
// 將數(shù)據(jù)庫服務(wù)器響應(yīng)返回給客戶端
return res.WriteTo(c)
}
func (p *Proxy) clientToServer(req *internal.Message) (*internal.Message, error) {
var server net.Conn
// 如果是讀操作,就從讀池中取出連接
if req.IsReadOp() {
host := req.GetHost()
// 某些讀操作需要發(fā)送到指定的讀庫上,所以需要傳 host,來獲取指定讀庫連接
server = p.readPool.Acquire(host)
// 反之,寫操作從寫池中取出連接
} else {
// 由于寫庫只有一個(gè),所以不用傳 host 參數(shù)了
server = p.writePool.Acquire()
}
// 將客戶端請(qǐng)求發(fā)送給數(shù)據(jù)庫服務(wù)器
err := req.WriteTo(server)
if err != nil {
return nil, err
}
// 獲取解析數(shù)據(jù)庫服務(wù)器響應(yīng)
res, err := internal.Decode(bufio.NewReader(server))
return res, err
}
大致邏輯就是,客戶端通過代理把請(qǐng)求發(fā)給服務(wù)端,服務(wù)端響應(yīng)也通過代理響應(yīng)回客戶端。
------------ request ----------- request ------------
| | --------> | | --------> | |
| client | | proxy | | repl_set |
| | <-------- | | <-------- | |
------------ response ----------- response ------------
吶——,當(dāng)然還有非常多的細(xì)節(jié),由于篇幅原因不得不省略...
pool 實(shí)現(xiàn)
由 proxy 的代碼邏輯來看,我們?nèi)∽x或?qū)憥爝B接是通過讀或?qū)懗氐?Acquire 方法來取的:
// connection/pool.go
type pool struct {
sync.RWMutex
connCh chan net.Conn
newConn func(string) (net.Conn, error)
freeConn func(net.Conn) error
}
func (p *pool) Acquire(opts ...interface{}) (net.Conn, error) {
host := ""
if len(opts) > 0 {
host, _ = (opts[0]).(string)
}
chLen := len(p.connCh)
// 從 channel 中遍歷剩余數(shù)量的 conn
for i := 0; i < chLen; i++ {
select {
case conn, ok := <- ch:
if ok {
if len(host) > 0 {
if conn.RemoteAddr().String() == host {
return conn, nil
}
// 沒有找到對(duì)應(yīng) host 的 conn,則把 conn 重新放回 channel
// 你可以簡單理解為只是執(zhí)行了 p.connCh <- conn 操作
p.freeConn(conn)
} else {
return conn, nil
}
}
// 避免數(shù)量不足而導(dǎo)致 channel 阻塞等待
default:
}
}
// 若還沒有從 channel 中取到 conn,則立馬 new 一個(gè)
conn, err := p.newConn(host)
if err != nil {
return nil, err
}
return conn, nil
}
池的實(shí)現(xiàn)大致就是實(shí)現(xiàn)了一個(gè)循環(huán)隊(duì)列,連接從池中取,取出的連接在使用完后,可以放回池中。
關(guān)于“Mongodb代理程序如何實(shí)現(xiàn)”的內(nèi)容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業(yè)相關(guān)的知識(shí),可以關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道,小編每天都會(huì)為大家更新不同的知識(shí)點(diǎn)。