Home > Erlang探索 > gen_tcp:send的深度解刨和使用指南(初稿)

gen_tcp:send的深度解刨和使用指南(初稿)

April 5th, 2010

原创文章,转载请注明: 转载自系统技术非业余研究

本文链接地址: gen_tcp:send的深度解刨和使用指南(初稿)

在大家的印象中, gen_tcp:send是个很朴素的函数, 一调用数据就喀嚓喀嚓到了对端. 这是个很大的误解, Erlang的otp文档写的很不清楚. 而且这个功能对于大部分的网络程序是至关重要的, 它的使用对否极大了影响了应用的性能. 我听到很多同学在抱怨erlang的性能低或者出了很奇怪的问题, 很多是由于对系统的不了解, 误用的. 我下面就来解刨下, 文章很长, 而且需要读者熟悉erlang和底层的知识, 跟我来吧.

这篇文章是基于Erlang R13B04这个版本写的.

以下是从gen_tcp文档中摘抄的:

gen_tcp:send(Socket, Packet) -> ok | {error, Reason}
* Socket = socket()
* Packet =

[char()] | binary()
* Reason = posix()
* Sends a packet on a socket.

There is no send call with timeout option, you use the send_timeout socket option if timeouts are desired. See the examples section.

典型的使用如下:

client(PortNo,Message) ->
{ok,Sock} = gen_tcp:connect("localhost",PortNo,[{active,false},
{packet,2}]),
gen_tcp:send(Sock,Message),
A = gen_tcp:recv(Sock,0),
gen_tcp:close(Sock),
A.

很简单是把? 乍一看确实很简单, 但是这是迷惑人的开始.

我们上源代码:

lib/kernel/src/gen_tcp.erl

124send(S, Packet) when is_port(S) ->    %这里可以看出 S是个port
125    case inet_db:lookup_socket(S) of
126        {ok, Mod} ->                  %Mod可能是inet_tcp.erl 或者  inet6_tcp.erl
127            Mod:send(S, Packet);
128        Error ->
129            Error
130    end.

lib/kernel/src/inet_tcp.erl

 49send(Socket, Packet, Opts) -> prim_inet:send(Socket, Packet, Opts). %转给prim_inet模块
 50send(Socket, Packet) -> prim_inet:send(Socket, Packet, []).

erts/preloaded/src/prim_inet.erl

 360send(S, Data, OptList) when is_port(S), is_list(OptList) ->
 361    ?DBG_FORMAT("prim_inet:send(~p, ~p)~n", [S,Data]),
 362    try erlang:port_command(S, Data, OptList) of     <strong>%推给底层的port模块来处理</strong>
 363        false -> % Port busy and nosuspend option passed
 364            ?DBG_FORMAT("prim_inet:send() -> {error,busy}~n", []),
 365            {error,busy};
 366        true -> <strong>% Port模块接受数据</strong>
 367            receive
 368                {inet_reply,S,Status} ->  <strong>%阻塞, 等待回应</strong>
 369                    ?DBG_FORMAT("prim_inet:send() -> ~p~n", [Status]),
 370                    Status
 371            end
 372    catch
 373        error:_Error ->
 374            ?DBG_FORMAT("prim_inet:send() -> {error,einval}~n", []),
 375             {error,einval}
 376    end.
 377
 378send(S, Data) ->
 379    send(S, Data, []).

从上面这几段代码我们可以看出,当我们调用gen_tcp:send的时候, kernel模块会根据gen_tcp socket的类型决定调用相应的模块. 这个模块要么是inet_tcp, 要么是inet6_tcp. 这个模块会把发送请求委托给
prim_inet模块. prim_inet模块首先检查Socket是否合法, 如果合法然后调用erlang:port_command把系统推到ERTS运行期.
这个推的结果有2个: 1. 成功, 进程挂起等待运行期的反馈. 2. 失败,立即返回.
什么情况下会失败呢?
1. 驱动不支持soft_busy, 但是我们用了force标志
2. 驱动已经busy了, 但是我们不允许进程挂起.

我们先看相关的文档和代码:

erlang:port_command(Port, Data, OptionList) -> true|false

If the port command is aborted false is returned; otherwise, true is returned.

