pgLab/asyncdbconnection.cpp

330 lines
8.7 KiB
C++

#include "asyncdbconnection.h"
#include "waithandlelist.h"
#include "scopeguard.h"
#include <chrono>
ASyncDBConnection::ASyncDBConnection()
{
}
ASyncDBConnection::State ASyncDBConnection::state() const
{
return m_threadData.m_state;
}
//void ASyncDBConnection::setupConnection(const std::string &connstring)
//{
// if (m_thread.joinable()) {
// m_threadData.stop();
// m_thread.join();
// }
// m_threadData.m_initString = connstring;
// m_thread = std::thread([this] () { m_threadData.run(); });
//}
void ASyncDBConnection::setupConnection(const ConnectionConfig &config)
{
if (m_thread.joinable()) {
m_threadData.stop();
m_thread.join();
}
m_threadData.m_config = config;
m_thread = std::thread([this] () { m_threadData.run(); });
}
void ASyncDBConnection::closeConnection()
{
m_threadData.stop();
if (m_thread.joinable()) {
m_thread.join();
}
}
bool ASyncDBConnection::send(const std::string &command, on_result_callback on_result)
{
{
std::lock_guard<std::mutex> lg(m_threadData.m_commandQueue.m_mutex);
m_threadData.m_commandQueue.m_queue.emplace(command, on_result);
m_threadData.m_commandQueue.m_newEvent.set();
}
return true;
}
bool ASyncDBConnection::cancel()
{
return m_threadData.cancel();
}
void ASyncDBConnection::setStateCallback(on_state_callback state_callback)
{
std::lock_guard<std::mutex> lg(m_threadData.m_stateCallback.m_mutex);
m_threadData.m_stateCallback.m_func = state_callback;
}
void ASyncDBConnection::setNoticeCallback(on_notice_callback notice_callback)
{
std::lock_guard<std::mutex> lg(m_threadData.m_noticeCallback.m_mutex);
m_threadData.m_noticeCallback.m_func = notice_callback;
}
ASyncDBConnection::Thread::Thread()
: m_stopEvent(Win32Event::Reset::Manual, Win32Event::Initial::Clear)
{}
void ASyncDBConnection::Thread::run()
{
m_terminated = false;
SCOPE_EXIT {
m_state = State::NotConnected;
m_terminated = true;
};
while (!terminateRequested) {
// make or recover connection
if (makeConnection()) {
m_connection.setNoticeReceiver(
[this](const PGresult *result) { processNotice(result); });
m_canceller = m_connection.getCancel();
// send commands and receive results
communicate();
}
else {
// It is not possible to determine the source of the problem.
// Accept for PQconnectionNeedsPassword
// Pass problem to main thread and stop this thread
// Main thread needs to know it has to restart connecting if it want's to.
// TODO: add status functions to help main thread so it doesn't have to remember
// everything reported through callbacks.
break;
}
}
// close connection
}
bool ASyncDBConnection::Thread::cancel()
{
return m_canceller.cancel(nullptr);
}
bool ASyncDBConnection::Thread::makeConnection()
{
using namespace std::chrono_literals;
while (!terminateRequested) {
// start connecting
//bool ok = m_connection.connectStart(m_initString + " client_encoding=utf8");
auto keywords = m_config.getKeywords();
auto values = m_config.getValues();
bool ok = m_connection.connectStart(keywords, values);
auto start = std::chrono::steady_clock::now();
if (ok && m_connection.status() != CONNECTION_BAD) {
int sock = m_connection.socket();
Win32Event socket_event(Win32Event::Reset::Auto, Win32Event::Initial::Clear);
long fd = FD_WRITE;
while (true) {
// poll till complete or failed (we can get an abort command)
WSAEventSelect(sock, socket_event.handle(), fd);
WaitHandleList whl;
auto wait_result_socket_event = whl.add(socket_event);
auto wait_result_stop = whl.add(m_stopEvent);
auto nu = std::chrono::steady_clock::now();
std::chrono::duration<float, std::milli> diff = -(nu-start);
diff += 30s;
DWORD timeout = diff.count();
DWORD res = MsgWaitForMultipleObjectsEx(
whl.count(), // _In_ DWORD nCount,
whl, // _In_ const HANDLE *pHandles,
timeout, // _In_ DWORD dwMilliseconds,
0, // _In_ DWORD dwWakeMask,
0 // _In_ DWORD dwFlags
);
if (res == wait_result_socket_event) {
auto poll_state = m_connection.connectPoll();
if (poll_state == PGRES_POLLING_OK) {
// if connected return true
doStateCallback(State::Connected);
return true;
}
else if (poll_state == PGRES_POLLING_FAILED) {
doStateCallback(State::NotConnected);
return false;
}
else if (poll_state == PGRES_POLLING_READING) {
doStateCallback(State::Connecting);
fd = FD_READ;
}
else if (poll_state == PGRES_POLLING_WRITING) {
doStateCallback(State::Connecting);
fd = FD_WRITE;
}
}
else if (res == wait_result_stop) {
}
} // end while (true)
}
}
return false;
}
void ASyncDBConnection::Thread::communicate()
{
while (!terminateRequested) {
// wait for something to do:
// - command to send to server
// - wait for results and (notifies can also come in)
// - pass each result on to the completion routine
// - notify comming in from the server
// - pass to notify callback
// - connection raises an error
// - return
// - stop signal
// - return
if (m_state == State::Connected) {
waitForAndSendCommand();
}
else if (m_state == State::QuerySend || m_state == State::CancelSend) {
// Wait for result, even after a cancel we should wait, for all results
// New command's are not excepted when one has been send
waitForResult();
}
}
}
void ASyncDBConnection::Thread::stop()
{
terminateRequested = true;
m_stopEvent.set();
}
void ASyncDBConnection::Thread::doStateCallback(State state)
{
m_state = state;
std::lock_guard<std::mutex> lg(m_stateCallback.m_mutex);
if (m_stateCallback.m_func) {
m_stateCallback.m_func(state);
}
}
void ASyncDBConnection::Thread::waitForAndSendCommand()
{
WaitHandleList whl;
auto wait_result_new_command = whl.add(m_commandQueue.m_newEvent);
auto wait_result_stop = whl.add(m_stopEvent);
DWORD res = MsgWaitForMultipleObjectsEx(
whl.count(), // _In_ DWORD nCount,
whl, // _In_ const HANDLE *pHandles,
INFINITE, // _In_ DWORD dwMilliseconds,
0, // _In_ DWORD dwWakeMask,
0 // _In_ DWORD dwFlags
);
if (res == wait_result_new_command) {
doNewCommand();
}
// if (res == wait_result_stop) return;
}
void ASyncDBConnection::Thread::doNewCommand()
{
// todo: send command
// get command from top of queue (but leave it in the queue, we need the callback)
std::string command;
{
std::lock_guard<std::mutex> lg(m_commandQueue.m_mutex);
if (! m_commandQueue.m_queue.empty()) {
command = m_commandQueue.m_queue.front().command;
}
}
if (!command.empty() && m_connection.sendQuery(command)) {
doStateCallback(State::QuerySend);
}
else {
std::string error = m_connection.getErrorMessage();
// todo: need to report the error
}
}
void ASyncDBConnection::Thread::waitForResult()
{
int sock = m_connection.socket();
Win32Event socket_event(Win32Event::Reset::Manual, Win32Event::Initial::Clear);
long fd = FD_READ | FD_CLOSE;
bool finished = false;
while ( ! finished) {
WSAEventSelect(sock, socket_event.handle(), fd);
WaitHandleList whl;
auto wait_result_socket = whl.add(socket_event);
auto wait_result_stop = whl.add(m_stopEvent);
DWORD res = MsgWaitForMultipleObjectsEx(
whl.count(), // _In_ DWORD nCount,
whl, // _In_ const HANDLE *pHandles,
INFINITE, // _In_ DWORD dwMilliseconds,
0, // _In_ DWORD dwWakeMask,
0 // _In_ DWORD dwFlags
);
if (res == wait_result_socket) {
WSANETWORKEVENTS net_events;
WSAEnumNetworkEvents(sock, socket_event.handle(), &net_events);
if (net_events.lNetworkEvents & FD_READ) {
if (m_connection.consumeInput()) {
while ( ! finished && ! m_connection.isBusy()) {
auto res(m_connection.getResult());
{
std::lock_guard<std::mutex> lg(m_commandQueue.m_mutex);
m_commandQueue.m_queue.front().on_result(res);
if (res == nullptr) {
m_commandQueue.m_queue.pop();
doStateCallback(State::Connected);
finished = true;
}
}
}
// else is still waiting for more data
}
else {
// error during consume
}
}
}
if (res == wait_result_stop) {
// Send cancel, close connection and terminate thread
cancel();
doStateCallback(State::Terminating);
finished = true;
}
} // end while
// When last result received, remove command from queue
}
void ASyncDBConnection::Thread::processNotice(const PGresult *result)
{
// Pgsql::Result res(result);
std::lock_guard<std::mutex> lg(m_noticeCallback.m_mutex);
if (m_noticeCallback.m_func) {
Pgsql::ErrorDetails details = Pgsql::ErrorDetails::createErrorDetailsFromPGresult(result);
m_noticeCallback.m_func(details);
}
}