pipe.c 11.6 KB
Newer Older
1
/*
2
 * Copyright (c) 2006-2021, RT-Thread Development Team
3
 *
4
 * SPDX-License-Identifier: Apache-2.0
5 6 7 8
 *
 * Change Logs:
 * Date           Author       Notes
 * 2012-09-30     Bernard      first version.
9
 * 2017-11-08     JasonJiaJie  fix memory leak issue when close a pipe.
10 11 12
 */
#include <rthw.h>
#include <rtdevice.h>
13
#include <stdint.h>
14
#include <sys/errno.h>
B
bernard 已提交
15

16
#ifdef RT_USING_POSIX
B
bernard 已提交
17 18
#include <dfs_file.h>
#include <dfs_posix.h>
B
bernard 已提交
19
#include <dfs_poll.h>
20

B
bernard 已提交
21
static int pipe_fops_open(struct dfs_fd *fd)
22
{
B
Bernard Xiong 已提交
23
    int rc = 0;
B
bernard 已提交
24 25
    rt_device_t device;
    rt_pipe_t *pipe;
26

B
bernard 已提交
27 28
    pipe = (rt_pipe_t *)fd->data;
    if (!pipe) return -1;
29

B
bernard 已提交
30 31
    device = &(pipe->parent);
    rt_mutex_take(&(pipe->lock), RT_WAITING_FOREVER);
32

B
bernard 已提交
33 34
    if (device->ref_count == 0)
    {
35
        pipe->fifo = rt_ringbuffer_create(pipe->bufsz);
36 37 38 39 40
        if (pipe->fifo == RT_NULL)
        {
            rc = -RT_ENOMEM;
            goto __exit;
        }
B
bernard 已提交
41
    }
42

B
bernard 已提交
43 44
    switch (fd->flags & O_ACCMODE)
    {
mysterywolf's avatar
mysterywolf 已提交
45 46 47 48 49 50 51 52 53 54
        case O_RDONLY:
            pipe->readers ++;
            break;
        case O_WRONLY:
            pipe->writers ++;
            break;
        case O_RDWR:
            pipe->readers ++;
            pipe->writers ++;
            break;
55
    }
B
bernard 已提交
56 57
    device->ref_count ++;

58
__exit:
B
bernard 已提交
59 60
    rt_mutex_release(&(pipe->lock));

61
    return rc;
62 63
}

B
bernard 已提交
64
static int pipe_fops_close(struct dfs_fd *fd)
65
{
B
bernard 已提交
66 67
    rt_device_t device;
    rt_pipe_t *pipe;
68

B
bernard 已提交
69 70
    pipe = (rt_pipe_t *)fd->data;
    if (!pipe) return -1;
71

B
bernard 已提交
72 73 74 75
    device = &(pipe->parent);
    rt_mutex_take(&(pipe->lock), RT_WAITING_FOREVER);

    switch (fd->flags & O_ACCMODE)
76
    {
mysterywolf's avatar
mysterywolf 已提交
77 78 79 80 81 82 83 84 85 86
        case O_RDONLY:
            pipe->readers --;
            break;
        case O_WRONLY:
            pipe->writers --;
            break;
        case O_RDWR:
            pipe->readers --;
            pipe->writers --;
            break;
B
bernard 已提交
87
    }
88

B
bernard 已提交
89 90 91 92
    if (pipe->writers == 0)
    {
        rt_wqueue_wakeup(&(pipe->reader_queue), (void*)(POLLIN | POLLERR | POLLHUP));
    }
93

B
bernard 已提交
94 95 96 97
    if (pipe->readers == 0)
    {
        rt_wqueue_wakeup(&(pipe->writer_queue), (void*)(POLLOUT | POLLERR | POLLHUP));
    }
98

B
bernard 已提交
99 100
    if (device->ref_count == 1)
    {
101 102
        if (pipe->fifo != RT_NULL)
            rt_ringbuffer_destroy(pipe->fifo);
B
bernard 已提交
103
        pipe->fifo = RT_NULL;
104
    }
B
bernard 已提交
105 106 107 108
    device->ref_count --;

    rt_mutex_release(&(pipe->lock));

109 110 111 112 113 114
    if (device->ref_count == 0 && pipe->is_named == RT_FALSE)
    {
        /* delete the unamed pipe */
        rt_pipe_delete(device->parent.name);
    }

B
bernard 已提交
115 116 117
    return 0;
}

