Home > Erlang探索, 源码分析 > gen_tcp发送缓冲区以及水位线问题分析

gen_tcp发送缓冲区以及水位线问题分析

May 15th, 2013

原创文章,转载请注明: 转载自系统技术非业余研究

本文链接地址: gen_tcp发送缓冲区以及水位线问题分析

前段时间有同学在线上问了个问题:

服务器端我是这样设的:gen_tcp:listen(8000, [{active, false}, {recbuf,1}, {buffer,1}]).
客户端是这样设的:gen_tcp:connect(“localhost”, 8000, [{active, false}, {high_watermark,2}, {low_watermark,1}, {sndbuf,1}, {buffer,1}]).
我客户端每次gen_tcp:send()发送一个字节,前6个字节返回ok,第7个字节阻塞
服务端每次gen_tcp:recv(_,0)接收一个字节,接收三个字节后,客户端的第7次发送返回。
按我的理解的话:应该是 服务器端可以接收2个字节+sndbuf里的一个字节,第4个字节客户端就该阻塞的,可事实不时这样,求分析

这个问题确实还是比较复杂,涉及到gen_tcp的发送缓冲区和接收缓冲区,水位线等问题,其中接收缓冲区的问题在这篇 以及这篇 博文里面讲的比较清楚了,今天我们重点来分析下发送缓冲区和水位线的问题。

在开始分析前,我们需要熟悉几个gen_tcp的选项, 更多参见 这里

{delay_send, Boolean}
Normally, when an Erlang process sends to a socket, the driver will try to immediately send the data. If that fails, the driver will use any means available to queue up the message to be sent whenever the operating system says it can handle it. Setting {delay_send, true} will make all messages queue up. This makes the messages actually sent onto the network be larger but fewer. The option actually affects the scheduling of send requests versus Erlang processes instead of changing any real property of the socket. Needless to say it is an implementation specific option. Default is false.

{high_msgq_watermark, Size} (TCP/IP sockets)
The socket message queue will be set into a busy state when the amount of data queued on the message queue reaches this limit. Note that this limit only concerns data that have not yet reached the ERTS internal socket implementation. Default value used is 8 kB.

Senders of data to the socket will be suspended if either the socket message queue is busy, or the socket itself is busy.

For more information see the low_msgq_watermark, high_watermark, and low_watermark options.

Note that distribution sockets will disable the use of high_msgq_watermark and low_msgq_watermark, and will instead use the distribution buffer busy limit which is a similar feature.

{high_watermark, Size} (TCP/IP sockets)
The socket will be set into a busy state when the amount of data queued internally by the ERTS socket implementation reaches this limit. Default value used is 8 kB.

Senders of data to the socket will be suspended if either the socket message queue is busy, or the socket itself is busy.

For more information see the low_watermark, high_msgq_watermark, and low_msqg_watermark options.
{low_msgq_watermark, Size} (TCP/IP sockets)
If the socket message queue is in a busy state, the socket message queue will be set in a not busy state when the amount of data queued in the message queue falls below this limit. Note that this limit only concerns data that have not yet reached the ERTS internal socket implementation. Default value used is 4 kB.

Senders that have been suspended due to either a busy message queue or a busy socket, will be resumed when neither the socket message queue, nor the socket are busy.

For more information see the high_msgq_watermark, high_watermark, and low_watermark options.

Note that distribution sockets will disable the use of high_msgq_watermark and low_msgq_watermark, and will instead use the distribution buffer busy limit which is a similar feature.

{low_watermark, Size} (TCP/IP sockets)
If the socket is in a busy state, the socket will be set in a not busy state when the amount of data queued internally by the ERTS socket implementation falls below this limit. Default value used is 4 kB.

Senders that have been suspended due to either a busy message queue or a busy socket, will be resumed when neither the socket message queue, nor the socket are busy.

For more information see the high_watermark, high_msgq_watermark, and low_msgq_watermark options.

这选项里面两对高低水位线的设置,以及delay_send选项,对发送缓冲区的影响很大。
gen_tcp:send的行为在之前的 博文 中分析的比较到位了,建议同学先看看这篇文章垫底下。

我们知道每个erlang的进程都有个消息队列,其他进程要和他通信就需要透过发消息给他,把通讯的内容在消息里面交代清楚。进程消息队列里面一旦有消息,erlang的VM就会马上准备调度该进程来让进程执行,处理消息。这个进程的消息队列机制每个erlang入门的书籍都写的非常清楚。 那么port呢?在Erlang的早期,Port是和进程一样的地位,接口,使用方式。Port作为Erlang对外的IO的执行单位,也拥有自己的消息队列,当进程把消息发送给port的时候,port通常也是把消息保存在消息队列中,然后VM就会调度这个port。等到port被调度执行的时候,port把队列里面的消息消耗掉,发送到网络或者执行相应IO的操作。port的调度和erlang的进程的调度是一样的,都非常讲究公平调度。

我们来考证下port和进程消息发送的接口。 我们知道!符号是erlang:send的语法糖,当我们给Port!msg 或者Pid!msg,最终都是调用erlang:send来发送消息。后面不知道为什么,erlang的设计者专门为port设计了port_command系列函数专门为port发送消息。

我们来考证下:
erlang:send->BIF_RETTYPE send_3(BIF_ALIST_3)->do_send 源码在bif.c中我们来看看:

