pipe.c 7.8 KB
Newer Older
1 2 3 4 5
/*
 * File      : pipe.c
 * This file is part of RT-Thread RTOS
 * COPYRIGHT (C) 2012, RT-Thread Development Team
 *
Y
yiyue.fang 已提交
6 7 8 9 10 11 12 13 14 15 16 17 18
 *  This program is free software; you can redistribute it and/or modify
 *  it under the terms of the GNU General Public License as published by
 *  the Free Software Foundation; either version 2 of the License, or
 *  (at your option) any later version.
 *
 *  This program is distributed in the hope that it will be useful,
 *  but WITHOUT ANY WARRANTY; without even the implied warranty of
 *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 *  GNU General Public License for more details.
 *
 *  You should have received a copy of the GNU General Public License along
 *  with this program; if not, write to the Free Software Foundation, Inc.,
 *  51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
19 20 21 22 23 24 25 26 27 28
 *
 * Change Logs:
 * Date           Author       Notes
 * 2012-09-30     Bernard      first version.
 */

#include <rthw.h>
#include <rtthread.h>
#include <rtdevice.h>

29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48
static void _rt_pipe_resume_writer(struct rt_pipe_device *pipe)
{
    if (!rt_list_isempty(&pipe->suspended_write_list))
    {
        rt_thread_t thread;

        RT_ASSERT(pipe->flag & RT_PIPE_FLAG_BLOCK_WR);

        /* get suspended thread */
        thread = rt_list_entry(pipe->suspended_write_list.next,
                struct rt_thread,
                tlist);

        /* resume the write thread */
        rt_thread_resume(thread);

        rt_schedule();
    }
}

49 50 51 52 53 54 55 56 57 58 59 60 61
static rt_size_t rt_pipe_read(rt_device_t dev,
                              rt_off_t    pos,
                              void       *buffer,
                              rt_size_t   size)
{
    rt_uint32_t level;
    rt_thread_t thread;
    struct rt_pipe_device *pipe;
    rt_size_t read_nbytes;

    pipe = PIPE_DEVICE(dev);
    RT_ASSERT(pipe != RT_NULL);

62 63 64 65 66 67 68 69 70 71 72 73 74 75
    if (!(pipe->flag & RT_PIPE_FLAG_BLOCK_RD))
    {
        level = rt_hw_interrupt_disable();
        read_nbytes = rt_ringbuffer_get(&(pipe->ringbuffer), buffer, size);

        /* if the ringbuffer is empty, there won't be any writer waiting */
        if (read_nbytes)
            _rt_pipe_resume_writer(pipe);

        rt_hw_interrupt_enable(level);

        return read_nbytes;
    }

76 77 78 79 80
    thread = rt_thread_self();

    /* current context checking */
    RT_DEBUG_NOT_IN_INTERRUPT;

81
    do {
82 83 84 85 86 87
        level = rt_hw_interrupt_disable();
        read_nbytes = rt_ringbuffer_get(&(pipe->ringbuffer), buffer, size);
        if (read_nbytes == 0)
        {
            rt_thread_suspend(thread);
            /* waiting on suspended read list */
88 89
            rt_list_insert_before(&(pipe->suspended_read_list),
                                  &(thread->tlist));
90 91 92 93 94 95
            rt_hw_interrupt_enable(level);

            rt_schedule();
        }
        else
        {
96 97
            _rt_pipe_resume_writer(pipe);
            rt_hw_interrupt_enable(level);
98 99 100 101 102 103 104
            break;
        }
    } while (read_nbytes == 0);

    return read_nbytes;
}

105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124
static void _rt_pipe_resume_reader(struct rt_pipe_device *pipe)
{
    if (!rt_list_isempty(&pipe->suspended_read_list))
    {
        rt_thread_t thread;

        RT_ASSERT(pipe->flag & RT_PIPE_FLAG_BLOCK_RD);

        /* get suspended thread */
        thread = rt_list_entry(pipe->suspended_read_list.next,
                struct rt_thread,
                tlist);

        /* resume the read thread */
        rt_thread_resume(thread);

        rt_schedule();
    }
}