B
bernard 已提交
118
static int pipe_fops_ioctl(struct dfs_fd *fd, int cmd, void *args)
B
bernard 已提交
119 120 121 122 123 124 125 126
{
    rt_pipe_t *pipe;
    int ret = 0;

    pipe = (rt_pipe_t *)fd->data;

    switch (cmd)
    {
mysterywolf's avatar
mysterywolf 已提交
127 128 129 130 131 132 133 134 135
        case FIONREAD:
            *((int*)args) = rt_ringbuffer_data_len(pipe->fifo);
            break;
        case FIONWRITE:
            *((int*)args) = rt_ringbuffer_space_len(pipe->fifo);
            break;
        default:
            ret = -EINVAL;
            break;
B
bernard 已提交
136 137 138 139 140
    }

    return ret;
}

B
bernard 已提交
141
static int pipe_fops_read(struct dfs_fd *fd, void *buf, size_t count)
B
bernard 已提交
142 143 144
{
    int len = 0;
    rt_pipe_t *pipe;
145

B
bernard 已提交
146
    pipe = (rt_pipe_t *)fd->data;
147

B
bernard 已提交
148 149 150
    /* no process has the pipe open for writing, return end-of-file */
    if (pipe->writers == 0)
        return 0;
151

B
bernard 已提交
152 153 154 155 156
    rt_mutex_take(&(pipe->lock), RT_WAITING_FOREVER);

    while (1)
    {
        if (pipe->writers == 0)
157
        {
B
bernard 已提交
158 159 160 161
            goto out;
        }

        len = rt_ringbuffer_get(pipe->fifo, buf, count);
162

B
bernard 已提交
163 164 165
        if (len > 0)
        {
            break;
166 167 168
        }
        else
        {
B
bernard 已提交
169 170 171 172 173 174 175 176 177 178
            if (fd->flags & O_NONBLOCK)
            {
                len = -EAGAIN;
                goto out;
            }

            rt_mutex_release(&pipe->lock);
            rt_wqueue_wakeup(&(pipe->writer_queue), (void*)POLLOUT);
            rt_wqueue_wait(&(pipe->reader_queue), 0, -1);
            rt_mutex_take(&(pipe->lock), RT_WAITING_FOREVER);
179
        }
B
bernard 已提交
180 181 182 183
    }

    /* wakeup writer */
    rt_wqueue_wakeup(&(pipe->writer_queue), (void*)POLLOUT);
184

B
bernard 已提交
185 186 187
out:
    rt_mutex_release(&pipe->lock);
    return len;
188 189
}

B
bernard 已提交
190
static int pipe_fops_write(struct dfs_fd *fd, const void *buf, size_t count)
191
{
B
bernard 已提交
192 193 194 195 196 197 198
    int len;
    rt_pipe_t *pipe;
    int wakeup = 0;
    int ret = 0;
    uint8_t *pbuf;

    pipe = (rt_pipe_t *)fd->data;
199

B
bernard 已提交
200
    if (pipe->readers == 0)
201
    {
B
bernard 已提交
202 203 204
        ret = -EPIPE;
        goto out;
    }
205

B
bernard 已提交
206 207
    if (count == 0)
        return 0;
208

B
bernard 已提交
209 210
    pbuf = (uint8_t*)buf;
    rt_mutex_take(&pipe->lock, -1);
211

B
bernard 已提交
212 213 214 215 216 217 218 219
    while (1)
    {
        if (pipe->readers == 0)
        {
            if (ret == 0)
                ret = -EPIPE;
            break;
        }
220

B
bernard 已提交
221 222 223 224
        len = rt_ringbuffer_put(pipe->fifo, pbuf, count - ret);
        ret +=  len;
        pbuf += len;
        wakeup = 1;
225

B
bernard 已提交
226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241
        if (ret == count)
        {
            break;
        }
        else
        {
            if (fd->flags & O_NONBLOCK)
            {
                if (ret == 0)
                {
                    ret = -EAGAIN;
                }

                break;
            }
        }
242

B
bernard 已提交
243 244 245 246 247 248 249
        rt_mutex_release(&pipe->lock);
        rt_wqueue_wakeup(&(pipe->reader_queue), (void*)POLLIN);
        /* pipe full, waiting on suspended write list */
        rt_wqueue_wait(&(pipe->writer_queue), 0, -1);
        rt_mutex_take(&pipe->lock, -1);
    }
    rt_mutex_release(&pipe->lock);
250

B
bernard 已提交
251
    if (wakeup)
252
    {
B
bernard 已提交
253 254
        rt_wqueue_wakeup(&(pipe->reader_queue), (void*)POLLIN);
    }
255

B
bernard 已提交
256 257 258
out:
    return ret;
}
259

