Home > Erlang探索 > gen_tcp发送进程被挂起起因分析及对策

gen_tcp发送进程被挂起起因分析及对策

原创文章,转载请注明: 转载自系统技术非业余研究

本文链接地址: gen_tcp发送进程被挂起起因分析及对策

最近有同学在gmail上问关于gen_tcp发送进程被挂起的问题,问题描述的非常好,见底下:

第一个问题是关于port_command和gen_tcp:send的。从项目上线至今,我在tcp发送的地方遇到过两次问题,都跟port_command有关系。

起初程序的性能不好,我从各方面尝试分析和优化,还有部分是靠猜测,当初把全服广播消息的地方,换成了port_command,当时参考了hotwheels的代码和您的一遍相关博文。

根据您的分析,port_command应该比直接用gen_tcp:send高效的,并且没有阻塞。但是我却在这个地方遇到了阻塞,具体表现如下(两次,分别出现在项目不同阶段,下面分别描述)

项目上线初期:

当时玩家进程给玩家发消息用的是gen_tcp:send,广播进程为了高效率用了port_command。当活跃玩家到了一定数量以后,玩家无法进入游戏,分析原因,是全局发送广播消息的进程堵住了,从message_queue_len可以看出来,改为广播进程给玩家进程发消息再让玩家进程给玩家自己发消息后,状况排除。

最近一段时间:

这时候玩家进程的tcp发送数据,已经被我替换成了port_command并运行了一段时间都没问题。但是一些流量比较大的游戏服,活跃玩家到了一定数量以后,消息延迟很大(5-6秒),做任何操作都卡,在出现状况期间,服务器CPU、内存、负载各项指标并未异常,ssh连到服务器操作也很正常,没有任何卡顿现象。同服务器的其它游戏服也都正常,但是出问题的游戏服的整个erlang节点都进入一个“很卡”的状态,体现在我进入erlang shell中进行操作时,输入文字延迟很大。

起初我没怀疑过port_command有问题,所以我到处找原因和“优化”代码,这个优化是加了引号的。

但是最后,在一次服务器同样出现状况很卡的时候,我把tcp发送数据的代码改回了gen_tcp:send,并热更新了相关模块,服务器立即恢复正常。

我一直对上面的情况百思不得其解,我之前写的代码如下:

tcp_send (Socket, Bin) ->
try erlang:port_command(Socket, Bin, [force, nosuspend]) of
false ->
exit({game_tcp_send_error, busy});
true ->
true
catch
error : Error ->
exit({game_tcp_send_error, {error, einval, Error}})
end.

希望您能帮忙分析下是什么原因导致整个erlang节点都卡的,我想这对其他的erlang程序员也会有帮助!

关于这个问题我之前写了篇文章,系统的介绍了gen_tcp的行为,gen_tcp:send的深度解刨和使用指南(初稿)见 这里

gen_tcp.erl:L235
send(S, Packet) when is_port(S) ->
case inet_db:lookup_socket(S) of
{ok, Mod} ->
Mod:send(S, Packet);
Error ->
Error
end.

我们就这个问题再深入的分析下,首先看gen_tcp:send的代码:

%% inet_tcp.erl:L50
%%                                                                                                                          
%% Send data on a socket                                                                                                    
%%                                                                                                                          
send(Socket, Packet, Opts) -> prim_inet:send(Socket, Packet, Opts).
send(Socket, Packet) -> prim_inet:send(Socket, Packet, []).

