Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

一个c实现的channel #17

Open
ICKelin opened this issue Jan 4, 2019 · 0 comments
Open

一个c实现的channel #17

ICKelin opened this issue Jan 4, 2019 · 0 comments

Comments

@ICKelin
Copy link
Owner

ICKelin commented Jan 4, 2019

最近看到一段用c实现的channel,感觉挺小巧精妙的,就好好研究了下。

包含两个基本操作:1、往channel发送数据;2、从channel中读取数据

本质上是用一段内存来实现一个环形队列。用两个游标来指向队头和队尾。每次要发送时,往队尾加数据,要读取时,从队头游标获取数据,当内存块不足时进行扩容,扩容机制采用的是原来内存块*2的方式。针对并发操作,用锁来保证同一时刻环形队列只有一个线程操作。

下面是具体代码,代码量很少。

channel.h:
定义基本数据结构以及接口

#ifndef _CHANNEL_H_
#define _CHANNEL_H_

#include "pv.h"

typedef struct
{
    int count;   // allocate count
    int used;    // used count
    int cursor;  // current position to rcv
    int last;    // current position to send
    int valsize; // each element size
    void *data;  // data memory ptr

    mutex_t lock; // mutex lock

} channel_t;

channel_t *new_chan(int valsize);

// send val to channel
// if channel is full, expand it, the expand strategy is old_size * 2
int chan_send(channel_t *chan, void *val);

// recv value from channel
// return !0 if channel empty
// otherwise return 0 and update the element params
int chan_rcv(channel_t *chan, void *ele);

// free queue data
// free channel
void free_chan(channel_t *chan);

#endif

channel.c:
接口实现


#include <stdlib.h>
#include <string.h>

#include "channel.h"

channel_t *new_chan(int valsize)
{
    channel_t *ch = malloc(sizeof(channel_t));

    if (!ch) {
        return NULL;
    }

    ch->count = 1;
    ch->valsize = valsize;
    ch->cursor = 0;
    ch->last = 0;
    ch->used = 0;
    ch->data = malloc(valsize);

    mutex_init(ch->lock);
    return ch;
}

// send val to channel
int chan_send(channel_t *chan, void *val)
{
    if (!chan) {
        return -1;
    }

    P(chan->lock);

    if (chan->used == chan->count)
    {
        int old = chan->count;
        int newcount = old * 2;

        chan->data = realloc(chan->data, chan->valsize * newcount);
        if (!chan->data)
        {
            V(chan->lock);
            return -2;
        }

        // reconstruct queue
        // supporse that the old channel is data[1,2,3,4,5]
        // the cursor pointer is 3, the last pointer is 2.
        // after expand, the channel is data[1,2,3,4,5,0,0,0,0...]
        // it should be data[0,0,3,4,5,1,2,0,0,0...]
        // the last pointer should be cursor + old_queue_size
        memcpy(chan->data + old * chan->valsize, chan->data, chan->cursor * chan->valsize);
        chan->count = newcount;
        chan->last = chan->cursor + old;
    }

    memcpy(chan->data + chan->last * chan->valsize, val, chan->valsize);
    chan->last = (chan->last + 1);
    if (chan->last == chan->count)
        chan->last = 0;
    chan->used += 1;

    V(chan->lock);
    return 0;
}

// recv value from channel
int chan_rcv(channel_t *chan, void *ele)
{
    P(chan->lock);
    if (chan->used == 0)
    {
        V(chan->lock);
        return -1;
    }

    memcpy(ele, chan->data + chan->cursor * chan->valsize, chan->valsize);
    chan->used -= 1;
    chan->cursor += 1;

    if (chan->cursor == chan->count)
    { // cursor move the begin of the queue
        chan->cursor = 0;
    }

    V(chan->lock);
    return 0;
}

// free channel
void free_chan(channel_t *chan)
{
    mutex_destroy(chan->lock);
    free(chan->data);
    free(chan);
}


pv.h:
锁操作的包裹函数

#ifndef _PV_H_
#define _PV_H_

#include <pthread.h>

#define mutex_t pthread_mutex_t

#define mutex_init(mu) pthread_mutex_init(&mu, NULL)
#define mutex_destroy(mu) pthread_mutex_destroy(&mu)
#define P(mu) pthread_mutex_lock(&mu)
#define V(mu) pthread_mutex_unlock(&mu)

#endif

接下来用图片显示具体每个操作之后内存状态。

  • 初始化
    分配一个元素的空间
    image

  • chan_send p1
    不需要扩容,直接附加到last即可
    image

  • chan_send p2
    send p1之后,data指向的内存块已被占用完。需要进行扩容,然后再附加到last
    image

接下来如果继续send,仅仅是重复上面两个过程,容量充足,则附加到last,不足,扩容再附加。

但是通常不会出现一直send的情况,会在send和rcv之间交替进行。
假设 send p1和send p2之后执行recv操作。

image

稍稍有点奇怪,但是也还算正常,这时候如果再 send p3, send p4。

image

这里挺巧妙的,需要琢磨一下,为了保证先进先出,需要将cursor之前的数据,也就是后进的数据移动到后面。代码当中也写了很长一段注释说明。

        // reconstruct queue
        // supporse that the old channel is data[1,2,3,4,5]
        // the cursor pointer is 3, the last pointer is 2.
        // after expand, the channel is data[1,2,3,4,5,0,0,0,0...]
        // it should be data[0,0,3,4,5,1,2,0,0,0...]
        // the last pointer should be cursor + old_queue_size
        memcpy(chan->data + old * chan->valsize, chan->data, chan->cursor * chan->valsize);
        chan->count = newcount;
        chan->last = chan->cursor + old;

总的来说很小巧,不难,又有意思,适合无聊逛github的时候看看。

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant