diff --git a/components/drivers/include/ipc/dataqueue.h b/components/drivers/include/ipc/dataqueue.h index d90be76ffb8b786a30392493c233be4d0c7d3c3f..0262f4702be20b57ae7afb31dcf7db39099ade6f 100644 --- a/components/drivers/include/ipc/dataqueue.h +++ b/components/drivers/include/ipc/dataqueue.h @@ -17,8 +17,7 @@ #define RT_DATAQUEUE_EVENT_LWM 0x03 struct rt_data_item; -#define RT_DATAQUEUE_SIZE(dq) ((dq)->put_index - (dq)->get_index) -#define RT_DATAQUEUE_EMPTY(dq) ((dq)->size - RT_DATAQUEUE_SIZE(dq)) + /* data queue implementation */ struct rt_data_queue { @@ -26,10 +25,11 @@ struct rt_data_queue rt_uint16_t size; rt_uint16_t lwm; - rt_bool_t waiting_lwm; - rt_uint16_t get_index; - rt_uint16_t put_index; + rt_uint16_t get_index : 15; + rt_uint16_t is_empty : 1; + rt_uint16_t put_index : 15; + rt_uint16_t is_full : 1; struct rt_data_item *queue; @@ -60,5 +60,6 @@ rt_err_t rt_data_queue_peak(struct rt_data_queue *queue, rt_size_t *size); void rt_data_queue_reset(struct rt_data_queue *queue); rt_err_t rt_data_queue_deinit(struct rt_data_queue *queue); +rt_uint16_t rt_data_queue_len(struct rt_data_queue *queue); #endif diff --git a/components/drivers/src/dataqueue.c b/components/drivers/src/dataqueue.c index ba4a921da0bb11a9b051f322443d3b197b218db1..bfe8e1257394da38b000e7758ecfc94c30e39f91 100644 --- a/components/drivers/src/dataqueue.c +++ b/components/drivers/src/dataqueue.c @@ -28,7 +28,7 @@ rt_data_queue_init(struct rt_data_queue *queue, void (*evt_notify)(struct rt_data_queue *queue, rt_uint32_t event)) { RT_ASSERT(queue != RT_NULL); - RT_ASSERT((0x10000 % size) == 0); + RT_ASSERT(size > 0); queue->evt_notify = evt_notify; @@ -38,6 +38,8 @@ rt_data_queue_init(struct rt_data_queue *queue, queue->get_index = 0; queue->put_index = 0; + queue->is_empty = 1; + queue->is_full = 0; rt_list_init(&(queue->suspended_push_list)); rt_list_init(&(queue->suspended_pop_list)); @@ -61,14 +63,14 @@ rt_err_t rt_data_queue_push(struct rt_data_queue *queue, rt_thread_t thread; rt_err_t result; - RT_ASSERT(queue->magic == DATAQUEUE_MAGIC); RT_ASSERT(queue != RT_NULL); + RT_ASSERT(queue->magic == DATAQUEUE_MAGIC); result = RT_EOK; thread = rt_thread_self(); level = rt_hw_interrupt_disable(); - while (queue->put_index - queue->get_index == queue->size) + while (queue->is_full) { /* queue is full */ if (timeout == 0) @@ -109,9 +111,18 @@ rt_err_t rt_data_queue_push(struct rt_data_queue *queue, if (result != RT_EOK) goto __exit; } - queue->queue[queue->put_index % queue->size].data_ptr = data_ptr; - queue->queue[queue->put_index % queue->size].data_size = data_size; + queue->queue[queue->put_index].data_ptr = data_ptr; + queue->queue[queue->put_index].data_size = data_size; queue->put_index += 1; + if (queue->put_index == queue->size) + { + queue->put_index = 0; + } + queue->is_empty = 0; + if (queue->put_index == queue->get_index) + { + queue->is_full = 1; + } /* there is at least one thread in suspended list */ if (!rt_list_isempty(&(queue->suspended_pop_list))) @@ -151,8 +162,8 @@ rt_err_t rt_data_queue_pop(struct rt_data_queue *queue, rt_thread_t thread; rt_err_t result; - RT_ASSERT(queue->magic == DATAQUEUE_MAGIC); RT_ASSERT(queue != RT_NULL); + RT_ASSERT(queue->magic == DATAQUEUE_MAGIC); RT_ASSERT(data_ptr != RT_NULL); RT_ASSERT(size != RT_NULL); @@ -160,7 +171,7 @@ rt_err_t rt_data_queue_pop(struct rt_data_queue *queue, thread = rt_thread_self(); level = rt_hw_interrupt_disable(); - while (queue->get_index == queue->put_index) + while (queue->is_empty) { /* queue is empty */ if (timeout == 0) @@ -201,12 +212,20 @@ rt_err_t rt_data_queue_pop(struct rt_data_queue *queue, goto __exit; } - *data_ptr = queue->queue[queue->get_index % queue->size].data_ptr; - *size = queue->queue[queue->get_index % queue->size].data_size; - + *data_ptr = queue->queue[queue->get_index].data_ptr; + *size = queue->queue[queue->get_index].data_size; queue->get_index += 1; + if (queue->get_index == queue->size) + { + queue->get_index = 0; + } + queue->is_full = 0; + if (queue->put_index == queue->get_index) + { + queue->is_empty = 1; + } - if ((queue->put_index - queue->get_index) <= queue->lwm) + if (rt_data_queue_len(queue) <= queue->lwm) { /* there is at least one thread in suspended list */ if (!rt_list_isempty(&(queue->suspended_push_list))) @@ -251,20 +270,18 @@ rt_err_t rt_data_queue_peak(struct rt_data_queue *queue, { rt_ubase_t level; - RT_ASSERT(queue->magic == DATAQUEUE_MAGIC); RT_ASSERT(queue != RT_NULL); + RT_ASSERT(queue->magic == DATAQUEUE_MAGIC); - level = rt_hw_interrupt_disable(); - - if (queue->get_index == queue->put_index) + if (queue->is_empty) { - rt_hw_interrupt_enable(level); - return -RT_EEMPTY; } - *data_ptr = queue->queue[queue->get_index % queue->size].data_ptr; - *size = queue->queue[queue->get_index % queue->size].data_size; + level = rt_hw_interrupt_disable(); + + *data_ptr = queue->queue[queue->get_index].data_ptr; + *size = queue->queue[queue->get_index].data_size; rt_hw_interrupt_enable(level); @@ -274,10 +291,20 @@ RTM_EXPORT(rt_data_queue_peak); void rt_data_queue_reset(struct rt_data_queue *queue) { + rt_ubase_t level; struct rt_thread *thread; - register rt_ubase_t temp; + RT_ASSERT(queue != RT_NULL); RT_ASSERT(queue->magic == DATAQUEUE_MAGIC); + + level = rt_hw_interrupt_disable(); + + queue->get_index = 0; + queue->put_index = 0; + queue->is_empty = 1; + queue->is_full = 0; + + rt_hw_interrupt_enable(level); rt_enter_critical(); /* wakeup all suspend threads */ @@ -286,7 +313,7 @@ void rt_data_queue_reset(struct rt_data_queue *queue) while (!rt_list_isempty(&(queue->suspended_pop_list))) { /* disable interrupt */ - temp = rt_hw_interrupt_disable(); + level = rt_hw_interrupt_disable(); /* get next suspend thread */ thread = rt_list_entry(queue->suspended_pop_list.next, @@ -303,14 +330,14 @@ void rt_data_queue_reset(struct rt_data_queue *queue) rt_thread_resume(thread); /* enable interrupt */ - rt_hw_interrupt_enable(temp); + rt_hw_interrupt_enable(level); } /* resume on push list */ while (!rt_list_isempty(&(queue->suspended_push_list))) { /* disable interrupt */ - temp = rt_hw_interrupt_disable(); + level = rt_hw_interrupt_disable(); /* get next suspend thread */ thread = rt_list_entry(queue->suspended_push_list.next, @@ -327,7 +354,7 @@ void rt_data_queue_reset(struct rt_data_queue *queue) rt_thread_resume(thread); /* enable interrupt */ - rt_hw_interrupt_enable(temp); + rt_hw_interrupt_enable(level); } rt_exit_critical(); @@ -339,19 +366,49 @@ rt_err_t rt_data_queue_deinit(struct rt_data_queue *queue) { rt_ubase_t level; - RT_ASSERT(queue->magic == DATAQUEUE_MAGIC); RT_ASSERT(queue != RT_NULL); - - level = rt_hw_interrupt_disable(); + RT_ASSERT(queue->magic == DATAQUEUE_MAGIC); /* wakeup all suspend threads */ rt_data_queue_reset(queue); + level = rt_hw_interrupt_disable(); queue->magic = 0; - rt_free(queue->queue); - rt_hw_interrupt_enable(level); + + rt_free(queue->queue); return RT_EOK; } RTM_EXPORT(rt_data_queue_deinit); + +rt_uint16_t rt_data_queue_len(struct rt_data_queue *queue) +{ + rt_ubase_t level; + rt_int16_t len; + + RT_ASSERT(queue != RT_NULL); + RT_ASSERT(queue->magic == DATAQUEUE_MAGIC); + + if (queue->is_empty) + { + return 0; + } + + level = rt_hw_interrupt_disable(); + + if (queue->put_index > queue->get_index) + { + len = queue->put_index - queue->get_index; + } + else + { + len = queue->size + queue->put_index - queue->get_index; + } + + rt_hw_interrupt_enable(level); + + return len; +} +RTM_EXPORT(rt_data_queue_len); +