Compiles, links and runs (functionality not tested)

This commit is contained in:
Eelke Klein 2017-08-23 13:27:23 +02:00
parent 04723a289b
commit 6a97c0447a
48 changed files with 224 additions and 149 deletions

View file

@ -2,6 +2,7 @@
#include "waithandlelist.h"
#include "ScopeGuard.h"
#include <chrono>
#include <sys/select.h>
ASyncDBConnection::ASyncDBConnection()
{
@ -46,7 +47,7 @@ bool ASyncDBConnection::send(const std::string &command, on_result_callback on_r
{
std::lock_guard<std::mutex> lg(m_threadData.m_commandQueue.m_mutex);
m_threadData.m_commandQueue.m_queue.emplace(command, on_result);
m_threadData.m_commandQueue.m_newEvent.set();
m_threadData.m_commandQueue.m_newEvent.notify_one();
}
return true;
}
@ -56,7 +57,7 @@ bool ASyncDBConnection::send(const std::string &command, Pgsql::Params params, o
{
std::lock_guard<std::mutex> lg(m_threadData.m_commandQueue.m_mutex);
m_threadData.m_commandQueue.m_queue.emplace(command, std::move(params), on_result);
m_threadData.m_commandQueue.m_newEvent.set();
m_threadData.m_commandQueue.m_newEvent.notify_one();
}
return true;
}
@ -79,7 +80,6 @@ void ASyncDBConnection::setNoticeCallback(on_notice_callback notice_callback)
}
ASyncDBConnection::Thread::Thread()
: m_stopEvent(Win32Event::Reset::Manual, Win32Event::Initial::Clear)
{}
void ASyncDBConnection::Thread::run()
@ -124,14 +124,16 @@ bool ASyncDBConnection::Thread::cancel()
bool ASyncDBConnection::Thread::makeConnection()
{
using namespace std::chrono_literals;
//using namespace std::literals::chrono_literals;
// start connecting
auto keywords = m_config.getKeywords();
auto values = m_config.getValues();
#if true
return m_connection.connect(keywords, values, 0);
#else
while (!terminateRequested) {
// start connecting
//bool ok = m_connection.connectStart(m_initString + " client_encoding=utf8");
auto keywords = m_config.getKeywords();
auto values = m_config.getValues();
bool ok = m_connection.connectStart(keywords, values);
auto start = std::chrono::steady_clock::now();
if (ok && m_connection.status() != CONNECTION_BAD) {
@ -144,7 +146,7 @@ bool ASyncDBConnection::Thread::makeConnection()
WSAEventSelect(sock, socket_event.handle(), fd);
WaitHandleList whl;
auto wait_result_socket_event = whl.add(socket_event);
auto wait_result_stop = whl.add(m_stopEvent);
//auto wait_result_stop = whl.add(m_stopEvent);
auto nu = std::chrono::steady_clock::now();
std::chrono::duration<float, std::milli> diff = -(nu-start);
@ -185,6 +187,7 @@ bool ASyncDBConnection::Thread::makeConnection()
}
}
return false;
#endif
}
void ASyncDBConnection::Thread::communicate()
@ -217,7 +220,7 @@ void ASyncDBConnection::Thread::communicate()
void ASyncDBConnection::Thread::stop()
{
terminateRequested = true;
m_stopEvent.set();
//m_stopEvent.set();
}
void ASyncDBConnection::Thread::doStateCallback(State state)
@ -231,10 +234,21 @@ void ASyncDBConnection::Thread::doStateCallback(State state)
void ASyncDBConnection::Thread::waitForAndSendCommand()
{
// lock the data
std::unique_lock<std::mutex> lk(m_commandQueue.m_mutex);
if (m_commandQueue.m_queue.empty()) {
// no data wait till there is data
m_commandQueue.m_newEvent.wait(lk);
// can we use the predicate to reimplement the stop function???, []{return ready;});
}
doNewCommand();
#if false
WaitHandleList whl;
auto wait_result_new_command = whl.add(m_commandQueue.m_newEvent);
//auto wait_result_stop =
whl.add(m_stopEvent);
//whl.add(m_stopEvent);
DWORD res = MsgWaitForMultipleObjectsEx(
whl.count(), // _In_ DWORD nCount,
@ -247,6 +261,7 @@ void ASyncDBConnection::Thread::waitForAndSendCommand()
doNewCommand();
}
// if (res == wait_result_stop) return;
#endif
}
void ASyncDBConnection::Thread::doNewCommand()
@ -278,10 +293,60 @@ void ASyncDBConnection::Thread::doNewCommand()
}
bool ASyncDBConnection::Thread::consumeResultInput()
{
bool finished = false;
if (m_connection.consumeInput()) {
while ( ! finished && ! m_connection.isBusy()) {
auto res(m_connection.getResult());
{
qint64 ms = m_timer.restart();
std::lock_guard<std::mutex> lg(m_commandQueue.m_mutex);
m_commandQueue.m_queue.front().on_result(res, ms);
if (res == nullptr) {
m_timer.invalidate();
m_commandQueue.m_queue.pop();
doStateCallback(State::Connected);
finished = true;
}
}
}
// else is still waiting for more data
}
else {
// error during consume
}
return finished;
}
void ASyncDBConnection::Thread::waitForResult()
{
int sock = m_connection.socket();
fd_set readfds;
timeval timeout;
bool finished = false;
while (!finished && !terminateRequested) {
FD_ZERO(&readfds);
FD_SET(sock, &readfds);
timeout.tv_sec = 5;
timeout.tv_usec = 0;
int select_result = select(1, &readfds, nullptr, nullptr, &timeout);
if (select_result > 0) {
if (FD_ISSET(sock, &readfds)) {
if (consumeResultInput()) {
finished = true;
}
}
}
}
#if false
Win32Event socket_event(Win32Event::Reset::Manual, Win32Event::Initial::Clear);
long fd = FD_READ | FD_CLOSE;
@ -292,7 +357,7 @@ void ASyncDBConnection::Thread::waitForResult()
WaitHandleList whl;
auto wait_result_socket = whl.add(socket_event);
auto wait_result_stop = whl.add(m_stopEvent);
//auto wait_result_stop = whl.add(m_stopEvent);
DWORD res = MsgWaitForMultipleObjectsEx(
whl.count(), // _In_ DWORD nCount,
@ -306,29 +371,9 @@ void ASyncDBConnection::Thread::waitForResult()
WSAEnumNetworkEvents(sock, socket_event.handle(), &net_events);
if (net_events.lNetworkEvents & FD_READ) {
if (m_connection.consumeInput()) {
while ( ! finished && ! m_connection.isBusy()) {
auto res(m_connection.getResult());
{
qint64 ms = m_timer.restart();
std::lock_guard<std::mutex> lg(m_commandQueue.m_mutex);
m_commandQueue.m_queue.front().on_result(res, ms);
if (res == nullptr) {
m_timer.invalidate();
m_commandQueue.m_queue.pop();
doStateCallback(State::Connected);
finished = true;
}
}
}
// else is still waiting for more data
}
else {
// error during consume
}
if (consumeResultInput()) {
finished = true;
}
}
}
if (res == wait_result_stop) {
@ -339,6 +384,7 @@ void ASyncDBConnection::Thread::waitForResult()
}
} // end while
// When last result received, remove command from queue
#endif
}
void ASyncDBConnection::Thread::processNotice(const PGresult *result)