If the port is busy, the calling process will be suspended until the port is not busy anymore.

Currently the following Options are valid:

force
The calling process will not be suspended if the port is busy; instead, the port command is forced through. The call will fail with a notsup exception if the driver of the port does not support this. For more information see the ERL_DRV_FLAG_SOFT_BUSY driver flag.

nosuspend
The calling process will not be suspended if the port is busy; instead, the port command is aborted and false is returned.

关于busy_port可以参见文档

erlang:system_monitor(MonitorPid, [Option]) -> MonSettings
busy_port
If a process in the system gets suspended because it sends to a busy port, a message {monitor, SusPid, busy_port, Port} is sent to MonitorPid. SusPid is the pid that got suspended when sending to Port.

erts/emulator/beam/erl_bif_port.c

 215BIF_RETTYPE port_command_2(BIF_ALIST_2)
 216{
 217    return do_port_command(BIF_P, BIF_ARG_1, BIF_ARG_2, NIL, 0);
 218}
 219
 220BIF_RETTYPE port_command_3(BIF_ALIST_3)
 221{
 222    Eterm l = BIF_ARG_3;
 223    Uint32 flags = 0;
 224    while (is_list(l)) {
 225        Eterm* cons = list_val(l);
 226        Eterm car = CAR(cons);
        <strong>    /*处理force和no_suspend选项*/</strong>
 227        if (car == am_force) {
 228            flags |= ERTS_PORT_COMMAND_FLAG_FORCE;
 229        } else if (car == am_nosuspend) {
 230            flags |= ERTS_PORT_COMMAND_FLAG_NOSUSPEND;
 231        } else {
 232            BIF_ERROR(BIF_P, BADARG);
 233        }
 234        l = CDR(cons);
 235    }
 236    if(!is_nil(l)) {
 237        BIF_ERROR(BIF_P, BADARG);
 238    }
 239    return do_port_command(BIF_P, BIF_ARG_1, BIF_ARG_2, BIF_ARG_3, flags);
 240}

 121#define ERTS_PORT_COMMAND_FLAG_FORCE            (((Uint32) 1) << 0)
 122#define ERTS_PORT_COMMAND_FLAG_NOSUSPEND        (((Uint32) 1) << 1)
 123
 124static BIF_RETTYPE do_port_command(Process *BIF_P,
 125                                   Eterm BIF_ARG_1,
 126                                   Eterm BIF_ARG_2,
 127                                   Eterm BIF_ARG_3,
 128                                   Uint32 flags)
 129{
 130    BIF_RETTYPE res;
 131    Port *p;
 132
 133    /* Trace sched out before lock check wait */
 134    if (IS_TRACED_FL(BIF_P, F_TRACE_SCHED_PROCS)) {
 135        trace_virtual_sched(BIF_P, am_out);
 136    }
 137
 138    if (erts_system_profile_flags.runnable_procs && erts_system_profile_flags.exclusive) {
 139        profile_runnable_proc(BIF_P, am_inactive);
 140    }
 141
 142    p = id_or_name2port(BIF_P, BIF_ARG_1);
 143    if (!p) {
 144        if (IS_TRACED_FL(BIF_P, F_TRACE_SCHED_PROCS)) {
 145            trace_virtual_sched(BIF_P, am_in);
 146        }
 147        if (erts_system_profile_flags.runnable_procs && erts_system_profile_flags.exclusive) {
 148            profile_runnable_proc(BIF_P, am_active);
 149        }
 150        BIF_ERROR(BIF_P, BADARG);
 151    }
 152
 153    /* Trace port in, id_or_name2port causes wait */
 154
 155    if (IS_TRACED_FL(p, F_TRACE_SCHED_PORTS)) {
 156        trace_sched_ports_where(p, am_in, am_command);
 157    }
 158    if (erts_system_profile_flags.runnable_ports && !erts_port_is_scheduled(p)) {
 159        profile_runnable_port(p, am_active);
 160    }
 161
 162    ERTS_BIF_PREP_RET(res, am_true);
 163
 164    if ((flags & ERTS_PORT_COMMAND_FLAG_FORCE)
 165        && !(p->drv_ptr->flags & ERL_DRV_FLAG_SOFT_BUSY)) {
 166        ERTS_BIF_PREP_ERROR(res, BIF_P, EXC_NOTSUP);
 167    }
 168    else if (!(flags & ERTS_PORT_COMMAND_FLAG_FORCE)
 169             && p->status & ERTS_PORT_SFLG_PORT_BUSY) {
 170        if (flags & ERTS_PORT_COMMAND_FLAG_NOSUSPEND) {
 171            ERTS_BIF_PREP_RET(res, am_false);
 172        }
 173        else {<strong>/*挂起调用者进程, 同时发送busy_port*/</strong>
 174            erts_suspend(BIF_P, ERTS_PROC_LOCK_MAIN, p);
 175            if (erts_system_monitor_flags.busy_port) {
 176                monitor_generic(BIF_P, am_busy_port, p->id);
 177            }
 178            ERTS_BIF_PREP_YIELD3(res, bif_export[BIF_port_command_3], BIF_P,
 179                                 BIF_ARG_1, BIF_ARG_2, BIF_ARG_3);
 180        }
 181    } else {
 182        int wres;
 183        erts_smp_proc_unlock(BIF_P, ERTS_PROC_LOCK_MAIN);
 184        ERTS_SMP_CHK_NO_PROC_LOCKS;
 185        wres = erts_write_to_port(BIF_P->id, p, BIF_ARG_2);
 186        erts_smp_proc_lock(BIF_P, ERTS_PROC_LOCK_MAIN);
 187        if (wres != 0) {
 188            ERTS_BIF_PREP_ERROR(res, BIF_P, BADARG);
 189        }
 190    }
 191
 192    if (IS_TRACED_FL(p, F_TRACE_SCHED_PORTS)) {
 193        trace_sched_ports_where(p, am_out, am_command);
 194    }
 195    if (erts_system_profile_flags.runnable_ports && !erts_port_is_scheduled(p)) {
 196        profile_runnable_port(p, am_inactive);
 197    }
 198
 199    erts_port_release(p);
 200    /* Trace sched in after port release */
 201    if (IS_TRACED_FL(BIF_P, F_TRACE_SCHED_PROCS)) {
 202        trace_virtual_sched(BIF_P, am_in);
 203    }
 204    if (erts_system_profile_flags.runnable_procs && erts_system_profile_flags.exclusive) {
 205        profile_runnable_proc(BIF_P, am_active);
 206    }
 207
 208    if (ERTS_PROC_IS_EXITING(BIF_P)) {
 209        KILL_CATCHES(BIF_P);    /* Must exit */
 210        ERTS_BIF_PREP_ERROR(res, BIF_P, EXC_ERROR);
 211    }
 212    return res;
 213}
 214