%%prim_inet.erl:L349
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%                                              
%%                                                                                                                          
%% SEND(insock(), Data) -> ok | {error, Reason}                                                                              
%%                                                                                                                          
%% send Data on the socket (io-list)                                                                                        
%%                                                                                                                          
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%                                              
%% This is a generic "port_command" interface used by TCP, UDP, SCTP, depending                                             
%% on the driver it is mapped to, and the "Data". It actually sends out data,--                                             
%% NOT delegating this task to any back-end.  For SCTP, this function MUST NOT                                              
%% be called directly -- use "sendmsg" instead:                                                                             
%%                                                                                                                          
send(S, Data, OptList) when is_port(S), is_list(OptList) ->
    ?DBG_FORMAT("prim_inet:send(~p, ~p)~n", [S,Data]),
    try erlang:port_command(S, Data, OptList) of
        false -> % Port busy and nosuspend option passed                                                                    
            ?DBG_FORMAT("prim_inet:send() -> {error,busy}~n", []),
            {error,busy};
        true ->
            receive
                {inet_reply,S,Status} ->
                    ?DBG_FORMAT("prim_inet:send() -> ~p~n", [Status]),
                    Status
            end
    catch
        error:_Error ->
            ?DBG_FORMAT("prim_inet:send() -> {error,einval}~n", []),
             {error,einval}
    end.

我们可以看到gen_tcp:send分为二个步骤 1. port_command提交数据 2. 等待{inet_reply,S,Status}回应。这是一个典型的阻塞操作,在等待的时候,进程被调出。
所以如果系统中有大量的tcp链接要发送数据,这种方式有点低效。 所以很多系统把这个动作改成集中提交数据,集中等待回应。

典型的例子见rabbitmq:

%%rabbit_writer.erl
...
handle_message({inet_reply, _, ok}, State) ->
    State;
handle_message({inet_reply, _, Status}, _State) ->
    exit({writer, send_failed, Status});
handle_message(shutdown, _State) ->
    exit(normal);
...
internal_send_command_async(Sock, Channel, MethodRecord, Content, FrameMax) ->
    true = port_cmd(Sock, assemble_frames(Channel, MethodRecord,
                                              Content, FrameMax)),
    ok.

port_cmd(Sock, Data) ->
    try rabbit_net:port_command(Sock, Data)
    catch error:Error -> exit({writer, send_failed, Error})
    end.

它的做法是用一个进程集中来发送数据,集中接收回应。在正常情况下,这种处理会大大提高进程切换的开销,减少等待时间。但是也会带来问题,我们看到port_command这个操作如果出现意外,被阻塞了,那么这个系统的消息发送会被卡死。而之前由每个处理进程去gen_tcp:send只会阻塞个别进程。

我们仔细看下port_command的文档

port_command(Port, Data, OptionList) -> true|false

Types:

Port = port() | atom()
Data = iodata()
OptionList = [Option]
Option = force
Option = nosuspend
Sends data to a port. port_command(Port, Data, []) equals port_command(Port, Data).

If the port command is aborted false is returned; otherwise, true is returned.

If the port is busy, the calling process will be suspended until the port is not busy anymore.

Currently the following Options are valid:

force
The calling process will not be suspended if the port is busy; instead, the port command is forced through. The call will fail with a notsup exception if the driver of the port does not support this. For more information see the ERL_DRV_FLAG_SOFT_BUSY driver flag.
nosuspend
The calling process will not be suspended if the port is busy; instead, the port command is aborted and false is returned.
Note
More options may be added in the future.

Failures:

badarg
If Port is not an open port or the registered name of an open port.
badarg
If Data is not a valid io list.
badarg
If OptionList is not a valid option list.
notsup
If the force option has been passed, but the driver of the port does not allow forcing through a busy port.

调用port_command是可能引起经常被suspend的,什么条件呢? 出于性能的考虑, inet会在gen_tcp驱动port中起用一个发送缓存区,当我们的数据超过了缓冲区的高水位线默认情况就会被挂起。

那什么是发送缓冲区高低水位线呢?我们看代码:

%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%                                              
%%                                                                                                                          
%% SETOPT(insock(), Opt, Value) -> ok | {error, Reason}                                                                     
%% SETOPTS(insock(), [{Opt,Value}]) -> ok | {error, Reason}                                                                 
%%                                                                                                                          
%% set socket, ip and driver option                                                                                         
%%                                                                                                                          
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%                                              

setopt(S, Opt, Value) when is_port(S) -> 
    setopts(S, [{Opt,Value}]).

