Query window has now buttons with icons made in the designer for better looks. Depending on received responses from the database the tabcontrol with the message, data and explain tab now switches to the appropriate tab.
321 lines
8.5 KiB
C++
321 lines
8.5 KiB
C++
#include "asyncdbconnection.h"
|
|
#include "waithandlelist.h"
|
|
#include <chrono>
|
|
|
|
ASyncDBConnection::ASyncDBConnection()
|
|
{
|
|
|
|
}
|
|
|
|
//void ASyncDBConnection::setupConnection(const std::string &connstring)
|
|
//{
|
|
// if (m_thread.joinable()) {
|
|
// m_threadData.stop();
|
|
// m_thread.join();
|
|
// }
|
|
// m_threadData.m_initString = connstring;
|
|
// m_thread = std::thread([this] () { m_threadData.run(); });
|
|
//}
|
|
|
|
void ASyncDBConnection::setupConnection(const ConnectionConfig &config)
|
|
{
|
|
if (m_thread.joinable()) {
|
|
m_threadData.stop();
|
|
m_thread.join();
|
|
}
|
|
m_threadData.m_config = config;
|
|
m_thread = std::thread([this] () { m_threadData.run(); });
|
|
}
|
|
|
|
void ASyncDBConnection::closeConnection()
|
|
{
|
|
m_threadData.stop();
|
|
if (m_thread.joinable()) {
|
|
m_thread.join();
|
|
}
|
|
}
|
|
|
|
bool ASyncDBConnection::send(const std::string &command, on_result_callback on_result)
|
|
{
|
|
{
|
|
std::lock_guard<std::mutex> lg(m_threadData.m_commandQueue.m_mutex);
|
|
m_threadData.m_commandQueue.m_queue.emplace(command, on_result);
|
|
m_threadData.m_commandQueue.m_newEvent.set();
|
|
}
|
|
return true;
|
|
}
|
|
|
|
bool ASyncDBConnection::cancel()
|
|
{
|
|
return m_threadData.cancel();
|
|
}
|
|
|
|
void ASyncDBConnection::setStateCallback(on_state_callback state_callback)
|
|
{
|
|
std::lock_guard<std::mutex> lg(m_threadData.m_stateCallback.m_mutex);
|
|
m_threadData.m_stateCallback.m_func = state_callback;
|
|
}
|
|
|
|
void ASyncDBConnection::setNoticeCallback(on_notice_callback notice_callback)
|
|
{
|
|
std::lock_guard<std::mutex> lg(m_threadData.m_noticeCallback.m_mutex);
|
|
m_threadData.m_noticeCallback.m_func = notice_callback;
|
|
}
|
|
|
|
ASyncDBConnection::Thread::Thread()
|
|
: m_stopEvent(Win32Event::Reset::Manual, Win32Event::Initial::Clear)
|
|
{}
|
|
|
|
void ASyncDBConnection::Thread::run()
|
|
{
|
|
m_terminated = false;
|
|
while (!terminateRequested) {
|
|
|
|
// make or recover connection
|
|
if (makeConnection()) {
|
|
m_connection.setNoticeReceiver(
|
|
[this](const PGresult *result) { processNotice(result); });
|
|
m_canceller = m_connection.getCancel();
|
|
|
|
|
|
// send commands and receive results
|
|
communicate();
|
|
}
|
|
else {
|
|
// It is not possible to determine the source of the problem.
|
|
// Accept for PQconnectionNeedsPassword
|
|
|
|
// Pass problem to main thread and stop this thread
|
|
|
|
// Main thread needs to know it has to restart connecting if it want's to.
|
|
// TODO: add status functions to help main thread so it doesn't have to remember
|
|
// everything reported through callbacks.
|
|
|
|
break;
|
|
}
|
|
}
|
|
m_terminated = true;
|
|
// close connection
|
|
}
|
|
|
|
bool ASyncDBConnection::Thread::cancel()
|
|
{
|
|
return m_canceller.cancel(nullptr);
|
|
}
|
|
|
|
bool ASyncDBConnection::Thread::makeConnection()
|
|
{
|
|
using namespace std::chrono_literals;
|
|
|
|
while (!terminateRequested) {
|
|
|
|
// start connecting
|
|
//bool ok = m_connection.connectStart(m_initString + " client_encoding=utf8");
|
|
auto keywords = m_config.getKeywords();
|
|
auto values = m_config.getValues();
|
|
bool ok = m_connection.connectStart(keywords, values);
|
|
auto start = std::chrono::steady_clock::now();
|
|
if (ok && m_connection.status() != CONNECTION_BAD) {
|
|
int sock = m_connection.socket();
|
|
Win32Event socket_event(Win32Event::Reset::Auto, Win32Event::Initial::Clear);
|
|
|
|
long fd = FD_WRITE;
|
|
while (true) {
|
|
// poll till complete or failed (we can get an abort command)
|
|
WSAEventSelect(sock, socket_event.handle(), fd);
|
|
WaitHandleList whl;
|
|
auto wait_result_socket_event = whl.add(socket_event);
|
|
auto wait_result_stop = whl.add(m_stopEvent);
|
|
|
|
auto nu = std::chrono::steady_clock::now();
|
|
std::chrono::duration<float, std::milli> diff = -(nu-start);
|
|
diff += 30s;
|
|
DWORD timeout = diff.count();
|
|
|
|
DWORD res = MsgWaitForMultipleObjectsEx(
|
|
whl.count(), // _In_ DWORD nCount,
|
|
whl, // _In_ const HANDLE *pHandles,
|
|
timeout, // _In_ DWORD dwMilliseconds,
|
|
0, // _In_ DWORD dwWakeMask,
|
|
0 // _In_ DWORD dwFlags
|
|
);
|
|
if (res == wait_result_socket_event) {
|
|
auto poll_state = m_connection.connectPoll();
|
|
if (poll_state == PGRES_POLLING_OK) {
|
|
// if connected return true
|
|
doStateCallback(State::Connected);
|
|
return true;
|
|
}
|
|
else if (poll_state == PGRES_POLLING_FAILED) {
|
|
doStateCallback(State::NotConnected);
|
|
return false;
|
|
}
|
|
else if (poll_state == PGRES_POLLING_READING) {
|
|
doStateCallback(State::Connecting);
|
|
fd = FD_READ;
|
|
}
|
|
else if (poll_state == PGRES_POLLING_WRITING) {
|
|
doStateCallback(State::Connecting);
|
|
fd = FD_WRITE;
|
|
}
|
|
}
|
|
else if (res == wait_result_stop) {
|
|
|
|
}
|
|
} // end while (true)
|
|
}
|
|
}
|
|
return false;
|
|
}
|
|
|
|
void ASyncDBConnection::Thread::communicate()
|
|
{
|
|
while (!terminateRequested) {
|
|
// wait for something to do:
|
|
// - command to send to server
|
|
// - wait for results and (notifies can also come in)
|
|
// - pass each result on to the completion routine
|
|
// - notify comming in from the server
|
|
// - pass to notify callback
|
|
// - connection raises an error
|
|
// - return
|
|
// - stop signal
|
|
// - return
|
|
|
|
|
|
if (m_state == State::Connected) {
|
|
waitForAndSendCommand();
|
|
}
|
|
else if (m_state == State::QuerySend || m_state == State::CancelSend) {
|
|
// Wait for result, even after a cancel we should wait, for all results
|
|
// New command's are not excepted when one has been send
|
|
waitForResult();
|
|
}
|
|
|
|
}
|
|
}
|
|
|
|
void ASyncDBConnection::Thread::stop()
|
|
{
|
|
terminateRequested = true;
|
|
m_stopEvent.set();
|
|
}
|
|
|
|
void ASyncDBConnection::Thread::doStateCallback(State state)
|
|
{
|
|
m_state = state;
|
|
std::lock_guard<std::mutex> lg(m_stateCallback.m_mutex);
|
|
if (m_stateCallback.m_func) {
|
|
m_stateCallback.m_func(state);
|
|
}
|
|
}
|
|
|
|
void ASyncDBConnection::Thread::waitForAndSendCommand()
|
|
{
|
|
WaitHandleList whl;
|
|
auto wait_result_new_command = whl.add(m_commandQueue.m_newEvent);
|
|
auto wait_result_stop = whl.add(m_stopEvent);
|
|
|
|
DWORD res = MsgWaitForMultipleObjectsEx(
|
|
whl.count(), // _In_ DWORD nCount,
|
|
whl, // _In_ const HANDLE *pHandles,
|
|
INFINITE, // _In_ DWORD dwMilliseconds,
|
|
0, // _In_ DWORD dwWakeMask,
|
|
0 // _In_ DWORD dwFlags
|
|
);
|
|
if (res == wait_result_new_command) {
|
|
doNewCommand();
|
|
}
|
|
// if (res == wait_result_stop) return;
|
|
}
|
|
|
|
void ASyncDBConnection::Thread::doNewCommand()
|
|
{
|
|
// todo: send command
|
|
// get command from top of queue (but leave it in the queue, we need the callback)
|
|
std::string command;
|
|
{
|
|
std::lock_guard<std::mutex> lg(m_commandQueue.m_mutex);
|
|
if (! m_commandQueue.m_queue.empty()) {
|
|
command = m_commandQueue.m_queue.front().command;
|
|
}
|
|
}
|
|
if (!command.empty() && m_connection.sendQuery(command)) {
|
|
doStateCallback(State::QuerySend);
|
|
}
|
|
else {
|
|
std::string error = m_connection.getErrorMessage();
|
|
// todo: need to report the error
|
|
}
|
|
}
|
|
|
|
void ASyncDBConnection::Thread::waitForResult()
|
|
{
|
|
|
|
int sock = m_connection.socket();
|
|
Win32Event socket_event(Win32Event::Reset::Manual, Win32Event::Initial::Clear);
|
|
|
|
long fd = FD_READ | FD_CLOSE;
|
|
|
|
bool finished = false;
|
|
while ( ! finished) {
|
|
WSAEventSelect(sock, socket_event.handle(), fd);
|
|
|
|
WaitHandleList whl;
|
|
auto wait_result_socket = whl.add(socket_event);
|
|
auto wait_result_stop = whl.add(m_stopEvent);
|
|
|
|
DWORD res = MsgWaitForMultipleObjectsEx(
|
|
whl.count(), // _In_ DWORD nCount,
|
|
whl, // _In_ const HANDLE *pHandles,
|
|
INFINITE, // _In_ DWORD dwMilliseconds,
|
|
0, // _In_ DWORD dwWakeMask,
|
|
0 // _In_ DWORD dwFlags
|
|
);
|
|
if (res == wait_result_socket) {
|
|
WSANETWORKEVENTS net_events;
|
|
WSAEnumNetworkEvents(sock, socket_event.handle(), &net_events);
|
|
|
|
if (net_events.lNetworkEvents & FD_READ) {
|
|
if (m_connection.consumeInput()) {
|
|
while ( ! finished && ! m_connection.isBusy()) {
|
|
auto res(m_connection.getResult());
|
|
{
|
|
std::lock_guard<std::mutex> lg(m_commandQueue.m_mutex);
|
|
m_commandQueue.m_queue.front().on_result(res);
|
|
if (res == nullptr) {
|
|
m_commandQueue.m_queue.pop();
|
|
doStateCallback(State::Connected);
|
|
finished = true;
|
|
}
|
|
}
|
|
|
|
}
|
|
// else is still waiting for more data
|
|
|
|
}
|
|
else {
|
|
// error during consume
|
|
|
|
}
|
|
}
|
|
}
|
|
if (res == wait_result_stop) {
|
|
// Send cancel, close connection and terminate thread
|
|
cancel();
|
|
doStateCallback(State::Terminating);
|
|
finished = true;
|
|
}
|
|
} // end while
|
|
// When last result received, remove command from queue
|
|
}
|
|
|
|
void ASyncDBConnection::Thread::processNotice(const PGresult *result)
|
|
{
|
|
// Pgsql::Result res(result);
|
|
std::lock_guard<std::mutex> lg(m_noticeCallback.m_mutex);
|
|
if (m_noticeCallback.m_func) {
|
|
Pgsql::ErrorDetails details = Pgsql::ErrorDetails::createErrorDetailsFromPGresult(result);
|
|
m_noticeCallback.m_func(details);
|
|
}
|
|
}
|