Home > Erlang探索, 源码分析 > gen_tcp接收缓冲区易混淆概念纠正

gen_tcp接收缓冲区易混淆概念纠正

May 14th, 2013

原创文章,转载请注明: 转载自系统技术非业余研究

本文链接地址: gen_tcp接收缓冲区易混淆概念纠正

Erlang的每个TCP网络链接是由相应的gen_tcp对象来表示的,说白了就是个port, 实现Erlang网络相关的逻辑,其实现代码位于erts/emulator/drivers/common/inet_drv.c

参照inet:setopts文档,它有三个buffer相关的选项,非常让人费解:

{buffer, Size}
Determines the size of the user-level software buffer used by the driver. Not to be confused with sndbuf and recbuf options which correspond to the kernel socket buffers. It is recommended to have val(buffer) >= max(val(sndbuf),val(recbuf)). In fact, the val(buffer) is automatically set to the above maximum when sndbuf or recbuf values are set.

{recbuf, Size}
Gives the size of the receive buffer to use for the socket.

{sndbuf, Size}
Gives the size of the send buffer to use for the socket.

其中sndbuf, recbuf选项比较好理解, 就是设置gen_tcp所拥有的socket句柄的内核的发送和接收缓冲区,从代码可以验证:

/* inet_drv.c */
#define INET_OPT_SNDBUF     6   /* set send buffer size */
#define INET_OPT_RCVBUF     7   /* set receive buffer size */
static int inet_set_opts(inet_descriptor* desc, char* ptr, int len)
{
...
        case INET_OPT_SNDBUF:    type = SO_SNDBUF;
            DEBUGF(("inet_set_opts(%ld): s=%d, SO_SNDBUF=%d\r\n",
                    (long)desc->port, desc->s, ival));
            break;
        case INET_OPT_RCVBUF:    type = SO_RCVBUF;
            DEBUGF(("inet_set_opts(%ld): s=%d, SO_RCVBUF=%d\r\n",
                    (long)desc->port, desc->s, ival));
            break;
...
        res = sock_setopt           (desc->s, proto, type, arg_ptr, arg_sz);
...
}

那buffer是什么呢,他们三者之间的关系? 从文档的描述来看:
It is recommended to have val(buffer) >= max(val(sndbuf),val(recbuf)). In fact, the val(buffer) is automatically set to the above maximum when sndbuf or recbuf values are set.

再对照源码:

/* inet_drv.c */

#define INET_DEF_BUFFER     1460        /* default buffer size */
#define INET_MIN_BUFFER     1           /* internal min buffer */
#define INET_LOPT_BUFFER      20  /* min buffer size hint */

static int inet_set_opts(inet_descriptor* desc, char* ptr, int len)
{
...
       case INET_LOPT_BUFFER:
            DEBUGF(("inet_set_opts(%ld): s=%d, BUFFER=%d\r\n",
                    (long)desc->port, desc->s, ival));
            if (ival < INET_MIN_BUFFER) ival = INET_MIN_BUFFER;
            desc->bufsz = ival;
            continue;

        DEBUGF(("inet_set_opts(%ld): s=%d returned %d\r\n",
                (long)desc->port, desc->s, res));
        if (type == SO_RCVBUF) {
            /* make sure we have desc->bufsz >= SO_RCVBUF */
            if (ival > desc->bufsz)
                desc->bufsz = ival;
        }
...
}

/* Allocate descriptor */
static ErlDrvData inet_start(ErlDrvPort port, int size, int protocol)
{
...
    desc->bufsz = INET_DEF_BUFFER;
...
}

我们从源码看到在实现上inet:setopts的二点要素:
1. make sure we have desc->bufsz >= SO_RCVBUF
2. desc->bufsz min buffer size hint
3. 接收缓冲区默认长度 1460, 刚好是一个mtu长度
4. 最小的缓冲区大小为1
5. bufsz不继承

但是关系还是没搞明白。

好吧,通读inet_drv源码,我们可以看出gen_tcp接收包的流程:

1. 当socket上面有数据的时候,epoll会通知到port,最终导致tcp_inet_drv_input被调用。
2. tcp_inet_drv_input 发现如果连接已建立,就会调用tcp_recv来处理网络封包。
3. tcp_recv在调用sock_recv真正准备接收数据包前:
a. 如果发现接收缓冲区是空的话,会分配一个缓冲区。如果包大小已知,缓冲区大小就是包大小,否则的话为desc->bufsz。
b. 如果缓冲区非空,这时候看看是否已经收了一个以上完整的包,如果是就通过tcp_deliver往上层投递包,投递后如果缓冲区里面除了完整包以外,没有其他数据的话,就会调用tcp_clear_input把输入缓冲区释放掉。
c. 如果缓冲区非空,而且缓冲区的大小无法容纳包的话,就会调用tcp_expand_buffer来把缓冲区扩大到包大小。
4. 接收好网络封包后,根据packet类型进行进一步整理,投递给上层,同时释放接收缓存区。