erts/emulator/drivers/common/inet_drv.c

865
866static struct erl_drv_entry tcp_inet_driver_entry =
867{
868    tcp_inet_init,  /* inet_init will add this driver !! */
869    tcp_inet_start,
870    tcp_inet_stop,
871    tcp_inet_command,
872#ifdef __WIN32__
873    tcp_inet_event,
874    NULL,
875#else
876    tcp_inet_drv_input,
877    tcp_inet_drv_output,
878#endif
879    "tcp_inet",
880    NULL,
881    NULL,
882    tcp_inet_ctl,
883    tcp_inet_timeout,
884    tcp_inet_commandv,
885    NULL,
886    tcp_inet_flush,
887    NULL,
888    NULL,
889    ERL_DRV_EXTENDED_MARKER,
890    ERL_DRV_EXTENDED_MAJOR_VERSION,
891    ERL_DRV_EXTENDED_MINOR_VERSION,
892    ERL_DRV_FLAG_USE_PORT_LOCKING|ERL_DRV_FLAG_SOFT_BUSY,<strong>  /*我们的tcp 驱动支持soft_busy*/</strong>
893    NULL,
894    tcp_inet_process_exit,
895    inet_stop_select
896};

897

在tcp:send 虚拟机执行这个层面上,  调用者进程被挂起有以下几种可能:

1. 数据成功推到ERTS, 等待ERTS的发送结果通知. 这是大多数情况.

