pgLab/pglablib/ASyncDBConnection.cpp

463 lines
13 KiB
C++
Raw Normal View History

2017-12-16 21:41:46 +01:00
#include "ASyncDBConnection.h"
#include "ScopeGuard.h"
#include "util.h"
#include "Pgsql_PgException.h"
#include "Win32Event.h"
#include "WaitHandleList.h"
#include <chrono>
#include <queue>
#include <WinSock2.h>
#include <QDebug>
namespace {
class registerMetaTypes {
public:
registerMetaTypes()
{
qRegisterMetaType<ASyncDBConnection::State>();
qRegisterMetaType<Pgsql::ErrorDetails>();
qRegisterMetaType<std::shared_ptr<Pgsql::Result>>();
}
} registerMetaTypes_instance;
class Command {
public:
std::string command;
Pgsql::Params params;
ASyncDBConnection::on_result_callback on_result;
Command() = default;
Command(const std::string &cmd, ASyncDBConnection::on_result_callback cb)
: command(cmd), on_result(cb)
{}
Command(const std::string &cmd, Pgsql::Params &&p, ASyncDBConnection::on_result_callback cb)
: command(cmd), params(p), on_result(cb)
{}
};
}
class ASyncDBConnectionThread {
public:
using t_CommandQueue = std::queue<Command>;
struct t_Command {
std::mutex m_mutex;
t_CommandQueue m_queue;
Win32Event m_newEvent;
t_Command()
: m_newEvent(Win32Event::Reset::Auto, Win32Event::Initial::Clear)
{}
} m_commandQueue;
ConnectionConfig m_config;
ASyncDBConnection::State m_state = ASyncDBConnection::State::NotConnected;
ASyncDBConnectionThread(ASyncDBConnection *asco);
/// Is started as a seperate thread by ASyncDBConnection
void run();
/// Sends a cancel request to the DB server
bool cancel();
void stop();
private:
ASyncDBConnection *asyncConnObject = nullptr;
/// \todo Implement new method to stop the thread
Win32Event m_stopEvent;
Pgsql::Connection m_connection;
bool terminateRequested = false; ///< is set when the thread should stop
bool m_terminated = true;
Pgsql::Canceller m_canceller;
QElapsedTimer m_timer;
bool makeConnection();
void communicate();
2022-08-14 08:04:21 +02:00
void doStateCallback(ASyncDBConnection::StateData state);
/// Wait's for a command to come in and send's it to the server
void waitForAndSendCommand();
2022-08-14 08:04:21 +02:00
void doNewCommand();
void waitForResult();
void processNotice(const PGresult *result);
/** Function to call when after sending a command the socket is ready for reading.
*
* It might take several consumes before all data is read.
*/
bool consumeResultInput();
};
ASyncDBConnectionThread::ASyncDBConnectionThread(ASyncDBConnection *asco)
: asyncConnObject(asco)
, m_stopEvent(Win32Event::Reset::Manual, Win32Event::Initial::Clear)
{}
void ASyncDBConnectionThread::run()
{
m_terminated = false;
SCOPE_EXIT {
m_state = ASyncDBConnection::State::NotConnected;
m_terminated = true;
2022-08-14 08:04:21 +02:00
m_connection.close();
// doStateCallback(ASyncDBConnection::State::NotConnected);
};
2022-08-14 08:04:21 +02:00
try {
while (!terminateRequested) {
// make or recover connection
if (makeConnection()) {
m_connection.setNoticeReceiver(
[this](const PGresult *result) { processNotice(result); });
m_canceller = m_connection.getCancel();
// send commands and receive results
communicate();
}
else {
// It is not possible to determine the source of the problem.
// Accept for PQconnectionNeedsPassword
// Pass problem to main thread and stop this thread
// Main thread needs to know it has to restart connecting if it want's to.
// TODO: add status functions to help main thread so it doesn't have to remember
// everything reported through callbacks.
break;
}
}
doStateCallback({ ASyncDBConnection::State::NotConnected, QString("terminating") });
}
catch (const std::exception &ex)
{
doStateCallback({ ASyncDBConnection::State::NotConnected, QString::fromUtf8(ex.what()) });
}
catch (...)
{
doStateCallback({ ASyncDBConnection::State::NotConnected, QString::fromUtf8("Unknown error") });
}
}
bool ASyncDBConnectionThread::cancel()
{
return m_canceller.cancel(nullptr);
}
bool ASyncDBConnectionThread::makeConnection()
{
using namespace std::literals::chrono_literals;
// start connecting
QString conn_string = m_config.connectionString();
while (!terminateRequested) {
bool ok = m_connection.connectStart(conn_string); //keywords, values);
auto start = std::chrono::steady_clock::now();
if (ok && m_connection.status() != CONNECTION_BAD) {
Win32Event socket_event(Win32Event::Reset::Auto, Win32Event::Initial::Clear);
long fd = FD_WRITE;
while (true) {
// poll till complete or failed (we can get an abort command)
int sock = m_connection.socket();
WSAEventSelect(sock, socket_event.handle(), fd);
WaitHandleList whl;
auto wait_result_socket_event = whl.add(socket_event);
auto wait_result_stop = whl.add(m_stopEvent);
auto nu = std::chrono::steady_clock::now();
std::chrono::duration<float, std::milli> diff = -(nu-start);
diff += 30s;
DWORD timeout = diff.count();
DWORD res = MsgWaitForMultipleObjectsEx(
whl.count(), // _In_ DWORD nCount,
whl, // _In_ const HANDLE *pHandles,
timeout, // _In_ DWORD dwMilliseconds,
0, // _In_ DWORD dwWakeMask,
0 // _In_ DWORD dwFlags
);
if (res == wait_result_socket_event) {
auto poll_state = m_connection.connectPoll();
if (poll_state == PGRES_POLLING_OK) {
2022-08-14 08:04:21 +02:00
qDebug() << "ASyncDBConnection Connected";
// if connected return true
2022-08-14 08:04:21 +02:00
doStateCallback({ ASyncDBConnection::State::Connected, "Success" });
return true;
}
else if (poll_state == PGRES_POLLING_FAILED) {
2022-08-14 08:04:21 +02:00
doStateCallback({ ASyncDBConnection::State::NotConnected, "Failed to connect" });
return false;
}
else if (poll_state == PGRES_POLLING_READING) {
2022-08-14 08:04:21 +02:00
doStateCallback({ ASyncDBConnection::State::Connecting, "Negotiating" });
fd = FD_READ;
}
else if (poll_state == PGRES_POLLING_WRITING) {
2022-08-14 08:04:21 +02:00
doStateCallback({ ASyncDBConnection::State::Connecting, "Negotiating" });
fd = FD_WRITE;
}
}
else if (res == wait_result_stop) {
m_connection.close();
return false;
}
} // end while (true)
}
}
return false;
}
void ASyncDBConnectionThread::communicate()
{
while (!terminateRequested) {
// wait for something to do:
// - command to send to server
// - wait for results and (notifies can also come in)
// - pass each result on to the completion routine
// - notify comming in from the server
// - pass to notify callback
// - connection raises an error
// - return
// - stop signal
// - return
if (m_state == ASyncDBConnection::State::Connected) {
waitForAndSendCommand();
}
else if (m_state == ASyncDBConnection::State::QuerySend || m_state == ASyncDBConnection::State::CancelSend) {
// Wait for result, even after a cancel we should wait, for all results
// New command's are not excepted when one has been send
waitForResult();
}
}
}
void ASyncDBConnectionThread::stop()
{
terminateRequested = true;
m_stopEvent.set();
}
2022-08-14 08:04:21 +02:00
void ASyncDBConnectionThread::doStateCallback(ASyncDBConnection::StateData state)
{
2022-08-14 08:04:21 +02:00
m_state = state.State;
Q_EMIT asyncConnObject->onStateChanged(state);
}
void ASyncDBConnectionThread::waitForAndSendCommand()
{
2022-05-26 08:25:31 +02:00
WaitHandleList whl;
auto wait_result_new_command = whl.add(m_commandQueue.m_newEvent);
2025-02-23 16:52:39 +01:00
whl.add(m_stopEvent);
DWORD res = MsgWaitForMultipleObjectsEx(
whl.count(), // _In_ DWORD nCount,
whl, // _In_ const HANDLE *pHandles,
INFINITE, // _In_ DWORD dwMilliseconds,
0, // _In_ DWORD dwWakeMask,
0 // _In_ DWORD dwFlags
);
if (res == wait_result_new_command) {
doNewCommand();
}
2022-05-26 08:25:31 +02:00
// Note if it was stop we can just return and function
// above will stop looping because terminateRequested has been set too by stop
}
void ASyncDBConnectionThread::doNewCommand()
{
// get command from top of queue (but leave it in the queue, we need the callback)
if (! m_commandQueue.m_queue.empty()) {
const Command &command = m_commandQueue.m_queue.front();
2019-12-18 19:35:17 +01:00
if (command.params.empty())
2022-08-14 08:04:21 +02:00
m_connection.sendQuery(command.command.c_str());
2019-12-18 19:35:17 +01:00
else
2022-08-14 08:04:21 +02:00
m_connection.sendQueryParams(command.command.c_str(), command.params);
2019-12-18 19:35:17 +01:00
2022-08-14 08:04:21 +02:00
m_timer.start();
doStateCallback(ASyncDBConnection::State::QuerySend);
}
}
bool ASyncDBConnectionThread::consumeResultInput()
{
bool finished = false;
if (m_connection.consumeInput()) {
while ( ! finished && ! m_connection.isBusy()) {
auto res(m_connection.getResultNoThrow());
{
qint64 ms = m_timer.restart();
std::lock_guard<std::mutex> lg(m_commandQueue.m_mutex);
m_commandQueue.m_queue.front().on_result(res, ms);
if (res == nullptr) {
m_timer.invalidate();
m_commandQueue.m_queue.pop();
doStateCallback(ASyncDBConnection::State::Connected);
finished = true;
}
}
}
// else is still waiting for more data
}
else {
// error during consume
auto error_msg = m_connection.getErrorMessage();
qDebug() << "error while communicating with server " << QString::fromStdString(error_msg);
2022-08-14 08:04:21 +02:00
doStateCallback({ASyncDBConnection::State::NotConnected, QString::fromStdString(error_msg)});
finished = true;
stop();
}
return finished;
}
void ASyncDBConnectionThread::waitForResult()
{
SOCKET sock = static_cast<SOCKET>(m_connection.socket());
Win32Event socket_event(Win32Event::Reset::Manual, Win32Event::Initial::Clear);
long fd = FD_READ | FD_CLOSE;
bool finished = false;
while ( ! finished) {
WSAEventSelect(sock, socket_event.handle(), fd);
WaitHandleList whl;
auto wait_result_socket = whl.add(socket_event);
auto wait_result_stop = whl.add(m_stopEvent);
DWORD res = MsgWaitForMultipleObjectsEx(
whl.count(), // _In_ DWORD nCount,
whl, // _In_ const HANDLE *pHandles,
INFINITE, // _In_ DWORD dwMilliseconds,
0, // _In_ DWORD dwWakeMask,
0 // _In_ DWORD dwFlags
);
if (res == wait_result_socket) {
WSANETWORKEVENTS net_events;
WSAEnumNetworkEvents(sock, socket_event.handle(), &net_events);
if (net_events.lNetworkEvents & FD_READ) {
if (consumeResultInput()) {
finished = true;
}
}
}
if (res == wait_result_stop) {
// Send cancel, close connection and terminate thread
cancel();
doStateCallback(ASyncDBConnection::State::Terminating);
finished = true;
}
} // end while
}
void ASyncDBConnectionThread::processNotice(const PGresult *result)
{
Pgsql::ErrorDetails details = Pgsql::ErrorDetails::createErrorDetailsFromPGresult(result);
Q_EMIT asyncConnObject->onNotice(details);
}
ASyncDBConnection::ASyncDBConnection()
: m_threadData(std::make_unique<ASyncDBConnectionThread>(this))
{}
ASyncDBConnection::~ASyncDBConnection()
{
closeConnection();
}
ASyncDBConnection::State ASyncDBConnection::state() const
{
return m_threadData->m_state;
}
void ASyncDBConnection::setupConnection(const ConnectionConfig &config)
{
if (m_thread.joinable()) {
m_threadData->stop();
m_thread.join();
}
m_threadData.reset(new ASyncDBConnectionThread(this));
m_threadData->m_config = config;
m_thread = std::thread([this] () { m_threadData->run(); });
}
void ASyncDBConnection::doStateCallback(State state)
{
m_state = state;
if (state == State::Connected) {
m_canceller = m_connection.getCancel();
m_connection.setNoticeReceiver(
[this](const PGresult *result) { processNotice(result); });
}
Q_EMIT onStateChanged(state);
}
void ASyncDBConnection::closeConnection()
{
m_threadData->stop();
if (m_thread.joinable()) {
m_thread.join();
}
}
bool ASyncDBConnection::send(const std::string &command, on_result_callback on_result)
{
2022-08-17 18:18:10 +02:00
if (command.empty() || state() == State::NotConnected)
2019-12-18 19:35:17 +01:00
return false;
{
std::lock_guard<std::mutex> lg(m_threadData->m_commandQueue.m_mutex);
m_threadData->m_commandQueue.m_queue.emplace(command, on_result);
m_threadData->m_commandQueue.m_newEvent.set();
}
return true;
}
bool ASyncDBConnection::send(const std::string &command, Pgsql::Params params, on_result_callback on_result)
{
2022-08-17 18:18:10 +02:00
if (command.empty() || state() == State::NotConnected)
return false;
{
std::lock_guard<std::mutex> lg(m_threadData->m_commandQueue.m_mutex);
m_threadData->m_commandQueue.m_queue.emplace(command, std::move(params), on_result);
m_threadData->m_commandQueue.m_newEvent.set();
}
return true;
}
bool ASyncDBConnection::cancel()
{
return m_threadData->cancel();
}
void ASyncDBConnection::processNotice(const PGresult *result)
{
Pgsql::ErrorDetails details = Pgsql::ErrorDetails::createErrorDetailsFromPGresult(result);
Q_EMIT onNotice(details);
}