#include "ASyncDBConnection.h" #include "waithandlelist.h" #include "ScopeGuard.h" #include #include using namespace boost::asio; ASyncDBConnection::ASyncDBConnection(boost::asio::io_service &ios) : m_asioSock(ios) { } ASyncDBConnection::State ASyncDBConnection::state() const { return m_state; } void ASyncDBConnection::setupConnection(const ConnectionConfig &config) { // if (m_thread.joinable()) { // m_threadData.stop(); // m_thread.join(); // } // m_threadData.m_config = config; // m_thread = std::thread([this] () { m_threadData.run(); }); m_config = config; auto keywords = m_config.getKeywords(); auto values = m_config.getValues(); bool ok = m_connection.connectStart(keywords, values); // auto start = std::chrono::steady_clock::now(); if (ok && m_connection.status() != CONNECTION_BAD) { auto sock_handle = m_connection.socket(); m_asioSock.assign(ip::tcp::v4(), sock_handle); m_asioSock.non_blocking(true); m_asioSock.async_write_some(null_buffers(), [this] (boost::system::error_code ec, std::size_t s) { async_connect_handler(ec, s); } ); } } void ASyncDBConnection::async_connect_handler(boost::system::error_code ec, std::size_t s) { auto poll_state = m_connection.connectPoll(); if (poll_state == PGRES_POLLING_OK) { // if connected return true doStateCallback(State::Connected); } else if (poll_state == PGRES_POLLING_FAILED) { doStateCallback(State::NotConnected); } else if (poll_state == PGRES_POLLING_READING) { doStateCallback(State::Connecting); m_asioSock.async_read_some(null_buffers(), [this] (boost::system::error_code ec, std::size_t s) { async_connect_handler(ec, s); } ); } else if (poll_state == PGRES_POLLING_WRITING) { doStateCallback(State::Connecting); m_asioSock.async_write_some(null_buffers(), [this] (boost::system::error_code ec, std::size_t s) { async_connect_handler(ec, s); } ); } } void ASyncDBConnection::doStateCallback(State state) { m_state = state; if (state == State::Connected) { m_connection.setNoticeReceiver( [this](const PGresult *result) { processNotice(result); }); } std::lock_guard lg(m_stateCallback.m_mutex); if (m_stateCallback.m_func) { m_stateCallback.m_func(state); } } void ASyncDBConnection::closeConnection() { // m_threadData.stop(); // if (m_thread.joinable()) { // m_thread.join(); // } // SHould this be async too???? m_connection.close(); } bool ASyncDBConnection::send(const std::string &command, on_result_callback on_result) { // { // std::lock_guard lg(m_threadData.m_commandQueue.m_mutex); // m_threadData.m_commandQueue.m_queue.emplace(command, on_result); // m_threadData.m_commandQueue.m_newEvent.notify_one(); // } m_connection.sendQuery(command); m_timer.start(); doStateCallback(State::QuerySend); m_asioSock.async_read_some(null_buffers(), [this, on_result] (boost::system::error_code ec, std::size_t s) { async_query_handler(ec, s, on_result); } ); return true; } bool ASyncDBConnection::send(const std::string &command, Pgsql::Params params, on_result_callback on_result) { // { // std::lock_guard lg(m_threadData.m_commandQueue.m_mutex); // m_threadData.m_commandQueue.m_queue.emplace(command, std::move(params), on_result); // m_threadData.m_commandQueue.m_newEvent.notify_one(); // } m_connection.sendQueryParams(command.c_str(), params); m_timer.start(); doStateCallback(State::QuerySend); m_asioSock.async_read_some(null_buffers(), [this, on_result] (boost::system::error_code ec, std::size_t s) { async_query_handler(ec, s, on_result); } ); return true; } void ASyncDBConnection::async_query_handler(boost::system::error_code ec, std::size_t s, on_result_callback on_result) { bool finished = false; if (m_connection.consumeInput()) { while ( ! finished && ! m_connection.isBusy()) { auto res = m_connection.getResult(); qint64 ms = m_timer.restart(); on_result(res, ms); if (res == nullptr) { m_timer.invalidate(); doStateCallback(State::Connected); finished = true; } } // else is still waiting for more data } else { // error during consume } //return finished; if (!finished) { // wait for more m_asioSock.async_read_some(null_buffers(), [this, on_result] (boost::system::error_code ec, std::size_t s) { async_query_handler(ec, s, on_result); } ); } } bool ASyncDBConnection::cancel() { return false; //m_threadData.cancel(); } void ASyncDBConnection::setStateCallback(on_state_callback state_callback) { std::lock_guard lg(m_stateCallback.m_mutex); m_stateCallback.m_func = state_callback; } void ASyncDBConnection::setNoticeCallback(on_notice_callback notice_callback) { std::lock_guard lg(m_noticeCallback.m_mutex); m_noticeCallback.m_func = notice_callback; } void ASyncDBConnection::processNotice(const PGresult *result) { std::lock_guard lg(m_noticeCallback.m_mutex); if (m_noticeCallback.m_func) { Pgsql::ErrorDetails details = Pgsql::ErrorDetails::createErrorDetailsFromPGresult(result); m_noticeCallback.m_func(details); } } // ASyncDBConnection::Thread::Thread() // {} // // void ASyncDBConnection::Thread::run() // { // m_terminated = false; // SCOPE_EXIT { // m_state = State::NotConnected; // m_terminated = true; // }; // while (!terminateRequested) { // // // make or recover connection // if (makeConnection()) { // m_connection.setNoticeReceiver( // [this](const PGresult *result) { processNotice(result); }); // m_canceller = m_connection.getCancel(); // // // // send commands and receive results // communicate(); // } // else { // // It is not possible to determine the source of the problem. // // Accept for PQconnectionNeedsPassword // // // Pass problem to main thread and stop this thread // // // Main thread needs to know it has to restart connecting if it want's to. // // TODO: add status functions to help main thread so it doesn't have to remember // // everything reported through callbacks. // // break; // } // } // // close connection // } // // bool ASyncDBConnection::Thread::cancel() // { // return m_canceller.cancel(nullptr); // } // bool ASyncDBConnection::Thread::makeConnection() // { // //using namespace std::literals::chrono_literals; // // // start connecting // auto keywords = m_config.getKeywords(); // auto values = m_config.getValues(); // #if true // bool result = m_connection.connect(keywords, values, 0); // if (result) { // doStateCallback(State::Connected); // } // return result; // #else // while (!terminateRequested) { // // bool ok = m_connection.connectStart(keywords, values); // auto start = std::chrono::steady_clock::now(); // if (ok && m_connection.status() != CONNECTION_BAD) { // int sock = m_connection.socket(); // Win32Event socket_event(Win32Event::Reset::Auto, Win32Event::Initial::Clear); // // long fd = FD_WRITE; // while (true) { // // poll till complete or failed (we can get an abort command) // WSAEventSelect(sock, socket_event.handle(), fd); // WaitHandleList whl; // auto wait_result_socket_event = whl.add(socket_event); // //auto wait_result_stop = whl.add(m_stopEvent); // // auto nu = std::chrono::steady_clock::now(); // std::chrono::duration diff = -(nu-start); // diff += 30s; // DWORD timeout = diff.count(); // // DWORD res = MsgWaitForMultipleObjectsEx( // whl.count(), // _In_ DWORD nCount, // whl, // _In_ const HANDLE *pHandles, // timeout, // _In_ DWORD dwMilliseconds, // 0, // _In_ DWORD dwWakeMask, // 0 // _In_ DWORD dwFlags // ); // if (res == wait_result_socket_event) { // auto poll_state = m_connection.connectPoll(); // if (poll_state == PGRES_POLLING_OK) { // // if connected return true // doStateCallback(State::Connected); // return true; // } // else if (poll_state == PGRES_POLLING_FAILED) { // doStateCallback(State::NotConnected); // return false; // } // else if (poll_state == PGRES_POLLING_READING) { // doStateCallback(State::Connecting); // fd = FD_READ; // } // else if (poll_state == PGRES_POLLING_WRITING) { // doStateCallback(State::Connecting); // fd = FD_WRITE; // } // } // else if (res == wait_result_stop) { // // } // } // end while (true) // } // } // return false; // #endif // } // // void ASyncDBConnection::Thread::communicate() // { // while (!terminateRequested) { // // wait for something to do: // // - command to send to server // // - wait for results and (notifies can also come in) // // - pass each result on to the completion routine // // - notify comming in from the server // // - pass to notify callback // // - connection raises an error // // - return // // - stop signal // // - return // // // if (m_state == State::Connected) { // waitForAndSendCommand(); // } // else if (m_state == State::QuerySend || m_state == State::CancelSend) { // // Wait for result, even after a cancel we should wait, for all results // // New command's are not excepted when one has been send // waitForResult(); // } // // } // } // // void ASyncDBConnection::Thread::stop() // { // terminateRequested = true; // //m_stopEvent.set(); // } // void ASyncDBConnection::Thread::doStateCallback(State state) // { // m_state = state; // std::lock_guard lg(m_stateCallback.m_mutex); // if (m_stateCallback.m_func) { // m_stateCallback.m_func(state); // } // } // void ASyncDBConnection::Thread::waitForAndSendCommand() // { // using namespace std::chrono_literals; // // lock the data // std::unique_lock lk(m_commandQueue.m_mutex); // if (m_commandQueue.m_queue.empty()) { // // no data wait till there is data // m_commandQueue.m_newEvent.wait_for(lk, 1000ms); // // can we use the predicate to reimplement the stop function???, []{return ready;}); // // } // doNewCommand(); // // #if false // WaitHandleList whl; // auto wait_result_new_command = whl.add(m_commandQueue.m_newEvent); // //auto wait_result_stop = // //whl.add(m_stopEvent); // // DWORD res = MsgWaitForMultipleObjectsEx( // whl.count(), // _In_ DWORD nCount, // whl, // _In_ const HANDLE *pHandles, // INFINITE, // _In_ DWORD dwMilliseconds, // 0, // _In_ DWORD dwWakeMask, // 0 // _In_ DWORD dwFlags // ); // if (res == wait_result_new_command) { // doNewCommand(); // } // // if (res == wait_result_stop) return; // #endif // } // void ASyncDBConnection::Thread::doNewCommand() // { // // get command from top of queue (but leave it in the queue, we need the callback) // if (! m_commandQueue.m_queue.empty()) { // const Command &command = m_commandQueue.m_queue.front(); // if (!command.command.empty()) { // bool query_send = false; // if (command.params.empty()) // query_send = m_connection.sendQuery(command.command.c_str()); // else // query_send = m_connection.sendQueryParams(command.command.c_str(), command.params); // // if (query_send) { // m_timer.start(); // doStateCallback(State::QuerySend); // } // else { // std::string error = m_connection.getErrorMessage(); // // todo: need to report the error // } // } // } // // } // bool ASyncDBConnection::Thread::consumeResultInput() // { // bool finished = false; // if (m_connection.consumeInput()) { // while ( ! finished && ! m_connection.isBusy()) { // auto res(m_connection.getResult()); // { // qint64 ms = m_timer.restart(); // std::lock_guard lg(m_commandQueue.m_mutex); // m_commandQueue.m_queue.front().on_result(res, ms); // if (res == nullptr) { // m_timer.invalidate(); // m_commandQueue.m_queue.pop(); // doStateCallback(State::Connected); // finished = true; // } // } // // } // // else is still waiting for more data // // } // else { // // error during consume // // } // return finished; // } // void ASyncDBConnection::Thread::waitForResult() // { // int sock = m_connection.socket(); // // fd_set readfds; // timeval timeout; // bool finished = false; // while (!finished && !terminateRequested) { // FD_ZERO(&readfds); // FD_SET(sock, &readfds); // // timeout.tv_sec = 5; // timeout.tv_usec = 0; // // int select_result = select(sock + 1, &readfds, nullptr, nullptr, &timeout); // if (select_result > 0) { // if (FD_ISSET(sock, &readfds)) { // if (consumeResultInput()) { // finished = true; // } // } // } // // } // } /* */