diff --git a/pglab/ASyncDBConnection.cpp b/pglab/ASyncDBConnection.cpp index 633a034..c1e97e9 100644 --- a/pglab/ASyncDBConnection.cpp +++ b/pglab/ASyncDBConnection.cpp @@ -8,9 +8,9 @@ using namespace boost::asio; ASyncDBConnection::ASyncDBConnection(boost::asio::io_service &ios) : m_asioSock(ios) -{ +{} -} +ASyncDBConnection::~ASyncDBConnection() = default; ASyncDBConnection::State ASyncDBConnection::state() const { @@ -46,27 +46,30 @@ void ASyncDBConnection::setupConnection(const ConnectionConfig &config) void ASyncDBConnection::async_connect_handler(boost::system::error_code ec, std::size_t s) { - auto poll_state = m_connection.connectPoll(); - if (poll_state == PGRES_POLLING_OK) { - // if connected return true - doStateCallback(State::Connected); - } - else if (poll_state == PGRES_POLLING_FAILED) { - doStateCallback(State::NotConnected); - } - else if (poll_state == PGRES_POLLING_READING) { - doStateCallback(State::Connecting); - m_asioSock.async_read_some(null_buffers(), - [this] (boost::system::error_code ec, std::size_t s) - { async_connect_handler(ec, s); } - ); - } - else if (poll_state == PGRES_POLLING_WRITING) { - doStateCallback(State::Connecting); - m_asioSock.async_write_some(null_buffers(), - [this] (boost::system::error_code ec, std::size_t s) - { async_connect_handler(ec, s); } - ); + // boost::asio::error::operation_aborted + if (ec == boost::system::errc::success) { + auto poll_state = m_connection.connectPoll(); + if (poll_state == PGRES_POLLING_OK) { + // if connected return true + doStateCallback(State::Connected); + } + else if (poll_state == PGRES_POLLING_FAILED) { + doStateCallback(State::NotConnected); + } + else if (poll_state == PGRES_POLLING_READING) { + doStateCallback(State::Connecting); + m_asioSock.async_read_some(null_buffers(), + [this] (boost::system::error_code ec, std::size_t s) + { async_connect_handler(ec, s); } + ); + } + else if (poll_state == PGRES_POLLING_WRITING) { + doStateCallback(State::Connecting); + m_asioSock.async_write_some(null_buffers(), + [this] (boost::system::error_code ec, std::size_t s) + { async_connect_handler(ec, s); } + ); + } } } @@ -74,6 +77,7 @@ void ASyncDBConnection::doStateCallback(State state) { m_state = state; if (state == State::Connected) { + m_canceller = m_connection.getCancel(); m_connection.setNoticeReceiver( [this](const PGresult *result) { processNotice(result); }); } @@ -93,7 +97,12 @@ void ASyncDBConnection::closeConnection() // m_thread.join(); // } // SHould this be async too???? + if (m_state == State::QuerySend) { + m_canceller.cancel(nullptr); + } + m_asioSock.close(); m_connection.close(); + doStateCallback(State::NotConnected); } bool ASyncDBConnection::send(const std::string &command, on_result_callback on_result) @@ -133,37 +142,39 @@ bool ASyncDBConnection::send(const std::string &command, Pgsql::Params params, o void ASyncDBConnection::async_query_handler(boost::system::error_code ec, std::size_t s, on_result_callback on_result) { - bool finished = false; - if (m_connection.consumeInput()) { - while ( ! finished && ! m_connection.isBusy()) { - auto res = m_connection.getResult(); - qint64 ms = m_timer.restart(); - on_result(res, ms); - if (res == nullptr) { - m_timer.invalidate(); - doStateCallback(State::Connected); - finished = true; + if (ec == boost::system::errc::success) { + bool finished = false; + if (m_connection.consumeInput()) { + while ( ! finished && ! m_connection.isBusy()) { + auto res = m_connection.getResult(); + qint64 ms = m_timer.restart(); + on_result(res, ms); + if (res == nullptr) { + m_timer.invalidate(); + doStateCallback(State::Connected); + finished = true; + } } + // else is still waiting for more data } - // else is still waiting for more data - } - else { - // error during consume + else { + // error during consume - } - //return finished; - if (!finished) { - // wait for more - m_asioSock.async_read_some(null_buffers(), - [this, on_result] (boost::system::error_code ec, std::size_t s) - { async_query_handler(ec, s, on_result); } - ); + } + //return finished; + if (!finished) { + // wait for more + m_asioSock.async_read_some(null_buffers(), + [this, on_result] (boost::system::error_code ec, std::size_t s) + { async_query_handler(ec, s, on_result); } + ); + } } } bool ASyncDBConnection::cancel() { - return false; //m_threadData.cancel(); + return m_canceller.cancel(nullptr); } void ASyncDBConnection::setStateCallback(on_state_callback state_callback) diff --git a/pglab/ASyncDBConnection.h b/pglab/ASyncDBConnection.h index 99dab70..622f41e 100644 --- a/pglab/ASyncDBConnection.h +++ b/pglab/ASyncDBConnection.h @@ -24,17 +24,18 @@ public: enum class State { NotConnected, Connecting, - Connected, - QuerySend, - CancelSend, - Terminating + Connected, ///< connected and idle + QuerySend, ///< connected query send expecting result + CancelSend, ///< cancel send expecting result + Terminating ///< shutting down }; using on_result_callback = std::function, qint64)>; using on_state_callback = std::function; using on_notice_callback = std::function; - ASyncDBConnection(boost::asio::io_service &ios); + explicit ASyncDBConnection(boost::asio::io_service &ios); + ~ASyncDBConnection(); State state() const; // void setupConnection(const std::string &connstring); @@ -60,10 +61,12 @@ private: boost::asio::ip::tcp::socket m_asioSock; ConnectionConfig m_config; State m_state = State::NotConnected; + Pgsql::Canceller m_canceller; struct { std::mutex m_mutex; on_state_callback m_func; } m_stateCallback; + struct { std::mutex m_mutex; on_notice_callback m_func;