setopts(S, Opts) when is_port(S) ->
    case encode_opt_val(Opts) of
        {ok, Buf} ->
            case ctl_cmd(S, ?INET_REQ_SETOPTS, Buf) of
                {ok, _} -> ok;
                Error -> Error
            end;
        Error  -> Error
    end.

%% Encoding for setopts                                                                                                     
%%                                                                                                                          
%% encode opt/val REVERSED since options are stored in reverse order                                                        
%% i.e. the recent options first (we must process old -> new)                                                               
encode_opt_val(Opts) -> 
    try
        enc_opt_val(Opts, [])
    catch
        Reason -> {error,Reason}
    end.
...
enc_opt_val(Opts, Acc, Opt, Val) when is_atom(Opt) ->
    Type = type_opt(set, Opt),
    case type_value(set, Type, Val) of
        true -> 
            enc_opt_val(Opts, [enc_opt(Opt),enc_value(set, Type, Val)|Acc]);
        false -> {error,einval}
    end;
...
enc_opt(high_watermark)  -> ?INET_LOPT_TCP_HIWTRMRK;
enc_opt(low_watermark)   -> ?INET_LOPT_TCP_LOWTRMRK;
#define INET_HIGH_WATERMARK (1024*8) /* 8k pending high => busy  */
#define INET_LOW_WATERMARK  (1024*4) /* 4k pending => allow more */

typedef struct {
    inet_descriptor inet;       /* common data structure (DON'T MOVE) */
    int   high;                 /* high watermark */
    int   low;                  /* low watermark */
    int   send_timeout;         /* timeout to use in send */
    int   send_timeout_close;   /* auto-close socket on send_timeout */
    int   busy_on_send;         /* busy on send with timeout! */
    int   i_bufsz;              /* current input buffer size (<= bufsz) */
    ErlDrvBinary* i_buf;        /* current binary buffer */
    char*         i_ptr;        /* current pos in buf */
    char*         i_ptr_start;  /* packet start pos in buf */
    int           i_remain;     /* remaining chars to read */
    int           tcp_add_flags;/* Additional TCP descriptor flags */
    int           http_state;   /* 0 = response|request  1=headers fields */
    inet_async_multi_op *multi_first;/* NULL == no multi-accept-queue, op is in ordinary queue */
    inet_async_multi_op *multi_last;
    MultiTimerData *mtd;        /* Timer structures for multiple accept */
} tcp_descriptor;

static int inet_set_opts(inet_descriptor* desc, char* ptr, int len)
{
...
        case INET_LOPT_TCP_HIWTRMRK:
            if (desc->stype == SOCK_STREAM) {
                tcp_descriptor* tdesc = (tcp_descriptor*) desc;
		if (ival < 0) ival = 0;
                if (tdesc->low > ival)
                    tdesc->low = ival;
                tdesc->high = ival;
            }
            continue;

        case INET_LOPT_TCP_LOWTRMRK:
            if (desc->stype == SOCK_STREAM) {
                tcp_descriptor* tdesc = (tcp_descriptor*) desc;
                if (ival < 0) ival = 0;
                if (tdesc->high < ival)
                    tdesc->high = ival;
                tdesc->low = ival;
            }
            continue;
...
}

gen_tcp的默认高低水位线分别为8K/4K, 如何微调参见 节点间通讯的通道微调
我们来验证下水位线的存在:

$ erl
Erlang R14B04 (erts-5.8.5)  [smp:2:2] [rq:2] [async-threads:0] [hipe] [kernel-poll:false]

