場(chǎng)景
本例子支持多線程異步處理消息,針對(duì)每一個(gè)鏈接請(qǐng)求,創(chuàng)建線程處理稍后的指令,CSimpleSession::SessionThreadFunc是線程函數(shù),async_read_some函數(shù)設(shè)置接收數(shù)據(jù)的回調(diào)函數(shù)ContinueRead,一般情況下,read_some函數(shù)未必能夠完整的讀取客戶端發(fā)送的數(shù)據(jù)包,當(dāng)然必須要指定明確的結(jié)束標(biāo)志,雙方必須規(guī)定好等接收完畢的時(shí)候,必須等待線程返回,因此在析構(gòu)函數(shù)調(diào)用m_thread->join函數(shù),等線程函數(shù)正常返回之后,關(guān)閉連接,如果沒有等待線程返回,就直接關(guān)閉連接,會(huì)導(dǎo)致async_read_some函數(shù)拋出異常,目前暫時(shí)沒有什么頭緒
service.h
#ifndef QPIDPUSHMESSAGESERVICE_H
#define QPIDPUSHMESSAGESERVICE_H
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
namespace qpid
{
class CSimpleSession : public boost::enable_shared_from_this
{
public:
CSimpleSession(boost::asio::io_service &io_service) : m_socket(io_service)
{
m_bRunning = true;
PrepareForNextRecv();
}
~CSimpleSession()
{
m_bRunning = false;
m_thread->join();
m_socket.close();
}
void StartThread()
{
static boost::asio::ip::tcp::no_delay option(true);
m_socket.set_option(option);
m_thread.reset(new boost::thread(boost::bind(&CSimpleSession::SessionThreadFunc, this)));
}
void SessionThreadFunc()
{
while (m_bRunning)
{
if (m_bStartSetCallBackRead)
{
m_socket.async_read_some(boost::asio::buffer(m_szRecvBuffer),
boost::bind(&CSimpleSession::ContinueRead, shared_from_this(),
boost::asio::placeholders::error,
boost::asio::placeholders::bytes_transferred));
m_bStartSetCallBackRead = false;
}
boost::this_thread::sleep_for(boost::chrono::milliseconds(300));
}
m_bRunning = false;
}
boost::asio::ip::tcp::socket &GetSocket()
{
return m_socket;
}
bool GetCurThreadRunningStatus()
{
return m_bRunning;
}
void PrepareForNextRecv()
{
memset(m_szRecvBuffer, 0x00, 10240);
m_strMatch = "";
m_bStartSetCallBackRead = true;
}
private:
void ContinueRead(const boost::system::error_code &error, std::size_t bytes_transferred)
{
if (error)
{
m_bRunning = false;
return;
}
m_strMatch = m_szRecvBuffer;
int nIndexOfContentLength = m_strMatch.find("Content-Length:", 0);
int indexOfEnd = m_strMatch.find("\r\n\r\n", 0);
if (nIndexOfContentLength == -1)
{
m_bRunning = false;
return;
}
std::cout << m_strMatch << std::endl;
std::string strContextLen = m_strMatch.substr(nIndexOfContentLength + 15, indexOfEnd - nIndexOfContentLength - 15);
int nContextLen = atoi(strContextLen.c_str());
if (nContextLen < m_strMatch.length())
{
//handle
m_bRunning = false;
return;
}
m_socket.async_read_some(boost::asio::buffer((m_szRecvBuffer)),
boost::bind(&CSimpleSession::ContinueRead, shared_from_this(),
boost::asio::placeholders::error,
boost::asio::placeholders::bytes_transferred));
}
private:
boost::asio::ip::tcp::socket m_socket;
char m_szRecvBuffer[10240];
std::string m_strMatch;
bool m_bStartSetCallBackRead;
bool m_bRunning;
boost::shared_ptr
};
typedef boost::shared_ptr
class CSimpleServer
{
public:
CSimpleServer(boost::asio::io_service &io_service, boost::asio::ip::tcp::endpoint &endpoint)
:m_ioService(io_service), m_acceptor(io_service, endpoint)
{
CPtrSession newSession(new CSimpleSession(io_service));
m_vecSession.push_back(newSession);
m_acceptor.async_accept(newSession->GetSocket(),
boost::bind(&CSimpleServer::HandleAccept,
this,
newSession,
boost::asio::placeholders::error));
}
void HandleAccept(CPtrSession newSession, const boost::system::error_code &error)
{
if (error)return;
//如果Start函數(shù)進(jìn)行了阻塞,只有處理完當(dāng)前的連接,才會(huì)進(jìn)行下一步處理連接
newSession->StartThread();
ClearHasEndConnection();
CPtrSession createNewSession(new CSimpleSession(m_ioService));
//當(dāng)前保存了會(huì)話連接,直到連接被釋放,而不是由于createNewSession跳出循環(huán),導(dǎo)致套接字異常
m_vecSession.push_back(createNewSession);
m_acceptor.async_accept(createNewSession->GetSocket(),
boost::bind(&CSimpleServer::HandleAccept,
this,
createNewSession,
boost::asio::placeholders::error));
}
//定時(shí)清除結(jié)束的連接
void ClearHasEndConnection()
{
std::vector
iter = m_vecSession.begin();
std::size_t count = m_vecSession.size();
std::cout << "session count:" << count << std::endl;
while (iter != m_vecSession.end())
{
if (!(*iter)->GetCurThreadRunningStatus())
{
iter->reset();
m_vecSession.erase(iter);
break;
}
iter++;
}
}
void run()
{
m_ioService.run();
}
private:
boost::asio::io_service &m_ioService;
std::vector
boost::asio::ip::tcp::acceptor m_acceptor;
};
void StartListenThread();
int StartListenService();
}
#endif
service.cpp
#include
#include "service.h"
void qpid::StartListenThread()
{
boost::asio::io_service ioService;
boost::asio::ip::tcp::endpoint endpoint(boost::asio::ip::address::from_string("192.168.0.34"), 7003);
qpid::CSimpleServer s(ioService, endpoint);
s.run();
}
int qpid::StartListenService()
{
boost::thread serviceThread(&StartListenThread);
serviceThread.detach();
return 0;
}
說明
這里跟之前的asio 異步服務(wù)器是有很大的區(qū)別
1)套接字可以不用關(guān)閉,其次也不需要擔(dān)心線程的返回問題
2)不再需要保存請(qǐng)求處理的實(shí)例,自然也就沒有管理所有實(shí)例的必要性,至于什么時(shí)候退出,服務(wù)器的接收線程不需要考慮
錯(cuò)誤提醒:
在實(shí)際的應(yīng)用環(huán)境中,在讀數(shù)據(jù)m_socket.read_some(boost::asio::buffer(szRecvBuf), ec)的時(shí)候,會(huì)產(chǎn)生套接字錯(cuò)誤,返回10035,代表含義是在一個(gè)非套接字上嘗試了一個(gè)操作。
出現(xiàn)原因分析:
當(dāng)線程分離的時(shí)候,accept函數(shù)開始等待下一個(gè)請(qǐng)求,createNewSession由于是智能指針,跳出了函數(shù),開始調(diào)用析構(gòu)函數(shù)進(jìn)行對(duì)象的清理,這個(gè)時(shí)候m_socket已經(jīng)被清理掉了,很多類的成員變量已經(jīng)無法被使用了,m_vecThreadInstance.push_back(createNewSession);卻能夠保存對(duì)象的實(shí)例,不至于馬上調(diào)用析構(gòu)函數(shù),如果調(diào)用該函數(shù)的話,就必須自己定時(shí)清理已經(jīng)服務(wù)完畢的對(duì)象
另外有需要云服務(wù)器可以了解下創(chuàng)新互聯(lián)scvps.cn,海內(nèi)外云服務(wù)器15元起步,三天無理由+7*72小時(shí)售后在線,公司持有idc許可證,提供“云服務(wù)器、裸金屬服務(wù)器、高防服務(wù)器、香港服務(wù)器、美國服務(wù)器、虛擬主機(jī)、免備案服務(wù)器”等云主機(jī)租用服務(wù)以及企業(yè)上云的綜合解決方案,具有“安全穩(wěn)定、簡單易用、服務(wù)可用性高、性價(jià)比高”等特點(diǎn)與優(yōu)勢(shì),專為企業(yè)上云打造定制,能夠滿足用戶豐富、多元化的應(yīng)用場(chǎng)景需求。