125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140
struct rt_pipe_device *_pipe = RT_NULL;
static rt_size_t rt_pipe_write(rt_device_t dev,
                               rt_off_t    pos,
                               const void *buffer,
                               rt_size_t   size)
{
    rt_uint32_t level;
    rt_thread_t thread;
    struct rt_pipe_device *pipe;
    rt_size_t write_nbytes;

    pipe = PIPE_DEVICE(dev);
    RT_ASSERT(pipe != RT_NULL);
    if (_pipe == RT_NULL)
        _pipe = pipe;

141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159
    if ((pipe->flag & RT_PIPE_FLAG_FORCE_WR) ||
       !(pipe->flag & RT_PIPE_FLAG_BLOCK_WR))
    {
        level = rt_hw_interrupt_disable();

        if (pipe->flag & RT_PIPE_FLAG_FORCE_WR)
            write_nbytes = rt_ringbuffer_put_force(&(pipe->ringbuffer),
                                                   buffer, size);
        else
            write_nbytes = rt_ringbuffer_put(&(pipe->ringbuffer),
                                             buffer, size);

        _rt_pipe_resume_reader(pipe);

        rt_hw_interrupt_enable(level);

        return write_nbytes;
    }

160 161 162 163 164
    thread = rt_thread_self();

    /* current context checking */
    RT_DEBUG_NOT_IN_INTERRUPT;

165
    do {
166 167 168 169 170 171 172
        level = rt_hw_interrupt_disable();
        write_nbytes = rt_ringbuffer_put(&(pipe->ringbuffer), buffer, size);
        if (write_nbytes == 0)
        {
            /* pipe full, waiting on suspended write list */
            rt_thread_suspend(thread);
            /* waiting on suspended read list */
173 174
            rt_list_insert_before(&(pipe->suspended_write_list),
                                  &(thread->tlist));
175 176 177 178 179 180
            rt_hw_interrupt_enable(level);

            rt_schedule();
        }
        else
        {
181 182
            _rt_pipe_resume_reader(pipe);
            rt_hw_interrupt_enable(level);
183 184
            break;
        }
185
    } while (write_nbytes == 0);
186 187 188 189 190 191 192 193 194

    return write_nbytes;
}

static rt_err_t rt_pipe_control(rt_device_t dev, rt_uint8_t cmd, void *args)
{
    return RT_EOK;
}

195 196 197 198 199 200
/**
 * This function will initialize a pipe device and put it under control of
 * resource management.
 *
 * @param pipe the pipe device
 * @param name the name of pipe device
201
 * @param flag the attribute of the pipe device
202 203 204 205 206 207 208
 * @param buf  the buffer of pipe device
 * @param size the size of pipe device buffer
 *
 * @return the operation status, RT_EOK on successful
 */
rt_err_t rt_pipe_init(struct rt_pipe_device *pipe,
                      const char *name,
209
                      enum rt_pipe_flag flag,
210 211 212 213 214 215 216 217 218 219 220 221 222
                      rt_uint8_t *buf,
                      rt_size_t size)
{
    RT_ASSERT(pipe);
    RT_ASSERT(buf);

    /* initialize suspended list */
    rt_list_init(&pipe->suspended_read_list);
    rt_list_init(&pipe->suspended_write_list);

    /* initialize ring buffer */
    rt_ringbuffer_init(&pipe->ringbuffer, buf, size);

223 224
    pipe->flag = flag;

225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251
    /* create pipe */
    pipe->parent.type    = RT_Device_Class_Char;
    pipe->parent.init    = RT_NULL;
    pipe->parent.open    = RT_NULL;
    pipe->parent.close   = RT_NULL;
    pipe->parent.read    = rt_pipe_read;
    pipe->parent.write   = rt_pipe_write;
    pipe->parent.control = rt_pipe_control;

    return rt_device_register(&(pipe->parent), name, RT_DEVICE_FLAG_RDWR);
}
RTM_EXPORT(rt_pipe_init);

/**
 * This function will detach a pipe device from resource management
 *
 * @param pipe the pipe device
 *
 * @return the operation status, RT_EOK on successful
 */
rt_err_t rt_pipe_detach(struct rt_pipe_device *pipe)
{
    return rt_device_unregister(&pipe->parent);
}
RTM_EXPORT(rt_pipe_detach);

#ifdef RT_USING_HEAP
252
rt_err_t rt_pipe_create(const char *name, enum rt_pipe_flag flag, rt_size_t size)
253 254 255 256 257 258 259
{
    rt_uint8_t *rb_memptr = RT_NULL;
    struct rt_pipe_device *pipe = RT_NULL;

    /* get aligned size */
    size = RT_ALIGN(size, RT_ALIGN_SIZE);
    pipe = (struct rt_pipe_device *)rt_calloc(1, sizeof(struct rt_pipe_device));
260 261
    if (pipe == RT_NULL)
        return -RT_ENOMEM;
262

263 264 265 266
    /* create ring buffer of pipe */
    rb_memptr = rt_malloc(size);
    if (rb_memptr == RT_NULL)
    {
267
        rt_free(pipe);
268 269
        return -RT_ENOMEM;
    }
270

271
    return rt_pipe_init(pipe, name, flag, rb_memptr, size);
272 273 274 275 276 277 278 279 280
}
RTM_EXPORT(rt_pipe_create);

void rt_pipe_destroy(struct rt_pipe_device *pipe)
{
    if (pipe == RT_NULL)
        return;

    /* un-register pipe device */
281
    rt_pipe_detach(pipe);
282 283 284 285 286 287 288 289

    /* release memory */
    rt_free(pipe->ringbuffer.buffer_ptr);
    rt_free(pipe);

    return;
}
RTM_EXPORT(rt_pipe_destroy);
290
#endif /* RT_USING_HEAP */