Eshell V5.8.5  (abort with ^G)
1> {ok,Sock} = gen_tcp:connect("baidu.com", 80, [{active,false}]).
{ok,#Port<0.595>}
2> inet:getopts(Sock,[high_watermark, low_watermark]).
{ok,[{high_watermark,8192},{low_watermark,4096}]}
3> inet:setopts(Sock,[{high_watermark,131072},{low_watermark, 65536}]).
ok
4> inet:getopts(Sock,[high_watermark, low_watermark]).                 
{ok,[{high_watermark,131072},{low_watermark,65536}]}

我们成功的把水位先提高到了128K/64K,同时也验证了它的存在。那么如果数据超出水位线会发生什么事情呢?
我们继续看文档和代码:

//inet_drv.c:L845
static struct erl_drv_entry tcp_inet_driver_entry =
{
    tcp_inet_init,  /* inet_init will add this driver !! */
    tcp_inet_start,
    tcp_inet_stop,
    tcp_inet_command,
#ifdef __WIN32__
    tcp_inet_event,
    NULL,
#else
    tcp_inet_drv_input,
    tcp_inet_drv_output,
#endif
    "tcp_inet",
    NULL,
    NULL,
    tcp_inet_ctl,
    tcp_inet_timeout,
    tcp_inet_commandv,
    NULL,
    tcp_inet_flush,
    NULL,
    NULL,
    ERL_DRV_EXTENDED_MARKER,
    ERL_DRV_EXTENDED_MAJOR_VERSION,
    ERL_DRV_EXTENDED_MINOR_VERSION,
    ERL_DRV_FLAG_USE_PORT_LOCKING|ERL_DRV_FLAG_SOFT_BUSY,
    NULL,
    tcp_inet_process_exit,
    inet_stop_select
};

我们的tcp驱动是支持ERL_DRV_FLAG_SOFT_BUSY的,那么什么是ERL_DRV_FLAG_SOFT_BUSY呢?

文档在这里

int driver_flags
This field is used to pass driver capability information to the runtime system. If the extended_marker field equals ERL_DRV_EXTENDED_MARKER, it should contain 0 or driver flags (ERL_DRV_FLAG_*) ored bitwise. Currently the following driver flags exist:

ERL_DRV_FLAG_USE_PORT_LOCKING
The runtime system will use port level locking on all ports executing this driver instead of driver level locking when the driver is run in a runtime system with SMP support. For more information see the erl_driver documentation.
ERL_DRV_FLAG_SOFT_BUSY
Marks that driver instances can handle being called in the output and/or outputv callbacks even though a driver instance has marked itself as busy (see set_busy_port()). Since erts version 5.7.4 this flag is required for drivers used by the Erlang distribution (the behaviour has always been required by drivers used by the distribution).

那么是port_command如何运作的呢?

BIF_RETTYPE port_command_2(BIF_ALIST_2)
{
    return do_port_command(BIF_P, BIF_ARG_1, BIF_ARG_2, NIL, 0);
}
//erl_bif_port.c:L120:
static BIF_RETTYPE do_port_command(Process *BIF_P,
                                   Eterm BIF_ARG_1,
                                   Eterm BIF_ARG_2,
                                   Eterm BIF_ARG_3,
                                   Uint32 flags)
{
...
    if ((flags & ERTS_PORT_COMMAND_FLAG_FORCE)
        && !(p->drv_ptr->flags & ERL_DRV_FLAG_SOFT_BUSY)) {
        ERTS_BIF_PREP_ERROR(res, BIF_P, EXC_NOTSUP);
    }
    else if (!(flags & ERTS_PORT_COMMAND_FLAG_FORCE)
             && p->status & ERTS_PORT_SFLG_PORT_BUSY) {
        if (flags & ERTS_PORT_COMMAND_FLAG_NOSUSPEND) {
            ERTS_BIF_PREP_RET(res, am_false);
        }
        else {
            erts_suspend(BIF_P, ERTS_PROC_LOCK_MAIN, p);
            if (erts_system_monitor_flags.busy_port) {
                monitor_generic(BIF_P, am_busy_port, p->id);
            }
            ERTS_BIF_PREP_YIELD3(res, bif_export[BIF_port_command_3], BIF_P,
                                 BIF_ARG_1, BIF_ARG_2, BIF_ARG_3);
        }
    } else {
        int wres;
        erts_smp_proc_unlock(BIF_P, ERTS_PROC_LOCK_MAIN);
        ERTS_SMP_CHK_NO_PROC_LOCKS;
        wres = erts_write_to_port(BIF_P->id, p, BIF_ARG_2);
        erts_smp_proc_lock(BIF_P, ERTS_PROC_LOCK_MAIN);
        if (wres != 0) {
            ERTS_BIF_PREP_ERROR(res, BIF_P, BADARG);
        }
    }

...
}

从代码我们可以看出:
1. 如果port_command设置了force标志,但是驱动不支持ERL_DRV_FLAG_SOFT_BUSY, 要返回EXC_NOTSUP错误。
我们的驱动支持ERL_DRV_FLAG_SOFT_BUSY的,所以如果force的话,数据入缓冲区;

2. 如果设置了NOSUSPEND,但是port已经busy了,返回false,表明发送失败。否则的话就把发送进程suspend,同时告诉system_monitor系统现在有port进入busy_port了。

透过system_monitor我们可以监控port的busy,参看:这里

erlang:system_monitor(MonitorPid, [Option]) -> MonSettings

Types:

MonitorPid = pid()
Option = {long_gc, Time} | {large_heap, Size} | busy_port | busy_dist_port
Time = Size = int()
MonSettings = {OldMonitorPid, [Option]}
OldMonitorPid = pid()
Sets system performance monitoring options. MonitorPid is a local pid that will receive system monitor messages, and the second argument is a list of monitoring options:

{long_gc, Time}
If a garbage collection in the system takes at least Time wallclock milliseconds, a message {monitor, GcPid, long_gc, Info} is sent to MonitorPid. GcPid is the pid that was garbage collected and Info is a list of two-element tuples describing the result of the garbage collection. One of the tuples is {timeout, GcTime} where GcTime is the actual time for the garbage collection in milliseconds. The other tuples are tagged with heap_size, heap_block_size, stack_size, mbuf_size, old_heap_size, and old_heap_block_size. These tuples are explained in the documentation of the gc_start trace message (see erlang:trace/3). New tuples may be added, and the order of the tuples in the Info list may be changed at any time without prior notice.

{large_heap, Size}
If a garbage collection in the system results in the allocated size of a heap being at least Size words, a message {monitor, GcPid, large_heap, Info} is sent to MonitorPid. GcPid and Info are the same as for long_gc above, except that the tuple tagged with timeout is not present. Note: As of erts version 5.6 the monitor message is sent if the sum of the sizes of all memory blocks allocated for all heap generations is equal to or larger than Size. Previously the monitor message was sent if the memory block allocated for the youngest generation was equal to or larger than Size.

busy_port
If a process in the system gets suspended because it sends to a busy port, a message {monitor, SusPid, busy_port, Port} is sent to MonitorPid. SusPid is the pid that got suspended when sending to Port.

busy_dist_port
If a process in the system gets suspended because it sends to a process on a remote node whose inter-node communication was handled by a busy port, a message {monitor, SusPid, busy_dist_port, Port} is sent to MonitorPid. SusPid is the pid that got suspended when sending through the inter-node communication port Port.

Returns the previous system monitor settings just like erlang:system_monitor/0.

Note
If a monitoring process gets so large that it itself starts to cause system monitor messages when garbage collecting, the messages will enlarge the process’s message queue and probably make the problem worse.

Keep the monitoring process neat and do not set the system monitor limits too tight.

Failure: badarg if MonitorPid does not exist.

那么发生的数据多于高水位线的时候要设置busy_port如何实现的呢?
看代码:

static void tcp_inet_commandv(ErlDrvData e, ErlIOVec* ev)
{
    tcp_descriptor* desc = (tcp_descriptor*)e;
    desc->inet.caller = driver_caller(desc->inet.port);

    DEBUGF(("tcp_inet_commanv(%ld) {s=%d\r\n",
            (long)desc->inet.port, desc->inet.s));
    if (!IS_CONNECTED(INETP(desc))) {
        if (desc->tcp_add_flags & TCP_ADDF_DELAYED_CLOSE_SEND) {
            desc->tcp_add_flags &= ~TCP_ADDF_DELAYED_CLOSE_SEND;
            inet_reply_error_am(INETP(desc), am_closed);
	}
        else
            inet_reply_error(INETP(desc), ENOTCONN);
    }
    else if (tcp_sendv(desc, ev) == 0)
        inet_reply_ok(INETP(desc));
    DEBUGF(("tcp_inet_commandv(%ld) }\r\n", (long)desc->inet.port));
}

/*                                                                                                                          
** Send non-blocking vector data                                                                                            
*/
static int tcp_sendv(tcp_descriptor* desc, ErlIOVec* ev)
{
...
if ((sz = driver_sizeq(ix)) > 0) {
        driver_enqv(ix, ev, 0);
        if (sz+ev->size >= desc->high) {
            DEBUGF(("tcp_sendv(%ld): s=%d, sender forced busy\r\n",
                    (long)desc->inet.port, desc->inet.s));
            desc->inet.state |= INET_F_BUSY;  /* mark for low-watermark */
            desc->inet.busy_caller = desc->inet.caller;
            set_busy_port(desc->inet.port, 1);
            if (desc->send_timeout != INET_INFINITY) {
                desc->busy_on_send = 1;
                driver_set_timer(desc->inet.port, desc->send_timeout);
            }
            return 1;
        }
    }

...
}

// beam/io.c:L2352
void
set_busy_port(ErlDrvPort port_num, int on)
{
    ERTS_SMP_CHK_NO_PROC_LOCKS;

    ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(&erts_port[port_num]));

    if (on) {
        erts_port_status_bor_set(&erts_port[port_num],
                                 ERTS_PORT_SFLG_PORT_BUSY);
    } else {
        ErtsProcList* plp = erts_port[port_num].suspended;
        erts_port_status_band_set(&erts_port[port_num],
                                  ~ERTS_PORT_SFLG_PORT_BUSY);
        erts_port[port_num].suspended = NULL;

        if (erts_port[port_num].dist_entry) {
            /*                                                                                                              
             * Processes suspended on distribution ports are                                                                
             * normally queued on the dist entry.                                                                           
             */
            erts_dist_port_not_busy(&erts_port[port_num]);
        }
        /*                                                                                                                  
         * Resume, in a round-robin fashion, all processes waiting on the port.                                             
         *                                                                                                                  
         * This version submitted by Tony Rogvall. The earlier version used                                                 
         * to resume the processes in order, which caused starvation of all but                                             
         * the first process.                                                                                               
         */

        if (plp) {
            /* First proc should be resumed last */
            if (plp->next) {
                erts_resume_processes(plp->next);
                plp->next = NULL;
            }
            erts_resume_processes(plp);
        }
    }
}