2. 该socket忙, 我们没有设定port_command的force标志.

3. 调用者进程发送了大量的数据, 时间片用完被运行期挂起.

失败的可能:  我们设定了nosuspend标志, 但是socket忙.

到此为止运行期顺利开始调用erts_write_to_port把数据传递到下一层去了:

我们的疑问是数据组织的, 运行期会对数据如何处理呢? 继续看程序

erts/emulator/beam/io.c

<strong>1054#define ERL_SMALL_IO_BIN_LIMIT (4*ERL_ONHEAP_BIN_LIMIT) /* #define ERL_ONHEAP_BIN_LIMIT 64*/
1055#define SMALL_WRITE_VEC  16</strong>
1056
1057
1058/* write data to a port */
1059int erts_write_to_port(Eterm caller_id, Port *p, Eterm list)
1060{
1061    char *buf;
1062    erts_driver_t *drv = p->drv_ptr;
1063    int size;
1064    int fpe_was_unmasked;
1065
1066    ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(p));
1067    ERTS_SMP_CHK_NO_PROC_LOCKS;
1068
1069    p->caller = caller_id;
1070    if (drv->outputv != NULL) {
1071        int vsize;
1072        int csize;
1073        int pvsize;
1074        int pcsize;
1075        int blimit;
1076        SysIOVec iv[SMALL_WRITE_VEC];   /*最多16个段*/
1077        ErlDrvBinary* bv[SMALL_WRITE_VEC];
1078        SysIOVec* ivp;
1079        ErlDrvBinary**  bvp;
1080        ErlDrvBinary* cbin;
1081        ErlIOVec ev;
1082
1083        if ((size = io_list_vec_len(list, &vsize, &csize,
1084                                    ERL_SMALL_IO_BIN_LIMIT,
1085                                    &pvsize, &pcsize)) < 0) {
1086            goto bad_value;
1087        }
1088        /* To pack or not to pack (small binaries) ...? */
1089        vsize++;
1090        if (vsize <= SMALL_WRITE_VEC) {
1091            /* Do NOT pack */
1092            blimit = 0;
1093        } else {
1094            /* Do pack */
1095            vsize = pvsize + 1;
1096            csize = pcsize;
1097            blimit = ERL_SMALL_IO_BIN_LIMIT;
1098        }
1099        /* Use vsize and csize from now on */
1100        if (vsize <= SMALL_WRITE_VEC) {
1101            ivp = iv;
1102            bvp = bv;
1103        } else {
1104            ivp = (SysIOVec *) erts_alloc(ERTS_ALC_T_TMP,
1105                                          vsize * sizeof(SysIOVec));
1106            bvp = (ErlDrvBinary**) erts_alloc(ERTS_ALC_T_TMP,
1107                                              vsize * sizeof(ErlDrvBinary*));
1108        }
1109        cbin = driver_alloc_binary(csize);
1110        if (!cbin)
1111            erts_alloc_enomem(ERTS_ALC_T_DRV_BINARY, ERTS_SIZEOF_Binary(csize));
1112
1113        /* Element 0 is for driver usage to add header block */
1114        ivp[0].iov_base = NULL;
1115        ivp[0].iov_len = 0;
1116        bvp[0] = NULL;
1117        ev.vsize = io_list_to_vec(list, ivp+1, bvp+1, cbin, blimit);
1118        ev.vsize++;
1119#if 0
1120        /* This assertion may say something useful, but it can
1121           be falsified during the emulator test suites. */
1122        ASSERT((ev.vsize >= 0) && (ev.vsize == vsize));
1123#endif
1124        ev.size = size;  /* total size */
1125        ev.iov = ivp;
1126        ev.binv = bvp;
1127        fpe_was_unmasked = erts_block_fpe();
1128        (*drv->outputv)((ErlDrvData)p->drv_data, &ev);
1129        erts_unblock_fpe(fpe_was_unmasked);
1130        if (ivp != iv) {
1131            erts_free(ERTS_ALC_T_TMP, (void *) ivp);
1132        }
1133        if (bvp != bv) {
1134            erts_free(ERTS_ALC_T_TMP, (void *) bvp);
1135        }
1136        driver_free_binary(cbin);
1137    } else {
1138        int r;
1139
1140        /* Try with an 8KB buffer first (will often be enough I guess). */
1141        size = 8*1024;
1142        /* See below why the extra byte is added. */
1143        buf = erts_alloc(ERTS_ALC_T_TMP, size+1);
1144        r = io_list_to_buf(list, buf, size);
1145
1146        if (r >= 0) {
1147            size -= r;
1148            fpe_was_unmasked = erts_block_fpe();
1149            (*drv->output)((ErlDrvData)p->drv_data, buf, size); <strong> /*调用inet_drv里面的tcp output*/</strong>
1150            erts_unblock_fpe(fpe_was_unmasked);
1151            erts_free(ERTS_ALC_T_TMP, buf);
1152        }
1153        else if (r == -2) {
1154            erts_free(ERTS_ALC_T_TMP, buf);
1155            goto bad_value;
1156        }
1157        else {
1158            ASSERT(r == -1); /* Overflow */
1159            erts_free(ERTS_ALC_T_TMP, buf);
1160            if ((size = io_list_len(list)) < 0) {
1161                goto bad_value;
1162            }
1163
1164            /*
1165             * I know drivers that pad space with '\0' this is clearly
1166             * incorrect but I don't feel like fixing them now, insted
1167             * add ONE extra byte.
1168             */
1169            buf = erts_alloc(ERTS_ALC_T_TMP, size+1);
1170            r = io_list_to_buf(list, buf, size);
1171            fpe_was_unmasked = erts_block_fpe();
1172            (*drv->output)((ErlDrvData)p->drv_data, buf, size);
1173            erts_unblock_fpe(fpe_was_unmasked);
1174            erts_free(ERTS_ALC_T_TMP, buf);
1175        }
1176    }
1177    p->bytes_out += size;
1178    erts_smp_atomic_add(&erts_bytes_out, size);
1179
1180#ifdef ERTS_SMP
1181    if (p->xports)
1182        erts_smp_xports_unlock(p);
1183    ASSERT(!p->xports);
1184#endif
1185    p->caller = NIL;
1186    return 0;
1187
1188 bad_value:
1189    p->caller = NIL;
1190    {
1191        erts_dsprintf_buf_t *dsbufp = erts_create_logger_dsbuf();
1192        erts_dsprintf(dsbufp, "Bad value on output port '%s'\n", p->name);
1193        erts_send_error_to_logger_nogl(dsbufp);
1194        return 1;
1195    }
1196}

