Most seems to work. Multi threading is not optimal

however some points use a timeout with select or wait_for
to poll a condition at the same time.
This commit is contained in:
Eelke Klein 2017-08-23 17:41:10 +02:00
parent c3d604e7b4
commit 4167c483f5
16 changed files with 76 additions and 67 deletions

View file

@ -130,7 +130,11 @@ bool ASyncDBConnection::Thread::makeConnection()
auto keywords = m_config.getKeywords();
auto values = m_config.getValues();
#if true
return m_connection.connect(keywords, values, 0);
bool result = m_connection.connect(keywords, values, 0);
if (result) {
doStateCallback(State::Connected);
}
return result;
#else
while (!terminateRequested) {
@ -234,11 +238,12 @@ void ASyncDBConnection::Thread::doStateCallback(State state)
void ASyncDBConnection::Thread::waitForAndSendCommand()
{
using namespace std::chrono_literals;
// lock the data
std::unique_lock<std::mutex> lk(m_commandQueue.m_mutex);
if (m_commandQueue.m_queue.empty()) {
// no data wait till there is data
m_commandQueue.m_newEvent.wait(lk);
m_commandQueue.m_newEvent.wait_for(lk, 1000ms);
// can we use the predicate to reimplement the stop function???, []{return ready;});
}
@ -266,30 +271,26 @@ void ASyncDBConnection::Thread::waitForAndSendCommand()
void ASyncDBConnection::Thread::doNewCommand()
{
// todo: send command
// get command from top of queue (but leave it in the queue, we need the callback)
{
std::lock_guard<std::mutex> lg(m_commandQueue.m_mutex);
if (! m_commandQueue.m_queue.empty()) {
const Command &command = m_commandQueue.m_queue.front();
if (!command.command.empty()) {
bool query_send = false;
if (command.params.empty())
query_send = m_connection.sendQuery(command.command.c_str());
else
query_send = m_connection.sendQueryParams(command.command.c_str(), command.params);
if (! m_commandQueue.m_queue.empty()) {
const Command &command = m_commandQueue.m_queue.front();
if (!command.command.empty()) {
bool query_send = false;
if (command.params.empty())
query_send = m_connection.sendQuery(command.command.c_str());
else
query_send = m_connection.sendQueryParams(command.command.c_str(), command.params);
if (query_send) {
m_timer.start();
doStateCallback(State::QuerySend);
}
else {
std::string error = m_connection.getErrorMessage();
// todo: need to report the error
}
}
}
}
if (query_send) {
m_timer.start();
doStateCallback(State::QuerySend);
}
else {
std::string error = m_connection.getErrorMessage();
// todo: need to report the error
}
}
}
}
@ -336,7 +337,7 @@ void ASyncDBConnection::Thread::waitForResult()
timeout.tv_sec = 5;
timeout.tv_usec = 0;
int select_result = select(1, &readfds, nullptr, nullptr, &timeout);
int select_result = select(sock + 1, &readfds, nullptr, nullptr, &timeout);
if (select_result > 0) {
if (FD_ISSET(sock, &readfds)) {
if (consumeResultInput()) {