diff --git a/pglab/ASyncDBConnection.cpp b/pglab/ASyncDBConnection.cpp index 7166502..633a034 100644 --- a/pglab/ASyncDBConnection.cpp +++ b/pglab/ASyncDBConnection.cpp @@ -4,396 +4,458 @@ #include #include -ASyncDBConnection::ASyncDBConnection() +using namespace boost::asio; + +ASyncDBConnection::ASyncDBConnection(boost::asio::io_service &ios) + : m_asioSock(ios) { } ASyncDBConnection::State ASyncDBConnection::state() const { - return m_threadData.m_state; + return m_state; } -//void ASyncDBConnection::setupConnection(const std::string &connstring) -//{ -// if (m_thread.joinable()) { -// m_threadData.stop(); -// m_thread.join(); -// } -// m_threadData.m_initString = connstring; -// m_thread = std::thread([this] () { m_threadData.run(); }); -//} - void ASyncDBConnection::setupConnection(const ConnectionConfig &config) { - if (m_thread.joinable()) { - m_threadData.stop(); - m_thread.join(); - } - m_threadData.m_config = config; - m_thread = std::thread([this] () { m_threadData.run(); }); +// if (m_thread.joinable()) { +// m_threadData.stop(); +// m_thread.join(); +// } +// m_threadData.m_config = config; +// m_thread = std::thread([this] () { m_threadData.run(); }); + + m_config = config; + auto keywords = m_config.getKeywords(); + auto values = m_config.getValues(); + + bool ok = m_connection.connectStart(keywords, values); + // auto start = std::chrono::steady_clock::now(); + if (ok && m_connection.status() != CONNECTION_BAD) { + auto sock_handle = m_connection.socket(); + m_asioSock.assign(ip::tcp::v4(), sock_handle); + m_asioSock.non_blocking(true); + + m_asioSock.async_write_some(null_buffers(), + [this] (boost::system::error_code ec, std::size_t s) + { async_connect_handler(ec, s); } + ); + } } +void ASyncDBConnection::async_connect_handler(boost::system::error_code ec, std::size_t s) +{ + auto poll_state = m_connection.connectPoll(); + if (poll_state == PGRES_POLLING_OK) { + // if connected return true + doStateCallback(State::Connected); + } + else if (poll_state == PGRES_POLLING_FAILED) { + doStateCallback(State::NotConnected); + } + else if (poll_state == PGRES_POLLING_READING) { + doStateCallback(State::Connecting); + m_asioSock.async_read_some(null_buffers(), + [this] (boost::system::error_code ec, std::size_t s) + { async_connect_handler(ec, s); } + ); + } + else if (poll_state == PGRES_POLLING_WRITING) { + doStateCallback(State::Connecting); + m_asioSock.async_write_some(null_buffers(), + [this] (boost::system::error_code ec, std::size_t s) + { async_connect_handler(ec, s); } + ); + } +} + +void ASyncDBConnection::doStateCallback(State state) +{ + m_state = state; + if (state == State::Connected) { + m_connection.setNoticeReceiver( + [this](const PGresult *result) { processNotice(result); }); + } + + std::lock_guard lg(m_stateCallback.m_mutex); + if (m_stateCallback.m_func) { + m_stateCallback.m_func(state); + } + +} + + void ASyncDBConnection::closeConnection() { - m_threadData.stop(); - if (m_thread.joinable()) { - m_thread.join(); - } +// m_threadData.stop(); +// if (m_thread.joinable()) { +// m_thread.join(); +// } + // SHould this be async too???? + m_connection.close(); } bool ASyncDBConnection::send(const std::string &command, on_result_callback on_result) { - { - std::lock_guard lg(m_threadData.m_commandQueue.m_mutex); - m_threadData.m_commandQueue.m_queue.emplace(command, on_result); - m_threadData.m_commandQueue.m_newEvent.notify_one(); - } +// { +// std::lock_guard lg(m_threadData.m_commandQueue.m_mutex); +// m_threadData.m_commandQueue.m_queue.emplace(command, on_result); +// m_threadData.m_commandQueue.m_newEvent.notify_one(); +// } + + m_connection.sendQuery(command); + m_timer.start(); + doStateCallback(State::QuerySend); + m_asioSock.async_read_some(null_buffers(), + [this, on_result] (boost::system::error_code ec, std::size_t s) + { async_query_handler(ec, s, on_result); } + ); return true; } bool ASyncDBConnection::send(const std::string &command, Pgsql::Params params, on_result_callback on_result) { - { - std::lock_guard lg(m_threadData.m_commandQueue.m_mutex); - m_threadData.m_commandQueue.m_queue.emplace(command, std::move(params), on_result); - m_threadData.m_commandQueue.m_newEvent.notify_one(); - } +// { +// std::lock_guard lg(m_threadData.m_commandQueue.m_mutex); +// m_threadData.m_commandQueue.m_queue.emplace(command, std::move(params), on_result); +// m_threadData.m_commandQueue.m_newEvent.notify_one(); +// } + m_connection.sendQueryParams(command.c_str(), params); + m_timer.start(); + doStateCallback(State::QuerySend); + m_asioSock.async_read_some(null_buffers(), + [this, on_result] (boost::system::error_code ec, std::size_t s) + { async_query_handler(ec, s, on_result); } + ); return true; } -bool ASyncDBConnection::cancel() -{ - return m_threadData.cancel(); -} - -void ASyncDBConnection::setStateCallback(on_state_callback state_callback) -{ - std::lock_guard lg(m_threadData.m_stateCallback.m_mutex); - m_threadData.m_stateCallback.m_func = state_callback; -} - -void ASyncDBConnection::setNoticeCallback(on_notice_callback notice_callback) -{ - std::lock_guard lg(m_threadData.m_noticeCallback.m_mutex); - m_threadData.m_noticeCallback.m_func = notice_callback; -} - -ASyncDBConnection::Thread::Thread() -{} - -void ASyncDBConnection::Thread::run() -{ - m_terminated = false; - SCOPE_EXIT { - m_state = State::NotConnected; - m_terminated = true; - }; - while (!terminateRequested) { - - // make or recover connection - if (makeConnection()) { - m_connection.setNoticeReceiver( - [this](const PGresult *result) { processNotice(result); }); - m_canceller = m_connection.getCancel(); - - - // send commands and receive results - communicate(); - } - else { - // It is not possible to determine the source of the problem. - // Accept for PQconnectionNeedsPassword - - // Pass problem to main thread and stop this thread - - // Main thread needs to know it has to restart connecting if it want's to. - // TODO: add status functions to help main thread so it doesn't have to remember - // everything reported through callbacks. - - break; - } - } - // close connection -} - -bool ASyncDBConnection::Thread::cancel() -{ - return m_canceller.cancel(nullptr); -} - -bool ASyncDBConnection::Thread::makeConnection() -{ - //using namespace std::literals::chrono_literals; - - // start connecting - auto keywords = m_config.getKeywords(); - auto values = m_config.getValues(); -#if true - bool result = m_connection.connect(keywords, values, 0); - if (result) { - doStateCallback(State::Connected); - } - return result; -#else - while (!terminateRequested) { - - bool ok = m_connection.connectStart(keywords, values); - auto start = std::chrono::steady_clock::now(); - if (ok && m_connection.status() != CONNECTION_BAD) { - int sock = m_connection.socket(); - Win32Event socket_event(Win32Event::Reset::Auto, Win32Event::Initial::Clear); - - long fd = FD_WRITE; - while (true) { - // poll till complete or failed (we can get an abort command) - WSAEventSelect(sock, socket_event.handle(), fd); - WaitHandleList whl; - auto wait_result_socket_event = whl.add(socket_event); - //auto wait_result_stop = whl.add(m_stopEvent); - - auto nu = std::chrono::steady_clock::now(); - std::chrono::duration diff = -(nu-start); - diff += 30s; - DWORD timeout = diff.count(); - - DWORD res = MsgWaitForMultipleObjectsEx( - whl.count(), // _In_ DWORD nCount, - whl, // _In_ const HANDLE *pHandles, - timeout, // _In_ DWORD dwMilliseconds, - 0, // _In_ DWORD dwWakeMask, - 0 // _In_ DWORD dwFlags - ); - if (res == wait_result_socket_event) { - auto poll_state = m_connection.connectPoll(); - if (poll_state == PGRES_POLLING_OK) { - // if connected return true - doStateCallback(State::Connected); - return true; - } - else if (poll_state == PGRES_POLLING_FAILED) { - doStateCallback(State::NotConnected); - return false; - } - else if (poll_state == PGRES_POLLING_READING) { - doStateCallback(State::Connecting); - fd = FD_READ; - } - else if (poll_state == PGRES_POLLING_WRITING) { - doStateCallback(State::Connecting); - fd = FD_WRITE; - } - } - else if (res == wait_result_stop) { - - } - } // end while (true) - } - } - return false; -#endif -} - -void ASyncDBConnection::Thread::communicate() -{ - while (!terminateRequested) { - // wait for something to do: - // - command to send to server - // - wait for results and (notifies can also come in) - // - pass each result on to the completion routine - // - notify comming in from the server - // - pass to notify callback - // - connection raises an error - // - return - // - stop signal - // - return - - - if (m_state == State::Connected) { - waitForAndSendCommand(); - } - else if (m_state == State::QuerySend || m_state == State::CancelSend) { - // Wait for result, even after a cancel we should wait, for all results - // New command's are not excepted when one has been send - waitForResult(); - } - - } -} - -void ASyncDBConnection::Thread::stop() -{ - terminateRequested = true; - //m_stopEvent.set(); -} - -void ASyncDBConnection::Thread::doStateCallback(State state) -{ - m_state = state; - std::lock_guard lg(m_stateCallback.m_mutex); - if (m_stateCallback.m_func) { - m_stateCallback.m_func(state); - } -} - -void ASyncDBConnection::Thread::waitForAndSendCommand() -{ - using namespace std::chrono_literals; - // lock the data - std::unique_lock lk(m_commandQueue.m_mutex); - if (m_commandQueue.m_queue.empty()) { - // no data wait till there is data - m_commandQueue.m_newEvent.wait_for(lk, 1000ms); - // can we use the predicate to reimplement the stop function???, []{return ready;}); - - } - doNewCommand(); - -#if false - WaitHandleList whl; - auto wait_result_new_command = whl.add(m_commandQueue.m_newEvent); - //auto wait_result_stop = - //whl.add(m_stopEvent); - - DWORD res = MsgWaitForMultipleObjectsEx( - whl.count(), // _In_ DWORD nCount, - whl, // _In_ const HANDLE *pHandles, - INFINITE, // _In_ DWORD dwMilliseconds, - 0, // _In_ DWORD dwWakeMask, - 0 // _In_ DWORD dwFlags - ); - if (res == wait_result_new_command) { - doNewCommand(); - } - // if (res == wait_result_stop) return; -#endif -} - -void ASyncDBConnection::Thread::doNewCommand() -{ - // get command from top of queue (but leave it in the queue, we need the callback) - if (! m_commandQueue.m_queue.empty()) { - const Command &command = m_commandQueue.m_queue.front(); - if (!command.command.empty()) { - bool query_send = false; - if (command.params.empty()) - query_send = m_connection.sendQuery(command.command.c_str()); - else - query_send = m_connection.sendQueryParams(command.command.c_str(), command.params); - - if (query_send) { - m_timer.start(); - doStateCallback(State::QuerySend); - } - else { - std::string error = m_connection.getErrorMessage(); - // todo: need to report the error - } - } - } - -} - -bool ASyncDBConnection::Thread::consumeResultInput() +void ASyncDBConnection::async_query_handler(boost::system::error_code ec, std::size_t s, on_result_callback on_result) { bool finished = false; if (m_connection.consumeInput()) { while ( ! finished && ! m_connection.isBusy()) { - auto res(m_connection.getResult()); - { - qint64 ms = m_timer.restart(); - std::lock_guard lg(m_commandQueue.m_mutex); - m_commandQueue.m_queue.front().on_result(res, ms); - if (res == nullptr) { - m_timer.invalidate(); - m_commandQueue.m_queue.pop(); - doStateCallback(State::Connected); - finished = true; - } + auto res = m_connection.getResult(); + qint64 ms = m_timer.restart(); + on_result(res, ms); + if (res == nullptr) { + m_timer.invalidate(); + doStateCallback(State::Connected); + finished = true; } - } // else is still waiting for more data - } else { // error during consume } - return finished; + //return finished; + if (!finished) { + // wait for more + m_asioSock.async_read_some(null_buffers(), + [this, on_result] (boost::system::error_code ec, std::size_t s) + { async_query_handler(ec, s, on_result); } + ); + } } -void ASyncDBConnection::Thread::waitForResult() +bool ASyncDBConnection::cancel() { - int sock = m_connection.socket(); - - fd_set readfds; - timeval timeout; - bool finished = false; - while (!finished && !terminateRequested) { - FD_ZERO(&readfds); - FD_SET(sock, &readfds); - - timeout.tv_sec = 5; - timeout.tv_usec = 0; - - int select_result = select(sock + 1, &readfds, nullptr, nullptr, &timeout); - if (select_result > 0) { - if (FD_ISSET(sock, &readfds)) { - if (consumeResultInput()) { - finished = true; - } - } - } - - } -#if false - Win32Event socket_event(Win32Event::Reset::Manual, Win32Event::Initial::Clear); - - long fd = FD_READ | FD_CLOSE; - - bool finished = false; - while ( ! finished) { - WSAEventSelect(sock, socket_event.handle(), fd); - - WaitHandleList whl; - auto wait_result_socket = whl.add(socket_event); - //auto wait_result_stop = whl.add(m_stopEvent); - - DWORD res = MsgWaitForMultipleObjectsEx( - whl.count(), // _In_ DWORD nCount, - whl, // _In_ const HANDLE *pHandles, - INFINITE, // _In_ DWORD dwMilliseconds, - 0, // _In_ DWORD dwWakeMask, - 0 // _In_ DWORD dwFlags - ); - if (res == wait_result_socket) { - WSANETWORKEVENTS net_events; - WSAEnumNetworkEvents(sock, socket_event.handle(), &net_events); - - if (net_events.lNetworkEvents & FD_READ) { - if (consumeResultInput()) { - finished = true; - } - } - } - if (res == wait_result_stop) { - // Send cancel, close connection and terminate thread - cancel(); - doStateCallback(State::Terminating); - finished = true; - } - } // end while - // When last result received, remove command from queue -#endif + return false; //m_threadData.cancel(); } -void ASyncDBConnection::Thread::processNotice(const PGresult *result) +void ASyncDBConnection::setStateCallback(on_state_callback state_callback) +{ + std::lock_guard lg(m_stateCallback.m_mutex); + m_stateCallback.m_func = state_callback; +} + +void ASyncDBConnection::setNoticeCallback(on_notice_callback notice_callback) +{ + std::lock_guard lg(m_noticeCallback.m_mutex); + m_noticeCallback.m_func = notice_callback; +} + +void ASyncDBConnection::processNotice(const PGresult *result) { -// Pgsql::Result res(result); std::lock_guard lg(m_noticeCallback.m_mutex); if (m_noticeCallback.m_func) { Pgsql::ErrorDetails details = Pgsql::ErrorDetails::createErrorDetailsFromPGresult(result); m_noticeCallback.m_func(details); } } + +// ASyncDBConnection::Thread::Thread() +// {} +// +// void ASyncDBConnection::Thread::run() +// { +// m_terminated = false; +// SCOPE_EXIT { +// m_state = State::NotConnected; +// m_terminated = true; +// }; +// while (!terminateRequested) { +// +// // make or recover connection +// if (makeConnection()) { +// m_connection.setNoticeReceiver( +// [this](const PGresult *result) { processNotice(result); }); +// m_canceller = m_connection.getCancel(); +// +// +// // send commands and receive results +// communicate(); +// } +// else { +// // It is not possible to determine the source of the problem. +// // Accept for PQconnectionNeedsPassword +// +// // Pass problem to main thread and stop this thread +// +// // Main thread needs to know it has to restart connecting if it want's to. +// // TODO: add status functions to help main thread so it doesn't have to remember +// // everything reported through callbacks. +// +// break; +// } +// } +// // close connection +// } +// +// bool ASyncDBConnection::Thread::cancel() +// { +// return m_canceller.cancel(nullptr); +// } + +// bool ASyncDBConnection::Thread::makeConnection() +// { +// //using namespace std::literals::chrono_literals; +// +// // start connecting +// auto keywords = m_config.getKeywords(); +// auto values = m_config.getValues(); +// #if true +// bool result = m_connection.connect(keywords, values, 0); +// if (result) { +// doStateCallback(State::Connected); +// } +// return result; +// #else +// while (!terminateRequested) { +// +// bool ok = m_connection.connectStart(keywords, values); +// auto start = std::chrono::steady_clock::now(); +// if (ok && m_connection.status() != CONNECTION_BAD) { +// int sock = m_connection.socket(); +// Win32Event socket_event(Win32Event::Reset::Auto, Win32Event::Initial::Clear); +// +// long fd = FD_WRITE; +// while (true) { +// // poll till complete or failed (we can get an abort command) +// WSAEventSelect(sock, socket_event.handle(), fd); +// WaitHandleList whl; +// auto wait_result_socket_event = whl.add(socket_event); +// //auto wait_result_stop = whl.add(m_stopEvent); +// +// auto nu = std::chrono::steady_clock::now(); +// std::chrono::duration diff = -(nu-start); +// diff += 30s; +// DWORD timeout = diff.count(); +// +// DWORD res = MsgWaitForMultipleObjectsEx( +// whl.count(), // _In_ DWORD nCount, +// whl, // _In_ const HANDLE *pHandles, +// timeout, // _In_ DWORD dwMilliseconds, +// 0, // _In_ DWORD dwWakeMask, +// 0 // _In_ DWORD dwFlags +// ); +// if (res == wait_result_socket_event) { +// auto poll_state = m_connection.connectPoll(); +// if (poll_state == PGRES_POLLING_OK) { +// // if connected return true +// doStateCallback(State::Connected); +// return true; +// } +// else if (poll_state == PGRES_POLLING_FAILED) { +// doStateCallback(State::NotConnected); +// return false; +// } +// else if (poll_state == PGRES_POLLING_READING) { +// doStateCallback(State::Connecting); +// fd = FD_READ; +// } +// else if (poll_state == PGRES_POLLING_WRITING) { +// doStateCallback(State::Connecting); +// fd = FD_WRITE; +// } +// } +// else if (res == wait_result_stop) { +// +// } +// } // end while (true) +// } +// } +// return false; +// #endif +// } +// +// void ASyncDBConnection::Thread::communicate() +// { +// while (!terminateRequested) { +// // wait for something to do: +// // - command to send to server +// // - wait for results and (notifies can also come in) +// // - pass each result on to the completion routine +// // - notify comming in from the server +// // - pass to notify callback +// // - connection raises an error +// // - return +// // - stop signal +// // - return +// +// +// if (m_state == State::Connected) { +// waitForAndSendCommand(); +// } +// else if (m_state == State::QuerySend || m_state == State::CancelSend) { +// // Wait for result, even after a cancel we should wait, for all results +// // New command's are not excepted when one has been send +// waitForResult(); +// } +// +// } +// } +// +// void ASyncDBConnection::Thread::stop() +// { +// terminateRequested = true; +// //m_stopEvent.set(); +// } + +// void ASyncDBConnection::Thread::doStateCallback(State state) +// { +// m_state = state; +// std::lock_guard lg(m_stateCallback.m_mutex); +// if (m_stateCallback.m_func) { +// m_stateCallback.m_func(state); +// } +// } + +// void ASyncDBConnection::Thread::waitForAndSendCommand() +// { +// using namespace std::chrono_literals; +// // lock the data +// std::unique_lock lk(m_commandQueue.m_mutex); +// if (m_commandQueue.m_queue.empty()) { +// // no data wait till there is data +// m_commandQueue.m_newEvent.wait_for(lk, 1000ms); +// // can we use the predicate to reimplement the stop function???, []{return ready;}); +// +// } +// doNewCommand(); +// +// #if false +// WaitHandleList whl; +// auto wait_result_new_command = whl.add(m_commandQueue.m_newEvent); +// //auto wait_result_stop = +// //whl.add(m_stopEvent); +// +// DWORD res = MsgWaitForMultipleObjectsEx( +// whl.count(), // _In_ DWORD nCount, +// whl, // _In_ const HANDLE *pHandles, +// INFINITE, // _In_ DWORD dwMilliseconds, +// 0, // _In_ DWORD dwWakeMask, +// 0 // _In_ DWORD dwFlags +// ); +// if (res == wait_result_new_command) { +// doNewCommand(); +// } +// // if (res == wait_result_stop) return; +// #endif +// } + +// void ASyncDBConnection::Thread::doNewCommand() +// { +// // get command from top of queue (but leave it in the queue, we need the callback) +// if (! m_commandQueue.m_queue.empty()) { +// const Command &command = m_commandQueue.m_queue.front(); +// if (!command.command.empty()) { +// bool query_send = false; +// if (command.params.empty()) +// query_send = m_connection.sendQuery(command.command.c_str()); +// else +// query_send = m_connection.sendQueryParams(command.command.c_str(), command.params); +// +// if (query_send) { +// m_timer.start(); +// doStateCallback(State::QuerySend); +// } +// else { +// std::string error = m_connection.getErrorMessage(); +// // todo: need to report the error +// } +// } +// } +// +// } + +// bool ASyncDBConnection::Thread::consumeResultInput() +// { +// bool finished = false; +// if (m_connection.consumeInput()) { +// while ( ! finished && ! m_connection.isBusy()) { +// auto res(m_connection.getResult()); +// { +// qint64 ms = m_timer.restart(); +// std::lock_guard lg(m_commandQueue.m_mutex); +// m_commandQueue.m_queue.front().on_result(res, ms); +// if (res == nullptr) { +// m_timer.invalidate(); +// m_commandQueue.m_queue.pop(); +// doStateCallback(State::Connected); +// finished = true; +// } +// } +// +// } +// // else is still waiting for more data +// +// } +// else { +// // error during consume +// +// } +// return finished; +// } + +// void ASyncDBConnection::Thread::waitForResult() +// { +// int sock = m_connection.socket(); +// +// fd_set readfds; +// timeval timeout; +// bool finished = false; +// while (!finished && !terminateRequested) { +// FD_ZERO(&readfds); +// FD_SET(sock, &readfds); +// +// timeout.tv_sec = 5; +// timeout.tv_usec = 0; +// +// int select_result = select(sock + 1, &readfds, nullptr, nullptr, &timeout); +// if (select_result > 0) { +// if (FD_ISSET(sock, &readfds)) { +// if (consumeResultInput()) { +// finished = true; +// } +// } +// } +// +// } +// } + +/* +*/ diff --git a/pglab/ASyncDBConnection.h b/pglab/ASyncDBConnection.h index cfc5dae..99dab70 100644 --- a/pglab/ASyncDBConnection.h +++ b/pglab/ASyncDBConnection.h @@ -5,12 +5,14 @@ #include "Pgsql_Params.h" #include "ConnectionConfig.h" #include -#include -#include +// #include +// #include #include -#include -#include -#include +// #include +// #include +// #include +#include +#include /** \brief Class that handles asynchronous execution of queries. * @@ -32,7 +34,7 @@ public: using on_state_callback = std::function; using on_notice_callback = std::function; - ASyncDBConnection(); + ASyncDBConnection(boost::asio::io_service &ios); State state() const; // void setupConnection(const std::string &connstring); @@ -54,88 +56,106 @@ public: bool cancel(); private: + Pgsql::Connection m_connection; + boost::asio::ip::tcp::socket m_asioSock; + ConnectionConfig m_config; + State m_state = State::NotConnected; + struct { + std::mutex m_mutex; + on_state_callback m_func; + } m_stateCallback; + struct { + std::mutex m_mutex; + on_notice_callback m_func; + } m_noticeCallback; + QElapsedTimer m_timer; + + void async_connect_handler(boost::system::error_code ec, std::size_t s); + void async_query_handler(boost::system::error_code ec, std::size_t s, on_result_callback on_result); + void doStateCallback(State state); + void processNotice(const PGresult *result); - class Command { - public: - std::string command; - Pgsql::Params params; - on_result_callback on_result; - - Command() = default; - Command(const std::string &cmd, on_result_callback cb) - : command(cmd), on_result(cb) - {} - Command(const std::string &cmd, Pgsql::Params &&p, on_result_callback cb) - : command(cmd), params(p), on_result(cb) - {} - }; - - /// Contains all the members accessed by the thread - class Thread { - public: - using t_CommandQueue = std::queue; - struct { - std::mutex m_mutex; - on_state_callback m_func; - } m_stateCallback; - struct { - std::mutex m_mutex; - on_notice_callback m_func; - } m_noticeCallback; - - struct t_Command { - std::mutex m_mutex; - t_CommandQueue m_queue; - std::condition_variable m_newEvent; - t_Command() - {} - } m_commandQueue; - -// std::string m_initString; - ConnectionConfig m_config; - State m_state = State::NotConnected; - - Thread(); - - /// Is started as a seperate thread by ASyncDBConnection - void run(); - - /// Sends a cancel request to the DB server - bool cancel(); - - void stop(); - - private: - - /// \todo Implement new method to stop the thread - //Win32Event m_stopEvent; - Pgsql::Connection m_connection; - bool terminateRequested = false; ///< is set when the thread should stop - bool m_terminated = true; - Pgsql::Canceller m_canceller; - QElapsedTimer m_timer; - - - bool makeConnection(); - void communicate(); - - void doStateCallback(State state); - /// Wait's for a command to come in and send's it to the server - void waitForAndSendCommand(); - void doNewCommand(); - void waitForResult(); - - - void processNotice(const PGresult *result); - /** Function to call when after sending a command the socket is ready for reading. - * - * It might take several consumes before all data is read. - */ - bool consumeResultInput(); - }; - - Thread m_threadData; - std::thread m_thread; +// class Command { +// public: +// std::string command; +// Pgsql::Params params; +// on_result_callback on_result; +// +// Command() = default; +// Command(const std::string &cmd, on_result_callback cb) +// : command(cmd), on_result(cb) +// {} +// Command(const std::string &cmd, Pgsql::Params &&p, on_result_callback cb) +// : command(cmd), params(p), on_result(cb) +// {} +// }; +// +// /// Contains all the members accessed by the thread +// class Thread { +// public: +// using t_CommandQueue = std::queue; +// struct { +// std::mutex m_mutex; +// on_state_callback m_func; +// } m_stateCallback; +// struct { +// std::mutex m_mutex; +// on_notice_callback m_func; +// } m_noticeCallback; +// +// struct t_Command { +// std::mutex m_mutex; +// t_CommandQueue m_queue; +// std::condition_variable m_newEvent; +// t_Command() +// {} +// } m_commandQueue; +// +// // std::string m_initString; +// ConnectionConfig m_config; +// State m_state = State::NotConnected; +// +// Thread(); +// +// /// Is started as a seperate thread by ASyncDBConnection +// void run(); +// +// /// Sends a cancel request to the DB server +// bool cancel(); +// +// void stop(); +// +// private: +// +// /// \todo Implement new method to stop the thread +// //Win32Event m_stopEvent; +// Pgsql::Connection m_connection; +// bool terminateRequested = false; ///< is set when the thread should stop +// bool m_terminated = true; +// Pgsql::Canceller m_canceller; +// QElapsedTimer m_timer; +// +// +// bool makeConnection(); +// void communicate(); +// +// void doStateCallback(State state); +// /// Wait's for a command to come in and send's it to the server +// void waitForAndSendCommand(); +// void doNewCommand(); +// void waitForResult(); +// +// +// void processNotice(const PGresult *result); +// /** Function to call when after sending a command the socket is ready for reading. +// * +// * It might take several consumes before all data is read. +// */ +// bool consumeResultInput(); +// }; +// +// Thread m_threadData; +// std::thread m_thread; }; #endif // ASYNCDBCONNECTION_H diff --git a/pglab/DatabaseWindow.cpp b/pglab/DatabaseWindow.cpp index db0653e..da890b3 100644 --- a/pglab/DatabaseWindow.cpp +++ b/pglab/DatabaseWindow.cpp @@ -1,10 +1,13 @@ #include "DatabaseWindow.h" #include "ui_DatabaseWindow.h" #include +#include "GlobalIoService.h" + DatabaseWindow::DatabaseWindow(QWidget *parent) : QMainWindow(parent), - ui(new Ui::DatabaseWindow) + ui(new Ui::DatabaseWindow), + m_dbConnection(*getGlobalAsioIoService()) { ui->setupUi(this); diff --git a/pglab/QueryTab.cpp b/pglab/QueryTab.cpp index bcb7bd6..7c5780a 100644 --- a/pglab/QueryTab.cpp +++ b/pglab/QueryTab.cpp @@ -19,6 +19,7 @@ #include "PgTypeContainer.h" #include "PgsqlDatabaseCatalogue.h" #include "util.h" +#include "GlobalIoService.h" QueryParamListController::QueryParamListController(QTableView *tv, OpenDatabase *opendb, QWidget *parent) @@ -60,7 +61,8 @@ void QueryParamListController::on_removeParam() QueryTab::QueryTab(MainWindow *win, QWidget *parent) : QWidget(parent), ui(new Ui::QueryTab), - m_win(win) + m_win(win), + m_dbConnection(*getGlobalAsioIoService()) { ui->setupUi(this);