#include "ASyncDBConnection.h" #include "ScopeGuard.h" #include "util.h" #include "Pgsql_PgException.h" #include "Win32Event.h" #include "WaitHandleList.h" #include #include #include #include namespace { class registerMetaTypes { public: registerMetaTypes() { qRegisterMetaType(); qRegisterMetaType(); qRegisterMetaType>(); } } registerMetaTypes_instance; class Command { public: std::string command; Pgsql::Params params; ASyncDBConnection::on_result_callback on_result; Command() = default; Command(const std::string &cmd, ASyncDBConnection::on_result_callback cb) : command(cmd), on_result(cb) {} Command(const std::string &cmd, Pgsql::Params &&p, ASyncDBConnection::on_result_callback cb) : command(cmd), params(p), on_result(cb) {} }; } class ASyncDBConnectionThread { public: using t_CommandQueue = std::queue; struct t_Command { std::mutex m_mutex; t_CommandQueue m_queue; Win32Event m_newEvent; t_Command() : m_newEvent(Win32Event::Reset::Auto, Win32Event::Initial::Clear) {} } m_commandQueue; ConnectionConfig m_config; ASyncDBConnection::State m_state = ASyncDBConnection::State::NotConnected; ASyncDBConnectionThread(ASyncDBConnection *asco); /// Is started as a seperate thread by ASyncDBConnection void run(); /// Sends a cancel request to the DB server bool cancel(); void stop(); private: ASyncDBConnection *asyncConnObject = nullptr; /// \todo Implement new method to stop the thread Win32Event m_stopEvent; Pgsql::Connection m_connection; bool terminateRequested = false; ///< is set when the thread should stop bool m_terminated = true; Pgsql::Canceller m_canceller; QElapsedTimer m_timer; bool makeConnection(); void communicate(); void doStateCallback(ASyncDBConnection::StateData state); /// Wait's for a command to come in and send's it to the server void waitForAndSendCommand(); void doNewCommand(); void waitForResult(); void processNotice(const PGresult *result); /** Function to call when after sending a command the socket is ready for reading. * * It might take several consumes before all data is read. */ bool consumeResultInput(); }; ASyncDBConnectionThread::ASyncDBConnectionThread(ASyncDBConnection *asco) : asyncConnObject(asco) , m_stopEvent(Win32Event::Reset::Manual, Win32Event::Initial::Clear) {} void ASyncDBConnectionThread::run() { m_terminated = false; SCOPE_EXIT { m_state = ASyncDBConnection::State::NotConnected; m_terminated = true; m_connection.close(); // doStateCallback(ASyncDBConnection::State::NotConnected); }; try { while (!terminateRequested) { // make or recover connection if (makeConnection()) { m_connection.setNoticeReceiver( [this](const PGresult *result) { processNotice(result); }); m_canceller = m_connection.getCancel(); // send commands and receive results communicate(); } else { // It is not possible to determine the source of the problem. // Accept for PQconnectionNeedsPassword // Pass problem to main thread and stop this thread // Main thread needs to know it has to restart connecting if it want's to. // TODO: add status functions to help main thread so it doesn't have to remember // everything reported through callbacks. break; } } doStateCallback({ ASyncDBConnection::State::NotConnected, QString("terminating") }); } catch (const std::exception &ex) { doStateCallback({ ASyncDBConnection::State::NotConnected, QString::fromUtf8(ex.what()) }); } catch (...) { doStateCallback({ ASyncDBConnection::State::NotConnected, QString::fromUtf8("Unknown error") }); } } bool ASyncDBConnectionThread::cancel() { return m_canceller.cancel(nullptr); } bool ASyncDBConnectionThread::makeConnection() { using namespace std::literals::chrono_literals; // start connecting QString conn_string = m_config.connectionString(); while (!terminateRequested) { bool ok = m_connection.connectStart(conn_string); //keywords, values); auto start = std::chrono::steady_clock::now(); if (ok && m_connection.status() != CONNECTION_BAD) { Win32Event socket_event(Win32Event::Reset::Auto, Win32Event::Initial::Clear); long fd = FD_WRITE; while (true) { // poll till complete or failed (we can get an abort command) int sock = m_connection.socket(); WSAEventSelect(sock, socket_event.handle(), fd); WaitHandleList whl; auto wait_result_socket_event = whl.add(socket_event); auto wait_result_stop = whl.add(m_stopEvent); auto nu = std::chrono::steady_clock::now(); std::chrono::duration diff = -(nu-start); diff += 30s; DWORD timeout = diff.count(); DWORD res = MsgWaitForMultipleObjectsEx( whl.count(), // _In_ DWORD nCount, whl, // _In_ const HANDLE *pHandles, timeout, // _In_ DWORD dwMilliseconds, 0, // _In_ DWORD dwWakeMask, 0 // _In_ DWORD dwFlags ); if (res == wait_result_socket_event) { auto poll_state = m_connection.connectPoll(); if (poll_state == PGRES_POLLING_OK) { qDebug() << "ASyncDBConnection Connected"; // if connected return true doStateCallback({ ASyncDBConnection::State::Connected, "Success" }); return true; } else if (poll_state == PGRES_POLLING_FAILED) { doStateCallback({ ASyncDBConnection::State::NotConnected, "Failed to connect" }); return false; } else if (poll_state == PGRES_POLLING_READING) { doStateCallback({ ASyncDBConnection::State::Connecting, "Negotiating" }); fd = FD_READ; } else if (poll_state == PGRES_POLLING_WRITING) { doStateCallback({ ASyncDBConnection::State::Connecting, "Negotiating" }); fd = FD_WRITE; } } else if (res == wait_result_stop) { m_connection.close(); return false; } } // end while (true) } } return false; } void ASyncDBConnectionThread::communicate() { while (!terminateRequested) { // wait for something to do: // - command to send to server // - wait for results and (notifies can also come in) // - pass each result on to the completion routine // - notify comming in from the server // - pass to notify callback // - connection raises an error // - return // - stop signal // - return if (m_state == ASyncDBConnection::State::Connected) { waitForAndSendCommand(); } else if (m_state == ASyncDBConnection::State::QuerySend || m_state == ASyncDBConnection::State::CancelSend) { // Wait for result, even after a cancel we should wait, for all results // New command's are not excepted when one has been send waitForResult(); } } } void ASyncDBConnectionThread::stop() { terminateRequested = true; m_stopEvent.set(); } void ASyncDBConnectionThread::doStateCallback(ASyncDBConnection::StateData state) { m_state = state.State; Q_EMIT asyncConnObject->onStateChanged(state); } void ASyncDBConnectionThread::waitForAndSendCommand() { WaitHandleList whl; auto wait_result_new_command = whl.add(m_commandQueue.m_newEvent); auto wait_result_stop = whl.add(m_stopEvent); DWORD res = MsgWaitForMultipleObjectsEx( whl.count(), // _In_ DWORD nCount, whl, // _In_ const HANDLE *pHandles, INFINITE, // _In_ DWORD dwMilliseconds, 0, // _In_ DWORD dwWakeMask, 0 // _In_ DWORD dwFlags ); if (res == wait_result_new_command) { doNewCommand(); } // Note if it was stop we can just return and function // above will stop looping because terminateRequested has been set too by stop } void ASyncDBConnectionThread::doNewCommand() { // get command from top of queue (but leave it in the queue, we need the callback) if (! m_commandQueue.m_queue.empty()) { const Command &command = m_commandQueue.m_queue.front(); bool query_send = false; if (command.params.empty()) m_connection.sendQuery(command.command.c_str()); else m_connection.sendQueryParams(command.command.c_str(), command.params); m_timer.start(); doStateCallback(ASyncDBConnection::State::QuerySend); } } bool ASyncDBConnectionThread::consumeResultInput() { bool finished = false; if (m_connection.consumeInput()) { while ( ! finished && ! m_connection.isBusy()) { auto res(m_connection.getResultNoThrow()); { qint64 ms = m_timer.restart(); std::lock_guard lg(m_commandQueue.m_mutex); m_commandQueue.m_queue.front().on_result(res, ms); if (res == nullptr) { m_timer.invalidate(); m_commandQueue.m_queue.pop(); doStateCallback(ASyncDBConnection::State::Connected); finished = true; } } } // else is still waiting for more data } else { // error during consume auto error_msg = m_connection.getErrorMessage(); qDebug() << "error while communicating with server " << QString::fromStdString(error_msg); doStateCallback({ASyncDBConnection::State::NotConnected, QString::fromStdString(error_msg)}); finished = true; stop(); } return finished; } void ASyncDBConnectionThread::waitForResult() { SOCKET sock = static_cast(m_connection.socket()); Win32Event socket_event(Win32Event::Reset::Manual, Win32Event::Initial::Clear); long fd = FD_READ | FD_CLOSE; bool finished = false; while ( ! finished) { WSAEventSelect(sock, socket_event.handle(), fd); WaitHandleList whl; auto wait_result_socket = whl.add(socket_event); auto wait_result_stop = whl.add(m_stopEvent); DWORD res = MsgWaitForMultipleObjectsEx( whl.count(), // _In_ DWORD nCount, whl, // _In_ const HANDLE *pHandles, INFINITE, // _In_ DWORD dwMilliseconds, 0, // _In_ DWORD dwWakeMask, 0 // _In_ DWORD dwFlags ); if (res == wait_result_socket) { WSANETWORKEVENTS net_events; WSAEnumNetworkEvents(sock, socket_event.handle(), &net_events); if (net_events.lNetworkEvents & FD_READ) { if (consumeResultInput()) { finished = true; } } } if (res == wait_result_stop) { // Send cancel, close connection and terminate thread cancel(); doStateCallback(ASyncDBConnection::State::Terminating); finished = true; } } // end while } void ASyncDBConnectionThread::processNotice(const PGresult *result) { Pgsql::ErrorDetails details = Pgsql::ErrorDetails::createErrorDetailsFromPGresult(result); Q_EMIT asyncConnObject->onNotice(details); } ASyncDBConnection::ASyncDBConnection() : m_threadData(std::make_unique(this)) {} ASyncDBConnection::~ASyncDBConnection() { closeConnection(); } ASyncDBConnection::State ASyncDBConnection::state() const { return m_threadData->m_state; } void ASyncDBConnection::setupConnection(const ConnectionConfig &config) { if (m_thread.joinable()) { m_threadData->stop(); m_thread.join(); } m_threadData.reset(new ASyncDBConnectionThread(this)); m_threadData->m_config = config; m_thread = std::thread([this] () { m_threadData->run(); }); } void ASyncDBConnection::doStateCallback(State state) { m_state = state; if (state == State::Connected) { m_canceller = m_connection.getCancel(); m_connection.setNoticeReceiver( [this](const PGresult *result) { processNotice(result); }); } Q_EMIT onStateChanged(state); } void ASyncDBConnection::closeConnection() { m_threadData->stop(); if (m_thread.joinable()) { m_thread.join(); } } bool ASyncDBConnection::send(const std::string &command, on_result_callback on_result) { if (command.empty()) return false; { std::lock_guard lg(m_threadData->m_commandQueue.m_mutex); m_threadData->m_commandQueue.m_queue.emplace(command, on_result); m_threadData->m_commandQueue.m_newEvent.set(); } return true; } bool ASyncDBConnection::send(const std::string &command, Pgsql::Params params, on_result_callback on_result) { { std::lock_guard lg(m_threadData->m_commandQueue.m_mutex); m_threadData->m_commandQueue.m_queue.emplace(command, std::move(params), on_result); m_threadData->m_commandQueue.m_newEvent.set(); } return true; } bool ASyncDBConnection::cancel() { return m_threadData->cancel(); } void ASyncDBConnection::processNotice(const PGresult *result) { Pgsql::ErrorDetails details = Pgsql::ErrorDetails::createErrorDetailsFromPGresult(result); Q_EMIT onNotice(details); }