#include "ASyncDBConnection.h" #include "ScopeGuard.h" #include using namespace boost::asio; ASyncDBConnection::ASyncDBConnection(boost::asio::io_service &ios) : m_asioSock(ios) {} ASyncDBConnection::~ASyncDBConnection() = default; ASyncDBConnection::State ASyncDBConnection::state() const { return m_state; } void ASyncDBConnection::setupConnection(const ConnectionConfig &config) { m_config = config; auto keywords = m_config.getKeywords(); auto values = m_config.getValues(); bool ok = m_connection.connectStart(keywords, values); // auto start = std::chrono::steady_clock::now(); if (ok && m_connection.status() != CONNECTION_BAD) { auto sock_handle = m_connection.socket(); m_asioSock.assign(ip::tcp::v4(), sock_handle); m_asioSock.non_blocking(true); m_asioSock.async_write_some(null_buffers(), [this] (boost::system::error_code ec, std::size_t s) { async_connect_handler(ec, s); } ); } } void ASyncDBConnection::async_connect_handler(boost::system::error_code ec, std::size_t s) { // boost::asio::error::operation_aborted if (ec == boost::system::errc::success) { auto poll_state = m_connection.connectPoll(); if (poll_state == PGRES_POLLING_OK) { // if connected return true doStateCallback(State::Connected); } else if (poll_state == PGRES_POLLING_FAILED) { doStateCallback(State::NotConnected); } else if (poll_state == PGRES_POLLING_READING) { doStateCallback(State::Connecting); m_asioSock.async_read_some(null_buffers(), [this] (boost::system::error_code ec, std::size_t s) { async_connect_handler(ec, s); } ); } else if (poll_state == PGRES_POLLING_WRITING) { doStateCallback(State::Connecting); m_asioSock.async_write_some(null_buffers(), [this] (boost::system::error_code ec, std::size_t s) { async_connect_handler(ec, s); } ); } } } void ASyncDBConnection::doStateCallback(State state) { m_state = state; if (state == State::Connected) { m_canceller = m_connection.getCancel(); m_connection.setNoticeReceiver( [this](const PGresult *result) { processNotice(result); }); } std::lock_guard lg(m_stateCallback.m_mutex); if (m_stateCallback.m_func) { m_stateCallback.m_func(state); } } void ASyncDBConnection::closeConnection() { // SHould this be async too???? if (m_state == State::QuerySend) { m_canceller.cancel(nullptr); } m_asioSock.close(); m_connection.close(); doStateCallback(State::NotConnected); } bool ASyncDBConnection::send(const std::string &command, on_result_callback on_result) { m_connection.sendQuery(command); m_timer.start(); doStateCallback(State::QuerySend); m_asioSock.async_read_some(null_buffers(), [this, on_result] (boost::system::error_code ec, std::size_t s) { async_query_handler(ec, s, on_result); } ); return true; } bool ASyncDBConnection::send(const std::string &command, Pgsql::Params params, on_result_callback on_result) { m_connection.sendQueryParams(command.c_str(), params); m_timer.start(); doStateCallback(State::QuerySend); m_asioSock.async_read_some(null_buffers(), [this, on_result] (boost::system::error_code ec, std::size_t s) { async_query_handler(ec, s, on_result); } ); return true; } void ASyncDBConnection::async_query_handler(boost::system::error_code ec, std::size_t s, on_result_callback on_result) { if (ec == boost::system::errc::success) { bool finished = false; if (m_connection.consumeInput()) { while ( ! finished && ! m_connection.isBusy()) { auto res = m_connection.getResult(); qint64 ms = m_timer.restart(); on_result(res, ms); if (res == nullptr) { m_timer.invalidate(); doStateCallback(State::Connected); finished = true; } } // else is still waiting for more data } else { // error during consume } //return finished; if (!finished) { // wait for more m_asioSock.async_read_some(null_buffers(), [this, on_result] (boost::system::error_code ec, std::size_t s) { async_query_handler(ec, s, on_result); } ); } } } bool ASyncDBConnection::cancel() { return m_canceller.cancel(nullptr); } void ASyncDBConnection::setStateCallback(on_state_callback state_callback) { std::lock_guard lg(m_stateCallback.m_mutex); m_stateCallback.m_func = state_callback; } void ASyncDBConnection::setNoticeCallback(on_notice_callback notice_callback) { std::lock_guard lg(m_noticeCallback.m_mutex); m_noticeCallback.m_func = notice_callback; } void ASyncDBConnection::processNotice(const PGresult *result) { std::lock_guard lg(m_noticeCallback.m_mutex); if (m_noticeCallback.m_func) { Pgsql::ErrorDetails details = Pgsql::ErrorDetails::createErrorDetailsFromPGresult(result); m_noticeCallback.m_func(details); } }