也就是说一旦超过了,只是设置下busy_port标志,但是本次进程并没有被挂起,下次发送者才会被挂起。同时会开启send_timeout定时器,如果数据在send_timeout时间内未发送出去就会出现timeout错误。

那么挂起的进程如何被解除挂起,继续执行呢,看代码:

/* socket ready for ouput:                                                                                                  
** 1. TCP_STATE_CONNECTING => non block connect ?                                                                           
** 2. TCP_STATE_CONNECTED  => write output                                                                                  
*/
static int tcp_inet_output(tcp_descriptor* desc, HANDLE event)
{
...
 if (driver_deq(ix, n) <= desc->low) {
                if (IS_BUSY(INETP(desc))) {
                    desc->inet.caller = desc->inet.busy_caller;
                    desc->inet.state &= ~INET_F_BUSY;
                    set_busy_port(desc->inet.port, 0);
                    /* if we have a timer then cancel and send ok to client */
                    if (desc->busy_on_send) {
                        driver_cancel_timer(desc->inet.port);
                        desc->busy_on_send = 0;
                    }
                    inet_reply_ok(INETP(desc));
                }
            }

...
}

//erl_process.c:5038
/*                                                                                                                          
** Suspend a process                                                                                                        
** If we are to suspend on a port the busy_port is the thing                                                                
** otherwise busy_port is NIL                                                                                               
*/