B
bernard 已提交
260
static int pipe_fops_poll(struct dfs_fd *fd, rt_pollreq_t *req)
B
bernard 已提交
261 262 263 264 265
{
    int mask = 0;
    rt_pipe_t *pipe;
    int mode = 0;
    pipe = (rt_pipe_t *)fd->data;
266

B
bernard 已提交
267 268
    rt_poll_add(&(pipe->reader_queue), req);
    rt_poll_add(&(pipe->writer_queue), req);
269

B
bernard 已提交
270 271 272 273 274 275 276 277 278 279 280 281
    switch (fd->flags & O_ACCMODE)
    {
    case O_RDONLY:
        mode = 1;
        break;
    case O_WRONLY:
        mode = 2;
        break;
    case O_RDWR:
        mode = 3;
        break;
    }
282

B
bernard 已提交
283 284 285 286 287 288 289 290 291 292 293
    if (mode & 1)
    {
        if (rt_ringbuffer_data_len(pipe->fifo) != 0)
        {
            mask |= POLLIN;
        }
        if (pipe->writers == 0)
        {
            mask |= POLLHUP;
        }
    }
294

B
bernard 已提交
295 296 297
    if (mode & 2)
    {
        if (rt_ringbuffer_space_len(pipe->fifo) != 0)
298
        {
B
bernard 已提交
299
            mask |= POLLOUT;
300
        }
B
bernard 已提交
301
        if (pipe->readers == 0)
302
        {
B
bernard 已提交
303
            mask |= POLLERR;
304
        }
B
bernard 已提交
305
    }
306

B
bernard 已提交
307
    return mask;
308 309
}

B
bernard 已提交
310
static const struct dfs_file_ops pipe_fops =
311
{
B
bernard 已提交
312 313 314 315 316
    pipe_fops_open,
    pipe_fops_close,
    pipe_fops_ioctl,
    pipe_fops_read,
    pipe_fops_write,
B
bernard 已提交
317 318 319
    RT_NULL,
    RT_NULL,
    RT_NULL,
B
bernard 已提交
320
    pipe_fops_poll,
B
bernard 已提交
321
};
322 323
#endif /* end of RT_USING_POSIX */

whj123999's avatar
whj123999 已提交
324
rt_err_t  rt_pipe_open(rt_device_t device, rt_uint16_t oflag)
325 326
{
    rt_pipe_t *pipe = (rt_pipe_t *)device;
D
David Lin 已提交
327
    rt_err_t ret = RT_EOK;
328

D
David Lin 已提交
329 330 331 332 333
    if (device == RT_NULL)
    {
        ret = -RT_EINVAL;
        goto __exit;
    }
334

335 336 337 338 339
    rt_mutex_take(&(pipe->lock), RT_WAITING_FOREVER);

    if (pipe->fifo == RT_NULL)
    {
        pipe->fifo = rt_ringbuffer_create(pipe->bufsz);
D
David Lin 已提交
340 341
        if (pipe->fifo == RT_NULL)
        {
D
David Lin 已提交
342
            ret = -RT_ENOMEM;
D
David Lin 已提交
343
        }
344 345 346 347
    }

    rt_mutex_release(&(pipe->lock));

D
David Lin 已提交
348 349
__exit:
    return ret;
350 351
}

