Added the thread safe TSQueue and using it in mainwindow to replace the adhoc queue implementation.

This commit is contained in:
Eelke Klein 2017-01-03 07:22:36 +01:00
parent c551d982c6
commit 83064ab86b
10 changed files with 253 additions and 27 deletions

1
.gitignore vendored
View file

@ -1,5 +1,6 @@
debug/ debug/
release/ release/
DIST/
Makefile Makefile
Makefile.Debug Makefile.Debug
Makefile.Release Makefile.Release

View file

@ -12,7 +12,7 @@ TARGET = Ivory
TEMPLATE = app TEMPLATE = app
INCLUDEPATH += C:\prog\include INCLUDEPATH += C:\prog\include
DEFINES += WIN32_LEAN_AND_MEAN
LIBS += c:\prog\lib\libpq.lib LIBS += c:\prog\lib\libpq.lib
SOURCES += main.cpp\ SOURCES += main.cpp\
@ -24,7 +24,10 @@ SOURCES += main.cpp\
sqlhighlighter.cpp \ sqlhighlighter.cpp \
jsoncpp.cpp \ jsoncpp.cpp \
queryexplainmodel.cpp \ queryexplainmodel.cpp \
explaintreemodelitem.cpp explaintreemodelitem.cpp \
asyncdbconnection.cpp \
tsqueue.cpp \
win32event.cpp
HEADERS += mainwindow.h \ HEADERS += mainwindow.h \
serverproperties.h \ serverproperties.h \
@ -33,7 +36,10 @@ HEADERS += mainwindow.h \
queryresultmodel.h \ queryresultmodel.h \
sqlhighlighter.h \ sqlhighlighter.h \
queryexplainmodel.h \ queryexplainmodel.h \
explaintreemodelitem.h explaintreemodelitem.h \
asyncdbconnection.h \
tsqueue.h \
win32event.h
FORMS += mainwindow.ui \ FORMS += mainwindow.ui \
serverproperties.ui serverproperties.ui

61
asyncdbconnection.cpp Normal file
View file

@ -0,0 +1,61 @@
#include "asyncdbconnection.h"
ASyncDBConnection::ASyncDBConnection()
{
}
void ASyncDBConnection::setupConnection(const std::string &connstring)
{}
void ASyncDBConnection::closeConnection()
{}
void ASyncDBConnection::setStateCallback(on_state_callback state_callback)
{}
void ASyncDBConnection::Thread::run()
{
while (!terminateRequested) {
// make or recover connection
if (makeConnection()) {
// send commands and receive results
communicate();
}
}
// close connection
}
bool ASyncDBConnection::Thread::makeConnection()
{
while (!terminateRequested) {
// start connecting
// poll till complete or failed (we can get an abort command)
// if connected return true
// else retry (unless we get a command to stop then return false)
}
return false;
}
void ASyncDBConnection::Thread::communicate()
{
while (!terminateRequested) {
// wait for something to do:
// - command to send to server
// - wait for results and (notifies can also come in)
// - pass each result on to the completion routine
// - notify comming in from the server
// - pass to notify callback
// - connection raises an error
// - return
// - stop signal
// - return
}
}

68
asyncdbconnection.h Normal file
View file

@ -0,0 +1,68 @@
#ifndef ASYNCDBCONNECTION_H
#define ASYNCDBCONNECTION_H
#include "PgsqlConn.h"
#include <functional>
#include <vector>
class ASyncDBConnection {
public:
enum class State {
NotConnected,
Connecting,
Connected,
QuerySend,
CancelSend
};
using on_result_callback = std::function<void(Pgsql::Result)>;
using on_state_callback = std::function<void(State)>;
ASyncDBConnection();
void setupConnection(const std::string &connstring);
void closeConnection();
void setStateCallback(on_state_callback state_callback);
/** Sends command to the server.
When the result is in and no result_task_queue was passed then on_result will be
called directly within the thread.
If the command gives multiple results on_result will be called for each result.
*/
bool send(const std::string &command, on_result_callback on_result);
private:
class Command {
public:
std::string command;
on_result_callback on_result;
};
/// Contains all the members accessed by the thread
class Thread {
public:
on_state_callback m_state_callback;
std::string m_init_string;
/// Is started as a seperate thread by ASyncDBConnection
void run();
/// Sends a cancel request to the DB server
void cancel();
private:
Pgsql::Connection m_connection;
bool terminateRequested = false; ///< is set when the thread should stop
bool makeConnection();
void communicate();
};
Thread thread;
};
#endif // ASYNCDBCONNECTION_H

View file

