#include "asyncdbconnection.h" #include "waithandlelist.h" #include "scopeguard.h" #include ASyncDBConnection::ASyncDBConnection() { } ASyncDBConnection::State ASyncDBConnection::state() const { return m_threadData.m_state; } //void ASyncDBConnection::setupConnection(const std::string &connstring) //{ // if (m_thread.joinable()) { // m_threadData.stop(); // m_thread.join(); // } // m_threadData.m_initString = connstring; // m_thread = std::thread([this] () { m_threadData.run(); }); //} void ASyncDBConnection::setupConnection(const ConnectionConfig &config) { if (m_thread.joinable()) { m_threadData.stop(); m_thread.join(); } m_threadData.m_config = config; m_thread = std::thread([this] () { m_threadData.run(); }); } void ASyncDBConnection::closeConnection() { m_threadData.stop(); if (m_thread.joinable()) { m_thread.join(); } } bool ASyncDBConnection::send(const std::string &command, on_result_callback on_result) { { std::lock_guard lg(m_threadData.m_commandQueue.m_mutex); m_threadData.m_commandQueue.m_queue.emplace(command, on_result); m_threadData.m_commandQueue.m_newEvent.set(); } return true; } bool ASyncDBConnection::cancel() { return m_threadData.cancel(); } void ASyncDBConnection::setStateCallback(on_state_callback state_callback) { std::lock_guard lg(m_threadData.m_stateCallback.m_mutex); m_threadData.m_stateCallback.m_func = state_callback; } void ASyncDBConnection::setNoticeCallback(on_notice_callback notice_callback) { std::lock_guard lg(m_threadData.m_noticeCallback.m_mutex); m_threadData.m_noticeCallback.m_func = notice_callback; } ASyncDBConnection::Thread::Thread() : m_stopEvent(Win32Event::Reset::Manual, Win32Event::Initial::Clear) {} void ASyncDBConnection::Thread::run() { m_terminated = false; SCOPE_EXIT { m_state = State::NotConnected; m_terminated = true; }; while (!terminateRequested) { // make or recover connection if (makeConnection()) { m_connection.setNoticeReceiver( [this](const PGresult *result) { processNotice(result); }); m_canceller = m_connection.getCancel(); // send commands and receive results communicate(); } else { // It is not possible to determine the source of the problem. // Accept for PQconnectionNeedsPassword // Pass problem to main thread and stop this thread // Main thread needs to know it has to restart connecting if it want's to. // TODO: add status functions to help main thread so it doesn't have to remember // everything reported through callbacks. break; } } // close connection } bool ASyncDBConnection::Thread::cancel() { return m_canceller.cancel(nullptr); } bool ASyncDBConnection::Thread::makeConnection() { using namespace std::chrono_literals; while (!terminateRequested) { // start connecting //bool ok = m_connection.connectStart(m_initString + " client_encoding=utf8"); auto keywords = m_config.getKeywords(); auto values = m_config.getValues(); bool ok = m_connection.connectStart(keywords, values); auto start = std::chrono::steady_clock::now(); if (ok && m_connection.status() != CONNECTION_BAD) { int sock = m_connection.socket(); Win32Event socket_event(Win32Event::Reset::Auto, Win32Event::Initial::Clear); long fd = FD_WRITE; while (true) { // poll till complete or failed (we can get an abort command) WSAEventSelect(sock, socket_event.handle(), fd); WaitHandleList whl; auto wait_result_socket_event = whl.add(socket_event); auto wait_result_stop = whl.add(m_stopEvent); auto nu = std::chrono::steady_clock::now(); std::chrono::duration diff = -(nu-start); diff += 30s; DWORD timeout = diff.count(); DWORD res = MsgWaitForMultipleObjectsEx( whl.count(), // _In_ DWORD nCount, whl, // _In_ const HANDLE *pHandles, timeout, // _In_ DWORD dwMilliseconds, 0, // _In_ DWORD dwWakeMask, 0 // _In_ DWORD dwFlags ); if (res == wait_result_socket_event) { auto poll_state = m_connection.connectPoll(); if (poll_state == PGRES_POLLING_OK) { // if connected return true doStateCallback(State::Connected); return true; } else if (poll_state == PGRES_POLLING_FAILED) { doStateCallback(State::NotConnected); return false; } else if (poll_state == PGRES_POLLING_READING) { doStateCallback(State::Connecting); fd = FD_READ; } else if (poll_state == PGRES_POLLING_WRITING) { doStateCallback(State::Connecting); fd = FD_WRITE; } } else if (res == wait_result_stop) { } } // end while (true) } } return false; } void ASyncDBConnection::Thread::communicate() { while (!terminateRequested) { // wait for something to do: // - command to send to server // - wait for results and (notifies can also come in) // - pass each result on to the completion routine // - notify comming in from the server // - pass to notify callback // - connection raises an error // - return // - stop signal // - return if (m_state == State::Connected) { waitForAndSendCommand(); } else if (m_state == State::QuerySend || m_state == State::CancelSend) { // Wait for result, even after a cancel we should wait, for all results // New command's are not excepted when one has been send waitForResult(); } } } void ASyncDBConnection::Thread::stop() { terminateRequested = true; m_stopEvent.set(); } void ASyncDBConnection::Thread::doStateCallback(State state) { m_state = state; std::lock_guard lg(m_stateCallback.m_mutex); if (m_stateCallback.m_func) { m_stateCallback.m_func(state); } } void ASyncDBConnection::Thread::waitForAndSendCommand() { WaitHandleList whl; auto wait_result_new_command = whl.add(m_commandQueue.m_newEvent); auto wait_result_stop = whl.add(m_stopEvent); DWORD res = MsgWaitForMultipleObjectsEx( whl.count(), // _In_ DWORD nCount, whl, // _In_ const HANDLE *pHandles, INFINITE, // _In_ DWORD dwMilliseconds, 0, // _In_ DWORD dwWakeMask, 0 // _In_ DWORD dwFlags ); if (res == wait_result_new_command) { doNewCommand(); } // if (res == wait_result_stop) return; } void ASyncDBConnection::Thread::doNewCommand() { // todo: send command // get command from top of queue (but leave it in the queue, we need the callback) std::string command; { std::lock_guard lg(m_commandQueue.m_mutex); if (! m_commandQueue.m_queue.empty()) { command = m_commandQueue.m_queue.front().command; } } if (!command.empty() && m_connection.sendQuery(command)) { m_timer.start(); doStateCallback(State::QuerySend); } else { std::string error = m_connection.getErrorMessage(); // todo: need to report the error } } void ASyncDBConnection::Thread::waitForResult() { int sock = m_connection.socket(); Win32Event socket_event(Win32Event::Reset::Manual, Win32Event::Initial::Clear); long fd = FD_READ | FD_CLOSE; bool finished = false; while ( ! finished) { WSAEventSelect(sock, socket_event.handle(), fd); WaitHandleList whl; auto wait_result_socket = whl.add(socket_event); auto wait_result_stop = whl.add(m_stopEvent); DWORD res = MsgWaitForMultipleObjectsEx( whl.count(), // _In_ DWORD nCount, whl, // _In_ const HANDLE *pHandles, INFINITE, // _In_ DWORD dwMilliseconds, 0, // _In_ DWORD dwWakeMask, 0 // _In_ DWORD dwFlags ); if (res == wait_result_socket) { WSANETWORKEVENTS net_events; WSAEnumNetworkEvents(sock, socket_event.handle(), &net_events); if (net_events.lNetworkEvents & FD_READ) { if (m_connection.consumeInput()) { while ( ! finished && ! m_connection.isBusy()) { auto res(m_connection.getResult()); { qint64 ms = m_timer.restart(); std::lock_guard lg(m_commandQueue.m_mutex); m_commandQueue.m_queue.front().on_result(res, ms); if (res == nullptr) { m_timer.invalidate(); m_commandQueue.m_queue.pop(); doStateCallback(State::Connected); finished = true; } } } // else is still waiting for more data } else { // error during consume } } } if (res == wait_result_stop) { // Send cancel, close connection and terminate thread cancel(); doStateCallback(State::Terminating); finished = true; } } // end while // When last result received, remove command from queue } void ASyncDBConnection::Thread::processNotice(const PGresult *result) { // Pgsql::Result res(result); std::lock_guard lg(m_noticeCallback.m_mutex); if (m_noticeCallback.m_func) { Pgsql::ErrorDetails details = Pgsql::ErrorDetails::createErrorDetailsFromPGresult(result); m_noticeCallback.m_func(details); } }