Restructured locations of source.

This commit is contained in:
Eelke Klein 2017-08-27 07:34:42 +02:00
parent 78a4c6d730
commit 7c4e8e95e8
151 changed files with 1 additions and 0 deletions

View file

@ -1,175 +0,0 @@
#include "ASyncDBConnection.h"
#include "ScopeGuard.h"
#include <chrono>
using namespace boost::asio;
ASyncDBConnection::ASyncDBConnection(boost::asio::io_service &ios)
: m_asioSock(ios)
{}
ASyncDBConnection::~ASyncDBConnection() = default;
ASyncDBConnection::State ASyncDBConnection::state() const
{
return m_state;
}
void ASyncDBConnection::setupConnection(const ConnectionConfig &config)
{
m_config = config;
auto keywords = m_config.getKeywords();
auto values = m_config.getValues();
bool ok = m_connection.connectStart(keywords, values);
// auto start = std::chrono::steady_clock::now();
if (ok && m_connection.status() != CONNECTION_BAD) {
auto sock_handle = m_connection.socket();
m_asioSock.assign(ip::tcp::v4(), sock_handle);
m_asioSock.non_blocking(true);
m_asioSock.async_write_some(null_buffers(),
[this] (boost::system::error_code ec, std::size_t s)
{ async_connect_handler(ec, s); }
);
}
}
void ASyncDBConnection::async_connect_handler(boost::system::error_code ec, std::size_t s)
{
// boost::asio::error::operation_aborted
if (ec == boost::system::errc::success) {
auto poll_state = m_connection.connectPoll();
if (poll_state == PGRES_POLLING_OK) {
// if connected return true
doStateCallback(State::Connected);
}
else if (poll_state == PGRES_POLLING_FAILED) {
doStateCallback(State::NotConnected);
}
else if (poll_state == PGRES_POLLING_READING) {
doStateCallback(State::Connecting);
m_asioSock.async_read_some(null_buffers(),
[this] (boost::system::error_code ec, std::size_t s)
{ async_connect_handler(ec, s); }
);
}
else if (poll_state == PGRES_POLLING_WRITING) {
doStateCallback(State::Connecting);
m_asioSock.async_write_some(null_buffers(),
[this] (boost::system::error_code ec, std::size_t s)
{ async_connect_handler(ec, s); }
);
}
}
}
void ASyncDBConnection::doStateCallback(State state)
{
m_state = state;
if (state == State::Connected) {
m_canceller = m_connection.getCancel();
m_connection.setNoticeReceiver(
[this](const PGresult *result) { processNotice(result); });
}
std::lock_guard<std::mutex> lg(m_stateCallback.m_mutex);
if (m_stateCallback.m_func) {
m_stateCallback.m_func(state);
}
}
void ASyncDBConnection::closeConnection()
{
// SHould this be async too????
if (m_state == State::QuerySend) {
m_canceller.cancel(nullptr);
}
m_asioSock.close();
m_connection.close();
doStateCallback(State::NotConnected);
}
bool ASyncDBConnection::send(const std::string &command, on_result_callback on_result)
{
m_connection.sendQuery(command);
m_timer.start();
doStateCallback(State::QuerySend);
m_asioSock.async_read_some(null_buffers(),
[this, on_result] (boost::system::error_code ec, std::size_t s)
{ async_query_handler(ec, s, on_result); }
);
return true;
}
bool ASyncDBConnection::send(const std::string &command, Pgsql::Params params, on_result_callback on_result)
{
m_connection.sendQueryParams(command.c_str(), params);
m_timer.start();
doStateCallback(State::QuerySend);
m_asioSock.async_read_some(null_buffers(),
[this, on_result] (boost::system::error_code ec, std::size_t s)
{ async_query_handler(ec, s, on_result); }
);
return true;
}
void ASyncDBConnection::async_query_handler(boost::system::error_code ec, std::size_t s, on_result_callback on_result)
{
if (ec == boost::system::errc::success) {
bool finished = false;
if (m_connection.consumeInput()) {
while ( ! finished && ! m_connection.isBusy()) {
auto res = m_connection.getResult();
qint64 ms = m_timer.restart();
on_result(res, ms);
if (res == nullptr) {
m_timer.invalidate();
doStateCallback(State::Connected);
finished = true;
}
}
// else is still waiting for more data
}
else {
// error during consume
}
//return finished;
if (!finished) {
// wait for more
m_asioSock.async_read_some(null_buffers(),
[this, on_result] (boost::system::error_code ec, std::size_t s)
{ async_query_handler(ec, s, on_result); }
);
}
}
}
bool ASyncDBConnection::cancel()
{
return m_canceller.cancel(nullptr);
}
void ASyncDBConnection::setStateCallback(on_state_callback state_callback)
{
std::lock_guard<std::mutex> lg(m_stateCallback.m_mutex);
m_stateCallback.m_func = state_callback;
}
void ASyncDBConnection::setNoticeCallback(on_notice_callback notice_callback)
{
std::lock_guard<std::mutex> lg(m_noticeCallback.m_mutex);
m_noticeCallback.m_func = notice_callback;
}
void ASyncDBConnection::processNotice(const PGresult *result)
{
std::lock_guard<std::mutex> lg(m_noticeCallback.m_mutex);
if (m_noticeCallback.m_func) {
Pgsql::ErrorDetails details = Pgsql::ErrorDetails::createErrorDetailsFromPGresult(result);
m_noticeCallback.m_func(details);
}
}