WIP: rewrite the network stack to use libevent
This commit is contained in:
@ -11,27 +11,20 @@ include_directories(${PROJECT_BINARY_DIR}/src/)
|
||||
|
||||
include_directories(src/)
|
||||
|
||||
# Find dependencies
|
||||
find_package(libmysql REQUIRED)
|
||||
find_package(Boost REQUIRED)
|
||||
|
||||
add_executable(${PROJECT_NAME} ${sources})
|
||||
|
||||
# Link dependencies if found
|
||||
if (libmysql_FOUND)
|
||||
target_link_libraries (${PROJECT_NAME} ${MYSQL_LIBRARIES})
|
||||
endif (libmysql_FOUND)
|
||||
# Find dependencies
|
||||
find_package(Boost REQUIRED)
|
||||
include_directories(${Boost_INCLUDE_DIRS})
|
||||
target_link_libraries (${PROJECT_NAME} PRIVATE ${Boost_LIBRARIES} ${Boost_SYSTEM_LIBRARY})
|
||||
|
||||
if (Boost_FOUND)
|
||||
include_directories(${Boost_INCLUDE_DIRS})
|
||||
target_link_libraries (${PROJECT_NAME} ${Boost_LIBRARIES} ${Boost_SYSTEM_LIBRARY})
|
||||
endif (Boost_FOUND)
|
||||
# Pthreads
|
||||
set(THREADS_PREFER_PTHREAD_FLAG ON)
|
||||
find_package(Threads REQUIRED)
|
||||
target_link_libraries(${PROJECT_NAME} PRIVATE Threads::Threads)
|
||||
|
||||
if (${CMAKE_SYSTEM_NAME} STREQUAL "FreeBSD")
|
||||
# Pthreads
|
||||
set(THREADS_PREFER_PTHREAD_FLAG ON)
|
||||
find_package (Threads REQUIRED)
|
||||
target_link_libraries (${PROJECT_NAME} Threads::Threads)
|
||||
endif (${CMAKE_SYSTEM_NAME} STREQUAL "FreeBSD")
|
||||
# Libevent
|
||||
find_package(Libevent CONFIG REQUIRED)
|
||||
target_link_libraries(${PROJECT_NAME} PRIVATE libevent::core libevent::extra libevent::pthreads)
|
||||
|
||||
target_link_libraries(${PROJECT_NAME} libpoly libsql libthecore)
|
||||
target_link_libraries(${PROJECT_NAME} PRIVATE libpoly libsql libthecore)
|
||||
|
@ -1,4 +1,3 @@
|
||||
|
||||
#include "stdafx.h"
|
||||
|
||||
#include <common/billing.h>
|
||||
@ -41,6 +40,71 @@ CPacketInfo g_item_info;
|
||||
int g_item_count = 0;
|
||||
int g_query_count[2];
|
||||
|
||||
static void AcceptConnection(
|
||||
evconnlistener* listener,
|
||||
evutil_socket_t fd,
|
||||
sockaddr* address,
|
||||
int socklen,
|
||||
void* ctx)
|
||||
{
|
||||
// We got a new connection! We have to create a new peer.
|
||||
auto* clientManager = (CClientManager*) ctx;
|
||||
|
||||
// Create a new buffer for the peer
|
||||
event_base *base = evconnlistener_get_base(listener);
|
||||
bufferevent *bev = bufferevent_socket_new(base, fd, BEV_OPT_CLOSE_ON_FREE);
|
||||
|
||||
// Create a new peer
|
||||
CPeer* peer = clientManager->AddPeer(bev, address);
|
||||
|
||||
// Set the event handlers for this peer
|
||||
bufferevent_setcb(bev, ReadHandler, WriteHandler, EventHandler, peer);
|
||||
|
||||
// Enable the events
|
||||
bufferevent_enable(bev, EV_READ|EV_WRITE);
|
||||
}
|
||||
|
||||
static void AcceptError(evconnlistener *listener, void *ctx) {
|
||||
struct event_base *base = evconnlistener_get_base(listener);
|
||||
int err = EVUTIL_SOCKET_ERROR();
|
||||
fprintf(stderr, "Got an error %d (%s) on the listener. "
|
||||
"Shutting down.\n", err, evutil_socket_error_to_string(err));
|
||||
|
||||
event_base_loopexit(base, NULL);
|
||||
}
|
||||
|
||||
static void ReadHandler(bufferevent *bev, void *ctx) {
|
||||
auto* peer = (CPeer*) ctx;
|
||||
|
||||
if (peer == CClientManager::Instance().GetAuthPeer())
|
||||
if (g_log)
|
||||
sys_log(0, "AUTH_PEER_READ: size %d", peer->GetRecvLength());
|
||||
|
||||
CClientManager::Instance().ProcessPackets(peer);
|
||||
}
|
||||
|
||||
static void WriteHandler(bufferevent *bev, void *ctx) {
|
||||
auto* peer = (CPeer*) ctx;
|
||||
|
||||
if (peer == CClientManager::Instance().GetAuthPeer())
|
||||
if (g_log)
|
||||
sys_log(0, "AUTH_PEER_WRITE: size %d", peer->GetSendLength());
|
||||
}
|
||||
|
||||
static void EventHandler(bufferevent *bev, short events, void *ctx) {
|
||||
auto* peer = (CPeer*) ctx;
|
||||
|
||||
if (events & BEV_EVENT_ERROR)
|
||||
sys_err("PEER libevent error, handle: %d", peer->GetHandle());
|
||||
|
||||
if (events & BEV_EVENT_EOF)
|
||||
sys_log(0, "PEER disconnected: handle %d", peer->GetHandle());
|
||||
|
||||
// Either the socket was closed or an error occured, therefore we can disconnect this peer.
|
||||
if (events & (BEV_EVENT_EOF | BEV_EVENT_ERROR))
|
||||
CClientManager::Instance().RemovePeer(peer);
|
||||
}
|
||||
|
||||
CClientManager::CClientManager() :
|
||||
m_pkAuthPeer(NULL),
|
||||
m_iPlayerIDStart(0),
|
||||
@ -51,7 +115,7 @@ CClientManager::CClientManager() :
|
||||
m_pShopTable(NULL),
|
||||
m_iRefineTableSize(0),
|
||||
m_pRefineTable(NULL),
|
||||
m_bShutdowned(FALSE),
|
||||
m_bShutdowned(false),
|
||||
m_iCacheFlushCount(0),
|
||||
m_iCacheFlushCountLimit(200)
|
||||
{
|
||||
@ -75,16 +139,22 @@ void CClientManager::SetPlayerIDStart(int iIDStart)
|
||||
void CClientManager::Destroy()
|
||||
{
|
||||
m_mChannelStatus.clear();
|
||||
for (itertype(m_peerList) i = m_peerList.begin(); i != m_peerList.end(); ++i)
|
||||
(*i)->Destroy();
|
||||
|
||||
// Close the peer connections and empty the peer list
|
||||
for (auto &peer: m_peerList)
|
||||
peer->Destroy();
|
||||
m_peerList.clear();
|
||||
|
||||
if (m_fdAccept > 0)
|
||||
{
|
||||
socket_close(m_fdAccept);
|
||||
m_fdAccept = -1;
|
||||
}
|
||||
// Free the libevent resources
|
||||
if (m_listener) {
|
||||
evconnlistener_free(m_listener);
|
||||
m_listener = nullptr;
|
||||
}
|
||||
|
||||
if (m_base) {
|
||||
event_base_free(m_base);
|
||||
m_base = nullptr;
|
||||
}
|
||||
}
|
||||
|
||||
bool CClientManager::Initialize()
|
||||
@ -124,16 +194,33 @@ bool CClientManager::Initialize()
|
||||
if (!CConfig::instance().GetValue("BIND_IP", szBindIP, 128))
|
||||
strncpy(szBindIP, "0", sizeof(szBindIP));
|
||||
|
||||
m_fdAccept = socket_tcp_bind(szBindIP, tmpValue);
|
||||
// Create a new libevent base and listen for new connections
|
||||
event_enable_debug_mode();
|
||||
m_base = event_base_new();
|
||||
if (!m_base) {
|
||||
sys_err("Libevent base initialization FAILED!");
|
||||
return false;
|
||||
}
|
||||
|
||||
if (m_fdAccept < 0)
|
||||
{
|
||||
perror("socket");
|
||||
return false;
|
||||
}
|
||||
sockaddr_in sin = {};
|
||||
|
||||
sys_log(0, "ACCEPT_HANDLE: %u", m_fdAccept);
|
||||
fdwatch_add_fd(m_fdWatcher, m_fdAccept, NULL, FDW_READ, false);
|
||||
/* This is an INET address */
|
||||
sin.sin_family = AF_INET;
|
||||
|
||||
sin.sin_addr.s_addr = inet_addr(szBindIP);
|
||||
sin.sin_port = htons(tmpValue);
|
||||
|
||||
m_listener = evconnlistener_new_bind(
|
||||
m_base,
|
||||
AcceptConnection, this,
|
||||
LEV_OPT_CLOSE_ON_FREE|LEV_OPT_REUSEABLE, -1,
|
||||
(const sockaddr*)&sin, sizeof(sin)
|
||||
);
|
||||
if (!m_listener) {
|
||||
sys_err("Libevent listener initialization FAILED!");
|
||||
return false;
|
||||
}
|
||||
evconnlistener_set_error_cb(m_listener, AcceptError);
|
||||
|
||||
if (!CConfig::instance().GetValue("BACKUP_LIMIT_SEC", &tmpValue))
|
||||
tmpValue = 600;
|
||||
@ -242,7 +329,7 @@ void CClientManager::MainLoop()
|
||||
|
||||
void CClientManager::Quit()
|
||||
{
|
||||
m_bShutdowned = TRUE;
|
||||
m_bShutdowned = true;
|
||||
}
|
||||
|
||||
void CClientManager::QUERY_BOOT(CPeer* peer, TPacketGDBoot * p)
|
||||
@ -2745,14 +2832,17 @@ void CClientManager::ProcessPackets(CPeer * peer)
|
||||
peer->RecvEnd(i);
|
||||
}
|
||||
|
||||
void CClientManager::AddPeer(socket_t fd)
|
||||
CPeer * CClientManager::AddPeer(bufferevent* bufev, sockaddr* addr)
|
||||
{
|
||||
CPeer * pPeer = new CPeer;
|
||||
auto* pPeer = new CPeer;
|
||||
|
||||
if (pPeer->Accept(fd))
|
||||
m_peerList.push_front(pPeer);
|
||||
else
|
||||
delete pPeer;
|
||||
if (!pPeer->Accept(bufev, addr)) {
|
||||
delete pPeer;
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
m_peerList.push_front(pPeer);
|
||||
return pPeer;
|
||||
}
|
||||
|
||||
void CClientManager::RemovePeer(CPeer * pPeer)
|
||||
@ -2807,6 +2897,11 @@ CPeer * CClientManager::GetPeer(IDENT ident)
|
||||
return NULL;
|
||||
}
|
||||
|
||||
CPeer * CClientManager::GetAuthPeer()
|
||||
{
|
||||
return m_pkAuthPeer;
|
||||
}
|
||||
|
||||
CPeer * CClientManager::GetAnyPeer()
|
||||
{
|
||||
if (m_peerList.empty())
|
||||
@ -3132,83 +3227,8 @@ int CClientManager::Process()
|
||||
}
|
||||
}
|
||||
|
||||
int num_events = fdwatch(m_fdWatcher, 0);
|
||||
int idx;
|
||||
CPeer * peer;
|
||||
|
||||
for (idx = 0; idx < num_events; ++idx) // <20><>Dz
|
||||
{
|
||||
peer = (CPeer *) fdwatch_get_client_data(m_fdWatcher, idx);
|
||||
|
||||
if (!peer)
|
||||
{
|
||||
if (fdwatch_check_event(m_fdWatcher, m_fdAccept, idx) == FDW_READ)
|
||||
{
|
||||
AddPeer(m_fdAccept);
|
||||
fdwatch_clear_event(m_fdWatcher, m_fdAccept, idx);
|
||||
}
|
||||
else
|
||||
{
|
||||
sys_err("FDWATCH: peer null in event: ident %d", fdwatch_get_ident(m_fdWatcher, idx));
|
||||
}
|
||||
|
||||
continue;
|
||||
}
|
||||
|
||||
switch (fdwatch_check_event(m_fdWatcher, peer->GetFd(), idx))
|
||||
{
|
||||
case FDW_READ:
|
||||
if (peer->Recv() < 0)
|
||||
{
|
||||
sys_err("Recv failed");
|
||||
RemovePeer(peer);
|
||||
}
|
||||
else
|
||||
{
|
||||
if (peer == m_pkAuthPeer)
|
||||
if (g_log)
|
||||
sys_log(0, "AUTH_PEER_READ: size %d", peer->GetRecvLength());
|
||||
|
||||
ProcessPackets(peer);
|
||||
}
|
||||
break;
|
||||
|
||||
case FDW_WRITE:
|
||||
if (peer == m_pkAuthPeer)
|
||||
if (g_log)
|
||||
sys_log(0, "AUTH_PEER_WRITE: size %d", peer->GetSendLength());
|
||||
|
||||
if (peer->Send() < 0)
|
||||
{
|
||||
sys_err("Send failed");
|
||||
RemovePeer(peer);
|
||||
}
|
||||
|
||||
break;
|
||||
|
||||
case FDW_EOF:
|
||||
RemovePeer(peer);
|
||||
break;
|
||||
|
||||
default:
|
||||
sys_err("fdwatch_check_fd returned unknown result");
|
||||
RemovePeer(peer);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
#ifdef __WIN32__
|
||||
if (_kbhit()) {
|
||||
int c = _getch();
|
||||
switch (c) {
|
||||
case 0x1b: // Esc
|
||||
return 0; // shutdown
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
}
|
||||
#endif
|
||||
// Process network events
|
||||
event_base_loop(m_base, EVLOOP_NONBLOCK);
|
||||
|
||||
VCardProcess();
|
||||
return 1;
|
||||
|
@ -1,10 +1,12 @@
|
||||
// vim:ts=8 sw=4
|
||||
#ifndef __INC_CLIENTMANAGER_H__
|
||||
#define __INC_CLIENTMANAGER_H__
|
||||
|
||||
#include <unordered_map>
|
||||
#include <unordered_set>
|
||||
|
||||
#include <event2/event.h>
|
||||
#include <event2/listener.h>
|
||||
|
||||
#include <common/stl.h>
|
||||
#include <common/building.h>
|
||||
#include <common/auction_table.h>
|
||||
@ -28,7 +30,13 @@ class CPacketInfo
|
||||
|
||||
size_t CreatePlayerSaveQuery(char * pszQuery, size_t querySize, TPlayerTable * pkTab);
|
||||
|
||||
class CClientManager : public CNetBase, public singleton<CClientManager>
|
||||
static void AcceptConnection(evconnlistener *listener, evutil_socket_t fd, sockaddr *address, int socklen, void *ctx);
|
||||
static void AcceptError(evconnlistener *listener, void *ctx);
|
||||
static void ReadHandler(bufferevent *bev, void *ctx);
|
||||
static void WriteHandler(bufferevent *bev, void *ctx);
|
||||
static void EventHandler(bufferevent *bev, short events, void *ctx);
|
||||
|
||||
class CClientManager : public singleton<CClientManager>
|
||||
{
|
||||
public:
|
||||
typedef std::list<CPeer *> TPeerList;
|
||||
@ -165,6 +173,13 @@ class CClientManager : public CNetBase, public singleton<CClientManager>
|
||||
char* GetCommand(char* str); //<2F><><EFBFBD>ϼ<EFBFBD><CFBC><EFBFBD><EFBFBD><EFBFBD><EFBFBD>ɿ<EFBFBD><C9BF><EFBFBD> <20><><EFBFBD>ɾ<EFBFBD> <20><><EFBFBD><EFBFBD> <20>Լ<EFBFBD>
|
||||
void ItemAward(CPeer * peer, char* login); //<2F><><EFBFBD><EFBFBD> <20><><EFBFBD><EFBFBD> <20><><EFBFBD><EFBFBD>
|
||||
|
||||
CPeer * AddPeer(bufferevent* bufev, sockaddr* addr);
|
||||
void RemovePeer(CPeer * pPeer);
|
||||
CPeer * GetPeer(IDENT ident);
|
||||
CPeer * GetAuthPeer();
|
||||
|
||||
void ProcessPackets(CPeer * peer);
|
||||
|
||||
protected:
|
||||
void Destroy();
|
||||
|
||||
@ -190,17 +205,11 @@ class CClientManager : public CNetBase, public singleton<CClientManager>
|
||||
bool MirrorMobTableIntoDB();
|
||||
bool MirrorItemTableIntoDB();
|
||||
|
||||
void AddPeer(socket_t fd);
|
||||
void RemovePeer(CPeer * pPeer);
|
||||
CPeer * GetPeer(IDENT ident);
|
||||
|
||||
int AnalyzeQueryResult(SQLMsg * msg);
|
||||
int AnalyzeErrorMsg(CPeer * peer, SQLMsg * msg);
|
||||
|
||||
int Process();
|
||||
|
||||
void ProcessPackets(CPeer * peer);
|
||||
|
||||
CLoginData * GetLoginData(DWORD dwKey);
|
||||
CLoginData * GetLoginDataByLogin(const char * c_pszLogin);
|
||||
CLoginData * GetLoginDataByAID(DWORD dwAID);
|
||||
@ -386,7 +395,8 @@ class CClientManager : public CNetBase, public singleton<CClientManager>
|
||||
|
||||
private:
|
||||
int m_looping;
|
||||
socket_t m_fdAccept; // <20><><EFBFBD><EFBFBD> <20><EFBFBD> <20><><EFBFBD><EFBFBD>
|
||||
event_base * m_base;
|
||||
evconnlistener * m_listener;
|
||||
TPeerList m_peerList;
|
||||
|
||||
CPeer * m_pkAuthPeer;
|
||||
|
@ -5,8 +5,6 @@
|
||||
// <20><><EFBFBD><EFBFBD> Ŀ<>ؼ<EFBFBD> Ŭ<><C5AC><EFBFBD><EFBFBD><EFBFBD><EFBFBD> <20><><EFBFBD><EFBFBD><EFBFBD><EFBFBD>... <20><><EFBFBD><EFBFBD><EFBFBD><EFBFBD> <20><><EFBFBD><EFBFBD><EFBFBD>ؼ<EFBFBD> <20><><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD> <20><><EFBFBD><EFBFBD> <20>ƿ<DEBE><C6BF><EFBFBD>
|
||||
// <20><><EFBFBD><EFBFBD> <20>ϵ<EFBFBD><CFB5><EFBFBD> ó<><C3B3><EFBFBD>Ѵ<EFBFBD>.
|
||||
// <20>ڵ<EFBFBD> by <20><><EFBFBD><EFBFBD> <20>ķα<CEB1><D7B7><EFBFBD> <20>Ƴ<EFBFBD><C6B3><EFBFBD>~ = _=)b
|
||||
#include <mysql/mysql.h>
|
||||
|
||||
#include <libsql/include/CAsyncSQL.h>
|
||||
|
||||
#define SQL_SAFE_LENGTH(size) (size * 2 + 1)
|
||||
|
@ -70,7 +70,6 @@ int main()
|
||||
#endif
|
||||
|
||||
CConfig Config;
|
||||
CNetPoller poller;
|
||||
CDBManager DBManager;
|
||||
CClientManager ClientManager;
|
||||
PlayerHB player_hb;
|
||||
@ -369,12 +368,6 @@ int Start()
|
||||
sys_err("SQL_HOTBACKUP not configured");
|
||||
return false;
|
||||
}
|
||||
|
||||
if (!CNetPoller::instance().Create())
|
||||
{
|
||||
sys_err("Cannot create network poller");
|
||||
return false;
|
||||
}
|
||||
|
||||
sys_log(0, "ClientManager initialization.. ");
|
||||
|
||||
|
@ -1,54 +0,0 @@
|
||||
#include "stdafx.h"
|
||||
#include "NetBase.h"
|
||||
#include "Config.h"
|
||||
#include "ClientManager.h"
|
||||
|
||||
LPFDWATCH CNetBase::m_fdWatcher = NULL;
|
||||
|
||||
CNetBase::CNetBase()
|
||||
{
|
||||
}
|
||||
|
||||
CNetBase::~CNetBase()
|
||||
{
|
||||
}
|
||||
|
||||
CNetPoller::CNetPoller()
|
||||
{
|
||||
}
|
||||
|
||||
CNetPoller::~CNetPoller()
|
||||
{
|
||||
Destroy();
|
||||
}
|
||||
|
||||
bool CNetPoller::Create()
|
||||
{
|
||||
sys_log(1, "NetPoller::Create()");
|
||||
|
||||
if (m_fdWatcher)
|
||||
return true;
|
||||
|
||||
m_fdWatcher = fdwatch_new(512);
|
||||
|
||||
if (!m_fdWatcher)
|
||||
{
|
||||
Destroy();
|
||||
return false;
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
void CNetPoller::Destroy()
|
||||
{
|
||||
sys_log(1, "NetPoller::Destroy()");
|
||||
|
||||
if (m_fdWatcher)
|
||||
{
|
||||
fdwatch_delete(m_fdWatcher);
|
||||
m_fdWatcher = NULL;
|
||||
}
|
||||
|
||||
thecore_destroy();
|
||||
}
|
@ -1,25 +0,0 @@
|
||||
// vim: ts=8 sw=4
|
||||
#ifndef __INC_NETWORKBASE_H__
|
||||
#define __INC_NETWORKBASE_H__
|
||||
|
||||
class CNetBase
|
||||
{
|
||||
public:
|
||||
CNetBase();
|
||||
virtual ~CNetBase();
|
||||
|
||||
protected:
|
||||
static LPFDWATCH m_fdWatcher;
|
||||
};
|
||||
|
||||
class CNetPoller : public CNetBase, public singleton<CNetPoller>
|
||||
{
|
||||
public:
|
||||
CNetPoller();
|
||||
virtual ~CNetPoller();
|
||||
|
||||
bool Create();
|
||||
void Destroy();
|
||||
};
|
||||
|
||||
#endif
|
@ -29,13 +29,7 @@ void CPeer::OnAccept()
|
||||
static DWORD current_handle = 0;
|
||||
m_dwHandle = ++current_handle;
|
||||
|
||||
sys_log(0, "Connection accepted. (host: %s handle: %u fd: %d)", m_host, m_dwHandle, m_fd);
|
||||
}
|
||||
|
||||
void CPeer::OnConnect()
|
||||
{
|
||||
sys_log(0, "Connection established. (host: %s handle: %u fd: %d)", m_host, m_dwHandle, m_fd);
|
||||
m_state = STATE_PLAYING;
|
||||
sys_log(0, "Connection accepted. (host: %s handle: %u)", m_host, m_dwHandle);
|
||||
}
|
||||
|
||||
void CPeer::OnClose()
|
||||
@ -72,7 +66,7 @@ bool CPeer::PeekPacket(int & iBytesProceed, BYTE & header, DWORD & dwHandle, DWO
|
||||
if (GetRecvLength() < iBytesProceed + 9)
|
||||
return false;
|
||||
|
||||
const char * buf = (const char *) GetRecvBuffer();
|
||||
const char * buf = (const char *) GetRecvBuffer(iBytesProceed + 9);
|
||||
buf += iBytesProceed;
|
||||
|
||||
header = *(buf++);
|
||||
@ -114,14 +108,6 @@ void CPeer::EncodeReturn(BYTE header, DWORD dwHandle)
|
||||
EncodeHeader(header, dwHandle, 0);
|
||||
}
|
||||
|
||||
int CPeer::Send()
|
||||
{
|
||||
if (m_state == STATE_CLOSE)
|
||||
return -1;
|
||||
|
||||
return (CPeerBase::Send());
|
||||
}
|
||||
|
||||
void CPeer::SetP2PPort(WORD wPort)
|
||||
{
|
||||
m_wP2PPort = wPort;
|
||||
|
@ -9,7 +9,6 @@ class CPeer : public CPeerBase
|
||||
protected:
|
||||
virtual void OnAccept();
|
||||
virtual void OnClose();
|
||||
virtual void OnConnect();
|
||||
|
||||
public:
|
||||
#pragma pack(1)
|
||||
@ -34,7 +33,6 @@ class CPeer : public CPeerBase
|
||||
void EncodeReturn(BYTE header, DWORD dwHandle);
|
||||
|
||||
void ProcessInput();
|
||||
int Send();
|
||||
|
||||
DWORD GetHandle();
|
||||
DWORD GetUserCount();
|
||||
|
@ -1,7 +1,9 @@
|
||||
#include "stdafx.h"
|
||||
#include "PeerBase.h"
|
||||
|
||||
CPeerBase::CPeerBase() : m_fd(INVALID_SOCKET), m_BytesRemain(0), m_outBuffer(NULL), m_inBuffer(NULL)
|
||||
#include <event2/buffer.h>
|
||||
|
||||
CPeerBase::CPeerBase() : m_bufferevent(nullptr)
|
||||
{
|
||||
}
|
||||
|
||||
@ -10,83 +12,53 @@ CPeerBase::~CPeerBase()
|
||||
Destroy();
|
||||
}
|
||||
|
||||
void CPeerBase::Disconnect()
|
||||
{
|
||||
if (m_fd != INVALID_SOCKET)
|
||||
{
|
||||
fdwatch_del_fd(m_fdWatcher, m_fd);
|
||||
|
||||
socket_close(m_fd);
|
||||
m_fd = INVALID_SOCKET;
|
||||
}
|
||||
}
|
||||
|
||||
void CPeerBase::Destroy()
|
||||
{
|
||||
Disconnect();
|
||||
|
||||
if (m_outBuffer)
|
||||
{
|
||||
buffer_delete(m_outBuffer);
|
||||
m_outBuffer = NULL;
|
||||
}
|
||||
|
||||
if (m_inBuffer)
|
||||
{
|
||||
buffer_delete(m_inBuffer);
|
||||
m_inBuffer = NULL;
|
||||
}
|
||||
if (m_bufferevent) {
|
||||
bufferevent_free(m_bufferevent);
|
||||
m_bufferevent = nullptr;
|
||||
}
|
||||
}
|
||||
|
||||
bool CPeerBase::Accept(socket_t fd_accept)
|
||||
bool CPeerBase::Accept(bufferevent* bufev, sockaddr* addr)
|
||||
{
|
||||
struct sockaddr_in peer;
|
||||
if (!bufev) {
|
||||
sys_err("Cannot accept empty bufferevent!");
|
||||
return false;
|
||||
}
|
||||
|
||||
if ((m_fd = socket_accept(fd_accept, &peer)) == INVALID_SOCKET)
|
||||
{
|
||||
Destroy();
|
||||
return false;
|
||||
}
|
||||
if (m_bufferevent != nullptr) {
|
||||
sys_err("Peer is already initialized");
|
||||
return false;
|
||||
}
|
||||
|
||||
//socket_block(m_fd);
|
||||
socket_sndbuf(m_fd, 233016);
|
||||
socket_rcvbuf(m_fd, 233016);
|
||||
// Save the bufferevent
|
||||
m_bufferevent = bufev;
|
||||
|
||||
strncpy(m_host, inet_ntoa(peer.sin_addr), sizeof(m_host));
|
||||
m_outBuffer = buffer_new(DEFAULT_PACKET_BUFFER_SIZE);
|
||||
m_inBuffer = buffer_new(MAX_INPUT_LEN);
|
||||
// Get the address of the conected peer
|
||||
sockaddr_in* peer;
|
||||
sockaddr_in6* peer6;
|
||||
|
||||
if (!m_outBuffer || !m_inBuffer)
|
||||
{
|
||||
Destroy();
|
||||
return false;
|
||||
}
|
||||
switch (addr->sa_family) {
|
||||
case AF_INET:
|
||||
peer = (sockaddr_in*) addr;
|
||||
inet_ntop(AF_INET, &(peer->sin_addr), m_host, INET_ADDRSTRLEN);
|
||||
break;
|
||||
|
||||
fdwatch_add_fd(m_fdWatcher, m_fd, this, FDW_READ, false);
|
||||
case AF_INET6:
|
||||
peer6 = (sockaddr_in6*) addr;
|
||||
inet_ntop(AF_INET, &(peer6->sin6_addr), m_host, INET6_ADDRSTRLEN);
|
||||
break;
|
||||
|
||||
default:
|
||||
break;
|
||||
}
|
||||
|
||||
// Trigger the OnAccept event
|
||||
OnAccept();
|
||||
sys_log(0, "ACCEPT FROM %s", inet_ntoa(peer.sin_addr));
|
||||
return true;
|
||||
}
|
||||
|
||||
bool CPeerBase::Connect(const char* host, WORD port)
|
||||
{
|
||||
strncpy(m_host, host, sizeof(m_host));
|
||||
sys_log(0, "ACCEPT FROM %s", m_host);
|
||||
|
||||
if ((m_fd = socket_connect(host, port)) == INVALID_SOCKET)
|
||||
return false;
|
||||
|
||||
m_outBuffer = buffer_new(DEFAULT_PACKET_BUFFER_SIZE);
|
||||
|
||||
if (!m_outBuffer)
|
||||
{
|
||||
Destroy();
|
||||
return false;
|
||||
}
|
||||
|
||||
fdwatch_add_fd(m_fdWatcher, m_fd, this, FDW_READ, false);
|
||||
|
||||
OnConnect();
|
||||
return true;
|
||||
}
|
||||
|
||||
@ -97,118 +69,77 @@ void CPeerBase::Close()
|
||||
|
||||
void CPeerBase::EncodeBYTE(BYTE b)
|
||||
{
|
||||
if (!m_outBuffer)
|
||||
{
|
||||
sys_err("Not ready to write");
|
||||
return;
|
||||
}
|
||||
|
||||
buffer_write(m_outBuffer, &b, 1);
|
||||
fdwatch_add_fd(m_fdWatcher, m_fd, this, FDW_WRITE, true);
|
||||
Encode(&b, sizeof(b));
|
||||
}
|
||||
|
||||
void CPeerBase::EncodeWORD(WORD w)
|
||||
{
|
||||
if (!m_outBuffer)
|
||||
{
|
||||
sys_err("Not ready to write");
|
||||
return;
|
||||
}
|
||||
|
||||
buffer_write(m_outBuffer, &w, 2);
|
||||
fdwatch_add_fd(m_fdWatcher, m_fd, this, FDW_WRITE, true);
|
||||
Encode(&w, sizeof(w));
|
||||
}
|
||||
|
||||
void CPeerBase::EncodeDWORD(DWORD dw)
|
||||
{
|
||||
if (!m_outBuffer)
|
||||
Encode(&dw, sizeof(dw));
|
||||
}
|
||||
|
||||
void CPeerBase::Encode(const void* data, size_t size)
|
||||
{
|
||||
if (!m_bufferevent)
|
||||
{
|
||||
sys_err("Not ready to write");
|
||||
sys_err("Bufferevent not ready!");
|
||||
return;
|
||||
}
|
||||
|
||||
buffer_write(m_outBuffer, &dw, 4);
|
||||
fdwatch_add_fd(m_fdWatcher, m_fd, this, FDW_WRITE, true);
|
||||
if(bufferevent_write(m_bufferevent, data, size) != 0) {
|
||||
sys_err("Buffer write error!");
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
void CPeerBase::Encode(const void* data, DWORD size)
|
||||
void CPeerBase::RecvEnd(size_t proceed_bytes)
|
||||
{
|
||||
if (!m_outBuffer)
|
||||
{
|
||||
sys_err("Not ready to write");
|
||||
return;
|
||||
}
|
||||
if (!m_bufferevent)
|
||||
{
|
||||
sys_err("Bufferevent not ready!");
|
||||
return;
|
||||
}
|
||||
|
||||
buffer_write(m_outBuffer, data, size);
|
||||
fdwatch_add_fd(m_fdWatcher, m_fd, this, FDW_WRITE, true);
|
||||
evbuffer *input = bufferevent_get_input(m_bufferevent);
|
||||
evbuffer_drain(input, proceed_bytes);
|
||||
}
|
||||
|
||||
int CPeerBase::Recv()
|
||||
size_t CPeerBase::GetRecvLength()
|
||||
{
|
||||
if (!m_inBuffer)
|
||||
{
|
||||
sys_err("input buffer nil");
|
||||
return -1;
|
||||
}
|
||||
if (!m_bufferevent)
|
||||
{
|
||||
sys_err("Bufferevent not ready!");
|
||||
return 0;
|
||||
}
|
||||
|
||||
buffer_adjust_size(m_inBuffer, MAX_INPUT_LEN >> 2);
|
||||
int bytes_to_read = buffer_has_space(m_inBuffer);
|
||||
ssize_t bytes_read = socket_read(m_fd, (char *) buffer_write_peek(m_inBuffer), bytes_to_read);
|
||||
|
||||
if (bytes_read < 0)
|
||||
{
|
||||
sys_err("socket_read failed %s", strerror(errno));
|
||||
return -1;
|
||||
}
|
||||
else if (bytes_read == 0)
|
||||
return 0;
|
||||
|
||||
buffer_write_proceed(m_inBuffer, bytes_read);
|
||||
m_BytesRemain = buffer_size(m_inBuffer);
|
||||
return 1;
|
||||
evbuffer *input = bufferevent_get_input(m_bufferevent);
|
||||
return evbuffer_get_length(input);
|
||||
}
|
||||
|
||||
void CPeerBase::RecvEnd(int proceed_bytes)
|
||||
const void * CPeerBase::GetRecvBuffer(ssize_t ensure_bytes)
|
||||
{
|
||||
buffer_read_proceed(m_inBuffer, proceed_bytes);
|
||||
m_BytesRemain = buffer_size(m_inBuffer);
|
||||
if (!m_bufferevent)
|
||||
{
|
||||
sys_err("Bufferevent not ready!");
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
evbuffer *input = bufferevent_get_input(m_bufferevent);
|
||||
return evbuffer_pullup(input, ensure_bytes);
|
||||
}
|
||||
|
||||
int CPeerBase::GetRecvLength()
|
||||
size_t CPeerBase::GetSendLength()
|
||||
{
|
||||
return m_BytesRemain;
|
||||
}
|
||||
|
||||
const void * CPeerBase::GetRecvBuffer()
|
||||
{
|
||||
return buffer_read_peek(m_inBuffer);
|
||||
}
|
||||
|
||||
int CPeerBase::GetSendLength()
|
||||
{
|
||||
return buffer_size(m_outBuffer);
|
||||
}
|
||||
|
||||
int CPeerBase::Send()
|
||||
{
|
||||
if (buffer_size(m_outBuffer) <= 0)
|
||||
return 0;
|
||||
|
||||
int iBufferLeft = fdwatch_get_buffer_size(m_fdWatcher, m_fd);
|
||||
int iBytesToWrite = MIN(iBufferLeft, buffer_size(m_outBuffer));
|
||||
|
||||
if (iBytesToWrite == 0)
|
||||
return 0;
|
||||
|
||||
int result = socket_write(m_fd, (const char *) buffer_read_peek(m_outBuffer), iBytesToWrite);
|
||||
|
||||
if (result == 0)
|
||||
{
|
||||
buffer_read_proceed(m_outBuffer, iBytesToWrite);
|
||||
|
||||
if (buffer_size(m_outBuffer) != 0)
|
||||
fdwatch_add_fd(m_fdWatcher, m_fd, this, FDW_WRITE, true);
|
||||
}
|
||||
|
||||
return (result);
|
||||
if (!m_bufferevent)
|
||||
{
|
||||
sys_err("Bufferevent not ready!");
|
||||
return 0;
|
||||
}
|
||||
|
||||
evbuffer *output = bufferevent_get_output(m_bufferevent);
|
||||
return evbuffer_get_length(output);
|
||||
}
|
||||
|
@ -1,61 +1,40 @@
|
||||
// vim: ts=8 sw=4
|
||||
#ifndef __INC_PEERBASE_H__
|
||||
#define __INC_PEERBASE_H__
|
||||
|
||||
#include "NetBase.h"
|
||||
#include <event2/bufferevent.h>
|
||||
|
||||
class CPeerBase : public CNetBase
|
||||
{
|
||||
public:
|
||||
enum
|
||||
{
|
||||
MAX_HOST_LENGTH = 30,
|
||||
MAX_INPUT_LEN = 1024 * 1024 * 2,
|
||||
DEFAULT_PACKET_BUFFER_SIZE = 1024 * 1024 * 2
|
||||
};
|
||||
|
||||
protected:
|
||||
class CPeerBase {
|
||||
protected:
|
||||
virtual void OnAccept() = 0;
|
||||
virtual void OnConnect() = 0;
|
||||
virtual void OnClose() = 0;
|
||||
|
||||
public:
|
||||
bool Accept(socket_t accept_fd);
|
||||
bool Connect(const char* host, WORD port);
|
||||
void Close();
|
||||
public:
|
||||
bool Accept(bufferevent* bufev, sockaddr* addr);
|
||||
void Close();
|
||||
|
||||
public:
|
||||
public:
|
||||
CPeerBase();
|
||||
virtual ~CPeerBase();
|
||||
|
||||
void Disconnect();
|
||||
void Destroy();
|
||||
void Destroy();
|
||||
|
||||
socket_t GetFd() { return m_fd; }
|
||||
bufferevent * GetBufferevent() { return m_bufferevent; }
|
||||
|
||||
void EncodeBYTE(BYTE b);
|
||||
void EncodeWORD(WORD w);
|
||||
void EncodeDWORD(DWORD dw);
|
||||
void Encode(const void* data, DWORD size);
|
||||
int Send();
|
||||
void EncodeBYTE(BYTE b);
|
||||
void EncodeWORD(WORD w);
|
||||
void EncodeDWORD(DWORD dw);
|
||||
void Encode(const void* data, size_t size);
|
||||
void RecvEnd(size_t proceed_bytes);
|
||||
size_t GetRecvLength();
|
||||
const void * GetRecvBuffer(ssize_t ensure_bytes);
|
||||
|
||||
int Recv();
|
||||
void RecvEnd(int proceed_bytes);
|
||||
int GetRecvLength();
|
||||
const void * GetRecvBuffer();
|
||||
|
||||
int GetSendLength();
|
||||
size_t GetSendLength();
|
||||
|
||||
const char * GetHost() { return m_host; }
|
||||
|
||||
protected:
|
||||
char m_host[MAX_HOST_LENGTH + 1];
|
||||
socket_t m_fd;
|
||||
|
||||
private:
|
||||
int m_BytesRemain;
|
||||
LPBUFFER m_outBuffer;
|
||||
LPBUFFER m_inBuffer;
|
||||
protected:
|
||||
char m_host[IP_ADDRESS_LENGTH + 1];
|
||||
bufferevent * m_bufferevent;
|
||||
};
|
||||
|
||||
#endif
|
||||
|
Reference in New Issue
Block a user