diff --git a/src/dlq.c b/src/dlq.c index f56b864..aa25b84 100644 --- a/src/dlq.c +++ b/src/dlq.c @@ -60,6 +60,8 @@ /* The queue is a linked list of these. */ static struct dlq_item_s *queue_head = NULL; /* Head of linked list for queue. */ +static struct dlq_item_s *queue_tail = NULL; /* Tail of linked list for queue. */ +int queue_length = 0; /* Count of items in queue */ #if __WIN32__ @@ -75,8 +77,6 @@ static pthread_mutex_t dlq_mutex; /* Critical section for updating queues. */ static pthread_cond_t wake_up_cond; /* Notify received packet processing thread when queue not empty. */ -static pthread_mutex_t wake_up_mutex; /* Required by cond_wait. */ - static volatile int recv_thread_is_waiting = 0; #endif @@ -117,7 +117,8 @@ void dlq_init (void) dw_printf ("dlq_init ( )\n"); #endif - queue_head = NULL; + queue_head = queue_tail = NULL; + queue_length = 0; #if DEBUG @@ -129,13 +130,6 @@ void dlq_init (void) InitializeCriticalSection (&dlq_cs); #else int err; - err = pthread_mutex_init (&wake_up_mutex, NULL); - if (err != 0) { - text_color_set(DW_COLOR_ERROR); - dw_printf ("dlq_init: pthread_mutex_init err=%d", err); - perror (""); - exit (EXIT_FAILURE); - } err = pthread_mutex_init (&dlq_mutex, NULL); if (err != 0) { text_color_set(DW_COLOR_ERROR); @@ -314,9 +308,6 @@ void dlq_rec_frame (int chan, int subchan, int slice, packet_t pp, alevel_t alev static void append_to_queue (struct dlq_item_s *pnew) { - struct dlq_item_s *plast; - int queue_length = 0; - if ( ! was_init) { dlq_init (); } @@ -341,30 +332,19 @@ static void append_to_queue (struct dlq_item_s *pnew) #endif if (queue_head == NULL) { - queue_head = pnew; + assert (queue_tail == NULL); + queue_head = queue_tail = pnew; queue_length = 1; + } else { + assert (queue_tail != NULL); + queue_tail->nextp = pnew; + queue_tail = pnew; + queue_length++; + assert (queue_length > 1); } - else { - queue_length = 2; /* head + new one */ - plast = queue_head; - while (plast->nextp != NULL) { - plast = plast->nextp; - queue_length++; - } - plast->nextp = pnew; - } - #if __WIN32__ LeaveCriticalSection (&dlq_cs); -#else - err = pthread_mutex_unlock (&dlq_mutex); - if (err != 0) { - text_color_set(DW_COLOR_ERROR); - dw_printf ("dlq append_to_queue: pthread_mutex_unlock err=%d", err); - perror (""); - exit (1); - } #endif #if DEBUG1 text_color_set(DW_COLOR_DEBUG); @@ -416,7 +396,7 @@ static void append_to_queue (struct dlq_item_s *pnew) * and blocking on a write. */ - if (queue_length > 10) { + if (queue_length > 15) { text_color_set(DW_COLOR_ERROR); dw_printf ("Received frame queue is out of control. Length=%d.\n", queue_length); dw_printf ("Reader thread is probably frozen.\n"); @@ -431,14 +411,6 @@ static void append_to_queue (struct dlq_item_s *pnew) #else if (recv_thread_is_waiting) { - err = pthread_mutex_lock (&wake_up_mutex); - if (err != 0) { - text_color_set(DW_COLOR_ERROR); - dw_printf ("dlq append_to_queue: pthread_mutex_lock wu err=%d", err); - perror (""); - exit (1); - } - err = pthread_cond_signal (&wake_up_cond); if (err != 0) { text_color_set(DW_COLOR_ERROR); @@ -446,14 +418,14 @@ static void append_to_queue (struct dlq_item_s *pnew) perror (""); exit (1); } + } - err = pthread_mutex_unlock (&wake_up_mutex); - if (err != 0) { - text_color_set(DW_COLOR_ERROR); - dw_printf ("dlq append_to_queue: pthread_mutex_unlock wu err=%d", err); - perror (""); - exit (1); - } + err = pthread_mutex_unlock (&dlq_mutex); + if (err != 0) { + text_color_set(DW_COLOR_ERROR); + dw_printf ("dlq append_to_queue: pthread_mutex_unlock wu err=%d", err); + perror (""); + exit (1); } #endif @@ -1011,9 +983,29 @@ int dlq_wait_while_empty (double timeout) dlq_init (); } +#if DEBUG1 + text_color_set(DW_COLOR_DEBUG); + dw_printf ("dlq dlq_wait_while_empty: enter critical section\n"); +#endif +#if __WIN32__ + EnterCriticalSection (&dlq_cs); +#else + int err; + err = pthread_mutex_lock (&dlq_mutex); + if (err != 0) { + text_color_set(DW_COLOR_ERROR); + dw_printf ("dlq append_to_queue: pthread_mutex_lock err=%d", err); + perror (""); + exit (1); + } +#endif if (queue_head == NULL) { +#if __WIN32__ + LeaveCriticalSection (&dlq_cs); +#endif + #if DEBUG text_color_set(DW_COLOR_DEBUG); dw_printf ("dlq_wait_while_empty (): prepare to SLEEP...\n"); @@ -1037,18 +1029,15 @@ int dlq_wait_while_empty (double timeout) else { WaitForSingleObject (wake_up_event, INFINITE); } + } else { +#if __WIN32__ + LeaveCriticalSection (&dlq_cs); +#endif + } #else int err; - err = pthread_mutex_lock (&wake_up_mutex); - if (err != 0) { - text_color_set(DW_COLOR_ERROR); - dw_printf ("dlq_wait_while_empty: pthread_mutex_lock wu err=%d", err); - perror (""); - exit (1); - } - recv_thread_is_waiting = 1; if (timeout != 0.0) { struct timespec abstime; @@ -1056,26 +1045,25 @@ int dlq_wait_while_empty (double timeout) abstime.tv_sec = (time_t)(long)timeout; abstime.tv_nsec = (long)((timeout - (long)abstime.tv_sec) * 1000000000.0); - err = pthread_cond_timedwait (&wake_up_cond, &wake_up_mutex, &abstime); + err = pthread_cond_timedwait (&wake_up_cond, &dlq_mutex, &abstime); if (err == ETIMEDOUT) { timed_out_result = 1; } } else { - err = pthread_cond_wait (&wake_up_cond, &wake_up_mutex); + err = pthread_cond_wait (&wake_up_cond, &dlq_mutex); } recv_thread_is_waiting = 0; - - err = pthread_mutex_unlock (&wake_up_mutex); - if (err != 0) { - text_color_set(DW_COLOR_ERROR); - dw_printf ("dlq_wait_while_empty: pthread_mutex_unlock wu err=%d", err); - perror (""); - exit (1); - } -#endif } + err = pthread_mutex_unlock (&dlq_mutex); + if (err != 0) { + text_color_set(DW_COLOR_ERROR); + dw_printf ("dlq_wait_while_empty: pthread_mutex_unlock wu err=%d", err); + perror (""); + exit (1); + } +#endif #if DEBUG text_color_set(DW_COLOR_DEBUG); @@ -1133,6 +1121,14 @@ struct dlq_item_s *dlq_remove (void) if (queue_head != NULL) { result = queue_head; queue_head = queue_head->nextp; + queue_length--; + if (queue_head == NULL) { + assert (queue_length == 0); + queue_tail = NULL; + } + if (queue_length == 1) { + assert (queue_head == queue_tail); + } } #if __WIN32__