void
erts_suspend(Process* process, ErtsProcLocks process_locks, Port *busy_port)
{
    ErtsRunQueue *rq;

    ERTS_SMP_LC_ASSERT(process_locks == erts_proc_lc_my_proc_locks(process));
    if (!(process_locks & ERTS_PROC_LOCK_STATUS))
        erts_smp_proc_lock(process, ERTS_PROC_LOCK_STATUS);

    rq = erts_get_runq_proc(process);

    erts_smp_runq_lock(rq);

    suspend_process(rq, process);

    erts_smp_runq_unlock(rq);

    if (busy_port)
	erts_wake_process_later(busy_port, process);

    if (!(process_locks & ERTS_PROC_LOCK_STATUS))
	erts_smp_proc_unlock(process, ERTS_PROC_LOCK_STATUS);

}


// io.c:474:
void
erts_wake_process_later(Port *prt, Process *process)
{
    ErtsProcList** p;
    ErtsProcList* new_p;

    ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt));

    if (prt->status & ERTS_PORT_SFLGS_DEAD)
        return;

    for (p = &(prt->suspended); *p != NULL; p = &((*p)->next))
	/* Empty loop body */;

    new_p = erts_proclist_create(process);
    new_p->next = NULL;
    *p = new_p;
}

我们可以看到随着数据被发送出去,缓冲区里面的数据如果少于低水位线,那么就解除busy_port标志,同时唤醒所有被挂起在这个port的进程,继续执行。