/* clear CURRENT input buffer */
static void tcp_clear_input(tcp_descriptor* desc)
{
    if (desc->i_buf != NULL)
        free_buffer(desc->i_buf);
    desc->i_buf = NULL;
    desc->i_remain    = 0;
    desc->i_ptr       = NULL;
    desc->i_ptr_start = NULL;
    desc->i_bufsz     = 0;
}
/*                                                                                                                        
** Set new size on buffer, used when packet size is determined                                                            
** and the buffer is to small.                                                                                            
** buffer must have a size of at least len bytes (counting from ptr_start!)                                               
*/
static int tcp_expand_buffer(tcp_descriptor* desc, int len)
{
   ...
    if (desc->i_bufsz >= ulen) /* packet will fit */
        return 0;
    else if (desc->i_buf->orig_size >= ulen) { /* buffer is large enough */
        desc->i_bufsz = ulen;  /* set "virtual" size */
        return 0;
    }

    offs1 = desc->i_ptr_start - desc->i_buf->orig_bytes;
    offs2 = desc->i_ptr - desc->i_ptr_start;

    if ((bin = driver_realloc_binary(desc->i_buf, ulen)) == NULL)
        return -1;

    desc->i_buf = bin;
    desc->i_ptr_start = bin->orig_bytes + offs1;
    desc->i_ptr       = desc->i_ptr_start + offs2;
...
}
/*                                                                                                                        
** Deliver all packets ready                                                                                              
** if len == 0 then check start with a check for ready packet                                                             
*/
static int tcp_deliver(tcp_descriptor* desc, int len)
{
...
    while (len > 0) {
        int code;

        inet_input_count(INETP(desc), len);

        /* deliver binary? */
        if (len*4 >= desc->i_buf->orig_size*3) { /* >=75% */
            code = tcp_reply_binary_data(desc, desc->i_buf,
                                         (desc->i_ptr_start -
                                          desc->i_buf->orig_bytes),
                                         len);
            if (code < 0)
                return code;

            /* something after? */
            if (desc->i_ptr_start + len == desc->i_ptr) { /* no */
                tcp_clear_input(desc);
            }
            else { /* move trail to beginning of a new buffer */
                ErlDrvBinary* bin = alloc_buffer(desc->i_bufsz);
                char* ptr_end = desc->i_ptr_start + len;
                int sz = desc->i_ptr - ptr_end;

                memcpy(bin->orig_bytes, ptr_end, sz);
                free_buffer(desc->i_buf);
                desc->i_buf = bin;
                desc->i_ptr_start = desc->i_buf->orig_bytes;
                desc->i_ptr = desc->i_ptr_start + sz;
                desc->i_remain = 0;
...
}
}

除了上面的逻辑外,这里需要强调几点:
1. 默认情况下gen_tcp建立的时候,接收缓冲区是空的。
2. 接收完整的包投递后,释放接收缓冲区。
3. 接收缓冲区大小由包的大小决定,如果包未知,由desc->bufsz决定。
4. INET_LOPT_BUFFER仅仅影响接收缓冲区,发送无需缓冲区,因为发送的时候,sendv可以直接发送队列里面的数据。
5. INET_LOPT_BUFFER只是给个缓冲区大小的hint, 而非强制。

分析到这里为止,我们可以把这三个缓冲区的概念搞清楚了。接下来就是如何用好这些缓冲区的实践了:

1. INET_LOPT_BUFFER由于指示的是inet_drv这个层面接收缓冲区的默认大小,所以这个缓冲区最好是比操作内核SO_RCVBUF指示的接收缓冲区要大。
2. INET_LOPT_BUFFER只是个hint, 在包大小未知的情况下,影响接收缓冲区的大小,而如果要接收的包大于接收缓冲区的时候,就要扩展缓冲区,通过realloc来实现的。所以通过统计包的平均大小,设置一个比较合理的hint, 减少expand缓冲区的发生。inets:getstat(Socket, [recv_avg]). 可以帮我们统计到平均包大小。

这里还需要指出个问题,通过前面的分析,我们知道接收缓冲区不停的分配,释放,这对内存分配器造成很大的压力。 所以inet_drv实现了一套小型的内存分配池。为了减少冲突,每个CPU一个分配池. 每个池维护最近使用的buffer, 达到最快分配到buffer的目的。

参看代码如下:

static ErlDrvBinary* alloc_buffer(ErlDrvSizeT minsz)
{
    InetDrvBufStk *bs = get_bufstk();
    if (bs && bs->buf.pos > 0) {
        long size;
        ErlDrvBinary* buf = bs->buf.stk[--bs->buf.pos];
        size = buf->orig_size;
        bs->buf.mem_size -= size;

        if (size >= minsz)
            return buf;

        driver_free_binary(buf);
    }

    return driver_alloc_binary(minsz);
}

static void release_buffer(ErlDrvBinary* buf)
{
...
    bs = get_bufstk();
    if (!bs
        || (bs->buf.mem_size + size > BUFFER_STACK_MAX_MEM_SIZE)
        || (bs->buf.pos >= BUFFER_STACK_SIZE)) {
    free_binary:
        driver_free_binary(buf);
    }
    else {
        bs->buf.mem_size += size;
        bs->buf.stk[bs->buf.pos++] = buf;
    }
...
}

有了高速的内存分配器,gen_tcp的接收缓冲区的管理的代价就不算太大。gen_tcp这样设计接收缓冲区的目的是为了能够在大量网络链接的情况下,尽可能的节约内存,典型的用时间换空间的设计。

小结: 源码是最好的答案,文档不是。
祝玩得开心!

Post Footer automatically generated by wp-posturl plugin for wordpress.

  1. piboyeliu
    June 22nd, 2013 at 10:52 | #1

    2. tcp_inet_drv_input 发现如果连接已建立,就会调用tcp_recv来处理网络封包。
    这是针对 {active, true} 模式吗?
    如果是 {active, false} 是不是要等用户调用 gen_tcp:recv 的时候才接收包呢?

Comments are closed.