@ -40,7 +40,6 @@ MainWindow::MainWindow(QWidget *parent) :
ui->queryEdit->setFont(font); ui->queryEdit->setFont(font);
highlighter.reset(new SqlHighlighter(ui->queryEdit->document())); highlighter.reset(new SqlHighlighter(ui->queryEdit->document()));
ui->queryEdit->setPlainText(test_query); ui->queryEdit->setPlainText(test_query);
ui->connectionStringEdit->setText("user=postgres dbname=foutrapport password=admin"); ui->connectionStringEdit->setText("user=postgres dbname=foutrapport password=admin");
QAction *action; QAction *action;
@ -60,10 +59,9 @@ MainWindow::MainWindow(QWidget *parent) :
MainWindow::~MainWindow() MainWindow::~MainWindow()
{} {}
void MainWindow::QueueTask(callable c) void MainWindow::QueueTask(TSQueue::t_Callable c)
{ {
std::lock_guard<std::mutex> lg(m_mutexCallableQueue); m_taskQueue.add(c);
m_callableQueue.emplace_back(std::move(c));
// Theoretically this needs to be only called if the queue was empty because otherwise it already would // Theoretically this needs to be only called if the queue was empty because otherwise it already would
// be busy emptying the queue. For now however I think it is safer to call it just to make sure. // be busy emptying the queue. For now however I think it is safer to call it just to make sure.
QMetaObject::invokeMethod(this, "processCallableQueue", Qt::QueuedConnection); // queues on main thread QMetaObject::invokeMethod(this, "processCallableQueue", Qt::QueuedConnection); // queues on main thread
@ -71,21 +69,13 @@ void MainWindow::QueueTask(callable c)
void MainWindow::processCallableQueue() void MainWindow::processCallableQueue()
{ {
bool empty; if (!m_taskQueue.empty()) {
callable c; auto c = m_taskQueue.pop();
{ // narrow scope for lock guard
std::lock_guard<std::mutex> lg(m_mutexCallableQueue);
c = m_callableQueue.back();
m_callableQueue.pop_back();
empty = m_callableQueue.empty();
}
c(); c();
if (!m_taskQueue.empty()) {
if (!empty) {
// This gives other events a chance to be processed to keep the UI snappy.
QTimer::singleShot(0, this, SLOT(processCallableQueue())); QTimer::singleShot(0, this, SLOT(processCallableQueue()));
} }
}
} }
void MainWindow::startConnect() void MainWindow::startConnect()
@ -148,6 +138,7 @@ void MainWindow::performQuery()
queryCancel = std::move(connection->getCancel()); queryCancel = std::move(connection->getCancel());
QString command = ui->queryEdit->toPlainText(); QString command = ui->queryEdit->toPlainText();
std::thread([this,command]() std::thread([this,command]()
{ {
auto res = std::make_shared<Pgsql::Result>(connection->query(command)); auto res = std::make_shared<Pgsql::Result>(connection->query(command));

View file

@ -1,6 +1,7 @@
#ifndef MAINWINDOW_H #ifndef MAINWINDOW_H
#define MAINWINDOW_H #define MAINWINDOW_H
#include "tsqueue.h"
#include <QMainWindow> #include <QMainWindow>
#include <QSocketNotifier> #include <QSocketNotifier>
#include <memory> #include <memory>
@ -36,20 +37,16 @@ class MainWindow : public QMainWindow
Q_OBJECT Q_OBJECT
public: public:
using callable = std::function<void()>;
explicit MainWindow(QWidget *parent = 0); explicit MainWindow(QWidget *parent = 0);
~MainWindow(); ~MainWindow();
/* Meant to be called from other threads to pass a code block /* Meant to be called from other threads to pass a code block
* that has to be executed in the context of the thread of the window. * that has to be executed in the context of the thread of the window.
*/ */
void QueueTask(callable c); void QueueTask(TSQueue::t_Callable c);
private: private:
using t_CallableQueue = std::deque<callable>; TSQueue m_taskQueue;
std::mutex m_mutexCallableQueue;
t_CallableQueue m_callableQueue;
std::unique_ptr<Ui::MainWindow> ui; std::unique_ptr<Ui::MainWindow> ui;

34
tsqueue.cpp Normal file
View file

@ -0,0 +1,34 @@
#include "tsqueue.h"
TSQueue::TSQueue()
: newData(Win32Event::Reset::Manual, Win32Event::Initial::Clear)
{}
void TSQueue::add(t_Callable callable)
{
std::lock_guard<std::mutex> g(m);
futureQueue.push_back(std::move(callable));
newData.Set();
}
bool TSQueue::empty()
{
std::lock_guard<std::mutex> g(m);
return futureQueue.empty();
}
TSQueue::t_Callable TSQueue::pop()
{
std::lock_guard<std::mutex> g(m);
auto f = std::move(futureQueue.front());
futureQueue.pop_front();
if (futureQueue.empty()) {
newData.Reset();
}
return f;
}
HANDLE TSQueue::getNewDataEventHandle()
{
return newData.GetHandle();
}

27
tsqueue.h Normal file
View file

@ -0,0 +1,27 @@
#ifndef TSQUEUE_H
#define TSQUEUE_H
#include "Win32Event.h"
#include <deque>
#include <functional>
#include <mutex>
class TSQueue {
public:
using t_Callable = std::function<void()>;
TSQueue();
void add(t_Callable callable);
bool empty();
t_Callable pop();
HANDLE getNewDataEventHandle();
private:
using t_CallableQueue = std::deque<t_Callable>;
Win32Event newData;
std::mutex m;
t_CallableQueue futureQueue;
};
#endif // TSQUEUE_H

1
win32event.cpp Normal file
View file

@ -0,0 +1 @@
#include "win32event.h"

40
win32event.h Normal file
View file

@ -0,0 +1,40 @@
#ifndef WIN32EVENT_H
#define WIN32EVENT_H
#include <windows.h>
/** Simpel wrapper around a Win32 Event object.
Mostly to make cleanup automatic.*/
class Win32Event {
public:
enum class Reset { Auto=0, Manual=1 };
enum class Initial { Clear=0, Set=1 };
Win32Event(Reset r, Initial is)
: hEvent(CreateEvent(
nullptr, // _In_opt_ LPSECURITY_ATTRIBUTES lpEventAttributes,
BOOL(r), // _In_ BOOL bManualReset,
BOOL(is), // _In_ BOOL bInitialState,
nullptr //_In_opt_ LPCTSTR lpName
))
{}
~Win32Event()
{
CloseHandle(hEvent);
}
Win32Event(const Win32Event &) = delete;
Win32Event &operator=(const Win32Event &) = delete;
void Set() { SetEvent(hEvent); }
void Reset() { ResetEvent(hEvent); }
HANDLE GetHandle() { return hEvent; }
private:
HANDLE hEvent;
};
#endif // WIN32EVENT_H