whj123999's avatar
whj123999 已提交
352
rt_err_t  rt_pipe_close(rt_device_t device)
353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368
{
    rt_pipe_t *pipe = (rt_pipe_t *)device;

    if (device == RT_NULL) return -RT_EINVAL;
    rt_mutex_take(&(pipe->lock), RT_WAITING_FOREVER);

    if (device->ref_count == 1)
    {
        rt_ringbuffer_destroy(pipe->fifo);
        pipe->fifo = RT_NULL;
    }

    rt_mutex_release(&(pipe->lock));

    return RT_EOK;
}
B
bernard 已提交
369

whj123999's avatar
whj123999 已提交
370
rt_size_t rt_pipe_read(rt_device_t device, rt_off_t pos, void *buffer, rt_size_t count)
371 372
{
    uint8_t *pbuf;
373
    rt_size_t read_bytes = 0;
374 375
    rt_pipe_t *pipe = (rt_pipe_t *)device;

376 377
    if (device == RT_NULL)
    {
378
        rt_set_errno(EINVAL);
379 380
        return 0;
    }
381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397
    if (count == 0) return 0;

    pbuf = (uint8_t*)buffer;
    rt_mutex_take(&(pipe->lock), RT_WAITING_FOREVER);

    while (read_bytes < count)
    {
        int len = rt_ringbuffer_get(pipe->fifo, &pbuf[read_bytes], count - read_bytes);
        if (len <= 0) break;

        read_bytes += len;
    }
    rt_mutex_release(&pipe->lock);

    return read_bytes;
}

whj123999's avatar
whj123999 已提交
398
rt_size_t rt_pipe_write(rt_device_t device, rt_off_t pos, const void *buffer, rt_size_t count)
399 400
{
    uint8_t *pbuf;
401
    rt_size_t write_bytes = 0;
402 403
    rt_pipe_t *pipe = (rt_pipe_t *)device;

404 405
    if (device == RT_NULL)
    {
406
        rt_set_errno(EINVAL);
407 408
        return 0;
    }
409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430
    if (count == 0) return 0;

    pbuf = (uint8_t*)buffer;
    rt_mutex_take(&pipe->lock, -1);

    while (write_bytes < count)
    {
        int len = rt_ringbuffer_put(pipe->fifo, &pbuf[write_bytes], count - write_bytes);
        if (len <= 0) break;

        write_bytes += len;
    }
    rt_mutex_release(&pipe->lock);

    return write_bytes;
}

rt_err_t  rt_pipe_control(rt_device_t dev, int cmd, void *args)
{
    return RT_EOK;
}

B
Bernard Xiong 已提交
431
#ifdef RT_USING_DEVICE_OPS
432
const static struct rt_device_ops pipe_ops =
B
Bernard Xiong 已提交
433 434 435 436 437 438 439 440 441 442
{
    RT_NULL,
    rt_pipe_open,
    rt_pipe_close,
    rt_pipe_read,
    rt_pipe_write,
    rt_pipe_control,
};
#endif

