Home > Erlang探索 > 未公开的gen_tcp:unrecv以及接收缓冲区行为分析


原创文章,转载请注明: 转载自系统技术非业余研究

本文链接地址: 未公开的gen_tcp:unrecv以及接收缓冲区行为分析


unrecv(S, Data) when is_port(S) ->
    case inet_db:lookup_socket(S) of
        {ok, Mod} ->
            Mod:unrecv(S, Data);
        Error ->
unrecv(Socket, Data) -> prim_inet:unrecv(Socket, Data).

%% UNRECV(insock(), data) -> ok | {error, Reason}                                                                           
unrecv(S, Data) ->
    case ctl_cmd(S, ?TCP_REQ_UNRECV, Data) of
        {ok, _} -> ok;
        Error  -> Error


/* TCP requests from Erlang */
static int tcp_inet_ctl(ErlDrvData e, unsigned int cmd, char* buf, int len,
                        char** rbuf, int rsize)
    case TCP_REQ_UNRECV: {
        DEBUGF(("tcp_inet_ctl(%ld): UNRECV\r\n", (long)desc->inet.port));
        if (!IS_CONNECTED(INETP(desc)))
            return ctl_error(ENOTCONN, rbuf, rsize);
        tcp_push_buffer(desc, buf, len);
        if (desc->inet.active)
            tcp_deliver(desc, 0);
        return ctl_reply(INET_REP_OK, NULL, 0, rbuf, rsize);

static int tcp_push_buffer(tcp_descriptor* desc, char* buf, int len)
    ErlDrvBinary* bin;

    if (desc->i_buf == NULL) {
        bin = alloc_buffer(len);
        sys_memcpy(bin->orig_bytes, buf, len);
        desc->i_buf = bin;
        desc->i_bufsz = len;
        desc->i_ptr_start = desc->i_buf->orig_bytes;
        desc->i_ptr = desc->i_ptr_start + len;
    else {
        char* start =  desc->i_buf->orig_bytes;
        int sz_before = desc->i_ptr_start - start;
        int sz_filled = desc->i_ptr - desc->i_ptr_start;

        if (len <= sz_before) {
            sys_memcpy(desc->i_ptr_start - len, buf, len);
            desc->i_ptr_start -= len;
        else {
            bin = alloc_buffer(desc->i_bufsz+len);
            sys_memcpy(bin->orig_bytes, buf, len);
            sys_memcpy(bin->orig_bytes+len, desc->i_ptr_start, sz_filled);
            desc->i_bufsz += len;
            desc->i_buf = bin;
            desc->i_ptr_start = bin->orig_bytes;
            desc->i_ptr = desc->i_ptr_start + sz_filled + len;
    desc->i_remain = 0;
    return 0;

实现上很简单,就是透过tcp ctl命令往驱动接收缓冲区里面填数据。
但是什么是gen_tcp接收缓冲区, 它的大小是多大呢?

setopts(Socket, Options) -> ok | {error, posix()}


Socket = term()
Options = [{Opt, Val} | {raw, Protocol, Option, ValueBin}]
Protocol = integer()
OptionNum = integer()
ValueBin = binary()
Opt, Val — see below
Sets one or more options for a socket. The following options are available:

{active, true | false | once}
If the value is true, which is the default, everything received from the socket will be sent as messages to the receiving process. If the value is false (passive mode), the process must explicitly receive incoming data by calling gen_tcp:recv/2,3 or gen_udp:recv/2,3 (depending on the type of socket).

If the value is once ({active, once}), one data message from the socket will be sent to the process. To receive one more message, setopts/2 must be called again with the {active, once} option.

When using {active, once}, the socket changes behaviour automatically when data is received. This can sometimes be confusing in combination with connection oriented sockets (i.e. gen_tcp) as a socket with {active, false} behaviour reports closing differently than a socket with {active, true} behaviour. To make programming easier, a socket where the peer closed and this was detected while in {active, false} mode, will still generate the message {tcp_closed,Socket} when set to {active, once} or {active, true} mode. It is therefore safe to assume that the message {tcp_closed,Socket}, possibly followed by socket port termination (depending on the exit_on_close option) will eventually appear when a socket changes back and forth between {active, true} and {active, false} mode. However, when peer closing is detected is all up to the underlying TCP/IP stack and protocol.

Note that {active,true} mode provides no flow control; a fast sender could easily overflow the receiver with incoming messages. Use active mode only if your high-level protocol provides its own flow control (for instance, acknowledging received messages) or the amount of data exchanged is small. {active,false} mode or use of the {active, once} mode provides flow control; the other side will not be able send faster than the receiver can read.
{packet, PacketType}(TCP/IP sockets)
Defines the type of packets to use for a socket. The following values are valid:

raw | 0
No packaging is done.

1 | 2 | 4
Packets consist of a header specifying the number of bytes in the packet, followed by that number of bytes. The length of header can be one, two, or four bytes; containing an unsigned integer in big-endian byte order. Each send operation will generate the header, and the header will be stripped off on each receive operation.

In current implementation the 4-byte header is limited to 2Gb.

asn1 | cdr | sunrm | fcgi | tpkt | line
These packet types only have effect on receiving. When sending a packet, it is the responsibility of the application to supply a correct header. On receiving, however, there will be one message sent to the controlling process for each complete packet received, and, similarly, each call to gen_tcp:recv/2,3 returns one complete packet. The header is not stripped off.

The meanings of the packet types are as follows:
asn1 – ASN.1 BER,
sunrm – Sun’s RPC encoding,
cdr – CORBA (GIOP 1.1),
fcgi – Fast CGI,
tpkt – TPKT format [RFC1006],
line – Line mode, a packet is a line terminated with newline, lines longer than the receive buffer are truncated.

http | http_bin
The Hypertext Transfer Protocol. The packets are returned with the format according to HttpPacket described in erlang:decode_packet/3. A socket in passive mode will return {ok, HttpPacket} from gen_tcp:recv while an active socket will send messages like {http, Socket, HttpPacket}.

Note that the packet type httph is not needed when reading from a socket.

{packet_size, Integer}(TCP/IP sockets)
Sets the max allowed length of the packet body. If the packet header indicates that the length of the packet is longer than the max allowed length, the packet is considered invalid. The same happens if the packet header is too big for the socket receive buffer.

{recbuf, Integer}
Gives the size of the receive buffer to use for the socket.


#define INET_DEF_BUFFER     1460        /* default buffer size */
#define INET_MIN_BUFFER     1           /* internal min buffer */

#define TCP_MAX_PACKET_SIZE 0x4000000  /* 64 M */

/* LOPT is local options */
#define INET_LOPT_BUFFER      20  /* min buffer size hint */

typedef struct {
       unsigned int psize;         /* max packet size (TCP only?) */
       int   bufsz;                /* minimum buffer constraint */
} inet_descriptor;

typedef struct {
    inet_descriptor inet;       /* common data structure (DON'T MOVE) */
    int   i_bufsz;              /* current input buffer size (<= bufsz) */
    ErlDrvBinary* i_buf;        /* current binary buffer */
    char*         i_ptr;        /* current pos in buf */
    char*         i_ptr_start;  /* packet start pos in buf */
    int           i_remain;     /* remaining chars to read */
} tcp_descriptor;

static int inet_set_opts(inet_descriptor* desc, char* ptr, int len)
        case INET_LOPT_BUFFER:
            DEBUGF(("inet_set_opts(%ld): s=%d, BUFFER=%d\r\n",
                    (long)desc->port, desc->s, ival));
            if (ival < INET_MIN_BUFFER) ival = INET_MIN_BUFFER;
            desc->bufsz = ival;
	if (type == SO_RCVBUF) {
            /* make sure we have desc->bufsz >= SO_RCVBUF */
            if (ival > desc->bufsz)
                desc->bufsz = ival;
            DEBUGF(("inet_set_opts(%ld): s=%d, PACKET_SIZE=%d\r\n",
                    (long)desc->port, desc->s, ival));
            desc->psize = (unsigned int)ival;

static int inet_fill_opts(inet_descriptor* desc,
                          char* buf, int len, char** dest, int destlen)
            type = SO_RCVBUF;

从文档和代码结合中可以看出,一个TCP报文的最大大小由{packet_size, Integer}决定,最大不超过64M. 每个TCP报文由一定的头,比如(1 | 2 | 4)字节的报文长度,由{packet, PacketType}决定。


enc_opt(buffer)          -> ?INET_LOPT_BUFFER;

以及{recbuf, Integer},接收缓冲区的默认大小取他们中间最大的一个,在默认情况下是1460,一个TCP段的大小。



** Deliver all packets ready                                                                                                
** if len == 0 then check start with a check for ready packet                                                               
static int tcp_deliver(tcp_descriptor* desc, int len)
 while (len > 0) {
        int code = 0;

        inet_input_count(INETP(desc), len);

        /* deliver binary? */
        if (len*4 >= desc->i_buf->orig_size*3) { /* >=75% */
            /* something after? */
            if (desc->i_ptr_start + len == desc->i_ptr) { /* no */
                code = tcp_reply_binary_data(desc, desc->i_buf,
                                             (desc->i_ptr_start -
     else { /* move trail to beginning of a new buffer */
                ErlDrvBinary* bin;
                char* ptr_end = desc->i_ptr_start + len;
                int sz = desc->i_ptr - ptr_end;

                bin = alloc_buffer(desc->i_bufsz);
                memcpy(bin->orig_bytes, ptr_end, sz);

                code = tcp_reply_binary_data(desc, desc->i_buf,
                desc->i_buf = bin;
                desc->i_ptr_start = desc->i_buf->orig_bytes;
                desc->i_ptr = desc->i_ptr_start + sz;
                desc->i_remain = 0;
   else {
            code = tcp_reply_data(desc, desc->i_ptr_start, len);
            /* XXX The buffer gets thrown away on error  (code < 0)    */
            /* Windows needs workaround for this in tcp_inet_event...  */
            desc->i_ptr_start += len;
            if (desc->i_ptr_start == desc->i_ptr)
                desc->i_remain = 0;





接下来我们看下分析下unrecv的用途,首先我们参考下misultin小型的erlang web服务器,项目在 这里

$ grep -rin setopts .
./misultin_socket.erl:106:	inet:setopts(Sock, [{active, once}]),
./misultin_socket.erl:130:	inet:setopts(Sock, [{active, once}]),
./misultin_socket.erl:194:      inet:setopts(Sock, [{packet, raw}, {active, false}]),
./misultin_socket.erl:202:	inet:setopts(Sock, [{packet, http}]),
$ grep -rin gen_tcp:recv  .
./misultin_socket.erl:195:      case gen_tcp:recv(Sock, Len, RecvTimeout) of


比如某个报文是用 [报文体]+ 回车行+,类似底下这样的报文


由于事先不知道报文的准确长度,我们就设成{packet,raw}, {active, false},
但是我们预读取的数据可能会超出报文的大小,部分回车行分割的数据已经被读取出来了,利用{packet, line}来分析就不正确了。所以我们用unrecv把这段数据还回去,就可以了,方便之门就打开了。




Post Footer automatically generated by wp-posturl plugin for wordpress.

Categories: Erlang探索 Tags: , ,
  1. pengyafu
    August 15th, 2014 at 13:59 | #1



    Yu Feng Reply:



  1. No trackbacks yet.