#include "asyncdbconnection.h" #include "waithandlelist.h" #include ASyncDBConnection::ASyncDBConnection() { } void ASyncDBConnection::setupConnection(const std::string &connstring) { m_threadData.m_initString = connstring; m_thread = std::thread([this] () { m_threadData.run(); }); } void ASyncDBConnection::closeConnection() { m_threadData.stop(); m_thread.join(); } void ASyncDBConnection::setStateCallback(on_state_callback state_callback) { std::lock_guard lg(m_threadData.m_stateCallbackMutex); m_threadData.m_stateCallback = state_callback; } ASyncDBConnection::Thread::Thread() : m_stopEvent(Win32Event::Reset::Manual, Win32Event::Initial::Clear) {} void ASyncDBConnection::Thread::run() { while (!terminateRequested) { // make or recover connection if (makeConnection()) { // send commands and receive results communicate(); } } // close connection } bool ASyncDBConnection::Thread::makeConnection() { using namespace std::chrono_literals; while (!terminateRequested) { // start connecting bool ok = m_connection.connectStart(m_initString + " client_encoding=utf8"); auto start = std::chrono::steady_clock::now(); if (ok && m_connection.status() != CONNECTION_BAD) { int sock = m_connection.socket(); Win32Event socket_event(Win32Event::Reset::Auto, Win32Event::Initial::Clear); long fd = FD_WRITE; while (true) { // poll till complete or failed (we can get an abort command) WSAEventSelect(sock, socket_event.handle(), fd); WaitHandleList whl; auto wait_result_socket_event = whl.add(socket_event); auto wait_result_stop = whl.add(m_stopEvent); auto nu = std::chrono::steady_clock::now(); std::chrono::duration diff = -(nu-start); diff += 30s; DWORD timeout = diff.count(); DWORD res = MsgWaitForMultipleObjectsEx( whl.count(), // _In_ DWORD nCount, whl, // _In_ const HANDLE *pHandles, timeout, // _In_ DWORD dwMilliseconds, 0, // _In_ DWORD dwWakeMask, 0 // _In_ DWORD dwFlags ); if (res == wait_result_socket_event) { auto poll_state = m_connection.connectPoll(); if (poll_state == PGRES_POLLING_OK) { // if connected return true doStateCallback(State::Connected); return true; } else if (poll_state == PGRES_POLLING_FAILED) { doStateCallback(State::NotConnected); return false; } else if (poll_state == PGRES_POLLING_READING) { doStateCallback(State::Connecting); fd = FD_READ; } else if (poll_state == PGRES_POLLING_WRITING) { doStateCallback(State::Connecting); fd = FD_WRITE; } } else if (res == wait_result_stop) { } } // end while (true) } } return false; } void ASyncDBConnection::Thread::communicate() { while (!terminateRequested) { // wait for something to do: // - command to send to server // - wait for results and (notifies can also come in) // - pass each result on to the completion routine // - notify comming in from the server // - pass to notify callback // - connection raises an error // - return // - stop signal // - return // WSAEventSelect(sock, socket_event.handle(), fd); WaitHandleList whl; // auto wait_result_socket_event = whl.add(socket_event); auto wait_result_stop = whl.add(m_stopEvent); DWORD res = MsgWaitForMultipleObjectsEx( whl.count(), // _In_ DWORD nCount, whl, // _In_ const HANDLE *pHandles, INFINITE, // _In_ DWORD dwMilliseconds, 0, // _In_ DWORD dwWakeMask, 0 // _In_ DWORD dwFlags ); } } void ASyncDBConnection::Thread::stop() { terminateRequested = true; m_stopEvent.set(); } void ASyncDBConnection::Thread::doStateCallback(State state) { std::lock_guard lg(m_stateCallbackMutex); if (m_stateCallback) { m_stateCallback(state); } }