443
rt_pipe_t *rt_pipe_create(const char *name, int bufsz)
444
{
B
bernard 已提交
445 446
    rt_pipe_t *pipe;
    rt_device_t dev;
447

448
    pipe = (rt_pipe_t *)rt_malloc(sizeof(rt_pipe_t));
B
bernard 已提交
449
    if (pipe == RT_NULL) return RT_NULL;
450

B
bernard 已提交
451
    rt_memset(pipe, 0, sizeof(rt_pipe_t));
452
    pipe->is_named = RT_TRUE; /* initialize as a named pipe */
B
bernard 已提交
453
    rt_mutex_init(&(pipe->lock), name, RT_IPC_FLAG_FIFO);
454 455
    rt_wqueue_init(&(pipe->reader_queue));
    rt_wqueue_init(&(pipe->writer_queue));
456

457 458 459
    RT_ASSERT(bufsz < 0xFFFF);
    pipe->bufsz = bufsz;

B
bernard 已提交
460 461
    dev = &(pipe->parent);
    dev->type = RT_Device_Class_Pipe;
B
Bernard Xiong 已提交
462 463 464
#ifdef RT_USING_DEVICE_OPS
    dev->ops         = &pipe_ops;
#else
465 466 467 468 469 470
    dev->init        = RT_NULL;
    dev->open        = rt_pipe_open;
    dev->read        = rt_pipe_read;
    dev->write       = rt_pipe_write;
    dev->close       = rt_pipe_close;
    dev->control     = rt_pipe_control;
B
Bernard Xiong 已提交
471
#endif
472 473 474

    dev->rx_indicate = RT_NULL;
    dev->tx_complete = RT_NULL;
475

B
bernard 已提交
476 477 478 479 480
    if (rt_device_register(&(pipe->parent), name, RT_DEVICE_FLAG_RDWR | RT_DEVICE_FLAG_REMOVABLE) != 0)
    {
        rt_free(pipe);
        return RT_NULL;
    }
481
#ifdef RT_USING_POSIX
B
bernard 已提交
482
    dev->fops = (void*)&pipe_fops;
483
#endif
484

B
bernard 已提交
485
    return pipe;
486 487
}

B
bernard 已提交
488
int rt_pipe_delete(const char *name)
489
{
B
bernard 已提交
490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509
    int result = 0;
    rt_device_t device;

    device = rt_device_find(name);
    if (device)
    {
        if (device->type == RT_Device_Class_Pipe)
        {
            rt_pipe_t *pipe;

            if (device->ref_count != 0)
            {
                return -RT_EBUSY;
            }

            pipe = (rt_pipe_t *)device;

            rt_mutex_detach(&(pipe->lock));
            rt_device_unregister(device);

510
            /* close fifo ringbuffer */
511
            if (pipe->fifo)
512 513 514 515
            {
                rt_ringbuffer_destroy(pipe->fifo);
                pipe->fifo = RT_NULL;
            }
B
bernard 已提交
516 517 518 519
            rt_free(pipe);
        }
        else
        {
520
            result = -RT_EINVAL;
B
bernard 已提交
521 522 523 524
        }
    }
    else
    {
525
        result = -RT_EINVAL;
B
bernard 已提交
526 527 528
    }

    return result;
529 530
}

531
#ifdef RT_USING_POSIX
B
bernard 已提交
532
int pipe(int fildes[2])
533
{
B
bernard 已提交
534
    rt_pipe_t *pipe;
whj123999's avatar
whj123999 已提交
535 536
    char dname[RT_NAME_MAX];
    char dev_name[RT_NAME_MAX * 4];
B
bernard 已提交
537
    static int pipeno = 0;
538

B
bernard 已提交
539 540
    rt_snprintf(dname, sizeof(dname), "pipe%d", pipeno++);

541
    pipe = rt_pipe_create(dname, PIPE_BUFSZ);
542
    if (pipe == RT_NULL)
B
bernard 已提交
543 544 545
    {
        return -1;
    }
546

547
    pipe->is_named = RT_FALSE; /* unamed pipe */
B
bernard 已提交
548 549 550
    rt_snprintf(dev_name, sizeof(dev_name), "/dev/%s", dname);
    fildes[0] = open(dev_name, O_RDONLY, 0);
    if (fildes[0] < 0)
551
    {
B
bernard 已提交
552
        return -1;
553
    }
554

B
bernard 已提交
555 556 557
    fildes[1] = open(dev_name, O_WRONLY, 0);
    if (fildes[1] < 0)
    {
558
        close(fildes[0]);
B
bernard 已提交
559 560 561 562
        return -1;
    }

    return 0;
563 564
}

B
bernard 已提交
565
int mkfifo(const char *path, mode_t mode)
566
{
B
bernard 已提交
567
    rt_pipe_t *pipe;
568

569
    pipe = rt_pipe_create(path, PIPE_BUFSZ);
570
    if (pipe == RT_NULL)
B
bernard 已提交
571 572 573
    {
        return -1;
    }
574

B
bernard 已提交
575
    return 0;
576
}
B
bernard 已提交
577
#endif