#include "ASyncDBConnection.h" #include "ScopeGuard.h" #include "util.h" #include "Pgsql_PgException.h" #include "Win32Event.h" #include "WaitHandleList.h" #include #include #include namespace { class registerMetaTypes { public: registerMetaTypes() { qRegisterMetaType(); qRegisterMetaType(); qRegisterMetaType>(); } } registerMetaTypes_instance; class Command { public: std::string command; Pgsql::Params params; ASyncDBConnection::on_result_callback on_result; Command() = default; Command(const std::string &cmd, ASyncDBConnection::on_result_callback cb) : command(cmd), on_result(cb) {} Command(const std::string &cmd, Pgsql::Params &&p, ASyncDBConnection::on_result_callback cb) : command(cmd), params(p), on_result(cb) {} }; } class ASyncDBConnectionThread { public: using t_CommandQueue = std::queue; // struct { // std::mutex m_mutex; // on_state_callback m_func; // } m_stateCallback; // struct { // std::mutex m_mutex; // on_notice_callback m_func; // } m_noticeCallback; struct t_Command { std::mutex m_mutex; t_CommandQueue m_queue; //std::condition_variable m_newEvent; Win32Event m_newEvent; t_Command() : m_newEvent(Win32Event::Reset::Auto, Win32Event::Initial::Clear) {} } m_commandQueue; // std::string m_initString; ConnectionConfig m_config; ASyncDBConnection::State m_state = ASyncDBConnection::State::NotConnected; ASyncDBConnectionThread(ASyncDBConnection *asco); /// Is started as a seperate thread by ASyncDBConnection void run(); /// Sends a cancel request to the DB server bool cancel(); void stop(); private: ASyncDBConnection *asyncConnObject = nullptr; /// \todo Implement new method to stop the thread Win32Event m_stopEvent; Pgsql::Connection m_connection; bool terminateRequested = false; ///< is set when the thread should stop bool m_terminated = true; Pgsql::Canceller m_canceller; QElapsedTimer m_timer; bool makeConnection(); void communicate(); void doStateCallback(ASyncDBConnection::State state); /// Wait's for a command to come in and send's it to the server void waitForAndSendCommand(); void doNewCommand(); void waitForResult(); void processNotice(const PGresult *result); /** Function to call when after sending a command the socket is ready for reading. * * It might take several consumes before all data is read. */ bool consumeResultInput(); }; ASyncDBConnectionThread::ASyncDBConnectionThread(ASyncDBConnection *asco) : asyncConnObject(asco) , m_stopEvent(Win32Event::Reset::Manual, Win32Event::Initial::Clear) {} void ASyncDBConnectionThread::run() { m_terminated = false; SCOPE_EXIT { m_state = ASyncDBConnection::State::NotConnected; m_terminated = true; }; while (!terminateRequested) { // make or recover connection if (makeConnection()) { m_connection.setNoticeReceiver( [this](const PGresult *result) { processNotice(result); }); m_canceller = m_connection.getCancel(); // send commands and receive results communicate(); } else { // It is not possible to determine the source of the problem. // Accept for PQconnectionNeedsPassword // Pass problem to main thread and stop this thread // Main thread needs to know it has to restart connecting if it want's to. // TODO: add status functions to help main thread so it doesn't have to remember // everything reported through callbacks. break; } } // close connection } bool ASyncDBConnectionThread::cancel() { return m_canceller.cancel(nullptr); } bool ASyncDBConnectionThread::makeConnection() { using namespace std::literals::chrono_literals; // start connecting // auto keywords = m_config.getKeywords(); // auto values = m_config.getValues(); QString conn_string = m_config.connectionString(); #if false try { m_connection.connect(conn_string); //keywords, values, 0); doStateCallback(ASyncDBConnection::State::Connected); return true; } catch (Pgsql::PgConnectionError &) { } return false; #else while (!terminateRequested) { bool ok = m_connection.connectStart(conn_string); //keywords, values); auto start = std::chrono::steady_clock::now(); if (ok && m_connection.status() != CONNECTION_BAD) { int sock = m_connection.socket(); Win32Event socket_event(Win32Event::Reset::Auto, Win32Event::Initial::Clear); long fd = FD_WRITE; while (true) { // poll till complete or failed (we can get an abort command) WSAEventSelect(sock, socket_event.handle(), fd); WaitHandleList whl; auto wait_result_socket_event = whl.add(socket_event); auto wait_result_stop = whl.add(m_stopEvent); auto nu = std::chrono::steady_clock::now(); std::chrono::duration diff = -(nu-start); diff += 30s; DWORD timeout = diff.count(); DWORD res = MsgWaitForMultipleObjectsEx( whl.count(), // _In_ DWORD nCount, whl, // _In_ const HANDLE *pHandles, timeout, // _In_ DWORD dwMilliseconds, 0, // _In_ DWORD dwWakeMask, 0 // _In_ DWORD dwFlags ); if (res == wait_result_socket_event) { auto poll_state = m_connection.connectPoll(); if (poll_state == PGRES_POLLING_OK) { // if connected return true doStateCallback(ASyncDBConnection::State::Connected); return true; } else if (poll_state == PGRES_POLLING_FAILED) { doStateCallback(ASyncDBConnection::State::NotConnected); return false; } else if (poll_state == PGRES_POLLING_READING) { doStateCallback(ASyncDBConnection::State::Connecting); fd = FD_READ; } else if (poll_state == PGRES_POLLING_WRITING) { doStateCallback(ASyncDBConnection::State::Connecting); fd = FD_WRITE; } } else if (res == wait_result_stop) { } } // end while (true) } } return false; #endif } void ASyncDBConnectionThread::communicate() { while (!terminateRequested) { // wait for something to do: // - command to send to server // - wait for results and (notifies can also come in) // - pass each result on to the completion routine // - notify comming in from the server // - pass to notify callback // - connection raises an error // - return // - stop signal // - return if (m_state == ASyncDBConnection::State::Connected) { waitForAndSendCommand(); } else if (m_state == ASyncDBConnection::State::QuerySend || m_state == ASyncDBConnection::State::CancelSend) { // Wait for result, even after a cancel we should wait, for all results // New command's are not excepted when one has been send waitForResult(); } } } void ASyncDBConnectionThread::stop() { terminateRequested = true; m_stopEvent.set(); } void ASyncDBConnectionThread::doStateCallback(ASyncDBConnection::State state) { m_state = state; // std::lock_guard lg(m_stateCallback.m_mutex); // if (m_stateCallback.m_func) { // m_stateCallback.m_func(state); // } emit asyncConnObject->onStateChanged(state); } void ASyncDBConnectionThread::waitForAndSendCommand() { #if false using namespace std::chrono_literals; // lock the data std::unique_lock lk(m_commandQueue.m_mutex); if (m_commandQueue.m_queue.empty()) { // no data wait till there is data m_commandQueue.m_newEvent.wait_for(lk, 1000ms); // can we use the predicate to reimplement the stop function???, []{return ready;}); } doNewCommand(); #else WaitHandleList whl; auto wait_result_new_command = whl.add(m_commandQueue.m_newEvent); auto wait_result_stop = whl.add(m_stopEvent); DWORD res = MsgWaitForMultipleObjectsEx( whl.count(), // _In_ DWORD nCount, whl, // _In_ const HANDLE *pHandles, INFINITE, // _In_ DWORD dwMilliseconds, 0, // _In_ DWORD dwWakeMask, 0 // _In_ DWORD dwFlags ); if (res == wait_result_new_command) { doNewCommand(); } if (res == wait_result_stop) return; #endif } void ASyncDBConnectionThread::doNewCommand() { // get command from top of queue (but leave it in the queue, we need the callback) if (! m_commandQueue.m_queue.empty()) { const Command &command = m_commandQueue.m_queue.front(); if (!command.command.empty()) { bool query_send = false; if (command.params.empty()) query_send = m_connection.sendQuery(command.command.c_str()); else query_send = m_connection.sendQueryParams(command.command.c_str(), command.params); if (query_send) { m_timer.start(); doStateCallback(ASyncDBConnection::State::QuerySend); } else { std::string error = m_connection.getErrorMessage(); // todo: need to report the error } } } } bool ASyncDBConnectionThread::consumeResultInput() { bool finished = false; if (m_connection.consumeInput()) { while ( ! finished && ! m_connection.isBusy()) { auto res(m_connection.getResult()); { qint64 ms = m_timer.restart(); std::lock_guard lg(m_commandQueue.m_mutex); m_commandQueue.m_queue.front().on_result(res, ms); if (res == nullptr) { m_timer.invalidate(); m_commandQueue.m_queue.pop(); doStateCallback(ASyncDBConnection::State::Connected); finished = true; } } } // else is still waiting for more data } else { // error during consume } return finished; } void ASyncDBConnectionThread::waitForResult() { int sock = m_connection.socket(); fd_set readfds; timeval timeout; bool finished = false; while (!finished && !terminateRequested) { FD_ZERO(&readfds); FD_SET(sock, &readfds); timeout.tv_sec = 5; timeout.tv_usec = 0; int select_result = select(sock + 1, &readfds, nullptr, nullptr, &timeout); if (select_result > 0) { if (FD_ISSET(sock, &readfds)) { if (consumeResultInput()) { finished = true; } } } } #if false Win32Event socket_event(Win32Event::Reset::Manual, Win32Event::Initial::Clear); long fd = FD_READ | FD_CLOSE; bool finished = false; while ( ! finished) { WSAEventSelect(sock, socket_event.handle(), fd); WaitHandleList whl; auto wait_result_socket = whl.add(socket_event); //auto wait_result_stop = whl.add(m_stopEvent); DWORD res = MsgWaitForMultipleObjectsEx( whl.count(), // _In_ DWORD nCount, whl, // _In_ const HANDLE *pHandles, INFINITE, // _In_ DWORD dwMilliseconds, 0, // _In_ DWORD dwWakeMask, 0 // _In_ DWORD dwFlags ); if (res == wait_result_socket) { WSANETWORKEVENTS net_events; WSAEnumNetworkEvents(sock, socket_event.handle(), &net_events); if (net_events.lNetworkEvents & FD_READ) { if (consumeResultInput()) { finished = true; } } } if (res == wait_result_stop) { // Send cancel, close connection and terminate thread cancel(); doStateCallback(State::Terminating); finished = true; } } // end while // When last result received, remove command from queue #endif } void ASyncDBConnectionThread::processNotice(const PGresult *result) { // Pgsql::Result res(result); // std::lock_guard lg(m_noticeCallback.m_mutex); // if (m_noticeCallback.m_func) { // Pgsql::ErrorDetails details = Pgsql::ErrorDetails::createErrorDetailsFromPGresult(result); // m_noticeCallback.m_func(details); // } Pgsql::ErrorDetails details = Pgsql::ErrorDetails::createErrorDetailsFromPGresult(result); emit asyncConnObject->onNotice(details); } ASyncDBConnection::ASyncDBConnection() : m_threadData(std::make_unique(this)) {} ASyncDBConnection::~ASyncDBConnection() { closeConnection(); } ASyncDBConnection::State ASyncDBConnection::state() const { return m_threadData->m_state; } void ASyncDBConnection::setupConnection(const ConnectionConfig &config) { if (m_thread.joinable()) { m_threadData->stop(); m_thread.join(); } m_threadData->m_config = config; m_thread = std::thread([this] () { m_threadData->run(); }); } void ASyncDBConnection::doStateCallback(State state) { m_state = state; if (state == State::Connected) { m_canceller = m_connection.getCancel(); m_connection.setNoticeReceiver( [this](const PGresult *result) { processNotice(result); }); } emit onStateChanged(state); } void ASyncDBConnection::closeConnection() { // doStateCallback(State::NotConnected); // TODO also send cancel??? m_threadData->stop(); if (m_thread.joinable()) { m_thread.join(); } } bool ASyncDBConnection::send(const std::string &command, on_result_callback on_result) { { std::lock_guard lg(m_threadData->m_commandQueue.m_mutex); m_threadData->m_commandQueue.m_queue.emplace(command, on_result); m_threadData->m_commandQueue.m_newEvent.set(); } return true; } bool ASyncDBConnection::send(const std::string &command, Pgsql::Params params, on_result_callback on_result) { { std::lock_guard lg(m_threadData->m_commandQueue.m_mutex); m_threadData->m_commandQueue.m_queue.emplace(command, std::move(params), on_result); m_threadData->m_commandQueue.m_newEvent.set(); } return true; } bool ASyncDBConnection::cancel() { return m_threadData->cancel(); } void ASyncDBConnection::processNotice(const PGresult *result) { Pgsql::ErrorDetails details = Pgsql::ErrorDetails::createErrorDetailsFromPGresult(result); emit onNotice(details); }