还有一种情况port会被挂起,那就是port也是公平调度的,预防过快的IO把其他的port饿死了.

port是和进程一样公平调度的.  进程是按照reductions为单位调度的, port是把发送的字节数折合成reductions.  所以如果一个进程发送大量的tcp数据 那么这个进程不是一直会得到执行的. 运行期会强制停止一段时间, 让其他port有机会执行的.

我们看下代码:

//erl_port_task.c:L45
/*                                                                                                                          
 * Costs in reductions for some port operations.                                                                            
 */
#define ERTS_PORT_REDS_EXECUTE          0
#define ERTS_PORT_REDS_FREE             50
#define ERTS_PORT_REDS_TIMEOUT          200
#define ERTS_PORT_REDS_INPUT            200
#define ERTS_PORT_REDS_OUTPUT           200
#define ERTS_PORT_REDS_EVENT            200
#define ERTS_PORT_REDS_TERMINATE        100

/*                                                                                                                          
 * Run all scheduled tasks for the first port in run queue. If                                                              
 * new tasks appear while running reschedule port (free task is                                                             
 * an exception; it is always handled instantly).                                                                           
 *                                                                                                                          
 * erts_port_task_execute() is called by scheduler threads between                                                          
 * scheduleing of processes. Sched lock should be held by caller.                                                           
 */

int
erts_port_task_execute(ErtsRunQueue *runq, Port **curr_port_pp)
{
 ...
 case ERTS_PORT_TASK_TIMEOUT:
            reds += ERTS_PORT_REDS_TIMEOUT;
            if (!(pp->status & ERTS_PORT_SFLGS_DEAD))
                (*pp->drv_ptr->timeout)((ErlDrvData) pp->drv_data);
            break;
        case ERTS_PORT_TASK_INPUT:
	    reds += ERTS_PORT_REDS_INPUT;
            ASSERT((pp->status & ERTS_PORT_SFLGS_DEAD) == 0);
            /* NOTE some windows drivers use ->ready_input for input and output */
	    (*pp->drv_ptr->ready_input)((ErlDrvData) pp->drv_data, ptp->event);
            io_tasks_executed++;
            break;
        case ERTS_PORT_TASK_OUTPUT:
            reds += ERTS_PORT_REDS_OUTPUT;
            ASSERT((pp->status & ERTS_PORT_SFLGS_DEAD) == 0);
            (*pp->drv_ptr->ready_output)((ErlDrvData) pp->drv_data, ptp->event);
            io_tasks_executed++;
            break;
        case ERTS_PORT_TASK_EVENT:
            reds += ERTS_PORT_REDS_EVENT;
            ASSERT((pp->status & ERTS_PORT_SFLGS_DEAD) == 0);
            (*pp->drv_ptr->event)((ErlDrvData) pp->drv_data, ptp->event, ptp->event_data);
            io_tasks_executed++;
            break;
        case ERTS_PORT_TASK_DIST_CMD:
            reds += erts_dist_command(pp, CONTEXT_REDS-reds);
            break;

...
  ERTS_PORT_REDUCTIONS_EXECUTED(runq, reds);

    return res;

}

