Query, Explain and Cancel are going throught the asyncdbconnection now.
Todo: Notice processing and error reporting.
This commit is contained in:
parent
fce51a7b7e
commit
a36bf5f7f4
11 changed files with 335 additions and 217 deletions
|
|
@ -19,10 +19,25 @@ void ASyncDBConnection::closeConnection()
|
|||
m_thread.join();
|
||||
}
|
||||
|
||||
bool ASyncDBConnection::send(const std::string &command, on_result_callback on_result)
|
||||
{
|
||||
{
|
||||
std::lock_guard<std::mutex> lg(m_threadData.m_commandQueue.m_mutex);
|
||||
m_threadData.m_commandQueue.m_queue.emplace(command, on_result);
|
||||
m_threadData.m_commandQueue.m_newEvent.set();
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
bool ASyncDBConnection::cancel()
|
||||
{
|
||||
return m_threadData.cancel();
|
||||
}
|
||||
|
||||
void ASyncDBConnection::setStateCallback(on_state_callback state_callback)
|
||||
{
|
||||
std::lock_guard<std::mutex> lg(m_threadData.m_stateCallbackMutex);
|
||||
m_threadData.m_stateCallback = state_callback;
|
||||
std::lock_guard<std::mutex> lg(m_threadData.m_stateCallback.m_mutex);
|
||||
m_threadData.m_stateCallback.m_func = state_callback;
|
||||
}
|
||||
|
||||
ASyncDBConnection::Thread::Thread()
|
||||
|
|
@ -31,21 +46,39 @@ ASyncDBConnection::Thread::Thread()
|
|||
|
||||
void ASyncDBConnection::Thread::run()
|
||||
{
|
||||
m_terminated = false;
|
||||
while (!terminateRequested) {
|
||||
|
||||
// make or recover connection
|
||||
if (makeConnection()) {
|
||||
m_canceller = m_connection.getCancel();
|
||||
|
||||
|
||||
// send commands and receive results
|
||||
communicate();
|
||||
}
|
||||
else {
|
||||
// It is not possible to determine the source of the problem.
|
||||
// Accept for PQconnectionNeedsPassword
|
||||
|
||||
// Pass problem to main thread and stop this thread
|
||||
|
||||
// Main thread needs to know it has to restart connecting if it want's to.
|
||||
// TODO: add status functions to help main thread so it doesn't have to remember
|
||||
// everything reported through callbacks.
|
||||
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
m_terminated = true;
|
||||
// close connection
|
||||
}
|
||||
|
||||
bool ASyncDBConnection::Thread::cancel()
|
||||
{
|
||||
return m_canceller.cancel(nullptr);
|
||||
}
|
||||
|
||||
bool ASyncDBConnection::Thread::makeConnection()
|
||||
{
|
||||
using namespace std::chrono_literals;
|
||||
|
|
@ -122,18 +155,16 @@ void ASyncDBConnection::Thread::communicate()
|
|||
// - stop signal
|
||||
// - return
|
||||
|
||||
// WSAEventSelect(sock, socket_event.handle(), fd);
|
||||
WaitHandleList whl;
|
||||
// auto wait_result_socket_event = whl.add(socket_event);
|
||||
auto wait_result_stop = whl.add(m_stopEvent);
|
||||
|
||||
DWORD res = MsgWaitForMultipleObjectsEx(
|
||||
whl.count(), // _In_ DWORD nCount,
|
||||
whl, // _In_ const HANDLE *pHandles,
|
||||
INFINITE, // _In_ DWORD dwMilliseconds,
|
||||
0, // _In_ DWORD dwWakeMask,
|
||||
0 // _In_ DWORD dwFlags
|
||||
);
|
||||
if (m_state == State::Connected) {
|
||||
waitForAndSendCommand();
|
||||
}
|
||||
else if (m_state == State::QuerySend || m_state == State::CancelSend) {
|
||||
// Wait for result, even after a cancel we should wait, for all results
|
||||
// New command's are not excepted when one has been send
|
||||
waitForResult();
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -145,8 +176,105 @@ void ASyncDBConnection::Thread::stop()
|
|||
|
||||
void ASyncDBConnection::Thread::doStateCallback(State state)
|
||||
{
|
||||
std::lock_guard<std::mutex> lg(m_stateCallbackMutex);
|
||||
if (m_stateCallback) {
|
||||
m_stateCallback(state);
|
||||
m_state = state;
|
||||
std::lock_guard<std::mutex> lg(m_stateCallback.m_mutex);
|
||||
if (m_stateCallback.m_func) {
|
||||
m_stateCallback.m_func(state);
|
||||
}
|
||||
}
|
||||
|
||||
void ASyncDBConnection::Thread::waitForAndSendCommand()
|
||||
{
|
||||
WaitHandleList whl;
|
||||
auto wait_result_new_command = whl.add(m_commandQueue.m_newEvent);
|
||||
auto wait_result_stop = whl.add(m_stopEvent);
|
||||
|
||||
DWORD res = MsgWaitForMultipleObjectsEx(
|
||||
whl.count(), // _In_ DWORD nCount,
|
||||
whl, // _In_ const HANDLE *pHandles,
|
||||
INFINITE, // _In_ DWORD dwMilliseconds,
|
||||
0, // _In_ DWORD dwWakeMask,
|
||||
0 // _In_ DWORD dwFlags
|
||||
);
|
||||
if (res == wait_result_new_command) {
|
||||
doNewCommand();
|
||||
}
|
||||
// if (res == wait_result_stop) return;
|
||||
}
|
||||
|
||||
void ASyncDBConnection::Thread::doNewCommand()
|
||||
{
|
||||
// todo: send command
|
||||
// get command from top of queue (but leave it in the queue, we need the callback)
|
||||
std::string command;
|
||||
{
|
||||
std::lock_guard<std::mutex> lg(m_commandQueue.m_mutex);
|
||||
if (! m_commandQueue.m_queue.empty()) {
|
||||
command = m_commandQueue.m_queue.front().command;
|
||||
}
|
||||
}
|
||||
if (!command.empty() && m_connection.sendQuery(command)) {
|
||||
doStateCallback(State::QuerySend);
|
||||
}
|
||||
else {
|
||||
std::string error = m_connection.getErrorMessage();
|
||||
// todo: need to report the error
|
||||
}
|
||||
}
|
||||
|
||||
void ASyncDBConnection::Thread::waitForResult()
|
||||
{
|
||||
|
||||
int sock = m_connection.socket();
|
||||
Win32Event socket_event(Win32Event::Reset::Auto, Win32Event::Initial::Clear);
|
||||
|
||||
long fd = FD_WRITE;
|
||||
|
||||
while (true) {
|
||||
WSAEventSelect(sock, socket_event.handle(), fd);
|
||||
|
||||
WaitHandleList whl;
|
||||
auto wait_result_socket = whl.add(socket_event);
|
||||
auto wait_result_stop = whl.add(m_stopEvent);
|
||||
|
||||
DWORD res = MsgWaitForMultipleObjectsEx(
|
||||
whl.count(), // _In_ DWORD nCount,
|
||||
whl, // _In_ const HANDLE *pHandles,
|
||||
INFINITE, // _In_ DWORD dwMilliseconds,
|
||||
0, // _In_ DWORD dwWakeMask,
|
||||
0 // _In_ DWORD dwFlags
|
||||
);
|
||||
if (res == wait_result_socket) {
|
||||
if (m_connection.consumeInput()) {
|
||||
if ( ! m_connection.isBusy()) {
|
||||
auto res(m_connection.getResult());
|
||||
{
|
||||
std::lock_guard<std::mutex> lg(m_commandQueue.m_mutex);
|
||||
m_commandQueue.m_queue.front().on_result(res);
|
||||
if (res == nullptr) {
|
||||
m_commandQueue.m_queue.pop();
|
||||
doStateCallback(State::Connected);
|
||||
break; // leave the while loop
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
// else is still waiting for more data
|
||||
|
||||
}
|
||||
else {
|
||||
// error during consume
|
||||
|
||||
}
|
||||
}
|
||||
if (res == wait_result_stop) {
|
||||
// Send cancel, close connection and terminate thread
|
||||
cancel();
|
||||
doStateCallback(State::Terminating);
|
||||
break;
|
||||
}
|
||||
} // end while
|
||||
// When last result received, remove command from queue
|
||||
}
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue