gen_tcp:send的深度解刨和使用指南(初稿)
原创文章,转载请注明: 转载自系统技术非业余研究
本文链接地址: gen_tcp:send的深度解刨和使用指南(初稿)
在大家的印象中, gen_tcp:send是个很朴素的函数, 一调用数据就喀嚓喀嚓到了对端. 这是个很大的误解, Erlang的otp文档写的很不清楚. 而且这个功能对于大部分的网络程序是至关重要的, 它的使用对否极大了影响了应用的性能. 我听到很多同学在抱怨erlang的性能低或者出了很奇怪的问题, 很多是由于对系统的不了解, 误用的. 我下面就来解刨下, 文章很长, 而且需要读者熟悉erlang和底层的知识, 跟我来吧.
这篇文章是基于Erlang R13B04这个版本写的.
以下是从gen_tcp文档中摘抄的:
gen_tcp:send(Socket, Packet) -> ok | {error, Reason}
* Socket = socket()
* Packet =[char()] | binary()
* Reason = posix()
* Sends a packet on a socket.There is no send call with timeout option, you use the send_timeout socket option if timeouts are desired. See the examples section.
典型的使用如下:
client(PortNo,Message) -> {ok,Sock} = gen_tcp:connect("localhost",PortNo,[{active,false}, {packet,2}]), gen_tcp:send(Sock,Message), A = gen_tcp:recv(Sock,0), gen_tcp:close(Sock), A.
很简单是把? 乍一看确实很简单, 但是这是迷惑人的开始.
我们上源代码:
lib/kernel/src/gen_tcp.erl
124send(S, Packet) when is_port(S) -> %这里可以看出 S是个port 125 case inet_db:lookup_socket(S) of 126 {ok, Mod} -> %Mod可能是inet_tcp.erl 或者 inet6_tcp.erl 127 Mod:send(S, Packet); 128 Error -> 129 Error 130 end.
lib/kernel/src/inet_tcp.erl
49send(Socket, Packet, Opts) -> prim_inet:send(Socket, Packet, Opts). %转给prim_inet模块 50send(Socket, Packet) -> prim_inet:send(Socket, Packet, []).
erts/preloaded/src/prim_inet.erl
360send(S, Data, OptList) when is_port(S), is_list(OptList) -> 361 ?DBG_FORMAT("prim_inet:send(~p, ~p)~n", [S,Data]), 362 try erlang:port_command(S, Data, OptList) of <strong>%推给底层的port模块来处理</strong> 363 false -> % Port busy and nosuspend option passed 364 ?DBG_FORMAT("prim_inet:send() -> {error,busy}~n", []), 365 {error,busy}; 366 true -> <strong>% Port模块接受数据</strong> 367 receive 368 {inet_reply,S,Status} -> <strong>%阻塞, 等待回应</strong> 369 ?DBG_FORMAT("prim_inet:send() -> ~p~n", [Status]), 370 Status 371 end 372 catch 373 error:_Error -> 374 ?DBG_FORMAT("prim_inet:send() -> {error,einval}~n", []), 375 {error,einval} 376 end. 377 378send(S, Data) -> 379 send(S, Data, []).
从上面这几段代码我们可以看出,当我们调用gen_tcp:send的时候, kernel模块会根据gen_tcp socket的类型决定调用相应的模块. 这个模块要么是inet_tcp, 要么是inet6_tcp. 这个模块会把发送请求委托给
prim_inet模块. prim_inet模块首先检查Socket是否合法, 如果合法然后调用erlang:port_command把系统推到ERTS运行期.
这个推的结果有2个: 1. 成功, 进程挂起等待运行期的反馈. 2. 失败,立即返回.
什么情况下会失败呢?
1. 驱动不支持soft_busy, 但是我们用了force标志
2. 驱动已经busy了, 但是我们不允许进程挂起.
我们先看相关的文档和代码:
erlang:port_command(Port, Data, OptionList) -> true|false
If the port command is aborted false is returned; otherwise, true is returned.
If the port is busy, the calling process will be suspended until the port is not busy anymore.
Currently the following Options are valid:
force
The calling process will not be suspended if the port is busy; instead, the port command is forced through. The call will fail with a notsup exception if the driver of the port does not support this. For more information see the ERL_DRV_FLAG_SOFT_BUSY driver flag.nosuspend
The calling process will not be suspended if the port is busy; instead, the port command is aborted and false is returned.
关于busy_port可以参见文档
erlang:system_monitor(MonitorPid, [Option]) -> MonSettings
busy_port
If a process in the system gets suspended because it sends to a busy port, a message {monitor, SusPid, busy_port, Port} is sent to MonitorPid. SusPid is the pid that got suspended when sending to Port.
erts/emulator/beam/erl_bif_port.c
215BIF_RETTYPE port_command_2(BIF_ALIST_2) 216{ 217 return do_port_command(BIF_P, BIF_ARG_1, BIF_ARG_2, NIL, 0); 218} 219 220BIF_RETTYPE port_command_3(BIF_ALIST_3) 221{ 222 Eterm l = BIF_ARG_3; 223 Uint32 flags = 0; 224 while (is_list(l)) { 225 Eterm* cons = list_val(l); 226 Eterm car = CAR(cons); <strong> /*处理force和no_suspend选项*/</strong> 227 if (car == am_force) { 228 flags |= ERTS_PORT_COMMAND_FLAG_FORCE; 229 } else if (car == am_nosuspend) { 230 flags |= ERTS_PORT_COMMAND_FLAG_NOSUSPEND; 231 } else { 232 BIF_ERROR(BIF_P, BADARG); 233 } 234 l = CDR(cons); 235 } 236 if(!is_nil(l)) { 237 BIF_ERROR(BIF_P, BADARG); 238 } 239 return do_port_command(BIF_P, BIF_ARG_1, BIF_ARG_2, BIF_ARG_3, flags); 240} 121#define ERTS_PORT_COMMAND_FLAG_FORCE (((Uint32) 1) << 0) 122#define ERTS_PORT_COMMAND_FLAG_NOSUSPEND (((Uint32) 1) << 1) 123 124static BIF_RETTYPE do_port_command(Process *BIF_P, 125 Eterm BIF_ARG_1, 126 Eterm BIF_ARG_2, 127 Eterm BIF_ARG_3, 128 Uint32 flags) 129{ 130 BIF_RETTYPE res; 131 Port *p; 132 133 /* Trace sched out before lock check wait */ 134 if (IS_TRACED_FL(BIF_P, F_TRACE_SCHED_PROCS)) { 135 trace_virtual_sched(BIF_P, am_out); 136 } 137 138 if (erts_system_profile_flags.runnable_procs && erts_system_profile_flags.exclusive) { 139 profile_runnable_proc(BIF_P, am_inactive); 140 } 141 142 p = id_or_name2port(BIF_P, BIF_ARG_1); 143 if (!p) { 144 if (IS_TRACED_FL(BIF_P, F_TRACE_SCHED_PROCS)) { 145 trace_virtual_sched(BIF_P, am_in); 146 } 147 if (erts_system_profile_flags.runnable_procs && erts_system_profile_flags.exclusive) { 148 profile_runnable_proc(BIF_P, am_active); 149 } 150 BIF_ERROR(BIF_P, BADARG); 151 } 152 153 /* Trace port in, id_or_name2port causes wait */ 154 155 if (IS_TRACED_FL(p, F_TRACE_SCHED_PORTS)) { 156 trace_sched_ports_where(p, am_in, am_command); 157 } 158 if (erts_system_profile_flags.runnable_ports && !erts_port_is_scheduled(p)) { 159 profile_runnable_port(p, am_active); 160 } 161 162 ERTS_BIF_PREP_RET(res, am_true); 163 164 if ((flags & ERTS_PORT_COMMAND_FLAG_FORCE) 165 && !(p->drv_ptr->flags & ERL_DRV_FLAG_SOFT_BUSY)) { 166 ERTS_BIF_PREP_ERROR(res, BIF_P, EXC_NOTSUP); 167 } 168 else if (!(flags & ERTS_PORT_COMMAND_FLAG_FORCE) 169 && p->status & ERTS_PORT_SFLG_PORT_BUSY) { 170 if (flags & ERTS_PORT_COMMAND_FLAG_NOSUSPEND) { 171 ERTS_BIF_PREP_RET(res, am_false); 172 } 173 else {<strong>/*挂起调用者进程, 同时发送busy_port*/</strong> 174 erts_suspend(BIF_P, ERTS_PROC_LOCK_MAIN, p); 175 if (erts_system_monitor_flags.busy_port) { 176 monitor_generic(BIF_P, am_busy_port, p->id); 177 } 178 ERTS_BIF_PREP_YIELD3(res, bif_export[BIF_port_command_3], BIF_P, 179 BIF_ARG_1, BIF_ARG_2, BIF_ARG_3); 180 } 181 } else { 182 int wres; 183 erts_smp_proc_unlock(BIF_P, ERTS_PROC_LOCK_MAIN); 184 ERTS_SMP_CHK_NO_PROC_LOCKS; 185 wres = erts_write_to_port(BIF_P->id, p, BIF_ARG_2); 186 erts_smp_proc_lock(BIF_P, ERTS_PROC_LOCK_MAIN); 187 if (wres != 0) { 188 ERTS_BIF_PREP_ERROR(res, BIF_P, BADARG); 189 } 190 } 191 192 if (IS_TRACED_FL(p, F_TRACE_SCHED_PORTS)) { 193 trace_sched_ports_where(p, am_out, am_command); 194 } 195 if (erts_system_profile_flags.runnable_ports && !erts_port_is_scheduled(p)) { 196 profile_runnable_port(p, am_inactive); 197 } 198 199 erts_port_release(p); 200 /* Trace sched in after port release */ 201 if (IS_TRACED_FL(BIF_P, F_TRACE_SCHED_PROCS)) { 202 trace_virtual_sched(BIF_P, am_in); 203 } 204 if (erts_system_profile_flags.runnable_procs && erts_system_profile_flags.exclusive) { 205 profile_runnable_proc(BIF_P, am_active); 206 } 207 208 if (ERTS_PROC_IS_EXITING(BIF_P)) { 209 KILL_CATCHES(BIF_P); /* Must exit */ 210 ERTS_BIF_PREP_ERROR(res, BIF_P, EXC_ERROR); 211 } 212 return res; 213} 214
erts/emulator/drivers/common/inet_drv.c
865 866static struct erl_drv_entry tcp_inet_driver_entry = 867{ 868 tcp_inet_init, /* inet_init will add this driver !! */ 869 tcp_inet_start, 870 tcp_inet_stop, 871 tcp_inet_command, 872#ifdef __WIN32__ 873 tcp_inet_event, 874 NULL, 875#else 876 tcp_inet_drv_input, 877 tcp_inet_drv_output, 878#endif 879 "tcp_inet", 880 NULL, 881 NULL, 882 tcp_inet_ctl, 883 tcp_inet_timeout, 884 tcp_inet_commandv, 885 NULL, 886 tcp_inet_flush, 887 NULL, 888 NULL, 889 ERL_DRV_EXTENDED_MARKER, 890 ERL_DRV_EXTENDED_MAJOR_VERSION, 891 ERL_DRV_EXTENDED_MINOR_VERSION, 892 ERL_DRV_FLAG_USE_PORT_LOCKING|ERL_DRV_FLAG_SOFT_BUSY,<strong> /*我们的tcp 驱动支持soft_busy*/</strong> 893 NULL, 894 tcp_inet_process_exit, 895 inet_stop_select 896}; 897
在tcp:send 虚拟机执行这个层面上, 调用者进程被挂起有以下几种可能:
1. 数据成功推到ERTS, 等待ERTS的发送结果通知. 这是大多数情况.
2. 该socket忙, 我们没有设定port_command的force标志.
3. 调用者进程发送了大量的数据, 时间片用完被运行期挂起.
失败的可能: 我们设定了nosuspend标志, 但是socket忙.
到此为止运行期顺利开始调用erts_write_to_port把数据传递到下一层去了:
我们的疑问是数据组织的, 运行期会对数据如何处理呢? 继续看程序
erts/emulator/beam/io.c
<strong>1054#define ERL_SMALL_IO_BIN_LIMIT (4*ERL_ONHEAP_BIN_LIMIT) /* #define ERL_ONHEAP_BIN_LIMIT 64*/ 1055#define SMALL_WRITE_VEC 16</strong> 1056 1057 1058/* write data to a port */ 1059int erts_write_to_port(Eterm caller_id, Port *p, Eterm list) 1060{ 1061 char *buf; 1062 erts_driver_t *drv = p->drv_ptr; 1063 int size; 1064 int fpe_was_unmasked; 1065 1066 ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(p)); 1067 ERTS_SMP_CHK_NO_PROC_LOCKS; 1068 1069 p->caller = caller_id; 1070 if (drv->outputv != NULL) { 1071 int vsize; 1072 int csize; 1073 int pvsize; 1074 int pcsize; 1075 int blimit; 1076 SysIOVec iv[SMALL_WRITE_VEC]; /*最多16个段*/ 1077 ErlDrvBinary* bv[SMALL_WRITE_VEC]; 1078 SysIOVec* ivp; 1079 ErlDrvBinary** bvp; 1080 ErlDrvBinary* cbin; 1081 ErlIOVec ev; 1082 1083 if ((size = io_list_vec_len(list, &vsize, &csize, 1084 ERL_SMALL_IO_BIN_LIMIT, 1085 &pvsize, &pcsize)) < 0) { 1086 goto bad_value; 1087 } 1088 /* To pack or not to pack (small binaries) ...? */ 1089 vsize++; 1090 if (vsize <= SMALL_WRITE_VEC) { 1091 /* Do NOT pack */ 1092 blimit = 0; 1093 } else { 1094 /* Do pack */ 1095 vsize = pvsize + 1; 1096 csize = pcsize; 1097 blimit = ERL_SMALL_IO_BIN_LIMIT; 1098 } 1099 /* Use vsize and csize from now on */ 1100 if (vsize <= SMALL_WRITE_VEC) { 1101 ivp = iv; 1102 bvp = bv; 1103 } else { 1104 ivp = (SysIOVec *) erts_alloc(ERTS_ALC_T_TMP, 1105 vsize * sizeof(SysIOVec)); 1106 bvp = (ErlDrvBinary**) erts_alloc(ERTS_ALC_T_TMP, 1107 vsize * sizeof(ErlDrvBinary*)); 1108 } 1109 cbin = driver_alloc_binary(csize); 1110 if (!cbin) 1111 erts_alloc_enomem(ERTS_ALC_T_DRV_BINARY, ERTS_SIZEOF_Binary(csize)); 1112 1113 /* Element 0 is for driver usage to add header block */ 1114 ivp[0].iov_base = NULL; 1115 ivp[0].iov_len = 0; 1116 bvp[0] = NULL; 1117 ev.vsize = io_list_to_vec(list, ivp+1, bvp+1, cbin, blimit); 1118 ev.vsize++; 1119#if 0 1120 /* This assertion may say something useful, but it can 1121 be falsified during the emulator test suites. */ 1122 ASSERT((ev.vsize >= 0) && (ev.vsize == vsize)); 1123#endif 1124 ev.size = size; /* total size */ 1125 ev.iov = ivp; 1126 ev.binv = bvp; 1127 fpe_was_unmasked = erts_block_fpe(); 1128 (*drv->outputv)((ErlDrvData)p->drv_data, &ev); 1129 erts_unblock_fpe(fpe_was_unmasked); 1130 if (ivp != iv) { 1131 erts_free(ERTS_ALC_T_TMP, (void *) ivp); 1132 } 1133 if (bvp != bv) { 1134 erts_free(ERTS_ALC_T_TMP, (void *) bvp); 1135 } 1136 driver_free_binary(cbin); 1137 } else { 1138 int r; 1139 1140 /* Try with an 8KB buffer first (will often be enough I guess). */ 1141 size = 8*1024; 1142 /* See below why the extra byte is added. */ 1143 buf = erts_alloc(ERTS_ALC_T_TMP, size+1); 1144 r = io_list_to_buf(list, buf, size); 1145 1146 if (r >= 0) { 1147 size -= r; 1148 fpe_was_unmasked = erts_block_fpe(); 1149 (*drv->output)((ErlDrvData)p->drv_data, buf, size); <strong> /*调用inet_drv里面的tcp output*/</strong> 1150 erts_unblock_fpe(fpe_was_unmasked); 1151 erts_free(ERTS_ALC_T_TMP, buf); 1152 } 1153 else if (r == -2) { 1154 erts_free(ERTS_ALC_T_TMP, buf); 1155 goto bad_value; 1156 } 1157 else { 1158 ASSERT(r == -1); /* Overflow */ 1159 erts_free(ERTS_ALC_T_TMP, buf); 1160 if ((size = io_list_len(list)) < 0) { 1161 goto bad_value; 1162 } 1163 1164 /* 1165 * I know drivers that pad space with '\0' this is clearly 1166 * incorrect but I don't feel like fixing them now, insted 1167 * add ONE extra byte. 1168 */ 1169 buf = erts_alloc(ERTS_ALC_T_TMP, size+1); 1170 r = io_list_to_buf(list, buf, size); 1171 fpe_was_unmasked = erts_block_fpe(); 1172 (*drv->output)((ErlDrvData)p->drv_data, buf, size); 1173 erts_unblock_fpe(fpe_was_unmasked); 1174 erts_free(ERTS_ALC_T_TMP, buf); 1175 } 1176 } 1177 p->bytes_out += size; 1178 erts_smp_atomic_add(&erts_bytes_out, size); 1179 1180#ifdef ERTS_SMP 1181 if (p->xports) 1182 erts_smp_xports_unlock(p); 1183 ASSERT(!p->xports); 1184#endif 1185 p->caller = NIL; 1186 return 0; 1187 1188 bad_value: 1189 p->caller = NIL; 1190 { 1191 erts_dsprintf_buf_t *dsbufp = erts_create_logger_dsbuf(); 1192 erts_dsprintf(dsbufp, "Bad value on output port '%s'\n", p->name); 1193 erts_send_error_to_logger_nogl(dsbufp); 1194 return 1; 1195 } 1196}
gent_tcp:send的时候数据的格式是iolist. 很多人会误会,特地把iolist特地变成list或者binary. 新生成的binary或者list在send之后要GC回收, 如果频繁的话,
系统的性能损失很大.
tcp驱动是支持scatter write的, 最终是调用writev系统调用的.所以我们要充分利用这一特性.
我们从上面的代码可以看出, io是按照这样的规则填充writev向量的: 如果iolist的元素是
1. int, 拷贝.
2. binary是heap binary, 拷贝
3. binary是proc binary而且大小<64字节拷贝.
同时tcp是流协议,我们在发送消息的时候, 通常需要在消息前面添加个头, 比如说4个字节的长度. 这个如果手工做的话, 效率非常低.
tcp_driver支持自动加消息长度, 看文档:
{packet, PacketType}(TCP/IP sockets)
Defines the type of packets to use for a socket. The following values are valid:raw | 0
No packaging is done.1 | 2 | 4
Packets consist of a header specifying the number of bytes in the packet, followed by that number of bytes. The length of header can be one, two, or four bytes; containing an unsigned integer in big-endian byte order. Each send operation will generate the header, and the header will be stripped off on each receive operation.In current implementation the 4-byte header is limited to 2Gb.
到此为止, 数据已经打包准备好, 这时候数据就移到到inet_drv驱动来负责了:
inet_drv内部每个socket都有个消息队列, 保持着上层推来的消息. 这个消息队列有上下水位线的. 当消息的字节数目超过了高水位线的时候, inet_drv就把socket标志为busy. 这个busy要到队列的字节数少于
低水位线的时候才解除.
这是未公开的文档,用法参见下面:
inet:setopts(Socket, [{high_watermark, 131072}]).
inet:setopts(Socket, [{low_watermark, 65536}]).
erts/emulator/drivers/common/inet_drv.c
8641static void tcp_inet_drv_output(ErlDrvData data, ErlDrvEvent event) 8642{ 8643 (void)tcp_inet_output((tcp_descriptor*)data, (HANDLE)event); 8644} 8651/* socket ready for ouput: 8652** 1. TCP_STATE_CONNECTING => non block connect ? 8653** 2. TCP_STATE_CONNECTED => write output 8654*/ 8655static int tcp_inet_output(tcp_descriptor* desc, HANDLE event) 8656{ 8657 int ret = 0; 8658 ErlDrvPort ix = desc->inet.port; 8659 8660 DEBUGF(("tcp_inet_output(%ld) {s=%d\r\n", 8661 (long)desc->inet.port, desc->inet.s)); 8662 if (desc->inet.state == TCP_STATE_CONNECTING) { 8663 sock_select(INETP(desc),FD_CONNECT,0); 8664 8665 driver_cancel_timer(ix); /* posssibly cancel a timer */ 8666#ifndef __WIN32__ 8667 /* 8668 * XXX This is strange. This *should* work on Windows NT too, 8669 * but doesn't. An bug in Winsock 2.0 for Windows NT? 8670 * 8671 * See "Unix Netwok Programming", W.R.Stevens, p 412 for a 8672 * discussion about Unix portability and non blocking connect. 8673 */ 8674 8675#ifndef SO_ERROR 8676 { 8677 int sz = sizeof(desc->inet.remote); 8678 int code = sock_peer(desc->inet.s, 8679 (struct sockaddr*) &desc->inet.remote, &sz); 8680 8681 if (code == SOCKET_ERROR) { 8682 desc->inet.state = TCP_STATE_BOUND; /* restore state */ 8683 ret = async_error(INETP(desc), sock_errno()); 8684 goto done; 8685 } 8686 } 8687#else 8688 { 8689 int error = 0; /* Has to be initiated, we check it */ 8690 unsigned int sz = sizeof(error); /* even if we get -1 */ 8691 int code = sock_getopt(desc->inet.s, SOL_SOCKET, SO_ERROR, 8692 (void *)&error, &sz); 8693 8694 if ((code < 0) || error) { 8695 desc->inet.state = TCP_STATE_BOUND; /* restore state */ 8696 ret = async_error(INETP(desc), error); 8697 goto done; 8698 } 8699 } 8700#endif /* SOCKOPT_CONNECT_STAT */ 8701#endif /* !__WIN32__ */ 8702 8703 desc->inet.state = TCP_STATE_CONNECTED; 8704 if (desc->inet.active) 8705 sock_select(INETP(desc),(FD_READ|FD_CLOSE),1); 8706 async_ok(INETP(desc)); 8707 } 8708 else if (IS_CONNECTED(INETP(desc))) { 8709 for (;;) { 8710 int vsize; 8711 int n; 8712 SysIOVec* iov; 8713 8714 if ((iov = driver_peekq(ix, &vsize)) == NULL) { 8715 sock_select(INETP(desc), FD_WRITE, 0); 8716 send_empty_out_q_msgs(INETP(desc)); 8717 goto done; 8718 } 8719 vsize = vsize > MAX_VSIZE ? MAX_VSIZE : vsize; 8720 DEBUGF(("tcp_inet_output(%ld): s=%d, About to send %d items\r\n", 8721 (long)desc->inet.port, desc->inet.s, vsize)); 8722 if (sock_sendv(desc->inet.s, iov, vsize, &n, 0)==SOCKET_ERROR) { 8723 if ((sock_errno() != ERRNO_BLOCK) && (sock_errno() != EINTR)) { 8724 DEBUGF(("tcp_inet_output(%ld): sock_sendv(%d) errno = %d\r\n", 8725 (long)desc->inet.port, vsize, sock_errno())); 8726 ret = tcp_send_error(desc, sock_errno()); 8727 goto done; 8728 } 8729#ifdef __WIN32__ 8730 desc->inet.send_would_block = 1; 8731#endif 8732 goto done; 8733 } 8734 if (driver_deq(ix, n) <= desc->low) { 8735 if (IS_BUSY(INETP(desc))) { 8736 desc->inet.caller = desc->inet.busy_caller; 8737 desc->inet.state &= ~INET_F_BUSY; 8738 set_busy_port(desc->inet.port, 0); 8739 /* if we have a timer then cancel and send ok to client */ 8740 if (desc->busy_on_send) { 8741 driver_cancel_timer(desc->inet.port); 8742 desc->busy_on_send = 0; 8743 } 8744 inet_reply_ok(INETP(desc)); 8745 } 8746 } 8747 } 8748 } 8749 else { 8750 sock_select(INETP(desc),FD_CONNECT,0); 8751 DEBUGF(("tcp_inet_output(%ld): bad state: %04x\r\n", 8752 (long)desc->inet.port, desc->inet.state)); 8753 } 8754 done: 8755 DEBUGF(("tcp_inet_output(%ld) }\r\n", (long)desc->inet.port)); 8756 return ret; 8757} 8758
首先看下文档, 那些行为会对这个数据发送有影响.
inet:setopts(Socket, Options) -> ok | {error, posix()}
{delay_send, Boolean}
Normally, when an Erlang process sends to a socket, the driver will try to immediately send the data. If that fails, the driver will use any means available to queue up the message to be sent whenever the operating system says it can handle it. Setting {delay_send, true} will make all messages queue up. This makes the messages actually sent onto the network be larger but fewer. The option actually affects the scheduling of send requests versus Erlang processes instead of changing any real property of the socket. Needless to say it is an implementation specific option. Default is false.
通常tcp_inet_output的时候, 首先先从队列里找出上次为发送完毕的消息, 尝试发送, 如果发送不能全发送出去. 那么剩下的连同现在的消息入队列.
如果发送成功, 那么看下delay_send标志有没有置位, 如果有直接把消息入队列, 凑成大的消息块, 等下次一起发送.
如果队列里有数据的话, tcp 驱动会把该socket登记等待可写事件,等待事件通知,在合适的时间,等待port再调度写.
{sndbuf, Integer}
Gives the size of the send buffer to use for the socket.
这个标志影响socket在内核协议栈的写缓存区, 越大, 系统调用send就越容易把数据推入协议栈.
{send_timeout, Integer}
Only allowed for connection oriented sockets.
Specifies a longest time to wait for a send operation to be accepted by the underlying TCP stack. When the limit is exceeded, the send operation will return {error,timeout}. How much of a packet that actually got sent is unknown, why the socket should be closed whenever a timeout has occurred (see send_timeout_close). Default is infinity.
{send_timeout_close, Boolean}
Only allowed for connection oriented sockets.Used together with send_timeout to specify whether the socket will be automatically closed when the send operation returns {error,timeout}. The recommended setting is true which will automatically close the socket. Default is false due to backward compatibility.
到现在为止, 数据可能部分在消息队列里面, 部分推到tcp协议栈的buffer中去等待网卡发出去,同时还可能登记着socket的可写事件.
一旦发生可写事件, 运行期 就会调度该socket对应的port来进行进一步的写.
如果一条消息成功的推到协议栈, 那么tcp 驱动会给调用者进程发送{inet_reply,S,Status}消息, 反馈结果. 这时候调用者进程也就是tcp:send返回, 完成了整个流程.
这里有几个要点:
1. port是和进程一样公平调度的. 进程是按照reductions为单位调度的, port是把发送的字节数折合成reductions. 所以如果一个进程发送大量的tcp数据 那么这个进程不是一直会得到执行的. 运行期会强制停止一段时间, 让其他port有机会执行的.
2. gen_tcp的发送是同步的, 也就是说阻塞在receive {inet_reply,S,Status} -> ?DBG_FORMAT(“prim_inet:send() -> ~p~n”, [Status]),Status end上, 这个对发送大量的消息的场合很不利.
更好的做法是: 手工把gen_tcp的2个步骤分开做:
1. 不停的erlang:port_command(S, Data, OptList) 最好加上force标志
2. 被动等待{inet_reply,S,Status} 消息.
具体请参考hotwheels或者rabbitmq项目的代码.
hotwheels/src/pubsun.erl
69handle_cast({publish, Msg}, State) -> 70 io:format("publish,info: ~p~n", [ets:info(State#state.subs)]), 71 {A, B, C} = Start = now(), 72 Msg1 = <<A:32, B:32, C:32, ?MESSAGE, Msg/binary>>, 73 F = fun({_, _, Sock}, _) -> erlang:port_command(Sock, Msg1) end, 74 erlang:process_flag(priority, high), 75 ets:foldl(F, ok, State#state.subs), 76 End = now(), 77 erlang:process_flag(priority, normal), 78 io:format("cost time: ~p~n", [timer:now_diff(End, Start) / 1000]), 79 {noreply, State}; 95 96handle_info({inet_reply, _Sock, _Error}, State) -> 97 io:format("inet reply error: ~p~n", [_Error]), 98 %% there needs to be a reverse lookup from sock to pid 99 {noreply, State}; 100
推论:
gen_tcp:send理论上的效率应该是顶级c程序员写的80%, 如果你低于这个数字, 请按照上面的步骤来排错问题.
参考文章:
http://mryufeng.javaeye.com/blog/475003
http://mryufeng.javaeye.com/blog/289058
http://mryufeng.javaeye.com/blog/288384
http://mryufeng.javaeye.com/blog/366761
http://avindev.javaeye.com/blog/76373
Post Footer automatically generated by wp-posturl plugin for wordpress.
这个 {delay_send, Boolean} 跟 TCP_NODELAY 是什么关系?貌似 ERTS 自己实现了一套组包的机制,这个跟 Nagle 算法得配合起来用才行吧?
原理是一样的, 都是通过累计包, 减少发送的次数. 只不过一个在ERTS, 一个在内核协议栈. 无需结合.
但反过来需要同时关掉,才能起到小包发送的效果吧,类似telnet这种协议?如果要冲吞吐量,得主动加大单包尺寸,这方面有设置么?
delay_send是不主动强制send, 而是等socket可写的时候马上就写。所以如果链路畅通的话 这个延时是几百us. 和tcp_nodelay的200ms不是一回事。
RPC永远都不是一项轻松地工作
呃,好丰富的内容啊。认真拜读…
老大就是老大,分析的就是深刻,受教了
我发现一个gen_tcp的大坑。 我做了个长连接服务, 写压力测试的时候。 网上大量说 一个ip 只能发起5W的连接, 是因为端口数限制了。 我当时就想, 如果服务端多侦听几个端口, 客户端就可以发起更多的连接。
实际测试发现不行, strace 发现 bind 的时候报 address in used.
跟踪代码 gen_tcp.erl connect -> inet_tcp.erl connect -> inet.erl open 发现, open 还是里面调用了 prim_inet:bind 操作。
一个正常的client 发起连接, 不应该调bind的啊, 调了后就会受到端口数限制。 而且不管你是连接多少个服务器, 因为这个是时候, 是bind失败了。
我改用 prim_inet 来实现, 发现不调用 bind 就是不能成功, prim_inet:connect 的时候老是报 badseq错误 。
Yu Feng Reply:
July 8th, 2013 at 9:01 pm
真心是你对网络理解不够。。。
piboyeliu Reply:
July 10th, 2013 at 8:51 pm
为什么?
我也怀疑我哪里搞错了。 但是就是没办法搞定啊。
strace 发现的就是 出现了bind, client 没必要bind端口的啊。
后来用nodejs 就ok了, 一台机器发起50W个连接。
guoming Reply:
November 1st, 2016 at 4:38 pm
好像client也需要bind的,只是不需要你手动去bind