#define ERTS_PORT_REDUCTIONS_EXECUTED(RQ, REDS)                 \
do {                                                            \
    (RQ)->ports.info.reds += (REDS);                            \
    (RQ)->check_balance_reds -= (REDS);                         \
    (RQ)->wakeup_other_reds += (REDS);                          \
} while (0)

从代码可以看出port的调度的时间片是从宿主的进程的时间片里面扣的,
#define ERTS_PORT_REDS_INPUT 200
#define ERTS_PORT_REDS_OUTPUT 200

每个读写占用200个时间片,而每个进程初始分配2000个时间片,也就是说做10次输出就要被调度了。

通过上面的分析我们知道被gen_tcp发送进程被挂起的原因。

对策就是如果该进程不能阻塞,那么就添加force标志,强行往缓冲区加入数据,同时设置{send_timeout, Integer}。
如果该socket在指定的时间内无法把数据发送完成,那么就直接宣告socket发送超时,避免了潜在的force加数据造成的缓冲区占用大量内存而出现问题。

上面分析过,gen_tcp数据的发送需要占用宿主进程的reds,这也可能造成宿主进程被挂起,在设计的时候尽量避免一个进程拥有太多的port.

试验过程稍后奉上!

祝大家玩得开心!

Post Footer automatically generated by wp-posturl plugin for wordpress.

Categories: Erlang探索 Tags: , , ,
  1. Xu Yifeng
    May 30th, 2013 at 12:43 | #1

    erlang tcp send的机制缺少象receive时那样的通知事件,receive时还有{active,once}可用,send就没有这样的机制,它的设计是不对称的。
    如果某进程堵塞在tcp send上,那么它不理睬发送给它的消息,这让其他进程难以与它交互。
    有可能的方法是使用专门的发送线程,而该发送线程的父线程则是处理消息循环主体,发送线程想要发送数据,可以给父线程发送一个请求,让父线程提供数据。发送线程在其他线程看来看来是不可见的,只能看到这个父线程,是和这个父线程交互的。这种做法的缺点是,在发送数据时多了上下文切换,好处是,其他线程总是可以和这个父线程交互,得到及时回应。

    [Reply]

    Yu Feng Reply:

    send也是异步操作的,只是gen_tcp:send默认是在send后会等driver给他反馈,这个等待是阻塞的。用户完全可以自己写send2,不用等待回馈,立即回去处理别的信息,等driver的反馈信息来了,再处理。目前mochiweb等都是用这种方式。 用线程专门发生不大好,上下文切换开销太大。

    [Reply]

    Xu Yifeng Reply:

    你所描述的方法不见于文档,所以如果以后它改变了实现方法,难保现有的程序还能运行。
    当然,对你们的钻研精神还是很敬佩的,但是我对使用未见于文档的方法,一向是不放心的,
    这等于是没有承诺的东西。

    [Reply]

    Yu Feng Reply:

    没有文档=没有承诺, 确实有风险,用用是可以的。

  2. redrock
    June 7th, 2016 at 17:45 | #2

    请教 有什么方法可以清理已经发送超时的消息么

    [Reply]

  1. October 6th, 2011 at 21:59 | #1