Sint
do_send(Process *p, Eterm to, Eterm msg, int suspend, Eterm *refp) {
...
    if (is_internal_pid(to)) {
	...

    } else if (is_external_pid(to)) {
    ...
    return remote_send(p, dep, to, to, msg, suspend);
    } else if (is_atom(to)) {
    ...
    } else if (is_external_port(to)
               && (external_port_dist_entry(to)
                   == erts_this_dist_entry)) {
        erts_dsprintf_buf_t *dsbufp = erts_create_logger_dsbuf();
        erts_dsprintf(dsbufp,
                      "Discarding message %T from %T to %T in an old "
                      "incarnation (%d) of this node (%d)\n",
                      msg,
                      p->common.id,
                      to,
                      external_port_creation(to),
                      erts_this_node->creation);
        erts_send_error_to_logger(p->group_leader, dsbufp);
        return 0;
    } else if (is_internal_port(to)) {
    ...
        pt = erts_port_lookup(portid, ERTS_PORT_SFLGS_INVALID_LOOKUP);
        ...
            switch (erts_port_command(p, ps_flags, pt, msg, refp)) {
            case ERTS_PORT_OP_CALLER_EXIT:
...
}

诸位看到了吧! 1. erlang:send接受二种对象: port和process 2. 发送到port的消息走的和erts_port_command是一样的路。

喝口水,保存体力,重新温习下二点: 1. port有消息队列。 2. port也是公平调度。

有了上面的知识铺垫,我们其实就比较好明白上面选项中的水位线做什么的。和每个消息队列一样,为了防止发送者和接收者能力的失衡,通常都会设置高低水位线来保护队列不至于太大把
系统撑爆。 上面的{high_watermark, Size},{low_watermark, Size} 就是干这个用的。

那port是如何保护自己的呢?答案是:
当消息量达到高水位线的时候,port进入busy状态,这时候会把发送进程suspend起来,等消息达到低水位线的时候,解除busy状态,同时让发送进程继续执行。

证明上面的说法,参考下port_command 文档

port_command(Port, Data, OptionList) -> boolean()

Types:

Port = port() | atom()
Data = iodata()
Option = force | nosuspend
OptionList = [Option]
Sends data to a port. port_command(Port, Data, []) equals port_command(Port, Data).

If the port command is aborted false is returned; otherwise, true is returned.

If the port is busy, the calling process will be suspended until the port is not busy anymore.

Currently the following Options are valid:

force
The calling process will not be suspended if the port is busy; instead, the port command is forced through. The call will fail with a notsup exception if the driver of

the port does not support this. For more information see the ERL_DRV_FLAG_SOFT_BUSY driver flag.
nosuspend
The calling process will not be suspended if the port is busy; instead, the port command is aborted and false is returned.

那如何知道一个port进入busy状态,因为这个状态通常很严重,发送进程被挂起,会引起很大的latency.

幸亏erlang考虑周到,参看这里

erlang:system_monitor(MonitorPid, Options) -> MonSettings

busy_port
If a process in the system gets suspended because it sends to a busy port, a message {monitor, SusPid, busy_port, Port} is sent to MonitorPid. SusPid is the pid that

got suspended when sending to Port.

系统会很友好的把发生busy_port的进程发出来,我们就可以知道那个进程进程碰到高水位线被挂起了,方面我们后面调整水位线避免这种情况发生。

当用户调用gen_tcp:send要发送数据的时候最终都会调用port_command来具体执行, 那么我们来看下它是如何运作的:

/* Command should be of the form                                                                                          
**   {PID, close}                                                                                                         
**   {PID, {command, io-list}}                                                                                            
**   {PID, {connect, New_PID}}                                                                                            
*/
ErtsPortOpResult
erts_port_command(Process *c_p,
                  int flags,
                  Port *port,
                  Eterm command,
                  Eterm *refp)
{
...
   if (is_tuple_arity(command, 2)) {
        Eterm cntd;
        tp = tuple_val(command);
        cntd = tp[1];
        if (is_internal_pid(cntd)) {
            if (tp[2] == am_close) {
                if (!erts_port_synchronous_ops)
                    refp = NULL;
                flags &= ~ERTS_PORT_SIG_FLG_NOSUSPEND;
                return erts_port_exit(c_p, flags, port, cntd, am_normal, refp);
            } else if (is_tuple_arity(tp[2], 2)) {
                tp = tuple_val(tp[2]);
                if (tp[1] == am_command) {
                    if (!(flags & ERTS_PORT_SIG_FLG_NOSUSPEND)
                        && !erts_port_synchronous_ops)
     	            refp = NULL;
                    return erts_port_output(c_p, flags, port, cntd, tp[2], refp);
                }
                else if (tp[1] == am_connect) {
                    if (!erts_port_synchronous_ops)
                        refp = NULL;
                    flags &= ~ERTS_PORT_SIG_FLG_NOSUSPEND;
                    return erts_port_connect(c_p, flags, port, cntd, tp[2], refp);
                }
            }
        }
    }
}
...
}

ErtsPortOpResult
erts_port_output(Process *c_p,
                 int flags,
                 Port *prt,
                 Eterm from,
                 Eterm list,
                 Eterm *refp)
{
...
    try_call = (force_immediate_call /* crash dumping */
                || !(sched_flags & (invalid_flags
                                    | ERTS_PTS_FLGS_FORCE_SCHEDULE_OP)));

    if (drv->outputv) {
          try_call_state.pre_chk_sched_flags = 0; /* already checked */
            if (force_immediate_call)
                try_call_res = force_imm_drv_call(&try_call_state);
            else
                try_call_res = try_imm_drv_call(&try_call_state);
            switch (try_call_res) {
            case ERTS_TRY_IMM_DRV_CALL_OK:
                call_driver_outputv(flags & ERTS_PORT_SIG_FLG_BANG_OP,
                                    c_p ? c_p->common.id : ERTS_INVALID_PID,
                                    from,
                                    prt,
                                    drv,
                                    evp);
                if (force_immediate_call)
                    finalize_force_imm_drv_call(&try_call_state);
                else
                    finalize_imm_drv_call(&try_call_state);
                /* Fall through... */
            ...
            case ERTS_TRY_IMM_DRV_CALL_INVALID_SCHED_FLAGS:
                sched_flags = try_call_state.sched_flags;
            case ERTS_TRY_IMM_DRV_CALL_BUSY_LOCK:
                /* Schedule outputv() call instead... */
                break;
...
}

static ERTS_INLINE void
call_driver_outputv(int bang_op,
                    Eterm caller,
                    Eterm from,
                    Port *prt,
                    erts_driver_t *drv,
                    ErlIOVec *evp)
{
    /*                                                                                                                    
     * if (bang_op)                                                                                                       
     *   we are part of a "Prt ! {From, {command, Data}}" operation                                                       
     * else                                                                                                               
     *   we are part of a call to port_command/[2,3]                                                                      
     * behave accordingly...                                                                                              
     */
    if (bang_op && from != ERTS_PORT_GET_CONNECTED(prt))
        send_badsig(prt);
    else {
...
        prt->caller = caller;
        (*drv->outputv)((ErlDrvData) prt->drv_data, evp);
        prt->caller = NIL;

        prt->bytes_out += size;
        erts_smp_atomic_add_nob(&erts_bytes_out, size);
    }
...
}

从源码分析来看,我们看到port_command如果看到port要执行command命令就会调用erts_port_output, 而后者会做复杂的判断,来决定如何调用call_driver_outputv。
这个复杂的流程就是msgq_watermark水位线发挥作用地方,我们暂时不分析,等后面讲msgq_watermark的时候一起。

目前只需要知道最终gen_tcp:send发松数据会调用port driver的outputv回调函数输出就好了。
接着源码分析:

static struct erl_drv_entry tcp_inet_driver_entry =
{
    tcp_inet_init,  /* inet_init will add this driver !! */
    tcp_inet_start,
    tcp_inet_stop,
    tcp_inet_command,
    tcp_inet_drv_input,
    tcp_inet_drv_output,
    "tcp_inet",
    NULL,
    NULL,
    tcp_inet_ctl,
    tcp_inet_timeout,
    tcp_inet_commandv,
...
}
static void tcp_inet_commandv(ErlDrvData e, ErlIOVec* ev)
{
    tcp_descriptor* desc = (tcp_descriptor*)e;
    desc->inet.caller = driver_caller(desc->inet.port);

    DEBUGF(("tcp_inet_commanv(%ld) {s=%d\r\n",
            (long)desc->inet.port, desc->inet.s));
    if (!IS_CONNECTED(INETP(desc))) {
        if (desc->tcp_add_flags & TCP_ADDF_DELAYED_CLOSE_SEND) {
            desc->tcp_add_flags &= ~TCP_ADDF_DELAYED_CLOSE_SEND;
            inet_reply_error_am(INETP(desc), am_closed);
        }
        else
            inet_reply_error(INETP(desc), ENOTCONN);
    }
    else if (tcp_sendv(desc, ev) == 0)
        inet_reply_ok(INETP(desc));
    DEBUGF(("tcp_inet_commandv(%ld) }\r\n", (long)desc->inet.port));
}

对于inet_drv(gen_tcp)的例子来讲就是会调用tcp_sendv来把消息转变成网络封包发送出去。

好吧,喝口水,休息下。 这里我们梳理下我们的数据路线:
gen_tcp:send->port_command->erts_port_output->call_driver_outputv->tcp_inet_commandv->tcp_sendv
大家要牢记在心。

继续接着我们参照源码来分析下水位线的实现:

/* inet_drv.c */
#define INET_LOPT_TCP_HIWTRMRK     27  /* set local high watermark */
#define INET_LOPT_TCP_LOWTRMRK     28  /* set local low watermark */


#define INET_HIGH_WATERMARK (1024*8) /* 8k pending high => busy  */
#define INET_LOW_WATERMARK  (1024*4) /* 4k pending => allow more */

typedef struct {
...
    int   high;                 /* high watermark */
    int   low;                  /* low watermark */
...
} tcp_descriptor;

static ErlDrvData tcp_inet_start(ErlDrvPort port, char* args)
{
...
    desc->high = INET_HIGH_WATERMARK;
    desc->low  = INET_LOW_WATERMARK;
...
}

static int inet_set_opts(inet_descriptor* desc, char* ptr, int len)
{
...
        case INET_LOPT_TCP_HIWTRMRK:
            if (desc->stype == SOCK_STREAM) {
                tcp_descriptor* tdesc = (tcp_descriptor*) desc;
                if (ival < 0) ival = 0;
                if (tdesc->low > ival)
                    tdesc->low = ival;
                tdesc->high = ival;
            }
            continue;

        case INET_LOPT_TCP_LOWTRMRK:
            if (desc->stype == SOCK_STREAM) {
                tcp_descriptor* tdesc = (tcp_descriptor*) desc;
                if (ival < 0) ival = 0;
                if (tdesc->high < ival)
                    tdesc->high = ival;
                tdesc->low = ival;
            }
            continue;
...
}

/* Copy a descriptor, by creating a new port with same settings                                                           
 * as the descriptor desc.                                                                                                
 * return NULL on error (SYSTEM_LIMIT no ports avail)                                                                     
 */
static tcp_descriptor* tcp_inet_copy(tcp_descriptor* desc,SOCKET s,
                                     ErlDrvTermData owner, int* err)
{
...
    copy_desc->high          = desc->high;
    copy_desc->low           = desc->low;
...
}

static int tcp_sendv(tcp_descriptor* desc, ErlIOVec* ev)
{
...
if ((sz = driver_sizeq(ix)) > 0) {
        driver_enqv(ix, ev, 0);
        if (sz+ev->size >= desc->high) {
            DEBUGF(("tcp_sendv(%ld): s=%d, sender forced busy\r\n",
                    (long)desc->inet.port, desc->inet.s));
            desc->inet.state |= INET_F_BUSY;  /* mark for low-watermark */
            desc->inet.busy_caller = desc->inet.caller;
            set_busy_port(desc->inet.port, 1);
            if (desc->send_timeout != INET_INFINITY) {
                desc->busy_on_send = 1;
                driver_set_timer(desc->inet.port, desc->send_timeout);
            }
            return 1;
        }
...
}

/* socket ready for ouput:                                                                                                
** 1. INET_STATE_CONNECTING => non block connect ?                                                                        
** 2. INET_STATE_CONNECTED  => write output                                                                               
*/
static int tcp_inet_output(tcp_descriptor* desc, HANDLE event)
{
...
  	if (driver_deq(ix, n) <= desc->low) {
                if (IS_BUSY(INETP(desc))) {
                    desc->inet.caller = desc->inet.busy_caller;
                    desc->inet.state &= ~INET_F_BUSY;
                    set_busy_port(desc->inet.port, 0);
                    /* if we have a timer then cancel and send ok to client */
                    if (desc->busy_on_send) {
                        driver_cancel_timer(desc->inet.port);
                        desc->busy_on_send = 0;
                    }
                    inet_reply_ok(INETP(desc));
                }
            }
...
}

从源码我们可以分析出几点:
1. 水位线设置是可以继承的。
2. 高低水位线默认是8K/4K.
3. 进入高水位后,port进入busy状态。
4. 当消息消耗到小于低水位线,busy解除。

这个水位线的说明和文档解释的一样,接下来我们稍微看看delay_send的实现原理,还是继续上源码:

/* TCP additional flags */
#define TCP_ADDF_DELAY_SEND    1

static int inet_set_opts(inet_descriptor* desc, char* ptr, int len)
{
...
case INET_LOPT_TCP_DELAY_SEND:
            if (desc->stype == SOCK_STREAM) {
                tcp_descriptor* tdesc = (tcp_descriptor*) desc;
                if (ival)
                    tdesc->tcp_add_flags |= TCP_ADDF_DELAY_SEND;
                else
                    tdesc->tcp_add_flags &= ~TCP_ADDF_DELAY_SEND;
            }
            continue;
...
}

/*                                                                                                                        
** Send non-blocking vector data                                                                                          
*/
static int tcp_sendv(tcp_descriptor* desc, ErlIOVec* ev)
{
...
        if (INETP(desc)->is_ignored) {
            INETP(desc)->is_ignored |= INET_IGNORE_WRITE;
            n = 0;
        } else if (desc->tcp_add_flags & TCP_ADDF_DELAY_SEND) {
            n = 0;
        } else if (IS_SOCKET_ERROR(sock_sendv(desc->inet.s, ev->iov,
                                              vsize, &n, 0))) {
            if ((sock_errno() != ERRNO_BLOCK) && (sock_errno() != EINTR)) {
                int err = sock_errno();
                DEBUGF(("tcp_sendv(%ld): s=%d, "
                        "sock_sendv(size=2) errno = %d\r\n",
                        (long)desc->inet.port, desc->inet.s, err));
                return tcp_send_error(desc, err);
            }
     	    n = 0;
        }
       else {
            DEBUGF(("tcp_sendv(%ld): s=%d, only sent "
                    LLU"/%d of "LLU"/%d bytes/items\r\n",
                    (long)desc->inet.port, desc->inet.s,
                    (llu_t)n, vsize, (llu_t)ev->size, ev->vsize));
        }

        DEBUGF(("tcp_sendv(%ld): s=%d, Send failed, queuing\r\n",
                (long)desc->inet.port, desc->inet.s));
        driver_enqv(ix, ev, n);
        if (!INETP(desc)->is_ignored)
            sock_select(INETP(desc),(FD_WRITE|FD_CLOSE), 1);
...
}
static void tcp_inet_drv_output(ErlDrvData data, ErlDrvEvent event)
{
    (void)tcp_inet_output((tcp_descriptor*)data, (HANDLE)event);
}
/* socket ready for ouput:                                                                                                
** 1. INET_STATE_CONNECTING => non block connect ?                                                                        
** 2. INET_STATE_CONNECTED  => write output                                                                               
*/
static int tcp_inet_output(tcp_descriptor* desc, HANDLE event)
{
...
 else if (IS_CONNECTED(INETP(desc))) {
        for (;;) {
            int vsize;
            ssize_t n;
            SysIOVec* iov;

            if ((iov = driver_peekq(ix, &vsize)) == NULL) {
                sock_select(INETP(desc), FD_WRITE, 0);
                send_empty_out_q_msgs(INETP(desc));
                goto done;
            }
            vsize = vsize > MAX_VSIZE ? MAX_VSIZE : vsize;
            DEBUGF(("tcp_inet_output(%ld): s=%d, About to send %d items\r\n",
                    (long)desc->inet.port, desc->inet.s, vsize));
            if (IS_SOCKET_ERROR(sock_sendv(desc->inet.s, iov, vsize, &n, 0))) {
                if ((sock_errno() != ERRNO_BLOCK) && (sock_errno() != EINTR)) {
                    DEBUGF(("tcp_inet_output(%ld): sock_sendv(%d) errno = %d\r\n",
                            (long)desc->inet.port, vsize, sock_errno()));
                    ret =  tcp_send_error(desc, sock_errno());
                    goto done;
                }
            goto done;
            }
            if (driver_deq(ix, n) <= desc->low) {
                if (IS_BUSY(INETP(desc))) {
                    desc->inet.caller = desc->inet.busy_caller;
                    desc->inet.state &= ~INET_F_BUSY;
                    set_busy_port(desc->inet.port, 0);
                    /* if we have a timer then cancel and send ok to client */
                    if (desc->busy_on_send) {
                        driver_cancel_timer(desc->inet.port);
                        desc->busy_on_send = 0;
                    }
             inet_reply_ok(INETP(desc));
                }
            }
...
}

从源码分析我们可以知道当tcp_sendv发送数据前看下:
1. delay_send标志是否设置,如果设置就不尝试调用sock_sendv发送。
2. 调用sock_sendv发送网络数据,剩下的部分数据保存到驱动的队列去。
3. 如果队列有数据的话,就把把epoll的写事件挂载上。
4. 后续epoll会通知socket可写的时候,会调用tcp_inet_drv_output
5. tcp_inet_drv_output->tcp_inet_output 继续把之前在队列里面的数据透过sock_sendv再次发送到网络

步骤3和4之间需要时间,依赖于epoll的写事件发生的以及port调度的时间点。

所以简单的说: delay_send就是在第一阶段不尝试发送数据,直接把数据推入port的消息队列去,等后面epoll说socket可写的时候一起发送出去。
这种做法的好处是gen_tcp:send马上就可以返回,因为sock_send通常要耗费几十us的时间,可用在对发送的latency很敏感的场合。

到这里为止,我们清楚的分析了数据是如何在port的各个链条里面流动.

再回顾下:当gen_tcp:send数据无法离开通过网络发送出去的时候,会暂时保留在port的消息队列里面,当消息队列满(到高水位线)的时候,port就会busy,抑制发送者推送更多的数据。当epoll探测到socket可写的时候,vm会调用tcp_inet_output把消息队列里面的数据,拉到网络去,这个过程中,队列里面的数据会越来越少,少到低水位线的时候,解除busy, 好让发送者发送更多的数据。

再喝口水,我们接着分析msgq_watermark. 这又是很大的一个坑,大家要坚持住,好像又是一顿分析!
首先,还是普及下知识:
从R16B的发布note里面我们摘抄和port相关的重大变化就是:

— Latency of signals sent from processes to ports — Signals
from processes to ports where previously always delivered
immediately. This kept latency for such communication to a
minimum, but it could cause lock contention which was very
expensive for the system as a whole. In order to keep this
latency low also in the future, most signals from processes
to ports are by default still delivered immediately as long
as no conflicts occur. Such conflicts include not being able
to acquire the port lock, but also include other conflicts.
When a conflict occur, the signal will be scheduled for
delivery at a later time. A scheduled signal delivery may
cause a higher latency for this specific communication, but
improves the overall performance of the system since it
reduce lock contention between schedulers. The default
behavior of only scheduling delivery of these signals on
conflict can be changed by passing the +spp command line flag
to erl(1). The behavior can also be changed on port basis
using the parallelism option of the open_port/2 BIF.

简单的说,过去进程给port发送数据的时候,都是立即锁定port, 调用call_driver_outputv来消耗数据,干完活解锁。这样对于单个请求来讲,latency最低。 但是如果系统有多个进程给同一个port发送数据,锁的碰撞率就会很高,势必影响到port的吞吐量。所以新的VM引入了port的parallelism这个概念,也就是说当锁冲突的时候,不是在那里傻傻的等,而是把要发送的数据引用计数后,通知port调度器在合适的时间,也就是说在
port不忙的时候,择机call_driver_outputv来消耗数据,这样就会大大提高吞吐量。

考证下代码:

int erts_port_parallelism = 0;

void
erl_start(int argc, char **argv)
{
...
else if (has_prefix("pp", sub_param)) {
                arg = get_arg(sub_param+2, argv[i+1], &i);
                if (sys_strcmp(arg, "true") == 0)
                    erts_port_parallelism = 1;
                else if (sys_strcmp(arg, "false") == 0)
                    erts_port_parallelism = 0;

...
}

static Port *
open_port(Process* p, Eterm name, Eterm settings, int *err_typep, int *err_nump)
{
... 
opts.parallelism = erts_port_parallelism;
...
  } else if (option == am_parallelism) {
                    if (*tp == am_true)
                        opts.parallelism = 1;
                    else if (*tp == am_false)
                        opts.parallelism = 0;
                    else
...
}

分析了源码可以知道:
1. port并行发送的行为为了和过去的版本兼容默认是关闭的,但是可以用+spp全局打开
2. 在open_port的时候通过参数{parallelism, true}来个别打开这个选项。

{parallelism, Boolean}
Set scheduler hint for port parallelism. If set to true, the VM will schedule port tasks when it by this can improve the parallelism in the system. If set to false, the VM will try to perform port tasks immediately and by this improving the latency at the expense of parallelism. The default can be set on system startup by passing the +spp command line argument to erl(1).

那开了这个选项后,对gen_tcp(port)有什么影响呢? 最明显的区别是过去call_driver_outputv是排队执行的,谁先拿到锁,谁先执行。那么在driver_outputv里面如果消息消耗不了,有可能会把数据加到port的消息队列去。 这个我们前面分析过,每个消息队列有高低水位线来控制,总能保证消息在一定的量。但是parallelism了后,当port在忙着做call_driver_outputv的时候,其他进程就不等了,直接把消息加引用计数保存到一个地方去,然后请求port调度器稍后调度执行这个消息,它就立即返回了。

各位看官看出来问题了吗? 如果不做控制的话,每个进程都会积累很多消息,都等着port调度器后续执行。所以port调度器就有义务来为这部分消息做水位线的控制,这就很自然的引入了msgq_watermark选项。

是不是有点复杂? 更复杂的还有呢? 引入msgq_watermark项后,那这些发送进程就可能被挂起,那如何唤醒它呢? 我们先回答这个问题, 上代码:

%%erlang.erl
port_command(Port, Data) ->
    case case erts_internal:port_command(Port, Data, []) of
             Ref when erlang:is_reference(Ref) -> receive {Ref, Res} -> Res end;
             Res -> Res
         end of
        true -> true;
        Error -> erlang:error(Error, [Port, Data])
    end.
/*                                                                                                                        
 * erts_internal:port_command/3 is used by the                                                                            
 * erlang:port_command/2 and erlang:port_command/3                                                                        
 * BIFs.                                                                                                                  
 */

BIF_RETTYPE erts_internal_port_command_3(BIF_ALIST_3)
{
...
 prt = lookup_port(BIF_P, BIF_ARG_1);
    if (!prt)
        BIF_RET(am_badarg); 
...

 switch (erts_port_output(BIF_P, flags, prt, prt->common.id, BIF_ARG_2, &ref)) {
    case ERTS_PORT_OP_CALLER_EXIT:
    case ERTS_PORT_OP_BADARG:
    case ERTS_PORT_OP_DROPPED:            
        ERTS_BIF_PREP_RET(res, am_badarg);
        break;
    case ERTS_PORT_OP_BUSY:
        ASSERT(!(flags & ERTS_PORT_SIG_FLG_FORCE));
        if (flags & ERTS_PORT_SIG_FLG_NOSUSPEND)
            ERTS_BIF_PREP_RET(res, am_false);
        else {
            erts_suspend(BIF_P, ERTS_PROC_LOCK_MAIN, prt);
            ERTS_BIF_PREP_YIELD3(res, bif_export[BIF_erts_internal_port_command_3],
                                 BIF_P, BIF_ARG_1, BIF_ARG_2, BIF_ARG_3);
        }
        break;
    case ERTS_PORT_OP_BUSY_SCHEDULED:
        ASSERT(!(flags & ERTS_PORT_SIG_FLG_FORCE));
        /* Fall through... */
    case ERTS_PORT_OP_SCHEDULED:
        ASSERT(is_internal_ref(ref));
        ERTS_BIF_PREP_RET(res, ref);
        break;
}

调用流程是:erlang:port_command->erts_internal:port_command->erts_internal_port_command_3
从erts_internal_port_command_3代码可以看出来,当我们调用发送数据的时候,遇到ERTS_PORT_OP_BUSY时候,系统会被挂起,等被唤醒的时候,发生trap,erlang vm会在合适的时间再重新调用erts_internal_port_command_3完成之前未完成的事情。

温习下:从port_command这层来讲,parallelism执行的时候,如果底层的port出现busy, 调用进程会经历挂起,唤醒,重新调用这套流程。
现在的问题是什么时候会出现ERTS_PORT_OP_BUSY?

从前面的分析,我们知道port_command->erts_port_output, 那么我们来看下什么情况下会返回ERTS_PORT_OP_BUSY:

ErtsPortOpResult
erts_port_output(Process *c_p,
                 int flags,
                 Port *prt,
                 Eterm from,
                 Eterm list,
                 Eterm *refp)
{
...
    /*                                                                                                                    
     * Assumes caller have checked that port is valid...                                                                  
     */

    sched_flags = erts_smp_atomic32_read_nob(&prt->sched.flags);
    if (sched_flags & (busy_flgs|ERTS_PTS_FLG_EXIT))
        return ((sched_flags & ERTS_PTS_FLG_EXIT)
                ? ERTS_PORT_OP_DROPPED
                : ERTS_PORT_OP_BUSY);
...
}
void
set_busy_port(ErlDrvPort dprt, int on)
{
...
    if (on) {
        flags = erts_smp_atomic32_read_bor_acqb(&prt->sched.flags,
                                                ERTS_PTS_FLG_BUSY_PORT);
        if (flags & ERTS_PTS_FLG_BUSY_PORT)
            return; /* Already busy */
...
}

代码很清楚的说明了,一旦到达高水位线调用set_busy_port就会导致后续的erts_port_output调用直接返回ERTS_PORT_OP_BUSY, 简单干脆。

再喝口水,接着分析, 现在到了代码验证的阶段了。
重新温习下前面我们分析过的:
在parallelism模式下,当port在忙着做call_driver_outputv的时候,其他进程就不等了,直接把消息加引用计数保存到一个地方去,然后请求port调度器稍后调度执行这个消息,它就立即返回了。

ErtsPortOpResult
erts_port_output(Process *c_p,
                 int flags,
                 Port *prt,
                 Eterm from,
                 Eterm list,
                 Eterm *refp)
{
...
if (drv->outputv) {
...


}
...
        if (!try_call) {
            int i;
            /* Need to increase refc on all binaries */
            for (i = 1; i < evp->vsize; i++)
                if (bvp[i])
                    driver_binary_inc_refc(bvp[i]);
        }
        else {
            int i;
            ErlIOVec *new_evp;
            ErtsTryImmDrvCallResult try_call_res;
            ErtsTryImmDrvCallState try_call_state
                = ERTS_INIT_TRY_IMM_DRV_CALL_STATE(
                    c_p,
                    prt,
                    ERTS_PORT_SFLGS_INVALID_LOOKUP,
                    invalid_flags,
                    !refp,
                    am_command);


            try_call_state.pre_chk_sched_flags = 0; /* already checked */
            if (force_immediate_call)
                try_call_res = force_imm_drv_call(&try_call_state);
            else
                try_call_res = try_imm_drv_call(&try_call_state);
            switch (try_call_res) {
            case ERTS_TRY_IMM_DRV_CALL_OK:
                call_driver_outputv(flags & ERTS_PORT_SIG_FLG_BANG_OP,
                                    c_p ? c_p->common.id : ERTS_INVALID_PID,
                                    from,
                                    prt,
                                    drv,
                                    evp);
                if (force_immediate_call)
                    finalize_force_imm_drv_call(&try_call_state);
                else
                    finalize_imm_drv_call(&try_call_state);
     /* Fall through... */
            case ERTS_TRY_IMM_DRV_CALL_INVALID_PORT:
.....
            case ERTS_TRY_IMM_DRV_CALL_INVALID_SCHED_FLAGS:
                sched_flags = try_call_state.sched_flags;
            case ERTS_TRY_IMM_DRV_CALL_BUSY_LOCK:
                /* Schedule outputv() call instead... */
                break;
            }

        sigdp = erts_port_task_alloc_p2p_sig_data();
        sigdp->flags = ERTS_P2P_SIG_TYPE_OUTPUTV;
        sigdp->u.outputv.from = from;
        sigdp->u.outputv.evp = evp;
        sigdp->u.outputv.cbinp = cbin;
        port_sig_callback = port_sig_outputv;
    }
    res = erts_schedule_proc2port_signal(c_p,
                                         prt,
                                         c_p ? c_p->common.id : ERTS_INVALID_PID,
                                         refp,
                                         sigdp,
                                         task_flags,
                                         port_sig_callback);

    if (res != ERTS_PORT_OP_SCHEDULED) {
        if (drv->outputv)
            cleanup_scheduled_outputv(evp, cbin);
        else
            cleanup_scheduled_output(buf);
        return res;
    }
}
static int
port_sig_outputv(Port *prt, erts_aint32_t state, int op, ErtsProc2PortSigData *sigdp)
{
    Eterm reply;

    switch (op) {
       case ERTS_PROC2PORT_SIG_EXEC:
        /* Execution of a scheduled outputv() call */

        ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt));

        if (state & ERTS_PORT_SFLGS_INVALID_LOOKUP)
            reply = am_badarg;
        else {
            call_driver_outputv(sigdp->flags & ERTS_P2P_SIG_DATA_FLG_BANG_OP,
                                sigdp->caller,
                                sigdp->u.outputv.from,
                                prt,
                                prt->drv_ptr,
                                sigdp->u.outputv.evp);
            reply = am_true;
        }
        break;
...
}

ErtsPortOpResult
erts_schedule_proc2port_signal(Process *c_p,
                               Port *prt,
                               Eterm caller,
                               Eterm *refp,
                               ErtsProc2PortSigData *sigdp,
                               int task_flags,
                               ErtsProc2PortSigCallback callback)
{
...
 /* Schedule port close call for later execution... */
    sched_res = erts_port_task_schedule(prt->common.id,
                                        NULL,
                                        ERTS_PORT_TASK_PROC_SIG,
                                        sigdp,
                                        callback,
                                        task_flags);

...
}
/*                                                                                                                        
 * Schedule a task.                                                                                                       
 */
int
erts_port_task_schedule(Eterm id,
                        ErtsPortTaskHandle *pthp,
                        ErtsPortTaskType type,
                        ...)
{
...
    if (!enqueue_task(pp, ptp, sigdp, ns_pthlp, &act)) {
        reset_handle(ptp);
        if (ns_pthlp && !(act & ERTS_PTS_FLG_EXIT))
            goto abort_nosuspend;
        else
            goto fail;
    }
...
}

static ERTS_INLINE int
enqueue_task(Port *pp,
             ErtsPortTask *ptp,
             ErtsProc2PortSigData *sigdp,
             ErtsPortTaskHandleList *ns_pthlp,
             erts_aint32_t *flagsp)

{
...
        pp->sched.taskq.in.last = ptp;
        flags = enqueue_proc2port_data(pp, sigdp, flags);
        res = 1;
...
}

从上面的代码我们知道,当port不忙的时候,就会直接调用call_driver_outputv干活,否则就会把消息延迟放到port调度去。这个环节的调用流程是
erts_port_output->erts_schedule_proc2port_signal->erts_port_task_schedule->enqueue_task->enqueue_proc2port_data.

enqueue_proc2port_data这个函数顾名思义就是把进程发送到port的数据保存到port_task的某个队列去。那么在这个地方做水位线检查是最合适的。
好吧,我们的msg_watermark相关的东西要出场了。

/* erl_driver.h */
#define ERL_DRV_BUSY_MSGQ_LIM_MAX       (ERL_DRV_BUSY_MSGQ_DISABLED - 1)
#define ERL_DRV_BUSY_MSGQ_LIM_MIN       ((ErlDrvSizeT) 1)

/* inet_drv.c */
#define INET_HIGH_MSGQ_WATERMARK (1024*8) /* 8k pending high => busy  */
#define INET_LOW_MSGQ_WATERMARK  (1024*4) /* 4k pending => allow more */

static ErlDrvData tcp_inet_start(ErlDrvPort port, char* args)
{
...
    q_high = INET_HIGH_MSGQ_WATERMARK;
    q_low = INET_LOW_MSGQ_WATERMARK;
...
    if (q_high < ERL_DRV_BUSY_MSGQ_LIM_MIN)
        q_high = ERL_DRV_BUSY_MSGQ_LIM_MIN;
    else if (q_high > ERL_DRV_BUSY_MSGQ_LIM_MAX)
        q_high = ERL_DRV_BUSY_MSGQ_LIM_MAX;
    erl_drv_busy_msgq_limits(port, &q_low, &q_high);
...
}

/* erl_port_task.c */
/*                                                                                                                        
 * erl_drv_busy_msgq_limits() is called by drivers either reading or                                                      
 * writing the limits.                                                                                                    
 *                                                                                                                        
 * A limit of zero is interpreted as a read only request (using a                                                         
 * limit of zero would not be useful). Other values are interpreted                                                       
 * as a write-read request.                                                                                               
 */
void
erl_drv_busy_msgq_limits(ErlDrvPort dport, ErlDrvSizeT *lowp, ErlDrvSizeT *highp)
{
...
    Port *pp = erts_drvport2port(dport, NULL);
    ErtsPortTaskBusyPortQ *bpq = pp->sched.taskq.bpq;
...
 if (!low)
            low = (ErlDrvSizeT) erts_smp_atomic_read_nob(&bpq->low);
        else {
            if (bpq->high < low)
                bpq->high = low;
            erts_smp_atomic_set_relb(&bpq->low, (erts_aint_t) low);
            written = 1;
        }

        if (!high)
            high = bpq->high;
        else {
            if (low > high) {
                low = high;
                erts_smp_atomic_set_relb(&bpq->low, (erts_aint_t) low);
            }
            bpq->high = high;
            written = 1;
        }

...
}

从源码分析来看:
1. MSGQ高低水位线也是8/4K,最小值是1, 高不封顶。
2. 它影响的是每个port调度器认为队列的bpq->low和bpq->high

有了这个知识,我们就很容易分析enqueue_proc2port_data是如何在数据入队列时限定高低水位线的, 同时一定会有个配套的dequeued_proc2port_data在出数据队列的时候解除busy状态的。

继续代码之旅:

static ERTS_INLINE erts_aint32_t
enqueue_proc2port_data(Port *pp,
                       ErtsProc2PortSigData *sigdp,
                       erts_aint32_t flags)
{
    ErtsPortTaskBusyPortQ *bpq = pp->sched.taskq.bpq;
...
            if (!(flags & ERTS_PTS_FLG_BUSY_PORT_Q) && qsz > bpq->high) {
                flags = erts_smp_atomic32_read_bor_acqb(&pp->sched.flags,
                                                        ERTS_PTS_FLG_BUSY_PORT_Q);
                flags |= ERTS_PTS_FLG_BUSY_PORT_Q;
                qsz = (ErlDrvSizeT) erts_smp_atomic_read_acqb(&bpq->size);
                if (qsz < (ErlDrvSizeT) erts_smp_atomic_read_nob(&bpq->low)) {
                    flags = (erts_smp_atomic32_read_bor_relb(
                                 &pp->sched.flags,
                                 ERTS_PTS_FLG_CHK_UNSET_BUSY_PORT_Q));
                    flags |= ERTS_PTS_FLG_CHK_UNSET_BUSY_PORT_Q;
                }
            }
...
}

static ERTS_INLINE void
dequeued_proc2port_data(Port *pp, ErlDrvSizeT size)
{
    ErtsPortTaskBusyPortQ *bpq;
    erts_aint32_t flags;
    ErlDrvSizeT qsz;

    ASSERT(pp->sched.taskq.bpq);

    if (size == 0)
        return;

    bpq = pp->sched.taskq.bpq;

    qsz = (ErlDrvSizeT) erts_smp_atomic_add_read_acqb(&bpq->size,
                                                      (erts_aint_t) -size);
    ASSERT(qsz + size > qsz);
    flags = erts_smp_atomic32_read_nob(&pp->sched.flags);
    if (!(flags & ERTS_PTS_FLG_BUSY_PORT_Q))
        return;
    if (qsz < (ErlDrvSizeT) erts_smp_atomic_read_acqb(&bpq->low))
        check_unset_busy_port_q(pp, flags, bpq);
}

/*                                                                                                                        
 * Busy port queue management                                                                                             
 */
static erts_aint32_t
check_unset_busy_port_q(Port *pp,
                        erts_aint32_t flags,
                        ErtsPortTaskBusyPortQ *bpq)
{
    ErlDrvSizeT qsize, low;
    int resume_procs = 0;

    ASSERT(bpq);
    ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(pp));

    erts_port_task_sched_lock(&pp->sched);
    qsize = (ErlDrvSizeT) erts_smp_atomic_read_nob(&bpq->size);
    low = (ErlDrvSizeT) erts_smp_atomic_read_nob(&bpq->low);
    if (qsize < low) {
        erts_aint32_t mask = ~(ERTS_PTS_FLG_CHK_UNSET_BUSY_PORT_Q
                               | ERTS_PTS_FLG_BUSY_PORT_Q);
        flags = erts_smp_atomic32_read_band_relb(&pp->sched.flags, mask);
        if ((flags & ERTS_PTS_FLGS_BUSY) == ERTS_PTS_FLG_BUSY_PORT_Q)
            resume_procs = 1;
    }
    else if (flags & ERTS_PTS_FLG_CHK_UNSET_BUSY_PORT_Q) {
        flags = erts_smp_atomic32_read_band_relb(&pp->sched.flags,
                                                 ~ERTS_PTS_FLG_CHK_UNSET_BUSY_PORT_Q);
        flags &= ~ERTS_PTS_FLG_CHK_UNSET_BUSY_PORT_Q;
    }
    erts_port_task_sched_unlock(&pp->sched);
    if (resume_procs)
        erts_port_resume_procs(pp);
    return flags;
}

代码很明白的说:
任务入队列的时候超过水位线会设置ERTS_PTS_FLG_BUSY_PORT_Q状态,出队列的时候发现低于水位线的时候,会调用check_unset_busy_port_q来唤醒被挂起的发送进程。

到现在任务入队列的流程我们基本明白了,但是何时会调用dequeued_proc2port_data出队列呢?
答案是erts_port_task_execute。还记得前面分析的时候说:这些任务会入队列,port调度器会在适当的时候执行这些任务。
继续看代码:

int
erts_port_task_execute(ErtsRunQueue *runq, Port **curr_port_pp)
{
...

    pp = pop_port(runq);
    if (!pp) {
        res = 0;
        goto done;
    }
...
while (1) {
        erts_aint32_t task_state;
        ErtsPortTask *ptp;

        ptp = select_task_for_exec(pp, &execq, &processing_busy_q);
        if (!ptp)
            break;
      ...
       switch (ptp->type) {
        case ERTS_PORT_TASK_PROC_SIG: {
            ErtsProc2PortSigData *sigdp = &ptp->u.alive.td.psig.data;
            ASSERT((state & ERTS_PORT_SFLGS_DEAD) == 0);
            if (!pp->sched.taskq.bpq)
                reds += ptp->u.alive.td.psig.callback(pp,
                                                      state,
                                                      ERTS_PROC2PORT_SIG_EXEC,
                                                      sigdp);
            else {
                ErlDrvSizeT size = erts_proc2port_sig_command_data_size(sigdp);
                reds += ptp->u.alive.td.psig.callback(pp,
                                                      state,
                                                      ERTS_PROC2PORT_SIG_EXEC,
                                                      sigdp);
                dequeued_proc2port_data(pp, size);
            }
            break;
        }
...
}

任务调度器发现发现是ERTS_PORT_TASK_PROC_SIG类型的任务,就会调用该任务的callback, 然后把该任务出队列。 还记得那个任务callback叫什么吗? 好记性,就是port_sig_outputv。 他会真正执行把数据拖到网络的具体事情。

山路十八弯,好不容易才把整个流程给串起来,是不是很复杂!确实很复杂, 前段时间感冒的时候,躺在床上想了好几天,才把事情的来龙去脉差不多搞清楚,做高性能服务器的不容易呀,来点掌声,为我们自己鼓励下!

总结:这个水位线官方文档写的不清不楚,还是源码靠谱!watermark 和 msgq_watermark的本质差别就是一个是控制port消息队列的水位线,一个控制进程在给并行port发送数据时,条件暂时无法满足时候,数据暂缓到port调度器时候,port调度器队列的水位线,也可以理解为inflight的数据的水位线。

祝玩的开心!

Post Footer automatically generated by wp-posturl plugin for wordpress.

  1. May 17th, 2013 at 14:26 | #1

    最近在写一个替换现有Erlang分布式通信carrier的port driver,看到这种分析感到非常爽

    Yu Feng Reply:

    多谢支持!自己做carrier, 安全方面考虑吗?

  2. piboyeliu
    June 22nd, 2013 at 11:19 | #2

    我以前做的一个长连接comet系统, 发现个奇怪的问题。 就是如果第一次就回, Http 200 ok 的话, 系统的内存就会多个4k. 如果等到有消息在发头, 就不会。
    一直不明白是怎么回事, 一开始以为是系统发送缓存区导致的, 后来发现发送缓存区是会被回收的。

    Yu Feng Reply:

    erlang有发送缓冲区吗?

    piboyeliu Reply:

    我不是说的erlang. 我表述有问题了。
    我遇到的问题, 是linux 系统 为每个socket 的发送缓冲区设置了4K, 而且如果发送过一次数据后
    就不能被回收。

    我在8G的机器上测试。
    75W的连接, erlang 程序只用了2.3G内存, 但 free -m 看到 整个系统用了7G内存。
    剩余800多M.
    如果压倒100W, 整个系统就崩溃了。 erlang 本来要3.5G的, 系统只给了1.5G,
    导致 disck swap. free -m 发现系统只有几M 内存可用了。

    但如果我server 在接到连接的时候, 不回包的话, 100W连接, free -m 看只用了 5G多的内存
    还有2G剩余。

    这两个案例, 唯一的差别, 就是server 有没有调用 send 回包。
    因为我的server 是消息推送, 所以回包的斌率是很低的, 低于1W/s .

    我可以肯定的是, 发送缓存区是被操作系统循环利用的。 但发送缓冲区似乎就是没有。

    piboyeliu Reply:

    写错了, 是内核的tcp 接收缓存区是会被循环利用的, 但发送缓存区好像不会。

  3. piboyeliu
    July 13th, 2013 at 13:52 | #3

    找到原因来, 内核代码中, net/ipv4/tcp.c 的 tcp_send_msg 函数可以找原因,TCP_PAGE 宏,就用的 sk->sk_sndmsg_page 成员, sk_sndmsg_page 就是发送 skbuf 需要发送数据的cache 页, sk_sndmsg_page 为了引用这个页, 调用了get_page, 导致后面 skbuf回收的时候不会释放这个页。 可以考虑修改这个代码来节约内存。

Comments are closed.