gent_tcp:send的时候数据的格式是iolist. 很多人会误会,特地把iolist特地变成list或者binary. 新生成的binary或者list在send之后要GC回收, 如果频繁的话,
系统的性能损失很大.
tcp驱动是支持scatter write的, 最终是调用writev系统调用的.所以我们要充分利用这一特性.
我们从上面的代码可以看出, io是按照这样的规则填充writev向量的: 如果iolist的元素是
1. int, 拷贝.
2. binary是heap binary, 拷贝
3. binary是proc binary而且大小<64字节拷贝.

同时tcp是流协议,我们在发送消息的时候, 通常需要在消息前面添加个头, 比如说4个字节的长度. 这个如果手工做的话, 效率非常低.
tcp_driver支持自动加消息长度, 看文档:

{packet, PacketType}(TCP/IP sockets)
Defines the type of packets to use for a socket. The following values are valid:

raw | 0
No packaging is done.

1 | 2 | 4
Packets consist of a header specifying the number of bytes in the packet, followed by that number of bytes. The length of header can be one, two, or four bytes; containing an unsigned integer in big-endian byte order. Each send operation will generate the header, and the header will be stripped off on each receive operation.

In current implementation the 4-byte header is limited to 2Gb.

到此为止, 数据已经打包准备好, 这时候数据就移到到inet_drv驱动来负责了:
inet_drv内部每个socket都有个消息队列, 保持着上层推来的消息. 这个消息队列有上下水位线的. 当消息的字节数目超过了高水位线的时候, inet_drv就把socket标志为busy. 这个busy要到队列的字节数少于
低水位线的时候才解除.

