change: Start updating ClientManager.

add: Callback AsyncQuery.
fix: Multiple missing `txn.commit()`.
This commit is contained in:
WildEgo 2025-06-09 17:01:28 +01:00
parent d25bf59c6f
commit 74e9f0e99d
9 changed files with 280 additions and 162 deletions

View File

@ -23,7 +23,8 @@ extern int g_iItemCacheFlushSeconds;
extern int g_test_server; extern int g_test_server;
extern std::string g_stLocale; extern std::string g_stLocale;
extern std::string g_stLocaleNameColumn; extern std::string g_stLocaleNameColumn;
bool CreateItemTableFromRes(MYSQL_RES *res, std::vector<TPlayerItem> *pVec, DWORD dwPID);
bool CreateItemTableFromRes(pqxx::result *res, std::vector<TPlayerItem> *pVec, DWORD dwPID);
DWORD g_dwUsageMax = 0; DWORD g_dwUsageMax = 0;
DWORD g_dwUsageAvg = 0; DWORD g_dwUsageAvg = 0;
@ -248,10 +249,6 @@ bool CClientManager::Initialize()
LoadEventFlag(); LoadEventFlag();
// database character-set을 강제로 맞춤
if (g_stLocale == "big5" || g_stLocale == "sjis")
CDBManager::instance().QueryLocaleSet();
return true; return true;
} }
@ -262,17 +259,18 @@ void CClientManager::MainLoop()
SPDLOG_DEBUG("ClientManager pointer is {}", (void *)this); SPDLOG_DEBUG("ClientManager pointer is {}", (void *)this);
// 메인루프 // 메인루프
while (!m_bShutdowned) // TODO Look at this
{ // while (!m_bShutdowned)
while ((tmp = CDBManager::instance().PopResult())) // {
{ // while ((tmp = CDBManager::instance().PopResult()))
AnalyzeQueryResult(tmp); // {
delete tmp; // AnalyzeQueryResult(tmp);
} // delete tmp;
// }
if (!Process()) // if (!Process())
break; // break;
} // }
// //
// 메인루프 종료처리 // 메인루프 종료처리
@ -527,24 +525,53 @@ void CClientManager::QUERY_QUEST_SAVE(CPeer *pkPeer, TQuestTable *pTable, DWORD
int iSize = dwLen / sizeof(TQuestTable); int iSize = dwLen / sizeof(TQuestTable);
char szQuery[1024]; // We move to prepared queries for speed and SAFETY
std::vector<TQuestTable> inserts;
std::vector<TQuestTable> deletes;
for (int i = 0; i < iSize; ++i, ++pTable) for (int i = 0; i < iSize; ++i, ++pTable)
{ {
if (pTable->lValue == 0) if (pTable->lValue == 0)
{ deletes.push_back(*pTable);
snprintf(szQuery, sizeof(szQuery),
"DELETE FROM quest%s WHERE dwPID=%d AND szName='%s' AND szState='%s'",
GetTablePostfix(), pTable->dwPID, pTable->szName, pTable->szState);
}
else else
inserts.push_back(*pTable);
}
auto pool = CDBManager::instance().GetConnectionPool();
auto conn = pool->acquire();
// Prepare statements
static bool prepared = false;
if (!prepared)
{
conn->prepare("delete_quest",
"DELETE FROM player.quest WHERE player_id = $1 AND name = $2 AND state = $3");
conn->prepare("upsert_quest",
"INSERT INTO player.quest (player_id, name, state, value) VALUES ($1, $2, $3, $4) ON CONFLICT (player_id, name, state) DO UPDATE SET value = EXCLUDED.value");
prepared = true;
}
try
{
pqxx::work txn{*conn};
for (const auto &row : deletes)
{ {
snprintf(szQuery, sizeof(szQuery), txn.exec_prepared("delete_quest", row.dwPID, row.szName, row.szState);
"REPLACE INTO quest%s (dwPID, szName, szState, lValue) VALUES(%d, '%s', '%s', %d)",
GetTablePostfix(), pTable->dwPID, pTable->szName, pTable->szState, pTable->lValue);
} }
CDBManager::instance().ReturnQuery(szQuery, QID_QUEST_SAVE, pkPeer->GetHandle(), NULL); for (const auto &row : inserts)
{
txn.exec_prepared("upsert_quest", row.dwPID, row.szName, row.szState, row.lValue);
}
txn.commit();
}
catch (const std::exception &e)
{
SPDLOG_ERROR("[CClientManager::QUERY_QUEST_SAVE] Query error: {}", e.what());
} }
} }
@ -557,26 +584,30 @@ void CClientManager::QUERY_SAFEBOX_LOAD(CPeer *pkPeer, DWORD dwHandle, TSafeboxL
pi->ip[0] = bMall ? 1 : 0; pi->ip[0] = bMall ? 1 : 0;
strlcpy(pi->login, packet->szLogin, sizeof(pi->login)); strlcpy(pi->login, packet->szLogin, sizeof(pi->login));
char szQuery[QUERY_MAX_LEN]; CDBManager::instance().AsyncQuery(
snprintf(szQuery, sizeof(szQuery), "SELECT account_id, size, password FROM player.safebox WHERE account_id = $1",
"SELECT account_id, size, password FROM safebox%s WHERE account_id=%u", pqxx::params(packet->dwID),
GetTablePostfix(), packet->dwID); [this, pkPeer, pi](const pqxx::result &result, const std::string &error)
{
if (!error.empty())
{
return;
}
SPDLOG_DEBUG("QUERY_RESULT: HEADER_GD_SAFEBOX_LOAD");
CClientManager::RESULT_SAFEBOX_LOAD(pkPeer, pi, result);
});
SPDLOG_TRACE("HEADER_GD_SAFEBOX_LOAD (handle: {} account.id {} is_mall {})", dwHandle, packet->dwID, bMall ? 1 : 0); SPDLOG_TRACE("HEADER_GD_SAFEBOX_LOAD (handle: {} account.id {} is_mall {})", dwHandle, packet->dwID, bMall ? 1 : 0);
CDBManager::instance().ReturnQuery(szQuery, QID_SAFEBOX_LOAD, pkPeer->GetHandle(), pi);
} }
void CClientManager::RESULT_SAFEBOX_LOAD(CPeer *pkPeer, SQLMsg *msg) void CClientManager::RESULT_SAFEBOX_LOAD(CPeer *pkPeer, ClientHandleInfo *pi, pqxx::result msg)
{ {
CQueryInfo *qi = (CQueryInfo *)msg->pvUserData;
ClientHandleInfo *pi = (ClientHandleInfo *)qi->pvData;
DWORD dwHandle = pi->dwHandle; DWORD dwHandle = pi->dwHandle;
// 여기에서 사용하는 account_index는 쿼리 순서를 말한다. // The account_index used here refers to the query order.
// 첫번째 패스워드 알아내기 위해 하는 쿼리가 0 // The first query to retrieve the password is 0.
// 두번째 실제 데이터를 얻어놓는 쿼리가 1 // The second query to fetch the actual data is 1.
if (pi->account_index == 0) if (pi->account_index == 0)
{ {
char szSafeboxPassword[SAFEBOX_PASSWORD_MAX_LEN + 1]; char szSafeboxPassword[SAFEBOX_PASSWORD_MAX_LEN + 1];
@ -585,9 +616,7 @@ void CClientManager::RESULT_SAFEBOX_LOAD(CPeer *pkPeer, SQLMsg *msg)
TSafeboxTable *pSafebox = new TSafeboxTable; TSafeboxTable *pSafebox = new TSafeboxTable;
memset(pSafebox, 0, sizeof(TSafeboxTable)); memset(pSafebox, 0, sizeof(TSafeboxTable));
SQLResult *res = msg->Get(); if (msg.size() == 0)
if (res->uiNumRows == 0)
{ {
if (strcmp("000000", szSafeboxPassword)) if (strcmp("000000", szSafeboxPassword))
{ {
@ -598,32 +627,38 @@ void CClientManager::RESULT_SAFEBOX_LOAD(CPeer *pkPeer, SQLMsg *msg)
} }
else else
{ {
MYSQL_ROW row = mysql_fetch_row(res->pSQLResult); pqxx::row row = msg[0];
// account_id, size, password
auto account_id = row[0].c_str();
auto size = row[1].c_str();
auto password = row[2].c_str();
// 비밀번호가 틀리면.. // If the password is incorrect...
if (((!row[2] || !*row[2]) && strcmp("000000", szSafeboxPassword)) || if (((!password || !*password) && strcmp("000000", szSafeboxPassword)) ||
((row[2] && *row[2]) && strcmp(row[2], szSafeboxPassword))) ((password && *password) && strcmp(password, szSafeboxPassword)))
{ {
pkPeer->EncodeHeader(HEADER_DG_SAFEBOX_WRONG_PASSWORD, dwHandle, 0); pkPeer->EncodeHeader(HEADER_DG_SAFEBOX_WRONG_PASSWORD, dwHandle, 0);
delete pi; delete pi;
return; return;
} }
if (!row[0]) if (!account_id)
pSafebox->dwID = 0; pSafebox->dwID = 0;
else else
str_to_number(pSafebox->dwID, row[0]); str_to_number(pSafebox->dwID, account_id);
if (!row[1]) if (!size)
pSafebox->bSize = 0; pSafebox->bSize = 0;
else else
str_to_number(pSafebox->bSize, row[1]); str_to_number(pSafebox->bSize, size);
/* /*
if (!row[3]) if (!row[3])
pSafebox->dwGold = 0; pSafebox->dwGold = 0;
else else
pSafebox->dwGold = atoi(row[3]); pSafebox->dwGold = atoi(row[3]);
*/ */
if (pi->ip[0] == 1) if (pi->ip[0] == 1)
{ {
pSafebox->bSize = 1; pSafebox->bSize = 1;
@ -637,27 +672,32 @@ void CClientManager::RESULT_SAFEBOX_LOAD(CPeer *pkPeer, SQLMsg *msg)
pSafebox->dwID = pi->account_id; pSafebox->dwID = pi->account_id;
pi->pSafebox = pSafebox; pi->pSafebox = pSafebox;
char szQuery[512];
snprintf(szQuery, sizeof(szQuery),
"SELECT id, window+0, pos, count, vnum, socket0, socket1, socket2, "
"attrtype0, attrvalue0, "
"attrtype1, attrvalue1, "
"attrtype2, attrvalue2, "
"attrtype3, attrvalue3, "
"attrtype4, attrvalue4, "
"attrtype5, attrvalue5, "
"attrtype6, attrvalue6 "
"FROM item%s WHERE owner_id=%d AND window='%s'",
GetTablePostfix(), pi->account_id, pi->ip[0] == 0 ? "SAFEBOX" : "MALL");
pi->account_index = 1; pi->account_index = 1;
CDBManager::instance().ReturnQuery(szQuery, QID_SAFEBOX_LOAD, pkPeer->GetHandle(), pi); CDBManager::instance().AsyncQuery(
"SELECT id, window_bits, position, count, vnum, socket_0, socket_1, socket_2, "
"attr_type_0, attr_value_0, "
"attr_type_1, attr_value_1, "
"attr_type_2, attr_value_2, "
"attr_type_3, attr_value_3, "
"attr_type_4, attr_value_4, "
"attr_type_5, attr_value_5, "
"attr_type_6, attr_value_6 "
"FROM player.item WHERE owner_id = $1 AND window = $2",
pqxx::params{pi->account_id, pi->ip[0] == 0 ? "SAFEBOX" : "MALL"},
[this, pkPeer, pi](const pqxx::result &result, const std::string &error)
{
if (!error.empty())
{
return;
}
SPDLOG_DEBUG("QUERY_RESULT: HEADER_GD_SAFEBOX_LOAD");
CClientManager::RESULT_SAFEBOX_LOAD(pkPeer, pi, result);
});
} }
else else
{ {
if (!pi->pSafebox) if (!pi->pSafebox)
{ {
SPDLOG_ERROR("null safebox pointer!"); SPDLOG_ERROR("null safebox pointer!");
@ -665,9 +705,9 @@ void CClientManager::RESULT_SAFEBOX_LOAD(CPeer *pkPeer, SQLMsg *msg)
return; return;
} }
// 쿼리에 에러가 있었으므로 응답할 경우 창고가 비어있는 것 처럼 // Since there was an error in the query, responding would make it look like the warehouse is empty,
// 보이기 때문에 창고가 아얘 안열리는게 나음 // so it's better not to open the warehouse at all.
if (!msg->Get()->pSQLResult) if (msg.size() == 0)
{ {
SPDLOG_ERROR("null safebox result"); SPDLOG_ERROR("null safebox result");
delete pi; delete pi;
@ -675,7 +715,7 @@ void CClientManager::RESULT_SAFEBOX_LOAD(CPeer *pkPeer, SQLMsg *msg)
} }
static std::vector<TPlayerItem> s_items; static std::vector<TPlayerItem> s_items;
CreateItemTableFromRes(msg->Get()->pSQLResult, &s_items, pi->account_id); CreateItemTableFromRes(&msg, &s_items, pi->account_id);
std::set<TItemAward *> *pSet = ItemAwardManager::instance().GetByLogin(pi->login); std::set<TItemAward *> *pSet = ItemAwardManager::instance().GetByLogin(pi->login);
@ -707,8 +747,6 @@ void CClientManager::RESULT_SAFEBOX_LOAD(CPeer *pkPeer, SQLMsg *msg)
typeof(pSet->begin()) it = pSet->begin(); typeof(pSet->begin()) it = pSet->begin();
char szQuery[512];
while (it != pSet->end()) while (it != pSet->end())
{ {
TItemAward *pItemAward = *(it++); TItemAward *pItemAward = *(it++);
@ -781,8 +819,8 @@ void CClientManager::RESULT_SAFEBOX_LOAD(CPeer *pkPeer, SQLMsg *msg)
case 72728: case 72728:
case 72729: case 72729:
case 72730: case 72730:
// 무시무시하지만 이전에 하던 걸 고치기는 무섭고... // It's terrifying, but fixing the old implementation is even scarier...
// 그래서 그냥 하드 코딩. 선물 상자용 자동물약 아이템들. // So just hardcoding it. Auto potion items for gift boxes.
case 76004: case 76004:
case 76005: case 76005:
case 76021: case 76021:
@ -844,37 +882,49 @@ void CClientManager::RESULT_SAFEBOX_LOAD(CPeer *pkPeer, SQLMsg *msg)
} }
} }
} }
snprintf(szQuery, sizeof(szQuery),
"INSERT INTO item%s (id, owner_id, window, pos, vnum, count, socket0, socket1, socket2) "
"VALUES(%u, %u, '%s', %d, %u, %u, %u, %u, %u)",
GetTablePostfix(),
GainItemID(),
pi->account_id,
pi->ip[0] == 0 ? "SAFEBOX" : "MALL",
iPos,
pItemAward->dwVnum, pItemAward->dwCount, pItemAward->dwSocket0, pItemAward->dwSocket1, dwSocket2);
} }
std::unique_ptr<SQLMsg> pmsg(CDBManager::instance().DirectQuery(szQuery)); auto pool = CDBManager::instance().GetConnectionPool();
SQLResult *pRes = pmsg->Get(); auto conn = pool->acquire();
SPDLOG_DEBUG("SAFEBOX Query : [{}]", szQuery);
if (pRes->uiAffectedRows == 0 || pRes->uiInsertID == 0 || pRes->uiAffectedRows == (uint32_t)-1) try
break; {
pqxx::work txn{*conn};
pqxx::result res = txn.exec_params(
"INSERT INTO player.item (id, owner_id, window, position, vnum, count, socket_0, socket_1, socket_2) "
"VALUES($1, $2, $3, $4, $5, $6, $7, $8, $9) RETURNING id",
pqxx::params{
GainItemID(),
pi->account_id,
pi->ip[0] == 0 ? "SAFEBOX" : "MALL",
iPos,
pItemAward->dwVnum, pItemAward->dwCount, pItemAward->dwSocket0, pItemAward->dwSocket1, dwSocket2});
item.id = pmsg->Get()->uiInsertID; SPDLOG_DEBUG("SAFEBOX Query");
item.window = pi->ip[0] == 0 ? SAFEBOX : MALL,
item.pos = iPos;
item.count = pItemAward->dwCount;
item.vnum = pItemAward->dwVnum;
item.alSockets[0] = pItemAward->dwSocket0;
item.alSockets[1] = pItemAward->dwSocket1;
item.alSockets[2] = dwSocket2;
s_items.push_back(item);
vec_dwFinishedAwardID.push_back(std::make_pair(pItemAward->dwID, item.id)); if (res.size() == 0)
grid.Put(iPos, 1, it->second->bSize); break;
item.id = res[0][0].as<DWORD>();
item.window = pi->ip[0] == 0 ? SAFEBOX : MALL,
item.pos = iPos;
item.count = pItemAward->dwCount;
item.vnum = pItemAward->dwVnum;
item.alSockets[0] = pItemAward->dwSocket0;
item.alSockets[1] = pItemAward->dwSocket1;
item.alSockets[2] = dwSocket2;
s_items.push_back(item);
vec_dwFinishedAwardID.push_back(std::make_pair(pItemAward->dwID, item.id));
grid.Put(iPos, 1, it->second->bSize);
txn.commit();
}
catch (const std::exception &e)
{
SPDLOG_ERROR("[CClientManager::RESULT_SAFEBOX_LOAD] Query error: {}", e.what());
continue;
}
} }
for (DWORD i = 0; i < vec_dwFinishedAwardID.size(); ++i) for (DWORD i = 0; i < vec_dwFinishedAwardID.size(); ++i)
@ -898,28 +948,30 @@ void CClientManager::RESULT_SAFEBOX_LOAD(CPeer *pkPeer, SQLMsg *msg)
void CClientManager::QUERY_SAFEBOX_CHANGE_SIZE(CPeer *pkPeer, DWORD dwHandle, TSafeboxChangeSizePacket *p) void CClientManager::QUERY_SAFEBOX_CHANGE_SIZE(CPeer *pkPeer, DWORD dwHandle, TSafeboxChangeSizePacket *p)
{ {
ClientHandleInfo *pi = new ClientHandleInfo(dwHandle); ClientHandleInfo *pi = new ClientHandleInfo(dwHandle);
pi->account_index = p->bSize; // account_index를 사이즈로 임시로 사용 pi->account_index = p->bSize; // Temporarily use account_index as size
char szQuery[QUERY_MAX_LEN]; CDBManager::instance().AsyncQuery(
p->bSize == 1 ? "INSERT INTO player.safebox (size, account_id) VALUES($1, $2) RETURNING id" : "UPDATE player.safebox SET size = $1 WHERE account_id = $2 RETURNING id",
pqxx::params{p->bSize, p->dwID},
[this, pkPeer, pi](const pqxx::result &result, const std::string &error)
{
if (!error.empty())
{
return;
}
if (p->bSize == 1) CClientManager::RESULT_SAFEBOX_CHANGE_SIZE(pkPeer, pi, result);
snprintf(szQuery, sizeof(szQuery), "INSERT INTO safebox%s (account_id, size) VALUES(%u, %u)", GetTablePostfix(), p->dwID, p->bSize); });
else
snprintf(szQuery, sizeof(szQuery), "UPDATE safebox%s SET size=%u WHERE account_id=%u", GetTablePostfix(), p->bSize, p->dwID);
CDBManager::instance().ReturnQuery(szQuery, QID_SAFEBOX_CHANGE_SIZE, pkPeer->GetHandle(), pi);
} }
void CClientManager::RESULT_SAFEBOX_CHANGE_SIZE(CPeer *pkPeer, SQLMsg *msg) void CClientManager::RESULT_SAFEBOX_CHANGE_SIZE(CPeer *pkPeer, ClientHandleInfo *pi, pqxx::result msg)
{ {
CQueryInfo *qi = (CQueryInfo *)msg->pvUserData; DWORD dwHandle = pi->dwHandle;
ClientHandleInfo *p = (ClientHandleInfo *)qi->pvData; BYTE bSize = pi->account_index;
DWORD dwHandle = p->dwHandle;
BYTE bSize = p->account_index;
delete p; delete pi;
if (msg->Get()->uiNumRows > 0) if (msg.size() > 0)
{ {
pkPeer->EncodeHeader(HEADER_DG_SAFEBOX_CHANGE_SIZE, dwHandle, sizeof(BYTE)); pkPeer->EncodeHeader(HEADER_DG_SAFEBOX_CHANGE_SIZE, dwHandle, sizeof(BYTE));
pkPeer->EncodeBYTE(bSize); pkPeer->EncodeBYTE(bSize);
@ -933,48 +985,58 @@ void CClientManager::QUERY_SAFEBOX_CHANGE_PASSWORD(CPeer *pkPeer, DWORD dwHandle
strlcpy(pi->login, p->szOldPassword, sizeof(pi->login)); strlcpy(pi->login, p->szOldPassword, sizeof(pi->login));
pi->account_id = p->dwID; pi->account_id = p->dwID;
char szQuery[QUERY_MAX_LEN]; CDBManager::instance().AsyncQuery(
snprintf(szQuery, sizeof(szQuery), "SELECT password FROM safebox%s WHERE account_id=%u", GetTablePostfix(), p->dwID); "SELECT password FROM player.safebox WHERE account_id = $1",
pqxx::params{p->dwID},
[this, pkPeer, pi](const pqxx::result &result, const std::string &error)
{
if (!error.empty())
{
return;
}
CDBManager::instance().ReturnQuery(szQuery, QID_SAFEBOX_CHANGE_PASSWORD, pkPeer->GetHandle(), pi); CClientManager::RESULT_SAFEBOX_CHANGE_PASSWORD(pkPeer, pi, result);
});
} }
void CClientManager::RESULT_SAFEBOX_CHANGE_PASSWORD(CPeer *pkPeer, SQLMsg *msg) void CClientManager::RESULT_SAFEBOX_CHANGE_PASSWORD(CPeer *pkPeer, ClientHandleInfo *pi, pqxx::result msg)
{ {
CQueryInfo *qi = (CQueryInfo *)msg->pvUserData; DWORD dwHandle = pi->dwHandle;
ClientHandleInfo *p = (ClientHandleInfo *)qi->pvData;
DWORD dwHandle = p->dwHandle;
if (msg->Get()->uiNumRows > 0) if (msg.size() > 0)
{ {
MYSQL_ROW row = mysql_fetch_row(msg->Get()->pSQLResult); auto password = msg[0][0].c_str();
if (row[0] && *row[0] && !strcasecmp(row[0], p->login) || (!row[0] || !*row[0]) && !strcmp("000000", p->login)) if (password && *password && !strcasecmp(password, pi->login) || (!password || !*password) && !strcmp("000000", pi->login))
{ {
char szQuery[QUERY_MAX_LEN]; CDBManager::instance().AsyncQuery(
char escape_pwd[64]; "UPDATE player.safebox SET password = $1 WHERE account_id = $2 RETURNING id",
CDBManager::instance().EscapeString(escape_pwd, p->safebox_password, strlen(p->safebox_password)); pqxx::params{pi->safebox_password, pi->account_id},
[this, pkPeer, pi](const pqxx::result &result, const std::string &error)
{
if (!error.empty())
{
return;
}
snprintf(szQuery, sizeof(szQuery), "UPDATE safebox%s SET password='%s' WHERE account_id=%u", GetTablePostfix(), escape_pwd, p->account_id); CClientManager::RESULT_SAFEBOX_CHANGE_PASSWORD_SECOND(pkPeer, pi, result);
});
CDBManager::instance().ReturnQuery(szQuery, QID_SAFEBOX_CHANGE_PASSWORD_SECOND, pkPeer->GetHandle(), p);
return; return;
} }
} }
delete p; delete pi;
// Wrong old password // Wrong old password
pkPeer->EncodeHeader(HEADER_DG_SAFEBOX_CHANGE_PASSWORD_ANSWER, dwHandle, sizeof(BYTE)); pkPeer->EncodeHeader(HEADER_DG_SAFEBOX_CHANGE_PASSWORD_ANSWER, dwHandle, sizeof(BYTE));
pkPeer->EncodeBYTE(0); pkPeer->EncodeBYTE(0);
} }
void CClientManager::RESULT_SAFEBOX_CHANGE_PASSWORD_SECOND(CPeer *pkPeer, SQLMsg *msg) void CClientManager::RESULT_SAFEBOX_CHANGE_PASSWORD_SECOND(CPeer *pkPeer, ClientHandleInfo *pi, pqxx::result msg)
{ {
CQueryInfo *qi = (CQueryInfo *)msg->pvUserData; DWORD dwHandle = pi->dwHandle;
ClientHandleInfo *p = (ClientHandleInfo *)qi->pvData; delete pi;
DWORD dwHandle = p->dwHandle;
delete p;
pkPeer->EncodeHeader(HEADER_DG_SAFEBOX_CHANGE_PASSWORD_ANSWER, dwHandle, sizeof(BYTE)); pkPeer->EncodeHeader(HEADER_DG_SAFEBOX_CHANGE_PASSWORD_ANSWER, dwHandle, sizeof(BYTE));
pkPeer->EncodeBYTE(1); pkPeer->EncodeBYTE(1);
@ -1364,13 +1426,13 @@ void CClientManager::QUERY_ITEM_SAVE(CPeer *pkPeer, const char *c_pData)
snprintf(szQuery, sizeof(szQuery), snprintf(szQuery, sizeof(szQuery),
"REPLACE INTO item%s (id, owner_id, window, pos, count, vnum, socket0, socket1, socket2, " "REPLACE INTO item%s (id, owner_id, window, pos, count, vnum, socket0, socket1, socket2, "
"attrtype0, attrvalue0, " "attr_type_0, attr_value_0, "
"attrtype1, attrvalue1, " "attr_type_1, attr_value_1, "
"attrtype2, attrvalue2, " "attr_type_2, attr_value_2, "
"attrtype3, attrvalue3, " "attr_type_3, attr_value_3, "
"attrtype4, attrvalue4, " "attr_type_4, attr_value_4, "
"attrtype5, attrvalue5, " "attr_type_5, attr_value_5, "
"attrtype6, attrvalue6) " "attr_type_6, attr_value_6) "
"VALUES(%u, %u, %d, %d, %u, %u, %d, %d, %d, %hd, %hd, %hd, %hd, %hd, %hd, %hd, %hd, %hd, %hd, %hd, %hd, %hd, %hd)", "VALUES(%u, %u, %d, %d, %u, %u, %d, %d, %d, %hd, %hd, %hd, %hd, %hd, %hd, %hd, %hd, %hd, %hd, %hd, %hd, %hd, %hd)",
GetTablePostfix(), GetTablePostfix(),
p->id, p->id,

View File

@ -76,7 +76,7 @@ public:
pAccountTable = NULL; pAccountTable = NULL;
player_id = dwPID; player_id = dwPID;
}; };
// 독일선물기능용 생성자 // Constructor for the German gift feature
ClientHandleInfo(DWORD argHandle, DWORD dwPID, DWORD accountId) ClientHandleInfo(DWORD argHandle, DWORD dwPID, DWORD accountId)
{ {
dwHandle = argHandle; dwHandle = argHandle;
@ -247,10 +247,10 @@ private:
void QUERY_PLAYER_LOAD(CPeer *peer, DWORD dwHandle, TPlayerLoadPacket *); void QUERY_PLAYER_LOAD(CPeer *peer, DWORD dwHandle, TPlayerLoadPacket *);
void RESULT_COMPOSITE_PLAYER(CPeer *peer, SQLMsg *pMsg, DWORD dwQID); void RESULT_COMPOSITE_PLAYER(CPeer *peer, SQLMsg *pMsg, DWORD dwQID);
void RESULT_PLAYER_LOAD(CPeer *peer, MYSQL_RES *pRes, ClientHandleInfo *pkInfo); void RESULT_PLAYER_LOAD(CPeer *peer, pqxx::result *pRes, ClientHandleInfo *pkInfo);
void RESULT_ITEM_LOAD(CPeer *peer, MYSQL_RES *pRes, DWORD dwHandle, DWORD dwPID); void RESULT_ITEM_LOAD(CPeer *peer, pqxx::result *pRes, DWORD dwHandle, DWORD dwPID);
void RESULT_QUEST_LOAD(CPeer *pkPeer, MYSQL_RES *pRes, DWORD dwHandle, DWORD dwPID); void RESULT_QUEST_LOAD(CPeer *pkPeer, pqxx::result *pRes, DWORD dwHandle, DWORD dwPID);
void RESULT_AFFECT_LOAD(CPeer *pkPeer, MYSQL_RES *pRes, DWORD dwHandle); void RESULT_AFFECT_LOAD(CPeer *pkPeer, pqxx::result *pRes, DWORD dwHandle);
// PLAYER_INDEX_CREATE_BUG_FIX // PLAYER_INDEX_CREATE_BUG_FIX
void RESULT_PLAYER_INDEX_CREATE(CPeer *pkPeer, SQLMsg *msg); void RESULT_PLAYER_INDEX_CREATE(CPeer *pkPeer, SQLMsg *msg);
@ -296,10 +296,10 @@ private:
void QUERY_SAFEBOX_CHANGE_SIZE(CPeer *pkPeer, DWORD dwHandle, TSafeboxChangeSizePacket *p); void QUERY_SAFEBOX_CHANGE_SIZE(CPeer *pkPeer, DWORD dwHandle, TSafeboxChangeSizePacket *p);
void QUERY_SAFEBOX_CHANGE_PASSWORD(CPeer *pkPeer, DWORD dwHandle, TSafeboxChangePasswordPacket *p); void QUERY_SAFEBOX_CHANGE_PASSWORD(CPeer *pkPeer, DWORD dwHandle, TSafeboxChangePasswordPacket *p);
void RESULT_SAFEBOX_LOAD(CPeer *pkPeer, SQLMsg *msg); void RESULT_SAFEBOX_LOAD(CPeer *pkPeer, ClientHandleInfo *pi, pqxx::result msg);
void RESULT_SAFEBOX_CHANGE_SIZE(CPeer *pkPeer, SQLMsg *msg); void RESULT_SAFEBOX_CHANGE_SIZE(CPeer *pkPeer, ClientHandleInfo *pi, pqxx::result msg);
void RESULT_SAFEBOX_CHANGE_PASSWORD(CPeer *pkPeer, SQLMsg *msg); void RESULT_SAFEBOX_CHANGE_PASSWORD(CPeer *pkPeer, ClientHandleInfo *pi, pqxx::result msg);
void RESULT_SAFEBOX_CHANGE_PASSWORD_SECOND(CPeer *pkPeer, SQLMsg *msg); void RESULT_SAFEBOX_CHANGE_PASSWORD_SECOND(CPeer *pkPeer, ClientHandleInfo *pi, pqxx::result msg);
void QUERY_EMPIRE_SELECT(CPeer *pkPeer, DWORD dwHandle, TEmpireSelectPacket *p); void QUERY_EMPIRE_SELECT(CPeer *pkPeer, DWORD dwHandle, TEmpireSelectPacket *p);
void QUERY_SETUP(CPeer *pkPeer, DWORD dwHandle, const char *c_pData); void QUERY_SETUP(CPeer *pkPeer, DWORD dwHandle, const char *c_pData);
@ -318,7 +318,7 @@ private:
void QUERY_RELOAD_PROTO(); void QUERY_RELOAD_PROTO();
void QUERY_CHANGE_NAME(CPeer *peer, DWORD dwHandle, TPacketGDChangeName *p); void QUERY_CHANGE_NAME(CPeer *peer, DWORD dwHandle, TPacketGDChangeName *p);
void GetPlayerFromRes(TPlayerTable *player_table, MYSQL_RES *res); void GetPlayerFromRes(TPlayerTable *player_table, pqxx::result *res);
void QUERY_LOGIN_KEY(CPeer *pkPeer, TPacketGDLoginKey *p); void QUERY_LOGIN_KEY(CPeer *pkPeer, TPacketGDLoginKey *p);

View File

@ -18,7 +18,8 @@ extern int g_test_server;
// //
// Check all SELECT syntax on item table before change this function!!! // Check all SELECT syntax on item table before change this function!!!
// //
bool CreateItemTableFromRes(MYSQL_RES *res, std::vector<TPlayerItem> *pVec, DWORD dwPID) // TODO Finish moving this to pg
bool CreateItemTableFromRes(pqxx::result *res, std::vector<TPlayerItem> *pVec, DWORD dwPID)
{ {
if (!res) if (!res)
{ {

View File

@ -84,6 +84,18 @@ void CDBManager::AsyncQuery(const std::string &query, const pqxx::params &params
m_queueCondition.notify_one(); m_queueCondition.notify_one();
} }
void CDBManager::AsyncQuery(const std::string &query, const pqxx::params &params, AsyncQueryCallback callback)
{
auto msg = std::make_shared<PgAsyncQuery>(++m_msgCounter, query, params, std::move(callback));
{
std::lock_guard<std::mutex> lock(m_queueMutex);
m_queryQueue.push(msg);
}
m_queueCondition.notify_one();
}
size_t CDBManager::GetPendingQueryCount() const size_t CDBManager::GetPendingQueryCount() const
{ {
std::lock_guard<std::mutex> lock(m_queueMutex); std::lock_guard<std::mutex> lock(m_queueMutex);
@ -156,6 +168,9 @@ void CDBManager::WorkerLoop()
void CDBManager::ProcessQuery(std::shared_ptr<PgAsyncQuery> msg) void CDBManager::ProcessQuery(std::shared_ptr<PgAsyncQuery> msg)
{ {
pqxx::result result;
std::string error;
try try
{ {
if (!m_connPool) if (!m_connPool)
@ -170,7 +185,7 @@ void CDBManager::ProcessQuery(std::shared_ptr<PgAsyncQuery> msg)
} }
pqxx::work txn(*conn); pqxx::work txn(*conn);
txn.exec_params(msg->query, msg->parameters); result = txn.exec_params(msg->query, msg->parameters);
txn.commit(); txn.commit();
SPDLOG_TRACE("AsyncQuery completed: {} (ID: {})", msg->query, msg->id); SPDLOG_TRACE("AsyncQuery completed: {} (ID: {})", msg->query, msg->id);
@ -180,5 +195,17 @@ void CDBManager::ProcessQuery(std::shared_ptr<PgAsyncQuery> msg)
SPDLOG_ERROR("AsyncQuery failed: {} (query: {}, ID: {})", e.what(), msg->query, msg->id); SPDLOG_ERROR("AsyncQuery failed: {} (query: {}, ID: {})", e.what(), msg->query, msg->id);
} }
if (msg->hasCallback && msg->callback)
{
try
{
msg->callback(result, error);
}
catch (const std::exception &e)
{
SPDLOG_ERROR("AsyncQuery callback threw exception: {}", e.what());
}
}
++m_completedQueries; ++m_completedQueries;
} }

View File

@ -20,6 +20,7 @@ public:
// Async Query // Async Query
void AsyncQuery(const std::string &query, const pqxx::params &params = pqxx::params{}); void AsyncQuery(const std::string &query, const pqxx::params &params = pqxx::params{});
void AsyncQuery(const std::string &query, const pqxx::params &params, AsyncQueryCallback callback);
size_t GetPendingQueryCount() const; size_t GetPendingQueryCount() const;
size_t GetCompletedQueryCount() const; size_t GetCompletedQueryCount() const;

View File

@ -131,6 +131,8 @@ bool CItemIDRangeManager::BuildRange(DWORD dwMin, DWORD dwMax, TItemIDRangeTable
return true; return true;
} }
} }
txn.commit();
} }
catch (const std::exception &e) catch (const std::exception &e)
{ {

View File

@ -68,6 +68,8 @@ namespace marriage
SPDLOG_DEBUG("Marriage {}: LP:{} TM:{} ST:{} {:10}:{:16} {:10}:{:16} ", uiRow, love_point, time, is_married, pid1, name1, pid2, name2); SPDLOG_DEBUG("Marriage {}: LP:{} TM:{} ST:{} {:10}:{:16} {:10}:{:16} ", uiRow, love_point, time, is_married, pid1, name1, pid2, name2);
} }
txn.commit();
} }
catch (const std::exception &e) catch (const std::exception &e)
{ {
@ -119,6 +121,8 @@ namespace marriage
SPDLOG_ERROR("cannot insert marriage"); SPDLOG_ERROR("cannot insert marriage");
return; return;
} }
txn.commit();
} }
catch (const std::exception &e) catch (const std::exception &e)
{ {
@ -167,6 +171,8 @@ namespace marriage
SPDLOG_ERROR("cannot update marriage : PID:{} {}", dwPID1, dwPID2); SPDLOG_ERROR("cannot update marriage : PID:{} {}", dwPID1, dwPID2);
return; return;
} }
txn.commit();
} }
catch (const std::exception &e) catch (const std::exception &e)
{ {
@ -222,6 +228,8 @@ namespace marriage
SPDLOG_ERROR("cannot delete marriage : PID:{} {}", dwPID1, dwPID2); SPDLOG_ERROR("cannot delete marriage : PID:{} {}", dwPID1, dwPID2);
return; return;
} }
txn.commit();
} }
catch (const std::exception &e) catch (const std::exception &e)
{ {
@ -274,6 +282,8 @@ namespace marriage
SPDLOG_ERROR("cannot change engage to marriage : PID:{} {}", dwPID1, dwPID2); SPDLOG_ERROR("cannot change engage to marriage : PID:{} {}", dwPID1, dwPID2);
return; return;
} }
txn.commit();
} }
catch (const std::exception &e) catch (const std::exception &e)
{ {

View File

@ -268,6 +268,8 @@ bool CMonarch::DelMonarch(int Empire)
{ {
return false; return false;
} }
txn.commit();
} }
catch (const std::exception &e) catch (const std::exception &e)
{ {
@ -308,6 +310,8 @@ bool CMonarch::DelMonarch(const char *name)
return true; return true;
} }
txn.commit();
} }
catch (const std::exception &e) catch (const std::exception &e)
{ {

View File

@ -3,6 +3,9 @@
#include <pqxx/pqxx> #include <pqxx/pqxx>
#include <string> #include <string>
#include <functional>
using AsyncQueryCallback = std::function<void(const pqxx::result &, const std::string &error)>;
class PgAsyncQuery class PgAsyncQuery
{ {
public: public:
@ -10,6 +13,14 @@ public:
std::string query; std::string query;
pqxx::params parameters; pqxx::params parameters;
bool hasCallback;
AsyncQueryCallback callback;
// Fire and Forget AsyncQuery
PgAsyncQuery(int msgId, const std::string &sql, const pqxx::params &params = pqxx::params{}) PgAsyncQuery(int msgId, const std::string &sql, const pqxx::params &params = pqxx::params{})
: id(msgId), query(sql), parameters(params) {} : id(msgId), query(sql), parameters(params), hasCallback(false) {}
};
// Callback AsyncQuery
PgAsyncQuery(int msgId, const std::string &sql, const pqxx::params &params = pqxx::params{}, AsyncQueryCallback cb)
: id(msgId), query(sql), parameters(params), hasCallback(true), callback(std::move(cb)) {}
};