From 6bb5525d5ee4b2138397d8a5f9384f16680af36a Mon Sep 17 00:00:00 2001 From: eelke Date: Wed, 6 Nov 2019 20:03:27 +0100 Subject: [PATCH] Switched away from boost::asio as it doesn't play well with libpq --- common.pri | 2 +- pglab/CrudModel.cpp | 3 +- pglab/GlobalIoService.cpp | 7 - pglab/GlobalIoService.h | 6 - pglab/QueryTool.cpp | 3 +- pglab/main.cpp | 7 - pglab/pglab.pro | 2 - pglablib/ASyncDBConnection.cpp | 559 ++++++++++++++++++++++++++------- pglablib/ASyncDBConnection.h | 13 +- pglablib/WaitHandleList.cpp | 31 ++ pglablib/WaitHandleList.h | 26 ++ pglablib/Win32Event.h | 47 +++ pglablib/pglablib.pro | 3 + 13 files changed, 566 insertions(+), 143 deletions(-) delete mode 100644 pglab/GlobalIoService.cpp delete mode 100644 pglab/GlobalIoService.h create mode 100644 pglablib/WaitHandleList.cpp create mode 100644 pglablib/WaitHandleList.h create mode 100644 pglablib/Win32Event.h diff --git a/common.pri b/common.pri index 9513af1..e95c368 100644 --- a/common.pri +++ b/common.pri @@ -2,7 +2,7 @@ error( "Use local.pri.sample to create your own local.pri" ) } -LIBS += -lws2_32 -llibpq +LIBS += -lUser32 -lws2_32 -llibpq CONFIG += c++17 QMAKE_CXXFLAGS += /std:c++17 diff --git a/pglab/CrudModel.cpp b/pglab/CrudModel.cpp index 3764249..bc1efe0 100644 --- a/pglab/CrudModel.cpp +++ b/pglab/CrudModel.cpp @@ -4,7 +4,6 @@ #include "catalog/PgAttribute.h" #include "catalog/PgAttributeContainer.h" #include "catalog/PgConstraintContainer.h" -#include "GlobalIoService.h" #include "SqlFormattingUtils.h" #include "Pgsql_oids.h" #include @@ -19,7 +18,7 @@ CrudModel::CrudModel(QObject *parent) : QAbstractTableModel(parent) - , m_dbConn(*getGlobalAsioIoService()) +, m_dbConn() { qDebug("CrudModel created"); connect(&m_dbConn, &ASyncDBConnection::onStateChanged, this, &CrudModel::connectionStateChanged); diff --git a/pglab/GlobalIoService.cpp b/pglab/GlobalIoService.cpp deleted file mode 100644 index 99ddd2b..0000000 --- a/pglab/GlobalIoService.cpp +++ /dev/null @@ -1,7 +0,0 @@ -#include "GlobalIoService.h" - -std::shared_ptr getGlobalAsioIoService() -{ - static auto ios = std::make_shared(); - return ios; -} diff --git a/pglab/GlobalIoService.h b/pglab/GlobalIoService.h deleted file mode 100644 index 5e92327..0000000 --- a/pglab/GlobalIoService.h +++ /dev/null @@ -1,6 +0,0 @@ -#pragma once - -#include -#include - -std::shared_ptr getGlobalAsioIoService(); diff --git a/pglab/QueryTool.cpp b/pglab/QueryTool.cpp index a021a92..75eab46 100644 --- a/pglab/QueryTool.cpp +++ b/pglab/QueryTool.cpp @@ -18,7 +18,6 @@ #include "catalog/PgDatabaseCatalog.h" #include "QueryParamListController.h" #include "util.h" -#include "GlobalIoService.h" #include "UserConfiguration.h" #include "IDatabaseWindow.h" @@ -27,7 +26,7 @@ QueryTool::QueryTool(IDatabaseWindow *context, QWidget *parent) : QWidget(parent) , m_context(context) , ui(new Ui::QueryTab) - , m_dbConnection(*getGlobalAsioIoService()) + , m_dbConnection() { ui->setupUi(this); diff --git a/pglab/main.cpp b/pglab/main.cpp index 38482a6..1796186 100644 --- a/pglab/main.cpp +++ b/pglab/main.cpp @@ -4,7 +4,6 @@ # include #endif #include -#include "GlobalIoService.h" int main(int argc, char *argv[]) { @@ -29,19 +28,13 @@ int main(int argc, char *argv[]) QCoreApplication::setOrganizationDomain("eelkeklein.nl"); QCoreApplication::setApplicationName("pglab"); - std::thread asio_service_thread; int result = -1; { - auto ios = getGlobalAsioIoService(); - boost::asio::io_service::work work(*ios); // Prevent service from running out of work so run doesn't return - asio_service_thread = std::thread([ios](){ ios->run(); }); - // make sure the io_service is stopped before we wait on the future auto master_controller = std::make_unique(); master_controller->init(); result = a.exec(); } - asio_service_thread.join(); #ifdef WIN32 WSACleanup(); #endif diff --git a/pglab/pglab.pro b/pglab/pglab.pro index ed2bb43..751505d 100644 --- a/pglab/pglab.pro +++ b/pglab/pglab.pro @@ -44,7 +44,6 @@ SOURCES += main.cpp\ DatabasesTableModel.cpp \ RolesTableModel.cpp \ ProcessStdioWidget.cpp \ - GlobalIoService.cpp \ ResultTableModelUtil.cpp \ BaseTableModel.cpp \ QueryParamListController.cpp \ @@ -111,7 +110,6 @@ HEADERS += \ DatabasesTableModel.h \ RolesTableModel.h \ ProcessStdioWidget.h \ - GlobalIoService.h \ ResultTableModelUtil.h \ BaseTableModel.h \ QueryParamListController.h \ diff --git a/pglablib/ASyncDBConnection.cpp b/pglablib/ASyncDBConnection.cpp index a33d53f..3cc675a 100644 --- a/pglablib/ASyncDBConnection.cpp +++ b/pglablib/ASyncDBConnection.cpp @@ -1,9 +1,12 @@ #include "ASyncDBConnection.h" #include "ScopeGuard.h" #include "util.h" +#include "Pgsql_PgException.h" +#include "Win32Event.h" +#include "WaitHandleList.h" #include - -using namespace boost::asio; +#include +#include namespace { @@ -17,71 +20,448 @@ namespace { } } registerMetaTypes_instance; - + class Command { + public: + std::string command; + Pgsql::Params params; + ASyncDBConnection::on_result_callback on_result; + + Command() = default; + Command(const std::string &cmd, ASyncDBConnection::on_result_callback cb) + : command(cmd), on_result(cb) + {} + Command(const std::string &cmd, Pgsql::Params &&p, ASyncDBConnection::on_result_callback cb) + : command(cmd), params(p), on_result(cb) + {} + }; + } -ASyncDBConnection::ASyncDBConnection(boost::asio::io_service &ios) - : m_asioSock(ios) +class ASyncDBConnectionThread { +public: + using t_CommandQueue = std::queue; +// struct { +// std::mutex m_mutex; +// on_state_callback m_func; +// } m_stateCallback; +// struct { +// std::mutex m_mutex; +// on_notice_callback m_func; +// } m_noticeCallback; + + struct t_Command { + std::mutex m_mutex; + t_CommandQueue m_queue; + //std::condition_variable m_newEvent; + Win32Event m_newEvent; + t_Command() + : m_newEvent(Win32Event::Reset::Auto, Win32Event::Initial::Clear) + {} + } m_commandQueue; + +// std::string m_initString; + ConnectionConfig m_config; + ASyncDBConnection::State m_state = ASyncDBConnection::State::NotConnected; + + ASyncDBConnectionThread(ASyncDBConnection *asco); + + /// Is started as a seperate thread by ASyncDBConnection + void run(); + + /// Sends a cancel request to the DB server + bool cancel(); + + void stop(); + +private: + ASyncDBConnection *asyncConnObject = nullptr; + + /// \todo Implement new method to stop the thread + Win32Event m_stopEvent; + Pgsql::Connection m_connection; + bool terminateRequested = false; ///< is set when the thread should stop + bool m_terminated = true; + Pgsql::Canceller m_canceller; + QElapsedTimer m_timer; + + + bool makeConnection(); + void communicate(); + + void doStateCallback(ASyncDBConnection::State state); + /// Wait's for a command to come in and send's it to the server + void waitForAndSendCommand(); + void doNewCommand(); + void waitForResult(); + + + void processNotice(const PGresult *result); + /** Function to call when after sending a command the socket is ready for reading. + * + * It might take several consumes before all data is read. + */ + bool consumeResultInput(); + +}; + +ASyncDBConnectionThread::ASyncDBConnectionThread(ASyncDBConnection *asco) + : asyncConnObject(asco) + , m_stopEvent(Win32Event::Reset::Manual, Win32Event::Initial::Clear) {} -ASyncDBConnection::~ASyncDBConnection() = default; +void ASyncDBConnectionThread::run() +{ + m_terminated = false; + SCOPE_EXIT { + m_state = ASyncDBConnection::State::NotConnected; + m_terminated = true; + }; + while (!terminateRequested) { + + // make or recover connection + if (makeConnection()) { + m_connection.setNoticeReceiver( + [this](const PGresult *result) { processNotice(result); }); + m_canceller = m_connection.getCancel(); + + + // send commands and receive results + communicate(); + } + else { + // It is not possible to determine the source of the problem. + // Accept for PQconnectionNeedsPassword + + // Pass problem to main thread and stop this thread + + // Main thread needs to know it has to restart connecting if it want's to. + // TODO: add status functions to help main thread so it doesn't have to remember + // everything reported through callbacks. + + break; + } + } + // close connection + +} + +bool ASyncDBConnectionThread::cancel() +{ + return m_canceller.cancel(nullptr); +} + +bool ASyncDBConnectionThread::makeConnection() +{ + using namespace std::literals::chrono_literals; + + // start connecting +// auto keywords = m_config.getKeywords(); +// auto values = m_config.getValues(); + QString conn_string = m_config.connectionString(); +#if false + try { + m_connection.connect(conn_string); //keywords, values, 0); + doStateCallback(ASyncDBConnection::State::Connected); + return true; + } + catch (Pgsql::PgConnectionError &) { + } + return false; +#else + while (!terminateRequested) { + + bool ok = m_connection.connectStart(conn_string); //keywords, values); + auto start = std::chrono::steady_clock::now(); + if (ok && m_connection.status() != CONNECTION_BAD) { + int sock = m_connection.socket(); + Win32Event socket_event(Win32Event::Reset::Auto, Win32Event::Initial::Clear); + + long fd = FD_WRITE; + while (true) { + // poll till complete or failed (we can get an abort command) + WSAEventSelect(sock, socket_event.handle(), fd); + WaitHandleList whl; + auto wait_result_socket_event = whl.add(socket_event); + auto wait_result_stop = whl.add(m_stopEvent); + + auto nu = std::chrono::steady_clock::now(); + std::chrono::duration diff = -(nu-start); + diff += 30s; + DWORD timeout = diff.count(); + + DWORD res = MsgWaitForMultipleObjectsEx( + whl.count(), // _In_ DWORD nCount, + whl, // _In_ const HANDLE *pHandles, + timeout, // _In_ DWORD dwMilliseconds, + 0, // _In_ DWORD dwWakeMask, + 0 // _In_ DWORD dwFlags + ); + if (res == wait_result_socket_event) { + auto poll_state = m_connection.connectPoll(); + if (poll_state == PGRES_POLLING_OK) { + // if connected return true + doStateCallback(ASyncDBConnection::State::Connected); + return true; + } + else if (poll_state == PGRES_POLLING_FAILED) { + doStateCallback(ASyncDBConnection::State::NotConnected); + return false; + } + else if (poll_state == PGRES_POLLING_READING) { + doStateCallback(ASyncDBConnection::State::Connecting); + fd = FD_READ; + } + else if (poll_state == PGRES_POLLING_WRITING) { + doStateCallback(ASyncDBConnection::State::Connecting); + fd = FD_WRITE; + } + } + else if (res == wait_result_stop) { + + } + } // end while (true) + } + } + return false; +#endif +} + +void ASyncDBConnectionThread::communicate() +{ + while (!terminateRequested) { + // wait for something to do: + // - command to send to server + // - wait for results and (notifies can also come in) + // - pass each result on to the completion routine + // - notify comming in from the server + // - pass to notify callback + // - connection raises an error + // - return + // - stop signal + // - return + + + if (m_state == ASyncDBConnection::State::Connected) { + waitForAndSendCommand(); + } + else if (m_state == ASyncDBConnection::State::QuerySend || m_state == ASyncDBConnection::State::CancelSend) { + // Wait for result, even after a cancel we should wait, for all results + // New command's are not excepted when one has been send + waitForResult(); + } + + } +} + +void ASyncDBConnectionThread::stop() +{ + terminateRequested = true; + m_stopEvent.set(); +} + +void ASyncDBConnectionThread::doStateCallback(ASyncDBConnection::State state) +{ + m_state = state; +// std::lock_guard lg(m_stateCallback.m_mutex); +// if (m_stateCallback.m_func) { +// m_stateCallback.m_func(state); +// } + emit asyncConnObject->onStateChanged(state); +} + +void ASyncDBConnectionThread::waitForAndSendCommand() +{ +#if false + using namespace std::chrono_literals; + // lock the data + std::unique_lock lk(m_commandQueue.m_mutex); + if (m_commandQueue.m_queue.empty()) { + // no data wait till there is data + m_commandQueue.m_newEvent.wait_for(lk, 1000ms); + // can we use the predicate to reimplement the stop function???, []{return ready;}); + + } + doNewCommand(); + +#else + WaitHandleList whl; + auto wait_result_new_command = whl.add(m_commandQueue.m_newEvent); + auto wait_result_stop = whl.add(m_stopEvent); + + DWORD res = MsgWaitForMultipleObjectsEx( + whl.count(), // _In_ DWORD nCount, + whl, // _In_ const HANDLE *pHandles, + INFINITE, // _In_ DWORD dwMilliseconds, + 0, // _In_ DWORD dwWakeMask, + 0 // _In_ DWORD dwFlags + ); + if (res == wait_result_new_command) { + doNewCommand(); + } + if (res == wait_result_stop) + return; +#endif +} + +void ASyncDBConnectionThread::doNewCommand() +{ + // get command from top of queue (but leave it in the queue, we need the callback) + if (! m_commandQueue.m_queue.empty()) { + const Command &command = m_commandQueue.m_queue.front(); + if (!command.command.empty()) { + bool query_send = false; + if (command.params.empty()) + query_send = m_connection.sendQuery(command.command.c_str()); + else + query_send = m_connection.sendQueryParams(command.command.c_str(), command.params); + + if (query_send) { + m_timer.start(); + doStateCallback(ASyncDBConnection::State::QuerySend); + } + else { + std::string error = m_connection.getErrorMessage(); + // todo: need to report the error + } + } + } + +} + +bool ASyncDBConnectionThread::consumeResultInput() +{ + bool finished = false; + if (m_connection.consumeInput()) { + while ( ! finished && ! m_connection.isBusy()) { + auto res(m_connection.getResult()); + { + qint64 ms = m_timer.restart(); + std::lock_guard lg(m_commandQueue.m_mutex); + m_commandQueue.m_queue.front().on_result(res, ms); + if (res == nullptr) { + m_timer.invalidate(); + m_commandQueue.m_queue.pop(); + doStateCallback(ASyncDBConnection::State::Connected); + finished = true; + } + } + + } + // else is still waiting for more data + + } + else { + // error during consume + + } + return finished; +} + +void ASyncDBConnectionThread::waitForResult() +{ + int sock = m_connection.socket(); + + fd_set readfds; + timeval timeout; + bool finished = false; + while (!finished && !terminateRequested) { + FD_ZERO(&readfds); + FD_SET(sock, &readfds); + + timeout.tv_sec = 5; + timeout.tv_usec = 0; + + int select_result = select(sock + 1, &readfds, nullptr, nullptr, &timeout); + if (select_result > 0) { + if (FD_ISSET(sock, &readfds)) { + if (consumeResultInput()) { + finished = true; + } + } + } + + } +#if false + Win32Event socket_event(Win32Event::Reset::Manual, Win32Event::Initial::Clear); + + long fd = FD_READ | FD_CLOSE; + + bool finished = false; + while ( ! finished) { + WSAEventSelect(sock, socket_event.handle(), fd); + + WaitHandleList whl; + auto wait_result_socket = whl.add(socket_event); + //auto wait_result_stop = whl.add(m_stopEvent); + + DWORD res = MsgWaitForMultipleObjectsEx( + whl.count(), // _In_ DWORD nCount, + whl, // _In_ const HANDLE *pHandles, + INFINITE, // _In_ DWORD dwMilliseconds, + 0, // _In_ DWORD dwWakeMask, + 0 // _In_ DWORD dwFlags + ); + if (res == wait_result_socket) { + WSANETWORKEVENTS net_events; + WSAEnumNetworkEvents(sock, socket_event.handle(), &net_events); + + if (net_events.lNetworkEvents & FD_READ) { + if (consumeResultInput()) { + finished = true; + } + } + } + if (res == wait_result_stop) { + // Send cancel, close connection and terminate thread + cancel(); + doStateCallback(State::Terminating); + finished = true; + } + } // end while + // When last result received, remove command from queue +#endif +} + +void ASyncDBConnectionThread::processNotice(const PGresult *result) +{ +// Pgsql::Result res(result); +// std::lock_guard lg(m_noticeCallback.m_mutex); +// if (m_noticeCallback.m_func) { +// Pgsql::ErrorDetails details = Pgsql::ErrorDetails::createErrorDetailsFromPGresult(result); +// m_noticeCallback.m_func(details); +// } + Pgsql::ErrorDetails details = Pgsql::ErrorDetails::createErrorDetailsFromPGresult(result); + emit asyncConnObject->onNotice(details); +} + + + + +ASyncDBConnection::ASyncDBConnection() + : m_threadData(std::make_unique(this)) +{} + +ASyncDBConnection::~ASyncDBConnection() +{ + closeConnection(); +} ASyncDBConnection::State ASyncDBConnection::state() const { - return m_state; + return m_threadData->m_state; } void ASyncDBConnection::setupConnection(const ConnectionConfig &config) { - m_config = config; -// auto keywords = m_config.getKeywords(); -// auto values = m_config.getValues(); - QString conn_string = config.connectionString(); - bool ok = m_connection.connectStart(conn_string.toStdString().c_str()); - // auto start = std::chrono::steady_clock::now(); - if (ok && m_connection.status() != CONNECTION_BAD) { - auto sock_handle = m_connection.socket(); - - m_asioSock.assign(ip::tcp::v4(), sock_handle); - m_asioSock.non_blocking(true); - - m_asioSock.async_write_some(null_buffers(), - [this] (boost::system::error_code ec, std::size_t s) - { async_connect_handler(ec, s); } - ); + if (m_thread.joinable()) { + m_threadData->stop(); + m_thread.join(); } -} + m_threadData->m_config = config; + m_thread = std::thread([this] () { m_threadData->run(); }); -void ASyncDBConnection::async_connect_handler(boost::system::error_code ec, std::size_t /*s*/) -{ - // boost::asio::error::operation_aborted - if (ec == boost::system::errc::success) { - auto poll_state = m_connection.connectPoll(); - if (poll_state == PGRES_POLLING_OK) { - // if connected return true - doStateCallback(State::Connected); - } - else if (poll_state == PGRES_POLLING_FAILED) { - doStateCallback(State::NotConnected); - } - else if (poll_state == PGRES_POLLING_READING) { - doStateCallback(State::Connecting); - m_asioSock.async_read_some(null_buffers(), - [this] (boost::system::error_code ec, std::size_t s) - { async_connect_handler(ec, s); } - ); - } - else if (poll_state == PGRES_POLLING_WRITING) { - doStateCallback(State::Connecting); - m_asioSock.async_write_some(null_buffers(), - [this] (boost::system::error_code ec, std::size_t s) - { async_connect_handler(ec, s); } - ); - } - } } - void ASyncDBConnection::doStateCallback(State state) { m_state = state; @@ -96,79 +476,38 @@ void ASyncDBConnection::doStateCallback(State state) void ASyncDBConnection::closeConnection() { - // SHould this be async too???? - if (m_state == State::QuerySend) { - m_canceller.cancel(nullptr); - } - if (m_state != State::NotConnected) { - // Do not really want to close it before libpq is finished with it - // However explicitly is the destroctor doing the right thing? - //m_asioSock.close(); - m_connection.close(); +// doStateCallback(State::NotConnected); + // TODO also send cancel??? + m_threadData->stop(); + if (m_thread.joinable()) { + m_thread.join(); } - doStateCallback(State::NotConnected); + } bool ASyncDBConnection::send(const std::string &command, on_result_callback on_result) { - m_connection.sendQuery(command); - m_timer.start(); - doStateCallback(State::QuerySend); - m_asioSock.async_read_some(null_buffers(), - [this, on_result] (boost::system::error_code ec, std::size_t s) - { async_query_handler(ec, s, on_result); } - ); + { + std::lock_guard lg(m_threadData->m_commandQueue.m_mutex); + m_threadData->m_commandQueue.m_queue.emplace(command, on_result); + m_threadData->m_commandQueue.m_newEvent.set(); + } return true; } bool ASyncDBConnection::send(const std::string &command, Pgsql::Params params, on_result_callback on_result) { - m_connection.sendQueryParams(command.c_str(), params); - m_timer.start(); - doStateCallback(State::QuerySend); - m_asioSock.async_read_some(null_buffers(), - [this, on_result] (boost::system::error_code ec, std::size_t s) - { async_query_handler(ec, s, on_result); } - ); + { + std::lock_guard lg(m_threadData->m_commandQueue.m_mutex); + m_threadData->m_commandQueue.m_queue.emplace(command, std::move(params), on_result); + m_threadData->m_commandQueue.m_newEvent.set(); + } return true; } -void ASyncDBConnection::async_query_handler(boost::system::error_code ec, std::size_t /*s*/, on_result_callback on_result) -{ - if (ec == boost::system::errc::success) { - bool finished = false; - if (m_connection.consumeInput()) { - while ( ! finished && ! m_connection.isBusy()) { - auto res = m_connection.getResultNoThrow(); - qint64 ms = m_timer.restart(); - on_result(res, ms); - if (res == nullptr) { - m_timer.invalidate(); - doStateCallback(State::Connected); - finished = true; - } - } - // else is still waiting for more data - } - else { - // error during consume - auto error_msg = m_connection.getErrorMessage(); - - } - //return finished; - if (!finished) { - // wait for more - m_asioSock.async_read_some(null_buffers(), - [this, on_result] (boost::system::error_code ec, std::size_t s) - { async_query_handler(ec, s, on_result); } - ); - } - } -} - bool ASyncDBConnection::cancel() { - return m_canceller.cancel(nullptr); + return m_threadData->cancel(); } void ASyncDBConnection::processNotice(const PGresult *result) diff --git a/pglablib/ASyncDBConnection.h b/pglablib/ASyncDBConnection.h index a69e8ba..fc8788e 100644 --- a/pglablib/ASyncDBConnection.h +++ b/pglablib/ASyncDBConnection.h @@ -10,9 +10,9 @@ #include "ConnectionConfig.h" #include #include -#include -#include +#include +class ASyncDBConnectionThread; /** \brief Class that handles asynchronous execution of queries. * * Queries are passed to this class with a routine to call on completion @@ -32,7 +32,7 @@ public: using on_result_callback = std::function>, qint64)>; - explicit ASyncDBConnection(boost::asio::io_service &ios); + explicit ASyncDBConnection(); ~ASyncDBConnection(); State state() const; @@ -73,17 +73,18 @@ signals: private: Pgsql::Connection m_connection; - boost::asio::ip::tcp::socket m_asioSock; + std::unique_ptr m_threadData; + std::thread m_thread; ConnectionConfig m_config; State m_state = State::NotConnected; Pgsql::Canceller m_canceller; QElapsedTimer m_timer; - void async_connect_handler(boost::system::error_code ec, std::size_t s); - void async_query_handler(boost::system::error_code ec, std::size_t s, on_result_callback on_result); void doStateCallback(State state); void processNotice(const PGresult *result); + + friend class ASyncDBConnectionThread; }; Q_DECLARE_METATYPE(ASyncDBConnection::State); diff --git a/pglablib/WaitHandleList.cpp b/pglablib/WaitHandleList.cpp new file mode 100644 index 0000000..d219610 --- /dev/null +++ b/pglablib/WaitHandleList.cpp @@ -0,0 +1,31 @@ +#include "WaitHandleList.h" +#include "win32event.h" + +WaitHandleList::WaitHandleList() = default; +WaitHandleList::~WaitHandleList() = default; + +WaitResultValue WaitHandleList::add(HANDLE h) +{ + m_waitHandles.push_back(h); + return WAIT_OBJECT_0 + static_cast(m_waitHandles.size() - 1); +} + +WaitResultValue WaitHandleList::add(Win32Event &e) +{ + return add(e.handle()); +} + +DWORD WaitHandleList::count() const +{ + return static_cast(m_waitHandles.size()); +} + +void WaitHandleList::clear() +{ + m_waitHandles.clear(); +} + +WaitHandleList::operator const HANDLE*() const +{ + return m_waitHandles.data(); +} diff --git a/pglablib/WaitHandleList.h b/pglablib/WaitHandleList.h new file mode 100644 index 0000000..376e817 --- /dev/null +++ b/pglablib/WaitHandleList.h @@ -0,0 +1,26 @@ +#ifndef WAITHANDLELIST_H +#define WAITHANDLELIST_H + +#include +#include + +class Win32Event; + +using WaitResultValue = DWORD; + +class WaitHandleList { +public: + WaitHandleList(); + ~WaitHandleList(); + + WaitResultValue add(HANDLE h); + WaitResultValue add(Win32Event &e); + DWORD count() const; + void clear(); + operator const HANDLE*() const; + +private: + std::vector m_waitHandles; +}; + +#endif // WAITHANDLELIST_H diff --git a/pglablib/Win32Event.h b/pglablib/Win32Event.h new file mode 100644 index 0000000..684d13f --- /dev/null +++ b/pglablib/Win32Event.h @@ -0,0 +1,47 @@ +#ifndef WIN32EVENT_H +#define WIN32EVENT_H + +#include + +#include +/** Simpel wrapper around a Win32 Event object. + +Mostly to make cleanup automatic.*/ +class Win32Event { +public: + enum class Reset { Auto=0, Manual=1 }; + enum class Initial { Clear=0, Set=1 }; + + Win32Event(Reset r, Initial is) + : hEvent(CreateEvent( + nullptr, // _In_opt_ LPSECURITY_ATTRIBUTES lpEventAttributes, + BOOL(r), // _In_ BOOL bManualReset, + BOOL(is), // _In_ BOOL bInitialState, + nullptr //_In_opt_ LPCTSTR lpName + )) + {} + + Win32Event(Reset r, Initial is, int sock, long net_events) + : Win32Event(r, is) + { + WSAEventSelect(sock, hEvent, net_events); + } + + ~Win32Event() + { + CloseHandle(hEvent); + } + + Win32Event(const Win32Event &) = delete; + Win32Event &operator=(const Win32Event &) = delete; + + void set() { SetEvent(hEvent); } + + void reset() { ResetEvent(hEvent); } + + HANDLE handle() { return hEvent; } +private: + HANDLE hEvent; +}; + +#endif // WIN32EVENT_H diff --git a/pglablib/pglablib.pro b/pglablib/pglablib.pro index 788bbbc..82a858a 100644 --- a/pglablib/pglablib.pro +++ b/pglablib/pglablib.pro @@ -23,6 +23,7 @@ SOURCES += \ Pglablib.cpp \ ASyncDBConnection.cpp \ ConnectionConfig.cpp \ + WaitHandleList.cpp \ catalog/PgType.cpp \ catalog/PgTypeContainer.cpp \ catalog/PgNamespace.cpp \ @@ -88,6 +89,8 @@ HEADERS += \ Pglablib.h \ ASyncDBConnection.h \ ConnectionConfig.h \ + WaitHandleList.h \ + Win32Event.h \ catalog/PgType.h \ catalog/PgTypeContainer.h \ catalog/PgNamespace.h \