#include "ASyncDBConnection.h" #include "ScopeGuard.h" #include using namespace boost::asio; namespace { class registerMetaTypes { public: registerMetaTypes() { qRegisterMetaType(); qRegisterMetaType(); } } registerMetaTypes_instance; } ASyncDBConnection::ASyncDBConnection(boost::asio::io_service &ios) : m_asioSock(ios) {} ASyncDBConnection::~ASyncDBConnection() = default; ASyncDBConnection::State ASyncDBConnection::state() const { return m_state; } void ASyncDBConnection::setupConnection(const ConnectionConfig &config) { m_config = config; auto keywords = m_config.getKeywords(); auto values = m_config.getValues(); bool ok = m_connection.connectStart(keywords, values); // auto start = std::chrono::steady_clock::now(); if (ok && m_connection.status() != CONNECTION_BAD) { auto sock_handle = m_connection.socket(); m_asioSock.assign(ip::tcp::v4(), sock_handle); m_asioSock.non_blocking(true); m_asioSock.async_write_some(null_buffers(), [this] (boost::system::error_code ec, std::size_t s) { async_connect_handler(ec, s); } ); } } void ASyncDBConnection::async_connect_handler(boost::system::error_code ec, std::size_t /*s*/) { // boost::asio::error::operation_aborted if (ec == boost::system::errc::success) { auto poll_state = m_connection.connectPoll(); if (poll_state == PGRES_POLLING_OK) { // if connected return true doStateCallback(State::Connected); } else if (poll_state == PGRES_POLLING_FAILED) { doStateCallback(State::NotConnected); } else if (poll_state == PGRES_POLLING_READING) { doStateCallback(State::Connecting); m_asioSock.async_read_some(null_buffers(), [this] (boost::system::error_code ec, std::size_t s) { async_connect_handler(ec, s); } ); } else if (poll_state == PGRES_POLLING_WRITING) { doStateCallback(State::Connecting); m_asioSock.async_write_some(null_buffers(), [this] (boost::system::error_code ec, std::size_t s) { async_connect_handler(ec, s); } ); } } } void ASyncDBConnection::doStateCallback(State state) { m_state = state; if (state == State::Connected) { m_canceller = m_connection.getCancel(); m_connection.setNoticeReceiver( [this](const PGresult *result) { processNotice(result); }); } emit onStateChanged(state); } void ASyncDBConnection::closeConnection() { // SHould this be async too???? if (m_state == State::QuerySend) { m_canceller.cancel(nullptr); } if (m_state != State::NotConnected) { m_asioSock.close(); m_connection.close(); } doStateCallback(State::NotConnected); } bool ASyncDBConnection::send(const std::string &command, on_result_callback on_result) { m_connection.sendQuery(command); m_timer.start(); doStateCallback(State::QuerySend); m_asioSock.async_read_some(null_buffers(), [this, on_result] (boost::system::error_code ec, std::size_t s) { async_query_handler(ec, s, on_result); } ); return true; } bool ASyncDBConnection::send(const std::string &command, Pgsql::Params params, on_result_callback on_result) { m_connection.sendQueryParams(command.c_str(), params); m_timer.start(); doStateCallback(State::QuerySend); m_asioSock.async_read_some(null_buffers(), [this, on_result] (boost::system::error_code ec, std::size_t s) { async_query_handler(ec, s, on_result); } ); return true; } void ASyncDBConnection::async_query_handler(boost::system::error_code ec, std::size_t s, on_result_callback on_result) { if (ec == boost::system::errc::success) { bool finished = false; if (m_connection.consumeInput()) { while ( ! finished && ! m_connection.isBusy()) { auto res = m_connection.getResult(); qint64 ms = m_timer.restart(); on_result(res, ms); if (res == nullptr) { m_timer.invalidate(); doStateCallback(State::Connected); finished = true; } } // else is still waiting for more data } else { // error during consume auto error_msg = m_connection.getErrorMessage(); } //return finished; if (!finished) { // wait for more m_asioSock.async_read_some(null_buffers(), [this, on_result] (boost::system::error_code ec, std::size_t s) { async_query_handler(ec, s, on_result); } ); } } } bool ASyncDBConnection::cancel() { return m_canceller.cancel(nullptr); } void ASyncDBConnection::processNotice(const PGresult *result) { Pgsql::ErrorDetails details = Pgsql::ErrorDetails::createErrorDetailsFromPGresult(result); emit onNotice(details); }