这是未公开的文档,用法参见下面:
inet:setopts(Socket, [{high_watermark, 131072}]).
inet:setopts(Socket, [{low_watermark, 65536}]).

erts/emulator/drivers/common/inet_drv.c

8641static void tcp_inet_drv_output(ErlDrvData data, ErlDrvEvent event)
8642{
8643    (void)tcp_inet_output((tcp_descriptor*)data, (HANDLE)event);
8644}

8651/* socket ready for ouput:
8652** 1. TCP_STATE_CONNECTING => non block connect ?
8653** 2. TCP_STATE_CONNECTED  => write output
8654*/
8655static int tcp_inet_output(tcp_descriptor* desc, HANDLE event)
8656{
8657    int ret = 0;
8658    ErlDrvPort ix = desc->inet.port;
8659
8660    DEBUGF(("tcp_inet_output(%ld) {s=%d\r\n",
8661            (long)desc->inet.port, desc->inet.s));
8662    if (desc->inet.state == TCP_STATE_CONNECTING) {
8663        sock_select(INETP(desc),FD_CONNECT,0);
8664
8665        driver_cancel_timer(ix);  /* posssibly cancel a timer */
8666#ifndef __WIN32__
8667        /*
8668         * XXX This is strange.  This *should* work on Windows NT too,
8669         * but doesn't.  An bug in Winsock 2.0 for Windows NT?
8670         *
8671         * See "Unix Netwok Programming", W.R.Stevens, p 412 for a
8672         * discussion about Unix portability and non blocking connect.
8673         */
8674
8675#ifndef SO_ERROR
8676        {
8677            int sz = sizeof(desc->inet.remote);
8678            int code = sock_peer(desc->inet.s,
8679                                 (struct sockaddr*) &desc->inet.remote, &sz);
8680
8681            if (code == SOCKET_ERROR) {
8682                desc->inet.state = TCP_STATE_BOUND;  /* restore state */
8683                ret =  async_error(INETP(desc), sock_errno());
8684                goto done;
8685            }
8686        }
8687#else
8688        {
8689            int error = 0;      /* Has to be initiated, we check it */
8690            unsigned int sz = sizeof(error); /* even if we get -1 */
8691            int code = sock_getopt(desc->inet.s, SOL_SOCKET, SO_ERROR,
8692                                   (void *)&error, &sz);
8693
8694            if ((code < 0) || error) {
8695                desc->inet.state = TCP_STATE_BOUND;  /* restore state */
8696                ret = async_error(INETP(desc), error);
8697                goto done;
8698            }
8699        }
8700#endif /* SOCKOPT_CONNECT_STAT */
8701#endif /* !__WIN32__ */
8702
8703        desc->inet.state = TCP_STATE_CONNECTED;
8704        if (desc->inet.active)
8705            sock_select(INETP(desc),(FD_READ|FD_CLOSE),1);
8706        async_ok(INETP(desc));
8707    }
8708    else if (IS_CONNECTED(INETP(desc))) {
8709        for (;;) {
8710            int vsize;
8711            int n;
8712            SysIOVec* iov;
8713
8714            if ((iov = driver_peekq(ix, &vsize)) == NULL) {
8715                sock_select(INETP(desc), FD_WRITE, 0);
8716                send_empty_out_q_msgs(INETP(desc));
8717                goto done;
8718            }
8719            vsize = vsize > MAX_VSIZE ? MAX_VSIZE : vsize;
8720            DEBUGF(("tcp_inet_output(%ld): s=%d, About to send %d items\r\n",
8721                    (long)desc->inet.port, desc->inet.s, vsize));
8722            if (sock_sendv(desc->inet.s, iov, vsize, &n, 0)==SOCKET_ERROR) {
8723                if ((sock_errno() != ERRNO_BLOCK) && (sock_errno() != EINTR)) {
8724                    DEBUGF(("tcp_inet_output(%ld): sock_sendv(%d) errno = %d\r\n",
8725                            (long)desc->inet.port, vsize, sock_errno()));
8726                    ret =  tcp_send_error(desc, sock_errno());
8727                    goto done;
8728                }
8729#ifdef __WIN32__
8730                desc->inet.send_would_block = 1;
8731#endif
8732                goto done;
8733            }
8734            if (driver_deq(ix, n) <= desc->low) {
8735                if (IS_BUSY(INETP(desc))) {
8736                    desc->inet.caller = desc->inet.busy_caller;
8737                    desc->inet.state &= ~INET_F_BUSY;
8738                    set_busy_port(desc->inet.port, 0);
8739                    /* if we have a timer then cancel and send ok to client */
8740                    if (desc->busy_on_send) {
8741                        driver_cancel_timer(desc->inet.port);
8742                        desc->busy_on_send = 0;
8743                    }
8744                    inet_reply_ok(INETP(desc));
8745                }
8746            }
8747        }
8748    }
8749    else {
8750        sock_select(INETP(desc),FD_CONNECT,0);
8751        DEBUGF(("tcp_inet_output(%ld): bad state: %04x\r\n",
8752                (long)desc->inet.port, desc->inet.state));
8753    }
8754 done:
8755    DEBUGF(("tcp_inet_output(%ld) }\r\n", (long)desc->inet.port));
8756    return ret;
8757}
8758

