diff --git a/src/db/src/ClientManager.cpp b/src/db/src/ClientManager.cpp index 9b39058..acc0ec1 100644 --- a/src/db/src/ClientManager.cpp +++ b/src/db/src/ClientManager.cpp @@ -23,7 +23,8 @@ extern int g_iItemCacheFlushSeconds; extern int g_test_server; extern std::string g_stLocale; extern std::string g_stLocaleNameColumn; -bool CreateItemTableFromRes(MYSQL_RES *res, std::vector *pVec, DWORD dwPID); + +bool CreateItemTableFromRes(pqxx::result *res, std::vector *pVec, DWORD dwPID); DWORD g_dwUsageMax = 0; DWORD g_dwUsageAvg = 0; @@ -248,10 +249,6 @@ bool CClientManager::Initialize() LoadEventFlag(); - // database character-set을 강제로 맞춤 - if (g_stLocale == "big5" || g_stLocale == "sjis") - CDBManager::instance().QueryLocaleSet(); - return true; } @@ -262,17 +259,18 @@ void CClientManager::MainLoop() SPDLOG_DEBUG("ClientManager pointer is {}", (void *)this); // 메인루프 - while (!m_bShutdowned) - { - while ((tmp = CDBManager::instance().PopResult())) - { - AnalyzeQueryResult(tmp); - delete tmp; - } + // TODO Look at this + // while (!m_bShutdowned) + // { + // while ((tmp = CDBManager::instance().PopResult())) + // { + // AnalyzeQueryResult(tmp); + // delete tmp; + // } - if (!Process()) - break; - } + // if (!Process()) + // break; + // } // // 메인루프 종료처리 @@ -527,24 +525,53 @@ void CClientManager::QUERY_QUEST_SAVE(CPeer *pkPeer, TQuestTable *pTable, DWORD int iSize = dwLen / sizeof(TQuestTable); - char szQuery[1024]; + // We move to prepared queries for speed and SAFETY + std::vector inserts; + std::vector deletes; for (int i = 0; i < iSize; ++i, ++pTable) { if (pTable->lValue == 0) - { - snprintf(szQuery, sizeof(szQuery), - "DELETE FROM quest%s WHERE dwPID=%d AND szName='%s' AND szState='%s'", - GetTablePostfix(), pTable->dwPID, pTable->szName, pTable->szState); - } + deletes.push_back(*pTable); else + inserts.push_back(*pTable); + } + + auto pool = CDBManager::instance().GetConnectionPool(); + auto conn = pool->acquire(); + + // Prepare statements + static bool prepared = false; + if (!prepared) + { + conn->prepare("delete_quest", + "DELETE FROM player.quest WHERE player_id = $1 AND name = $2 AND state = $3"); + + conn->prepare("upsert_quest", + "INSERT INTO player.quest (player_id, name, state, value) VALUES ($1, $2, $3, $4) ON CONFLICT (player_id, name, state) DO UPDATE SET value = EXCLUDED.value"); + + prepared = true; + } + + try + { + pqxx::work txn{*conn}; + + for (const auto &row : deletes) { - snprintf(szQuery, sizeof(szQuery), - "REPLACE INTO quest%s (dwPID, szName, szState, lValue) VALUES(%d, '%s', '%s', %d)", - GetTablePostfix(), pTable->dwPID, pTable->szName, pTable->szState, pTable->lValue); + txn.exec_prepared("delete_quest", row.dwPID, row.szName, row.szState); } - CDBManager::instance().ReturnQuery(szQuery, QID_QUEST_SAVE, pkPeer->GetHandle(), NULL); + for (const auto &row : inserts) + { + txn.exec_prepared("upsert_quest", row.dwPID, row.szName, row.szState, row.lValue); + } + + txn.commit(); + } + catch (const std::exception &e) + { + SPDLOG_ERROR("[CClientManager::QUERY_QUEST_SAVE] Query error: {}", e.what()); } } @@ -557,26 +584,30 @@ void CClientManager::QUERY_SAFEBOX_LOAD(CPeer *pkPeer, DWORD dwHandle, TSafeboxL pi->ip[0] = bMall ? 1 : 0; strlcpy(pi->login, packet->szLogin, sizeof(pi->login)); - char szQuery[QUERY_MAX_LEN]; - snprintf(szQuery, sizeof(szQuery), - "SELECT account_id, size, password FROM safebox%s WHERE account_id=%u", - GetTablePostfix(), packet->dwID); + CDBManager::instance().AsyncQuery( + "SELECT account_id, size, password FROM player.safebox WHERE account_id = $1", + pqxx::params(packet->dwID), + [this, pkPeer, pi](const pqxx::result &result, const std::string &error) + { + if (!error.empty()) + { + return; + } + + SPDLOG_DEBUG("QUERY_RESULT: HEADER_GD_SAFEBOX_LOAD"); + CClientManager::RESULT_SAFEBOX_LOAD(pkPeer, pi, result); + }); SPDLOG_TRACE("HEADER_GD_SAFEBOX_LOAD (handle: {} account.id {} is_mall {})", dwHandle, packet->dwID, bMall ? 1 : 0); - - CDBManager::instance().ReturnQuery(szQuery, QID_SAFEBOX_LOAD, pkPeer->GetHandle(), pi); } -void CClientManager::RESULT_SAFEBOX_LOAD(CPeer *pkPeer, SQLMsg *msg) +void CClientManager::RESULT_SAFEBOX_LOAD(CPeer *pkPeer, ClientHandleInfo *pi, pqxx::result msg) { - CQueryInfo *qi = (CQueryInfo *)msg->pvUserData; - ClientHandleInfo *pi = (ClientHandleInfo *)qi->pvData; DWORD dwHandle = pi->dwHandle; - // 여기에서 사용하는 account_index는 쿼리 순서를 말한다. - // 첫번째 패스워드 알아내기 위해 하는 쿼리가 0 - // 두번째 실제 데이터를 얻어놓는 쿼리가 1 - + // The account_index used here refers to the query order. + // The first query to retrieve the password is 0. + // The second query to fetch the actual data is 1. if (pi->account_index == 0) { char szSafeboxPassword[SAFEBOX_PASSWORD_MAX_LEN + 1]; @@ -585,9 +616,7 @@ void CClientManager::RESULT_SAFEBOX_LOAD(CPeer *pkPeer, SQLMsg *msg) TSafeboxTable *pSafebox = new TSafeboxTable; memset(pSafebox, 0, sizeof(TSafeboxTable)); - SQLResult *res = msg->Get(); - - if (res->uiNumRows == 0) + if (msg.size() == 0) { if (strcmp("000000", szSafeboxPassword)) { @@ -598,32 +627,38 @@ void CClientManager::RESULT_SAFEBOX_LOAD(CPeer *pkPeer, SQLMsg *msg) } else { - MYSQL_ROW row = mysql_fetch_row(res->pSQLResult); + pqxx::row row = msg[0]; + // account_id, size, password + auto account_id = row[0].c_str(); + auto size = row[1].c_str(); + auto password = row[2].c_str(); - // 비밀번호가 틀리면.. - if (((!row[2] || !*row[2]) && strcmp("000000", szSafeboxPassword)) || - ((row[2] && *row[2]) && strcmp(row[2], szSafeboxPassword))) + // If the password is incorrect... + if (((!password || !*password) && strcmp("000000", szSafeboxPassword)) || + ((password && *password) && strcmp(password, szSafeboxPassword))) { pkPeer->EncodeHeader(HEADER_DG_SAFEBOX_WRONG_PASSWORD, dwHandle, 0); delete pi; return; } - if (!row[0]) + if (!account_id) pSafebox->dwID = 0; else - str_to_number(pSafebox->dwID, row[0]); + str_to_number(pSafebox->dwID, account_id); - if (!row[1]) + if (!size) pSafebox->bSize = 0; else - str_to_number(pSafebox->bSize, row[1]); + str_to_number(pSafebox->bSize, size); + /* - if (!row[3]) - pSafebox->dwGold = 0; - else - pSafebox->dwGold = atoi(row[3]); - */ + if (!row[3]) + pSafebox->dwGold = 0; + else + pSafebox->dwGold = atoi(row[3]); + */ + if (pi->ip[0] == 1) { pSafebox->bSize = 1; @@ -637,27 +672,32 @@ void CClientManager::RESULT_SAFEBOX_LOAD(CPeer *pkPeer, SQLMsg *msg) pSafebox->dwID = pi->account_id; pi->pSafebox = pSafebox; - - char szQuery[512]; - snprintf(szQuery, sizeof(szQuery), - "SELECT id, window+0, pos, count, vnum, socket0, socket1, socket2, " - "attrtype0, attrvalue0, " - "attrtype1, attrvalue1, " - "attrtype2, attrvalue2, " - "attrtype3, attrvalue3, " - "attrtype4, attrvalue4, " - "attrtype5, attrvalue5, " - "attrtype6, attrvalue6 " - "FROM item%s WHERE owner_id=%d AND window='%s'", - GetTablePostfix(), pi->account_id, pi->ip[0] == 0 ? "SAFEBOX" : "MALL"); - pi->account_index = 1; - CDBManager::instance().ReturnQuery(szQuery, QID_SAFEBOX_LOAD, pkPeer->GetHandle(), pi); + CDBManager::instance().AsyncQuery( + "SELECT id, window_bits, position, count, vnum, socket_0, socket_1, socket_2, " + "attr_type_0, attr_value_0, " + "attr_type_1, attr_value_1, " + "attr_type_2, attr_value_2, " + "attr_type_3, attr_value_3, " + "attr_type_4, attr_value_4, " + "attr_type_5, attr_value_5, " + "attr_type_6, attr_value_6 " + "FROM player.item WHERE owner_id = $1 AND window = $2", + pqxx::params{pi->account_id, pi->ip[0] == 0 ? "SAFEBOX" : "MALL"}, + [this, pkPeer, pi](const pqxx::result &result, const std::string &error) + { + if (!error.empty()) + { + return; + } + + SPDLOG_DEBUG("QUERY_RESULT: HEADER_GD_SAFEBOX_LOAD"); + CClientManager::RESULT_SAFEBOX_LOAD(pkPeer, pi, result); + }); } else { - if (!pi->pSafebox) { SPDLOG_ERROR("null safebox pointer!"); @@ -665,9 +705,9 @@ void CClientManager::RESULT_SAFEBOX_LOAD(CPeer *pkPeer, SQLMsg *msg) return; } - // 쿼리에 에러가 있었으므로 응답할 경우 창고가 비어있는 것 처럼 - // 보이기 때문에 창고가 아얘 안열리는게 나음 - if (!msg->Get()->pSQLResult) + // Since there was an error in the query, responding would make it look like the warehouse is empty, + // so it's better not to open the warehouse at all. + if (msg.size() == 0) { SPDLOG_ERROR("null safebox result"); delete pi; @@ -675,7 +715,7 @@ void CClientManager::RESULT_SAFEBOX_LOAD(CPeer *pkPeer, SQLMsg *msg) } static std::vector s_items; - CreateItemTableFromRes(msg->Get()->pSQLResult, &s_items, pi->account_id); + CreateItemTableFromRes(&msg, &s_items, pi->account_id); std::set *pSet = ItemAwardManager::instance().GetByLogin(pi->login); @@ -707,8 +747,6 @@ void CClientManager::RESULT_SAFEBOX_LOAD(CPeer *pkPeer, SQLMsg *msg) typeof(pSet->begin()) it = pSet->begin(); - char szQuery[512]; - while (it != pSet->end()) { TItemAward *pItemAward = *(it++); @@ -781,8 +819,8 @@ void CClientManager::RESULT_SAFEBOX_LOAD(CPeer *pkPeer, SQLMsg *msg) case 72728: case 72729: case 72730: - // 무시무시하지만 이전에 하던 걸 고치기는 무섭고... - // 그래서 그냥 하드 코딩. 선물 상자용 자동물약 아이템들. + // It's terrifying, but fixing the old implementation is even scarier... + // So just hardcoding it. Auto potion items for gift boxes. case 76004: case 76005: case 76021: @@ -844,37 +882,49 @@ void CClientManager::RESULT_SAFEBOX_LOAD(CPeer *pkPeer, SQLMsg *msg) } } } - - snprintf(szQuery, sizeof(szQuery), - "INSERT INTO item%s (id, owner_id, window, pos, vnum, count, socket0, socket1, socket2) " - "VALUES(%u, %u, '%s', %d, %u, %u, %u, %u, %u)", - GetTablePostfix(), - GainItemID(), - pi->account_id, - pi->ip[0] == 0 ? "SAFEBOX" : "MALL", - iPos, - pItemAward->dwVnum, pItemAward->dwCount, pItemAward->dwSocket0, pItemAward->dwSocket1, dwSocket2); } - std::unique_ptr pmsg(CDBManager::instance().DirectQuery(szQuery)); - SQLResult *pRes = pmsg->Get(); - SPDLOG_DEBUG("SAFEBOX Query : [{}]", szQuery); + auto pool = CDBManager::instance().GetConnectionPool(); + auto conn = pool->acquire(); - if (pRes->uiAffectedRows == 0 || pRes->uiInsertID == 0 || pRes->uiAffectedRows == (uint32_t)-1) - break; + try + { + pqxx::work txn{*conn}; + pqxx::result res = txn.exec_params( + "INSERT INTO player.item (id, owner_id, window, position, vnum, count, socket_0, socket_1, socket_2) " + "VALUES($1, $2, $3, $4, $5, $6, $7, $8, $9) RETURNING id", + pqxx::params{ + GainItemID(), + pi->account_id, + pi->ip[0] == 0 ? "SAFEBOX" : "MALL", + iPos, + pItemAward->dwVnum, pItemAward->dwCount, pItemAward->dwSocket0, pItemAward->dwSocket1, dwSocket2}); - item.id = pmsg->Get()->uiInsertID; - item.window = pi->ip[0] == 0 ? SAFEBOX : MALL, - item.pos = iPos; - item.count = pItemAward->dwCount; - item.vnum = pItemAward->dwVnum; - item.alSockets[0] = pItemAward->dwSocket0; - item.alSockets[1] = pItemAward->dwSocket1; - item.alSockets[2] = dwSocket2; - s_items.push_back(item); + SPDLOG_DEBUG("SAFEBOX Query"); - vec_dwFinishedAwardID.push_back(std::make_pair(pItemAward->dwID, item.id)); - grid.Put(iPos, 1, it->second->bSize); + if (res.size() == 0) + break; + + item.id = res[0][0].as(); + item.window = pi->ip[0] == 0 ? SAFEBOX : MALL, + item.pos = iPos; + item.count = pItemAward->dwCount; + item.vnum = pItemAward->dwVnum; + item.alSockets[0] = pItemAward->dwSocket0; + item.alSockets[1] = pItemAward->dwSocket1; + item.alSockets[2] = dwSocket2; + s_items.push_back(item); + + vec_dwFinishedAwardID.push_back(std::make_pair(pItemAward->dwID, item.id)); + grid.Put(iPos, 1, it->second->bSize); + + txn.commit(); + } + catch (const std::exception &e) + { + SPDLOG_ERROR("[CClientManager::RESULT_SAFEBOX_LOAD] Query error: {}", e.what()); + continue; + } } for (DWORD i = 0; i < vec_dwFinishedAwardID.size(); ++i) @@ -898,28 +948,30 @@ void CClientManager::RESULT_SAFEBOX_LOAD(CPeer *pkPeer, SQLMsg *msg) void CClientManager::QUERY_SAFEBOX_CHANGE_SIZE(CPeer *pkPeer, DWORD dwHandle, TSafeboxChangeSizePacket *p) { ClientHandleInfo *pi = new ClientHandleInfo(dwHandle); - pi->account_index = p->bSize; // account_index를 사이즈로 임시로 사용 + pi->account_index = p->bSize; // Temporarily use account_index as size - char szQuery[QUERY_MAX_LEN]; + CDBManager::instance().AsyncQuery( + p->bSize == 1 ? "INSERT INTO player.safebox (size, account_id) VALUES($1, $2) RETURNING id" : "UPDATE player.safebox SET size = $1 WHERE account_id = $2 RETURNING id", + pqxx::params{p->bSize, p->dwID}, + [this, pkPeer, pi](const pqxx::result &result, const std::string &error) + { + if (!error.empty()) + { + return; + } - if (p->bSize == 1) - snprintf(szQuery, sizeof(szQuery), "INSERT INTO safebox%s (account_id, size) VALUES(%u, %u)", GetTablePostfix(), p->dwID, p->bSize); - else - snprintf(szQuery, sizeof(szQuery), "UPDATE safebox%s SET size=%u WHERE account_id=%u", GetTablePostfix(), p->bSize, p->dwID); - - CDBManager::instance().ReturnQuery(szQuery, QID_SAFEBOX_CHANGE_SIZE, pkPeer->GetHandle(), pi); + CClientManager::RESULT_SAFEBOX_CHANGE_SIZE(pkPeer, pi, result); + }); } -void CClientManager::RESULT_SAFEBOX_CHANGE_SIZE(CPeer *pkPeer, SQLMsg *msg) +void CClientManager::RESULT_SAFEBOX_CHANGE_SIZE(CPeer *pkPeer, ClientHandleInfo *pi, pqxx::result msg) { - CQueryInfo *qi = (CQueryInfo *)msg->pvUserData; - ClientHandleInfo *p = (ClientHandleInfo *)qi->pvData; - DWORD dwHandle = p->dwHandle; - BYTE bSize = p->account_index; + DWORD dwHandle = pi->dwHandle; + BYTE bSize = pi->account_index; - delete p; + delete pi; - if (msg->Get()->uiNumRows > 0) + if (msg.size() > 0) { pkPeer->EncodeHeader(HEADER_DG_SAFEBOX_CHANGE_SIZE, dwHandle, sizeof(BYTE)); pkPeer->EncodeBYTE(bSize); @@ -933,48 +985,58 @@ void CClientManager::QUERY_SAFEBOX_CHANGE_PASSWORD(CPeer *pkPeer, DWORD dwHandle strlcpy(pi->login, p->szOldPassword, sizeof(pi->login)); pi->account_id = p->dwID; - char szQuery[QUERY_MAX_LEN]; - snprintf(szQuery, sizeof(szQuery), "SELECT password FROM safebox%s WHERE account_id=%u", GetTablePostfix(), p->dwID); + CDBManager::instance().AsyncQuery( + "SELECT password FROM player.safebox WHERE account_id = $1", + pqxx::params{p->dwID}, + [this, pkPeer, pi](const pqxx::result &result, const std::string &error) + { + if (!error.empty()) + { + return; + } - CDBManager::instance().ReturnQuery(szQuery, QID_SAFEBOX_CHANGE_PASSWORD, pkPeer->GetHandle(), pi); + CClientManager::RESULT_SAFEBOX_CHANGE_PASSWORD(pkPeer, pi, result); + }); } -void CClientManager::RESULT_SAFEBOX_CHANGE_PASSWORD(CPeer *pkPeer, SQLMsg *msg) +void CClientManager::RESULT_SAFEBOX_CHANGE_PASSWORD(CPeer *pkPeer, ClientHandleInfo *pi, pqxx::result msg) { - CQueryInfo *qi = (CQueryInfo *)msg->pvUserData; - ClientHandleInfo *p = (ClientHandleInfo *)qi->pvData; - DWORD dwHandle = p->dwHandle; + DWORD dwHandle = pi->dwHandle; - if (msg->Get()->uiNumRows > 0) + if (msg.size() > 0) { - MYSQL_ROW row = mysql_fetch_row(msg->Get()->pSQLResult); + auto password = msg[0][0].c_str(); - if (row[0] && *row[0] && !strcasecmp(row[0], p->login) || (!row[0] || !*row[0]) && !strcmp("000000", p->login)) + if (password && *password && !strcasecmp(password, pi->login) || (!password || !*password) && !strcmp("000000", pi->login)) { - char szQuery[QUERY_MAX_LEN]; - char escape_pwd[64]; - CDBManager::instance().EscapeString(escape_pwd, p->safebox_password, strlen(p->safebox_password)); + CDBManager::instance().AsyncQuery( + "UPDATE player.safebox SET password = $1 WHERE account_id = $2 RETURNING id", + pqxx::params{pi->safebox_password, pi->account_id}, + [this, pkPeer, pi](const pqxx::result &result, const std::string &error) + { + if (!error.empty()) + { + return; + } - snprintf(szQuery, sizeof(szQuery), "UPDATE safebox%s SET password='%s' WHERE account_id=%u", GetTablePostfix(), escape_pwd, p->account_id); + CClientManager::RESULT_SAFEBOX_CHANGE_PASSWORD_SECOND(pkPeer, pi, result); + }); - CDBManager::instance().ReturnQuery(szQuery, QID_SAFEBOX_CHANGE_PASSWORD_SECOND, pkPeer->GetHandle(), p); return; } } - delete p; + delete pi; // Wrong old password pkPeer->EncodeHeader(HEADER_DG_SAFEBOX_CHANGE_PASSWORD_ANSWER, dwHandle, sizeof(BYTE)); pkPeer->EncodeBYTE(0); } -void CClientManager::RESULT_SAFEBOX_CHANGE_PASSWORD_SECOND(CPeer *pkPeer, SQLMsg *msg) +void CClientManager::RESULT_SAFEBOX_CHANGE_PASSWORD_SECOND(CPeer *pkPeer, ClientHandleInfo *pi, pqxx::result msg) { - CQueryInfo *qi = (CQueryInfo *)msg->pvUserData; - ClientHandleInfo *p = (ClientHandleInfo *)qi->pvData; - DWORD dwHandle = p->dwHandle; - delete p; + DWORD dwHandle = pi->dwHandle; + delete pi; pkPeer->EncodeHeader(HEADER_DG_SAFEBOX_CHANGE_PASSWORD_ANSWER, dwHandle, sizeof(BYTE)); pkPeer->EncodeBYTE(1); @@ -1364,13 +1426,13 @@ void CClientManager::QUERY_ITEM_SAVE(CPeer *pkPeer, const char *c_pData) snprintf(szQuery, sizeof(szQuery), "REPLACE INTO item%s (id, owner_id, window, pos, count, vnum, socket0, socket1, socket2, " - "attrtype0, attrvalue0, " - "attrtype1, attrvalue1, " - "attrtype2, attrvalue2, " - "attrtype3, attrvalue3, " - "attrtype4, attrvalue4, " - "attrtype5, attrvalue5, " - "attrtype6, attrvalue6) " + "attr_type_0, attr_value_0, " + "attr_type_1, attr_value_1, " + "attr_type_2, attr_value_2, " + "attr_type_3, attr_value_3, " + "attr_type_4, attr_value_4, " + "attr_type_5, attr_value_5, " + "attr_type_6, attr_value_6) " "VALUES(%u, %u, %d, %d, %u, %u, %d, %d, %d, %hd, %hd, %hd, %hd, %hd, %hd, %hd, %hd, %hd, %hd, %hd, %hd, %hd, %hd)", GetTablePostfix(), p->id, diff --git a/src/db/src/ClientManager.h b/src/db/src/ClientManager.h index c5fcf58..6e6fb5a 100644 --- a/src/db/src/ClientManager.h +++ b/src/db/src/ClientManager.h @@ -76,7 +76,7 @@ public: pAccountTable = NULL; player_id = dwPID; }; - // 독일선물기능용 생성자 + // Constructor for the German gift feature ClientHandleInfo(DWORD argHandle, DWORD dwPID, DWORD accountId) { dwHandle = argHandle; @@ -247,10 +247,10 @@ private: void QUERY_PLAYER_LOAD(CPeer *peer, DWORD dwHandle, TPlayerLoadPacket *); void RESULT_COMPOSITE_PLAYER(CPeer *peer, SQLMsg *pMsg, DWORD dwQID); - void RESULT_PLAYER_LOAD(CPeer *peer, MYSQL_RES *pRes, ClientHandleInfo *pkInfo); - void RESULT_ITEM_LOAD(CPeer *peer, MYSQL_RES *pRes, DWORD dwHandle, DWORD dwPID); - void RESULT_QUEST_LOAD(CPeer *pkPeer, MYSQL_RES *pRes, DWORD dwHandle, DWORD dwPID); - void RESULT_AFFECT_LOAD(CPeer *pkPeer, MYSQL_RES *pRes, DWORD dwHandle); + void RESULT_PLAYER_LOAD(CPeer *peer, pqxx::result *pRes, ClientHandleInfo *pkInfo); + void RESULT_ITEM_LOAD(CPeer *peer, pqxx::result *pRes, DWORD dwHandle, DWORD dwPID); + void RESULT_QUEST_LOAD(CPeer *pkPeer, pqxx::result *pRes, DWORD dwHandle, DWORD dwPID); + void RESULT_AFFECT_LOAD(CPeer *pkPeer, pqxx::result *pRes, DWORD dwHandle); // PLAYER_INDEX_CREATE_BUG_FIX void RESULT_PLAYER_INDEX_CREATE(CPeer *pkPeer, SQLMsg *msg); @@ -296,10 +296,10 @@ private: void QUERY_SAFEBOX_CHANGE_SIZE(CPeer *pkPeer, DWORD dwHandle, TSafeboxChangeSizePacket *p); void QUERY_SAFEBOX_CHANGE_PASSWORD(CPeer *pkPeer, DWORD dwHandle, TSafeboxChangePasswordPacket *p); - void RESULT_SAFEBOX_LOAD(CPeer *pkPeer, SQLMsg *msg); - void RESULT_SAFEBOX_CHANGE_SIZE(CPeer *pkPeer, SQLMsg *msg); - void RESULT_SAFEBOX_CHANGE_PASSWORD(CPeer *pkPeer, SQLMsg *msg); - void RESULT_SAFEBOX_CHANGE_PASSWORD_SECOND(CPeer *pkPeer, SQLMsg *msg); + void RESULT_SAFEBOX_LOAD(CPeer *pkPeer, ClientHandleInfo *pi, pqxx::result msg); + void RESULT_SAFEBOX_CHANGE_SIZE(CPeer *pkPeer, ClientHandleInfo *pi, pqxx::result msg); + void RESULT_SAFEBOX_CHANGE_PASSWORD(CPeer *pkPeer, ClientHandleInfo *pi, pqxx::result msg); + void RESULT_SAFEBOX_CHANGE_PASSWORD_SECOND(CPeer *pkPeer, ClientHandleInfo *pi, pqxx::result msg); void QUERY_EMPIRE_SELECT(CPeer *pkPeer, DWORD dwHandle, TEmpireSelectPacket *p); void QUERY_SETUP(CPeer *pkPeer, DWORD dwHandle, const char *c_pData); @@ -318,7 +318,7 @@ private: void QUERY_RELOAD_PROTO(); void QUERY_CHANGE_NAME(CPeer *peer, DWORD dwHandle, TPacketGDChangeName *p); - void GetPlayerFromRes(TPlayerTable *player_table, MYSQL_RES *res); + void GetPlayerFromRes(TPlayerTable *player_table, pqxx::result *res); void QUERY_LOGIN_KEY(CPeer *pkPeer, TPacketGDLoginKey *p); diff --git a/src/db/src/ClientManagerPlayer.cpp b/src/db/src/ClientManagerPlayer.cpp index 5da0eea..a1bc084 100644 --- a/src/db/src/ClientManagerPlayer.cpp +++ b/src/db/src/ClientManagerPlayer.cpp @@ -18,7 +18,8 @@ extern int g_test_server; // // Check all SELECT syntax on item table before change this function!!! // -bool CreateItemTableFromRes(MYSQL_RES *res, std::vector *pVec, DWORD dwPID) +// TODO Finish moving this to pg +bool CreateItemTableFromRes(pqxx::result *res, std::vector *pVec, DWORD dwPID) { if (!res) { diff --git a/src/db/src/DBManager.cpp b/src/db/src/DBManager.cpp index 4b1e663..b7cbe34 100644 --- a/src/db/src/DBManager.cpp +++ b/src/db/src/DBManager.cpp @@ -84,6 +84,18 @@ void CDBManager::AsyncQuery(const std::string &query, const pqxx::params ¶ms m_queueCondition.notify_one(); } +void CDBManager::AsyncQuery(const std::string &query, const pqxx::params ¶ms, AsyncQueryCallback callback) +{ + auto msg = std::make_shared(++m_msgCounter, query, params, std::move(callback)); + + { + std::lock_guard lock(m_queueMutex); + m_queryQueue.push(msg); + } + + m_queueCondition.notify_one(); +} + size_t CDBManager::GetPendingQueryCount() const { std::lock_guard lock(m_queueMutex); @@ -156,6 +168,9 @@ void CDBManager::WorkerLoop() void CDBManager::ProcessQuery(std::shared_ptr msg) { + pqxx::result result; + std::string error; + try { if (!m_connPool) @@ -170,7 +185,7 @@ void CDBManager::ProcessQuery(std::shared_ptr msg) } pqxx::work txn(*conn); - txn.exec_params(msg->query, msg->parameters); + result = txn.exec_params(msg->query, msg->parameters); txn.commit(); SPDLOG_TRACE("AsyncQuery completed: {} (ID: {})", msg->query, msg->id); @@ -180,5 +195,17 @@ void CDBManager::ProcessQuery(std::shared_ptr msg) SPDLOG_ERROR("AsyncQuery failed: {} (query: {}, ID: {})", e.what(), msg->query, msg->id); } + if (msg->hasCallback && msg->callback) + { + try + { + msg->callback(result, error); + } + catch (const std::exception &e) + { + SPDLOG_ERROR("AsyncQuery callback threw exception: {}", e.what()); + } + } + ++m_completedQueries; -} \ No newline at end of file +} diff --git a/src/db/src/DBManager.h b/src/db/src/DBManager.h index e05e16c..98fa545 100644 --- a/src/db/src/DBManager.h +++ b/src/db/src/DBManager.h @@ -20,6 +20,7 @@ public: // Async Query void AsyncQuery(const std::string &query, const pqxx::params ¶ms = pqxx::params{}); + void AsyncQuery(const std::string &query, const pqxx::params ¶ms, AsyncQueryCallback callback); size_t GetPendingQueryCount() const; size_t GetCompletedQueryCount() const; diff --git a/src/db/src/ItemIDRangeManager.cpp b/src/db/src/ItemIDRangeManager.cpp index 9c824f8..ba020a9 100644 --- a/src/db/src/ItemIDRangeManager.cpp +++ b/src/db/src/ItemIDRangeManager.cpp @@ -131,6 +131,8 @@ bool CItemIDRangeManager::BuildRange(DWORD dwMin, DWORD dwMax, TItemIDRangeTable return true; } } + + txn.commit(); } catch (const std::exception &e) { diff --git a/src/db/src/Marriage.cpp b/src/db/src/Marriage.cpp index d48299e..b97d8b2 100644 --- a/src/db/src/Marriage.cpp +++ b/src/db/src/Marriage.cpp @@ -68,6 +68,8 @@ namespace marriage SPDLOG_DEBUG("Marriage {}: LP:{} TM:{} ST:{} {:10}:{:16} {:10}:{:16} ", uiRow, love_point, time, is_married, pid1, name1, pid2, name2); } + + txn.commit(); } catch (const std::exception &e) { @@ -119,6 +121,8 @@ namespace marriage SPDLOG_ERROR("cannot insert marriage"); return; } + + txn.commit(); } catch (const std::exception &e) { @@ -167,6 +171,8 @@ namespace marriage SPDLOG_ERROR("cannot update marriage : PID:{} {}", dwPID1, dwPID2); return; } + + txn.commit(); } catch (const std::exception &e) { @@ -222,6 +228,8 @@ namespace marriage SPDLOG_ERROR("cannot delete marriage : PID:{} {}", dwPID1, dwPID2); return; } + + txn.commit(); } catch (const std::exception &e) { @@ -274,6 +282,8 @@ namespace marriage SPDLOG_ERROR("cannot change engage to marriage : PID:{} {}", dwPID1, dwPID2); return; } + + txn.commit(); } catch (const std::exception &e) { diff --git a/src/db/src/Monarch.cpp b/src/db/src/Monarch.cpp index 5fecbb5..1df92c6 100644 --- a/src/db/src/Monarch.cpp +++ b/src/db/src/Monarch.cpp @@ -268,6 +268,8 @@ bool CMonarch::DelMonarch(int Empire) { return false; } + + txn.commit(); } catch (const std::exception &e) { @@ -308,6 +310,8 @@ bool CMonarch::DelMonarch(const char *name) return true; } + + txn.commit(); } catch (const std::exception &e) { diff --git a/src/libsql/include/PgAsyncQuery.h b/src/libsql/include/PgAsyncQuery.h index 158c352..dae7a00 100644 --- a/src/libsql/include/PgAsyncQuery.h +++ b/src/libsql/include/PgAsyncQuery.h @@ -3,6 +3,9 @@ #include #include +#include +using AsyncQueryCallback = std::function; + class PgAsyncQuery { public: @@ -10,6 +13,14 @@ public: std::string query; pqxx::params parameters; + bool hasCallback; + AsyncQueryCallback callback; + + // Fire and Forget AsyncQuery PgAsyncQuery(int msgId, const std::string &sql, const pqxx::params ¶ms = pqxx::params{}) - : id(msgId), query(sql), parameters(params) {} -}; \ No newline at end of file + : id(msgId), query(sql), parameters(params), hasCallback(false) {} + + // Callback AsyncQuery + PgAsyncQuery(int msgId, const std::string &sql, const pqxx::params ¶ms = pqxx::params{}, AsyncQueryCallback cb) + : id(msgId), query(sql), parameters(params), hasCallback(true), callback(std::move(cb)) {} +};