Simplify queueing and locking

This commit is contained in:
Brent Petit 2023-10-29 21:05:29 -05:00
parent 5d35780498
commit 336861813a
1 changed files with 63 additions and 67 deletions

130
src/dlq.c
View File

@ -60,6 +60,8 @@
/* The queue is a linked list of these. */ /* The queue is a linked list of these. */
static struct dlq_item_s *queue_head = NULL; /* Head of linked list for queue. */ static struct dlq_item_s *queue_head = NULL; /* Head of linked list for queue. */
static struct dlq_item_s *queue_tail = NULL; /* Tail of linked list for queue. */
int queue_length = 0; /* Count of items in queue */
#if __WIN32__ #if __WIN32__
@ -75,8 +77,6 @@ static pthread_mutex_t dlq_mutex; /* Critical section for updating queues. */
static pthread_cond_t wake_up_cond; /* Notify received packet processing thread when queue not empty. */ static pthread_cond_t wake_up_cond; /* Notify received packet processing thread when queue not empty. */
static pthread_mutex_t wake_up_mutex; /* Required by cond_wait. */
static volatile int recv_thread_is_waiting = 0; static volatile int recv_thread_is_waiting = 0;
#endif #endif
@ -117,7 +117,8 @@ void dlq_init (void)
dw_printf ("dlq_init ( )\n"); dw_printf ("dlq_init ( )\n");
#endif #endif
queue_head = NULL; queue_head = queue_tail = NULL;
queue_length = 0;
#if DEBUG #if DEBUG
@ -129,13 +130,6 @@ void dlq_init (void)
InitializeCriticalSection (&dlq_cs); InitializeCriticalSection (&dlq_cs);
#else #else
int err; int err;
err = pthread_mutex_init (&wake_up_mutex, NULL);
if (err != 0) {
text_color_set(DW_COLOR_ERROR);
dw_printf ("dlq_init: pthread_mutex_init err=%d", err);
perror ("");
exit (EXIT_FAILURE);
}
err = pthread_mutex_init (&dlq_mutex, NULL); err = pthread_mutex_init (&dlq_mutex, NULL);
if (err != 0) { if (err != 0) {
text_color_set(DW_COLOR_ERROR); text_color_set(DW_COLOR_ERROR);
@ -314,9 +308,6 @@ void dlq_rec_frame (int chan, int subchan, int slice, packet_t pp, alevel_t alev
static void append_to_queue (struct dlq_item_s *pnew) static void append_to_queue (struct dlq_item_s *pnew)
{ {
struct dlq_item_s *plast;
int queue_length = 0;
if ( ! was_init) { if ( ! was_init) {
dlq_init (); dlq_init ();
} }
@ -341,30 +332,19 @@ static void append_to_queue (struct dlq_item_s *pnew)
#endif #endif
if (queue_head == NULL) { if (queue_head == NULL) {
queue_head = pnew; assert (queue_tail == NULL);
queue_head = queue_tail = pnew;
queue_length = 1; queue_length = 1;
} else {
assert (queue_tail != NULL);
queue_tail->nextp = pnew;
queue_tail = pnew;
queue_length++;
assert (queue_length > 1);
} }
else {
queue_length = 2; /* head + new one */
plast = queue_head;
while (plast->nextp != NULL) {
plast = plast->nextp;
queue_length++;
}
plast->nextp = pnew;
}
#if __WIN32__ #if __WIN32__
LeaveCriticalSection (&dlq_cs); LeaveCriticalSection (&dlq_cs);
#else
err = pthread_mutex_unlock (&dlq_mutex);
if (err != 0) {
text_color_set(DW_COLOR_ERROR);
dw_printf ("dlq append_to_queue: pthread_mutex_unlock err=%d", err);
perror ("");
exit (1);
}
#endif #endif
#if DEBUG1 #if DEBUG1
text_color_set(DW_COLOR_DEBUG); text_color_set(DW_COLOR_DEBUG);
@ -416,7 +396,7 @@ static void append_to_queue (struct dlq_item_s *pnew)
* and blocking on a write. * and blocking on a write.
*/ */
if (queue_length > 10) { if (queue_length > 15) {
text_color_set(DW_COLOR_ERROR); text_color_set(DW_COLOR_ERROR);
dw_printf ("Received frame queue is out of control. Length=%d.\n", queue_length); dw_printf ("Received frame queue is out of control. Length=%d.\n", queue_length);
dw_printf ("Reader thread is probably frozen.\n"); dw_printf ("Reader thread is probably frozen.\n");
@ -431,14 +411,6 @@ static void append_to_queue (struct dlq_item_s *pnew)
#else #else
if (recv_thread_is_waiting) { if (recv_thread_is_waiting) {
err = pthread_mutex_lock (&wake_up_mutex);
if (err != 0) {
text_color_set(DW_COLOR_ERROR);
dw_printf ("dlq append_to_queue: pthread_mutex_lock wu err=%d", err);
perror ("");
exit (1);
}
err = pthread_cond_signal (&wake_up_cond); err = pthread_cond_signal (&wake_up_cond);
if (err != 0) { if (err != 0) {
text_color_set(DW_COLOR_ERROR); text_color_set(DW_COLOR_ERROR);
@ -446,14 +418,14 @@ static void append_to_queue (struct dlq_item_s *pnew)
perror (""); perror ("");
exit (1); exit (1);
} }
}
err = pthread_mutex_unlock (&wake_up_mutex); err = pthread_mutex_unlock (&dlq_mutex);
if (err != 0) { if (err != 0) {
text_color_set(DW_COLOR_ERROR); text_color_set(DW_COLOR_ERROR);
dw_printf ("dlq append_to_queue: pthread_mutex_unlock wu err=%d", err); dw_printf ("dlq append_to_queue: pthread_mutex_unlock wu err=%d", err);
perror (""); perror ("");
exit (1); exit (1);
}
} }
#endif #endif
@ -1011,9 +983,29 @@ int dlq_wait_while_empty (double timeout)
dlq_init (); dlq_init ();
} }
#if DEBUG1
text_color_set(DW_COLOR_DEBUG);
dw_printf ("dlq dlq_wait_while_empty: enter critical section\n");
#endif
#if __WIN32__
EnterCriticalSection (&dlq_cs);
#else
int err;
err = pthread_mutex_lock (&dlq_mutex);
if (err != 0) {
text_color_set(DW_COLOR_ERROR);
dw_printf ("dlq append_to_queue: pthread_mutex_lock err=%d", err);
perror ("");
exit (1);
}
#endif
if (queue_head == NULL) { if (queue_head == NULL) {
#if __WIN32__
LeaveCriticalSection (&dlq_cs);
#endif
#if DEBUG #if DEBUG
text_color_set(DW_COLOR_DEBUG); text_color_set(DW_COLOR_DEBUG);
dw_printf ("dlq_wait_while_empty (): prepare to SLEEP...\n"); dw_printf ("dlq_wait_while_empty (): prepare to SLEEP...\n");
@ -1037,18 +1029,15 @@ int dlq_wait_while_empty (double timeout)
else { else {
WaitForSingleObject (wake_up_event, INFINITE); WaitForSingleObject (wake_up_event, INFINITE);
} }
} else {
#if __WIN32__
LeaveCriticalSection (&dlq_cs);
#endif
}
#else #else
int err; int err;
err = pthread_mutex_lock (&wake_up_mutex);
if (err != 0) {
text_color_set(DW_COLOR_ERROR);
dw_printf ("dlq_wait_while_empty: pthread_mutex_lock wu err=%d", err);
perror ("");
exit (1);
}
recv_thread_is_waiting = 1; recv_thread_is_waiting = 1;
if (timeout != 0.0) { if (timeout != 0.0) {
struct timespec abstime; struct timespec abstime;
@ -1056,26 +1045,25 @@ int dlq_wait_while_empty (double timeout)
abstime.tv_sec = (time_t)(long)timeout; abstime.tv_sec = (time_t)(long)timeout;
abstime.tv_nsec = (long)((timeout - (long)abstime.tv_sec) * 1000000000.0); abstime.tv_nsec = (long)((timeout - (long)abstime.tv_sec) * 1000000000.0);
err = pthread_cond_timedwait (&wake_up_cond, &wake_up_mutex, &abstime); err = pthread_cond_timedwait (&wake_up_cond, &dlq_mutex, &abstime);
if (err == ETIMEDOUT) { if (err == ETIMEDOUT) {
timed_out_result = 1; timed_out_result = 1;
} }
} }
else { else {
err = pthread_cond_wait (&wake_up_cond, &wake_up_mutex); err = pthread_cond_wait (&wake_up_cond, &dlq_mutex);
} }
recv_thread_is_waiting = 0; recv_thread_is_waiting = 0;
err = pthread_mutex_unlock (&wake_up_mutex);
if (err != 0) {
text_color_set(DW_COLOR_ERROR);
dw_printf ("dlq_wait_while_empty: pthread_mutex_unlock wu err=%d", err);
perror ("");
exit (1);
}
#endif
} }
err = pthread_mutex_unlock (&dlq_mutex);
if (err != 0) {
text_color_set(DW_COLOR_ERROR);
dw_printf ("dlq_wait_while_empty: pthread_mutex_unlock wu err=%d", err);
perror ("");
exit (1);
}
#endif
#if DEBUG #if DEBUG
text_color_set(DW_COLOR_DEBUG); text_color_set(DW_COLOR_DEBUG);
@ -1133,6 +1121,14 @@ struct dlq_item_s *dlq_remove (void)
if (queue_head != NULL) { if (queue_head != NULL) {
result = queue_head; result = queue_head;
queue_head = queue_head->nextp; queue_head = queue_head->nextp;
queue_length--;
if (queue_head == NULL) {
assert (queue_length == 0);
queue_tail = NULL;
}
if (queue_length == 1) {
assert (queue_head == queue_tail);
}
} }
#if __WIN32__ #if __WIN32__