Cancel functionality works again.

Also async operations are now cancelled when the dbconnection is closed and if needed also a cancel is
send before closing the connection.
This commit is contained in:
Eelke Klein 2017-08-25 08:37:18 +02:00
parent f11f9545ac
commit 16676aa910
2 changed files with 65 additions and 51 deletions

View file

@ -8,9 +8,9 @@ using namespace boost::asio;
ASyncDBConnection::ASyncDBConnection(boost::asio::io_service &ios) ASyncDBConnection::ASyncDBConnection(boost::asio::io_service &ios)
: m_asioSock(ios) : m_asioSock(ios)
{ {}
} ASyncDBConnection::~ASyncDBConnection() = default;
ASyncDBConnection::State ASyncDBConnection::state() const ASyncDBConnection::State ASyncDBConnection::state() const
{ {
@ -46,27 +46,30 @@ void ASyncDBConnection::setupConnection(const ConnectionConfig &config)
void ASyncDBConnection::async_connect_handler(boost::system::error_code ec, std::size_t s) void ASyncDBConnection::async_connect_handler(boost::system::error_code ec, std::size_t s)
{ {
auto poll_state = m_connection.connectPoll(); // boost::asio::error::operation_aborted
if (poll_state == PGRES_POLLING_OK) { if (ec == boost::system::errc::success) {
// if connected return true auto poll_state = m_connection.connectPoll();
doStateCallback(State::Connected); if (poll_state == PGRES_POLLING_OK) {
} // if connected return true
else if (poll_state == PGRES_POLLING_FAILED) { doStateCallback(State::Connected);
doStateCallback(State::NotConnected); }
} else if (poll_state == PGRES_POLLING_FAILED) {
else if (poll_state == PGRES_POLLING_READING) { doStateCallback(State::NotConnected);
doStateCallback(State::Connecting); }
m_asioSock.async_read_some(null_buffers(), else if (poll_state == PGRES_POLLING_READING) {
[this] (boost::system::error_code ec, std::size_t s) doStateCallback(State::Connecting);
{ async_connect_handler(ec, s); } m_asioSock.async_read_some(null_buffers(),
); [this] (boost::system::error_code ec, std::size_t s)
} { async_connect_handler(ec, s); }
else if (poll_state == PGRES_POLLING_WRITING) { );
doStateCallback(State::Connecting); }
m_asioSock.async_write_some(null_buffers(), else if (poll_state == PGRES_POLLING_WRITING) {
[this] (boost::system::error_code ec, std::size_t s) doStateCallback(State::Connecting);
{ async_connect_handler(ec, s); } m_asioSock.async_write_some(null_buffers(),
); [this] (boost::system::error_code ec, std::size_t s)
{ async_connect_handler(ec, s); }
);
}
} }
} }
@ -74,6 +77,7 @@ void ASyncDBConnection::doStateCallback(State state)
{ {
m_state = state; m_state = state;
if (state == State::Connected) { if (state == State::Connected) {
m_canceller = m_connection.getCancel();
m_connection.setNoticeReceiver( m_connection.setNoticeReceiver(
[this](const PGresult *result) { processNotice(result); }); [this](const PGresult *result) { processNotice(result); });
} }
@ -93,7 +97,12 @@ void ASyncDBConnection::closeConnection()
// m_thread.join(); // m_thread.join();
// } // }
// SHould this be async too???? // SHould this be async too????
if (m_state == State::QuerySend) {
m_canceller.cancel(nullptr);
}
m_asioSock.close();
m_connection.close(); m_connection.close();
doStateCallback(State::NotConnected);
} }
bool ASyncDBConnection::send(const std::string &command, on_result_callback on_result) bool ASyncDBConnection::send(const std::string &command, on_result_callback on_result)
@ -133,37 +142,39 @@ bool ASyncDBConnection::send(const std::string &command, Pgsql::Params params, o
void ASyncDBConnection::async_query_handler(boost::system::error_code ec, std::size_t s, on_result_callback on_result) void ASyncDBConnection::async_query_handler(boost::system::error_code ec, std::size_t s, on_result_callback on_result)
{ {
bool finished = false; if (ec == boost::system::errc::success) {
if (m_connection.consumeInput()) { bool finished = false;
while ( ! finished && ! m_connection.isBusy()) { if (m_connection.consumeInput()) {
auto res = m_connection.getResult(); while ( ! finished && ! m_connection.isBusy()) {
qint64 ms = m_timer.restart(); auto res = m_connection.getResult();
on_result(res, ms); qint64 ms = m_timer.restart();
if (res == nullptr) { on_result(res, ms);
m_timer.invalidate(); if (res == nullptr) {
doStateCallback(State::Connected); m_timer.invalidate();
finished = true; doStateCallback(State::Connected);
finished = true;
}
} }
// else is still waiting for more data
} }
// else is still waiting for more data else {
} // error during consume
else {
// error during consume
} }
//return finished; //return finished;
if (!finished) { if (!finished) {
// wait for more // wait for more
m_asioSock.async_read_some(null_buffers(), m_asioSock.async_read_some(null_buffers(),
[this, on_result] (boost::system::error_code ec, std::size_t s) [this, on_result] (boost::system::error_code ec, std::size_t s)
{ async_query_handler(ec, s, on_result); } { async_query_handler(ec, s, on_result); }
); );
}
} }
} }
bool ASyncDBConnection::cancel() bool ASyncDBConnection::cancel()
{ {
return false; //m_threadData.cancel(); return m_canceller.cancel(nullptr);
} }
void ASyncDBConnection::setStateCallback(on_state_callback state_callback) void ASyncDBConnection::setStateCallback(on_state_callback state_callback)

View file

@ -24,17 +24,18 @@ public:
enum class State { enum class State {
NotConnected, NotConnected,
Connecting, Connecting,
Connected, Connected, ///< connected and idle
QuerySend, QuerySend, ///< connected query send expecting result
CancelSend, CancelSend, ///< cancel send expecting result
Terminating Terminating ///< shutting down
}; };
using on_result_callback = std::function<void(std::shared_ptr<Pgsql::Result>, qint64)>; using on_result_callback = std::function<void(std::shared_ptr<Pgsql::Result>, qint64)>;
using on_state_callback = std::function<void(State)>; using on_state_callback = std::function<void(State)>;
using on_notice_callback = std::function<void(Pgsql::ErrorDetails)>; using on_notice_callback = std::function<void(Pgsql::ErrorDetails)>;
ASyncDBConnection(boost::asio::io_service &ios); explicit ASyncDBConnection(boost::asio::io_service &ios);
~ASyncDBConnection();
State state() const; State state() const;
// void setupConnection(const std::string &connstring); // void setupConnection(const std::string &connstring);
@ -60,10 +61,12 @@ private:
boost::asio::ip::tcp::socket m_asioSock; boost::asio::ip::tcp::socket m_asioSock;
ConnectionConfig m_config; ConnectionConfig m_config;
State m_state = State::NotConnected; State m_state = State::NotConnected;
Pgsql::Canceller m_canceller;
struct { struct {
std::mutex m_mutex; std::mutex m_mutex;
on_state_callback m_func; on_state_callback m_func;
} m_stateCallback; } m_stateCallback;
struct { struct {
std::mutex m_mutex; std::mutex m_mutex;
on_notice_callback m_func; on_notice_callback m_func;