首先看下文档, 那些行为会对这个数据发送有影响.

inet:setopts(Socket, Options) -> ok | {error, posix()}

{delay_send, Boolean}
Normally, when an Erlang process sends to a socket, the driver will try to immediately send the data. If that fails, the driver will use any means available to queue up the message to be sent whenever the operating system says it can handle it. Setting {delay_send, true} will make all messages queue up. This makes the messages actually sent onto the network be larger but fewer. The option actually affects the scheduling of send requests versus Erlang processes instead of changing any real property of the socket. Needless to say it is an implementation specific option. Default is false.

通常tcp_inet_output的时候,  首先先从队列里找出上次为发送完毕的消息, 尝试发送, 如果发送不能全发送出去. 那么剩下的连同现在的消息入队列.

如果发送成功, 那么看下delay_send标志有没有置位, 如果有直接把消息入队列, 凑成大的消息块, 等下次一起发送.

如果队列里有数据的话, tcp 驱动会把该socket登记等待可写事件,等待事件通知,在合适的时间,等待port再调度写.

{sndbuf, Integer}
Gives the size of the send buffer to use for the socket.

这个标志影响socket在内核协议栈的写缓存区, 越大, 系统调用send就越容易把数据推入协议栈.

{send_timeout, Integer}

Only allowed for connection oriented sockets.

Specifies a longest time to wait for a send operation to be accepted by the underlying TCP stack. When the limit is exceeded, the send operation will return {error,timeout}. How much of a packet that actually got sent is unknown, why the socket should be closed whenever a timeout has occurred (see send_timeout_close). Default is infinity.

{send_timeout_close, Boolean}
Only allowed for connection oriented sockets.

Used together with send_timeout to specify whether the socket will be automatically closed when the send operation returns {error,timeout}. The recommended setting is true which will automatically close the socket. Default is false due to backward compatibility.

到现在为止, 数据可能部分在消息队列里面, 部分推到tcp协议栈的buffer中去等待网卡发出去,同时还可能登记着socket的可写事件.

一旦发生可写事件, 运行期 就会调度该socket对应的port来进行进一步的写.

如果一条消息成功的推到协议栈, 那么tcp 驱动会给调用者进程发送{inet_reply,S,Status}消息, 反馈结果. 这时候调用者进程也就是tcp:send返回, 完成了整个流程.

这里有几个要点:

1. port是和进程一样公平调度的.  进程是按照reductions为单位调度的, port是把发送的字节数折合成reductions.  所以如果一个进程发送大量的tcp数据 那么这个进程不是一直会得到执行的. 运行期会强制停止一段时间, 让其他port有机会执行的.

2.  gen_tcp的发送是同步的, 也就是说阻塞在receive {inet_reply,S,Status} -> ?DBG_FORMAT(“prim_inet:send() -> ~p~n”, [Status]),Status end上, 这个对发送大量的消息的场合很不利.

更好的做法是: 手工把gen_tcp的2个步骤分开做:
1. 不停的erlang:port_command(S, Data, OptList)  最好加上force标志
2. 被动等待{inet_reply,S,Status} 消息.

具体请参考hotwheels或者rabbitmq项目的代码.
hotwheels/src/pubsun.erl

 69handle_cast({publish, Msg}, State) ->
 70    io:format("publish,info: ~p~n", [ets:info(State#state.subs)]),
 71    {A, B, C} = Start = now(),
 72    Msg1 = <<A:32, B:32, C:32, ?MESSAGE, Msg/binary>>,
 73    F = fun({_, _, Sock}, _) -> erlang:port_command(Sock, Msg1) end,
 74   erlang:process_flag(priority, high),
 75    ets:foldl(F, ok, State#state.subs),
 76    End = now(),
 77    erlang:process_flag(priority, normal),
 78    io:format("cost time: ~p~n", [timer:now_diff(End, Start) / 1000]),
 79    {noreply, State};

 95
 96handle_info({inet_reply, _Sock, _Error}, State) ->
 97    io:format("inet reply error: ~p~n", [_Error]),
 98    %% there needs to be a reverse lookup from sock to pid
 99    {noreply, State};
100


推论:
gen_tcp:send理论上的效率应该是顶级c程序员写的80%, 如果你低于这个数字, 请按照上面的步骤来排错问题.

参考文章:
http://mryufeng.javaeye.com/blog/475003
http://mryufeng.javaeye.com/blog/289058
http://mryufeng.javaeye.com/blog/288384
http://mryufeng.javaeye.com/blog/366761
http://avindev.javaeye.com/blog/76373

Post Footer automatically generated by wp-posturl plugin for wordpress.

  1. flier
    April 6th, 2010 at 00:36 | #1

    这个 {delay_send, Boolean} 跟 TCP_NODELAY 是什么关系?貌似 ERTS 自己实现了一套组包的机制,这个跟 Nagle 算法得配合起来用才行吧?

    原理是一样的, 都是通过累计包, 减少发送的次数. 只不过一个在ERTS, 一个在内核协议栈. 无需结合.

  2. flier
    April 6th, 2010 at 01:08 | #2

    但反过来需要同时关掉,才能起到小包发送的效果吧,类似telnet这种协议?如果要冲吞吐量,得主动加大单包尺寸,这方面有设置么?

  3. April 6th, 2010 at 01:42 | #3

    delay_send是不主动强制send, 而是等socket可写的时候马上就写。所以如果链路畅通的话 这个延时是几百us. 和tcp_nodelay的200ms不是一回事。

  4. pi1ot
    April 6th, 2010 at 10:20 | #4

    RPC永远都不是一项轻松地工作

  5. litaocheng
    April 6th, 2010 at 10:31 | #5

    呃,好丰富的内容啊。认真拜读…

  6. whhbest
    August 16th, 2010 at 08:32 | #6

    老大就是老大,分析的就是深刻,受教了

  7. piboyeliu
    July 8th, 2013 at 15:11 | #7

    我发现一个gen_tcp的大坑。 我做了个长连接服务, 写压力测试的时候。 网上大量说 一个ip 只能发起5W的连接, 是因为端口数限制了。 我当时就想, 如果服务端多侦听几个端口, 客户端就可以发起更多的连接。

    实际测试发现不行, strace 发现 bind 的时候报 address in used.
    跟踪代码 gen_tcp.erl connect -> inet_tcp.erl connect -> inet.erl open 发现, open 还是里面调用了 prim_inet:bind 操作。

    一个正常的client 发起连接, 不应该调bind的啊, 调了后就会受到端口数限制。 而且不管你是连接多少个服务器, 因为这个是时候, 是bind失败了。

    我改用 prim_inet 来实现, 发现不调用 bind 就是不能成功, prim_inet:connect 的时候老是报 badseq错误 。

    Yu Feng Reply:

    真心是你对网络理解不够。。。

    piboyeliu Reply:

    为什么?
    我也怀疑我哪里搞错了。 但是就是没办法搞定啊。
    strace 发现的就是 出现了bind, client 没必要bind端口的啊。

    后来用nodejs 就ok了, 一台机器发起50W个连接。

    guoming Reply:

    好像client也需要bind的,只是不需要你手动去bind

Comments are closed.