Archive

Archive for April, 2010

Shell Break模式下 ‘o’ 查看port消息

April 7th, 2010 2 comments

原创文章,转载请注明: 转载自Erlang非业余研究

本文链接地址: Shell Break模式下 ‘o’ 查看port消息

这是未公开的一个特性, 很方便查看Erlang内部的port使用状态.

演示下:

root@ubuntu:~# erl
Erlang R13B04 (erts-5.7.5) 1 [smp:2:2] [rq:2] [async-threads:0] [hipe] [kernel-poll:false]

Eshell V5.7.5  (abort with ^G)
1>
BREAK: (a)bort (c)ontinue (p)roc info (i)nfo (l)oaded
       (v)ersion (k)ill (D)b-tables (d)istribution
o
=port:#Port<0.1>
Slot: 1
Connected: <0.3.0>
Links: <0.3.0>
Port controls linked-in driver: efile
=port:#Port<0.49>
Slot: 49
Connected: <0.18.0>
Links: <0.18.0>
Port controls linked-in driver: efile
=port:#Port<0.306>
Slot: 306
Connected: <0.21.0>
Links: <0.21.0>
Port is UNIX fd not opened by emulator: 2/2
=port:#Port<0.315>
Slot: 315
Connected: <0.23.0>
Links: <0.23.0>
Port controls linked-in driver: tty_sl -c -e

Post Footer automatically generated by wp-posturl plugin for wordpress.

Categories: Erlang探索 Tags:

Erlang网络多进程模型的实验

April 7th, 2010 No comments

原创文章,转载请注明: 转载自Erlang非业余研究

本文链接地址: Erlang网络多进程模型的实验

在做网络程序的时候我们会经常用到多进程模式. 主进程执行bind调用得到句柄后, 同时fork N个子进程, 把句柄传递给子进程, 多进程同时accept来处理.
这个模型在erlang下很有现实意义的. 在之前的测试中,我们演示了erlang的单处理器模式的威力,最多的时候在单cpu上可以发起40,000个tcp广播包.
但是erlang如何利用这个能力呢? 其实Erlang的port也是靠fork来实现的, 是支持这个能力的, 只不过官方的版本会在fork的时候, 把继承过来的句柄全部关闭.

让我们crack下代码来绕过这个问题.
erts/emulator/sys/unix/sys.c

1513            if(0) /*/*fork路径*/*/
1514              for (i = opts->use_stdio ? 3 : 5; i < max_files; i++)
1515                (void) close(i);
...
1581        fprintf(stderr, "cracked\n");    /*vfork路径*/
1582        sprintf(fd_close_range, "%d:%d", opts->use_stdio ? 3 : 5, opts->use_stdio ? 3 : 5);

记得重新make && make install

我们下面的代码演示在主进程把一个tcp句柄传递过去, 然后在子进程中恢复成gen_tcp.

root@nd-desktop:~/otp/test# ls *.erl
child.erl  test.erl
root@ubuntu:~/otp/test# erlc *.erl

root@nd-desktop:~/otp/test# cat test.erl

-module(test).
-export([start/0]).
start()->
    process_flag(trap_exit, true),
    {ok, Sock} = gen_tcp:listen(0, []),
    {ok, Handle} = inet:getfd(Sock),
    Command ="erl -noshell -s child -handle " ++ integer_to_list(Handle),
    io:format("child command line: ~p~n", [Command]),
    Child = case (catch open_port({spawn, Command}, [in, {line, 256}])) of
                {'EXIT', Reason}->
                    io:format("open child error, reason: ~p~n", [Reason]),
                    halt(1);
                Port-> Port
            end,
    register(?MODULE, self()),
    io:format("STOP ME: test!stop. ~n",[]),
    loop(Child),
    io:format("bye~n",[]).

loop(Child)->
    receive
        {Child, {data, Result}} ->
            io:format("child say: ~p~n", [Result]),
            loop(Child);
        stop->
            halt(0);
        Other->
            io:format("got msg: ~p~n", [Other]),
            loop(Child)
    end.

root@nd-desktop:~/otp/test# cat child.erl

-module(child).
-export([start/0]).

start()->
    {ok, [[HandleArg|_]|_]} = init:get_argument(handle),
    Handle = list_to_integer(HandleArg),
    io:format("handle: ~w~n", [Handle]),
    case gen_tcp:fdopen(Handle, []) of
        {ok, Socket} ->
            io:format("got socket ok: ~p~n", [Socket]);
        _ ->
            io:format("got socket fail~n", [])
    end,

    halt(0).
root@ubuntu:~/otp/test# erl -noshell -s test
child command line: "erl -noshell -s child -handle 8"
cracked
STOP ME: test!stop.
child say: {eol,"handle: 8"}
child say: {eol,"got socket ok: #Port<0.354>"}
got msg: {'EXIT',#Port<0.360>,normal}
^C
BREAK: (a)bort (c)ontinue (p)roc info (i)nfo (l)oaded
       (v)ersion (k)ill (D)b-tables (d)istribution

Bingo! 成功实现目的!

这里给大家一个思路就是说明fork是可行的, 如果你有这个需求把上面的fork patch做好点就行.

Post Footer automatically generated by wp-posturl plugin for wordpress.

Categories: Erlang探索 Tags: ,

获取Erlang系统信息的代码片段

April 7th, 2010 1 comment

原创文章,转载请注明: 转载自Erlang非业余研究

本文链接地址: 获取Erlang系统信息的代码片段

从lib/megaco/src/tcp/megaco_tcp_connection.erl摘抄的代码, 挺详细的关于系统的信息:

    SchedId      = erlang:system_info(scheduler_id),
    SchedNum     = erlang:system_info(schedulers),
    ProcCount    = erlang:system_info(process_count),
    ProcLimit    = erlang:system_info(process_limit),
    ProcMemUsed  = erlang:memory(processes_used),
    ProcMemAlloc = erlang:memory(processes),
    MemTot       = erlang:memory(total),
    io:format("abormal termination: "
              "~n   Scheduler id:                         ~p"
              "~n   Num scheduler:                        ~p"
              "~n   Process count:                        ~p"
              "~n   Process limit:                        ~p"
              "~n   Memory used by erlang processes:      ~p"
              "~n   Memory allocated by erlang processes: ~p"
              "~n   The total amount of memory allocated: ~p"
              "~n~p",
              [SchedId, SchedNum, ProcCount, ProcLimit,
               ProcMemUsed, ProcMemAlloc, MemTot, Reason]),
    ok.

Post Footer automatically generated by wp-posturl plugin for wordpress.

emacs msf-abbrev写c程序 (火箭一样快)

April 6th, 2010 No comments

原创文章,转载请注明: 转载自Erlang非业余研究

本文链接地址: emacs msf-abbrev写c程序 (火箭一样快)

看图不说话:

有兴趣的同学google之!

Post Footer automatically generated by wp-posturl plugin for wordpress.

Categories: 杂七杂八 Tags: ,

iolist跟list有什么区别?

April 6th, 2010 4 comments

原创文章,转载请注明: 转载自Erlang非业余研究

本文链接地址: iolist跟list有什么区别?

看到erlang-china.org上有个帖子在问这个问题

一直不太明白iolist,跟list到底有什么区别?请各位大侠指教。。

我翻看了半天erlang的文档也没写的太明白,所以就看看源码:
erts/emulator/beam/utils.c

3015int io_list_len(Eterm obj)
3016{
3017    Eterm* objp;
3018    Sint len = 0;
3019    DECLARE_ESTACK(s);
3020    goto L_again;
3021
3022    while (!ESTACK_ISEMPTY(s)) {
3023        obj = ESTACK_POP(s);
3024    L_again:
3025        if (is_list(obj)) {
3026        L_iter_list:
3027            objp = list_val(obj);
3028            /* Head */
3029            obj = CAR(objp);
3030            if (is_byte(obj)) {
3031                len++;
3032            } else if (is_binary(obj) && binary_bitsize(obj) == 0) {
3033                len += binary_size(obj);
3034            } else if (is_list(obj)) {
3035                ESTACK_PUSH(s, CDR(objp));
3036                goto L_iter_list; /* on head */
3037            } else if (is_not_nil(obj)) {
3038                goto L_type_error;
3039            }
3040            /* Tail */
3041            obj = CDR(objp);
3042            if (is_list(obj))
3043                goto L_iter_list; /* on tail */
3044            else if (is_binary(obj) && binary_bitsize(obj) == 0) {
3045                len += binary_size(obj);
3046            } else if (is_not_nil(obj)) {
3047                goto L_type_error;
3048            }
3049        } else if (is_binary(obj) && binary_bitsize(obj) == 0) { /* Tail was binary */
3050            len += binary_size(obj);
3051        } else if (is_not_nil(obj)) {
3052            goto L_type_error;
3053        }
3054    }
3055
3056    DESTROY_ESTACK(s);
3057    return len;
3058
3059 L_type_error:
3060    DESTROY_ESTACK(s);
3061    return -1;
3062}

从源码可以看出来iolist是这样的定义的:
1. []
2. binary
3. 列表, 每个元素是int(0-255)或者binary或者iolist.
其中binary是指 bitsize % 8 == 0 .
int 是0-255

root@ubuntu:/usr/src/otp# erl
Erlang R13B04 (erts-5.7.5) 1 [smp:2:2] [rq:2] [async-threads:0] [hipe] [kernel-poll:false]

Eshell V5.7.5  (abort with ^G)
2> iolist_size(<<>>).
0
3> iolist_size(<<1:1>>).
** exception error: bad argument
 in function  iolist_size/1
 called as iolist_size(<<1:1>>)
4> iolist_size(<<1:8>>).
1
5> iolist_size([]).
0
6> iolist_size(<<1,2>>).
2
7> iolist_size([1,2]).
2
8> iolist_size([1,2, <<1,2>>]).
4
9> iolist_size([1,2, <<1,2>>, [2]]).
5
10> iolist_size([1,2, <<1,2>>, [2]]).
5
11> iolist_size([<<1:1>>]).
** exception error: bad argument
 in function  iolist_size/1
 called as iolist_size([<<1:1>>])
12> iolist_size([257]).
** exception error: bad argument
     in function  iolist_size/1
        called as iolist_size([257])

Iolist的作用是用于往port送数据的时候.由于底层的系统调用如writev支持向量写, 就避免了无谓的iolist_to_binary这样的扁平话操作, 避免了内存拷贝,极大的提高了效率.
建议多用.

Post Footer automatically generated by wp-posturl plugin for wordpress.

Categories: Erlang探索 Tags: ,

erlang:send_after和erlang:start_timer的使用解释

April 6th, 2010 1 comment

原创文章,转载请注明: 转载自Erlang非业余研究

本文链接地址: erlang:send_after和erlang:start_timer的使用解释

前段时间arksea同学提出这个问题, 因为文档里面写的很不明白.

erlang:send_after(Time, Dest, Msg) -> TimerRef
Time = int()
0 <= Time <= 4294967295
Dest = pid() | RegName
LocalPid = pid() (of a process, alive or dead, on the local node)
Msg = term()
TimerRef = ref()
Starts a timer which will send the message Msg to Dest after Time milliseconds.

If Dest is an atom, it is supposed to be the name of a registered process. The process referred to by the name is looked up at the time of delivery. No error is given if the name does not refer to a process.

If Dest is a pid, the timer will be automatically canceled if the process referred to by the pid is not alive, or when the process exits. This feature was introduced in erts version 5.4.11. Note that timers will not be automatically canceled when Dest is an atom.

See also erlang:start_timer/3, erlang:cancel_timer/1, and erlang:read_timer/1.

Failure: badarg if the arguments does not satisfy the requirements specified above.

erlang:start_timer(Time, Dest, Msg) -> TimerRef
Time = int()
0 <= Time <= 4294967295
Dest = LocalPid | RegName
LocalPid = pid() (of a process, alive or dead, on the local node)
RegName = atom()
Msg = term()
TimerRef = ref()
Starts a timer which will send the message {timeout, TimerRef, Msg} to Dest after Time milliseconds.

If Dest is an atom, it is supposed to be the name of a registered process. The process referred to by the name is looked up at the time of delivery. No error is given if the name does not refer to a process.

If Dest is a pid, the timer will be automatically canceled if the process referred to by the pid is not alive, or when the process exits. This feature was introduced in erts version 5.4.11. Note that timers will not be automatically canceled when Dest is an atom.

See also erlang:send_after/3, erlang:cancel_timer/1, and erlang:read_timer/1.

Failure: badarg if the arguments does not satisfy the requirements specified above.

表面上看这2个API没有什么大的差别,使用上也一样, 那为什么要搞二个呢? 好奇怪!

好, 让我们来好好研究下典型应用.

这2个API都返回 TimerRef. 用户可以用这个TimerRef来取消定时器. 唯一的差别是在超时的时候发送的消息不同: send_after是Msg, start_timer是{timeout, TimerRef, Msg}.
问题就出在取消timer的时候. 如果这个timer还没有超时的时候, 那么取消就没问题. 如果超时了麻烦就来了, 这个消息已经有可能已经被放到目标进程的消息队列里,等待派遣处理了.

这时候send_after里面存放的是Msg, 那用户如何知道Msg是对于那个TimerRef的呢? 读者可能说, 那我可以在消息里面加入TimerRef. 这个主意不错, 但是问题是在send_after调用返回之前, 你是无法得到TimerRef, 当然也就无从构造这个消息, 那就无法处理这个可能的超时信息, 就会破坏逻辑.
所以erts version 5.4.11 引入了, start_timer来解决这个问题. 它是自动的在超时后, 要发送消息前, 在消息里面添加了{timeout, TimerRef, Msg}, 达到识别的目的.

结论: 文档里面一眼带过的东西, 其实是有很多设计方面的考虑, 要认真考虑它的存在的意义.

Post Footer automatically generated by wp-posturl plugin for wordpress.

Categories: Erlang探索 Tags: ,

gen_tcp:send的深度解刨和使用指南(初稿)

April 5th, 2010 6 comments

原创文章,转载请注明: 转载自Erlang非业余研究

本文链接地址: gen_tcp:send的深度解刨和使用指南(初稿)

在大家的印象中, gen_tcp:send是个很朴素的函数, 一调用数据就喀嚓喀嚓到了对端. 这是个很大的误解, Erlang的otp文档写的很不清楚. 而且这个功能对于大部分的网络程序是至关重要的, 它的使用对否极大了影响了应用的性能. 我听到很多同学在抱怨erlang的性能低或者出了很奇怪的问题, 很多是由于对系统的不了解, 误用的. 我下面就来解刨下, 文章很长, 而且需要读者熟悉erlang和底层的知识, 跟我来吧.

这篇文章是基于Erlang R13B04这个版本写的.

以下是从gen_tcp文档中摘抄的:

gen_tcp:send(Socket, Packet) -> ok | {error, Reason}
* Socket = socket()
* Packet =

[char()] | binary()
* Reason = posix()
* Sends a packet on a socket.

There is no send call with timeout option, you use the send_timeout socket option if timeouts are desired. See the examples section.

典型的使用如下:

client(PortNo,Message) ->
{ok,Sock} = gen_tcp:connect("localhost",PortNo,[{active,false},
{packet,2}]),
gen_tcp:send(Sock,Message),
A = gen_tcp:recv(Sock,0),
gen_tcp:close(Sock),
A.

很简单是把? 乍一看确实很简单, 但是这是迷惑人的开始.

我们上源代码:

lib/kernel/src/gen_tcp.erl

124send(S, Packet) when is_port(S) ->    %这里可以看出 S是个port
125    case inet_db:lookup_socket(S) of
126        {ok, Mod} ->                  %Mod可能是inet_tcp.erl 或者  inet6_tcp.erl
127            Mod:send(S, Packet);
128        Error ->
129            Error
130    end.

lib/kernel/src/inet_tcp.erl

 49send(Socket, Packet, Opts) -> prim_inet:send(Socket, Packet, Opts). %转给prim_inet模块
 50send(Socket, Packet) -> prim_inet:send(Socket, Packet, []).

erts/preloaded/src/prim_inet.erl

 360send(S, Data, OptList) when is_port(S), is_list(OptList) ->
 361    ?DBG_FORMAT("prim_inet:send(~p, ~p)~n", [S,Data]),
 362    try erlang:port_command(S, Data, OptList) of     <strong>%推给底层的port模块来处理</strong>
 363        false -> % Port busy and nosuspend option passed
 364            ?DBG_FORMAT("prim_inet:send() -> {error,busy}~n", []),
 365            {error,busy};
 366        true -> <strong>% Port模块接受数据</strong>
 367            receive
 368                {inet_reply,S,Status} ->  <strong>%阻塞, 等待回应</strong>
 369                    ?DBG_FORMAT("prim_inet:send() -> ~p~n", [Status]),
 370                    Status
 371            end
 372    catch
 373        error:_Error ->
 374            ?DBG_FORMAT("prim_inet:send() -> {error,einval}~n", []),
 375             {error,einval}
 376    end.
 377
 378send(S, Data) ->
 379    send(S, Data, []).

从上面这几段代码我们可以看出,当我们调用gen_tcp:send的时候, kernel模块会根据gen_tcp socket的类型决定调用相应的模块. 这个模块要么是inet_tcp, 要么是inet6_tcp. 这个模块会把发送请求委托给
prim_inet模块. prim_inet模块首先检查Socket是否合法, 如果合法然后调用erlang:port_command把系统推到ERTS运行期.
这个推的结果有2个: 1. 成功, 进程挂起等待运行期的反馈. 2. 失败,立即返回.
什么情况下会失败呢?
1. 驱动不支持soft_busy, 但是我们用了force标志
2. 驱动已经busy了, 但是我们不允许进程挂起.

我们先看相关的文档和代码:

erlang:port_command(Port, Data, OptionList) -> true|false

If the port command is aborted false is returned; otherwise, true is returned.

If the port is busy, the calling process will be suspended until the port is not busy anymore.

Currently the following Options are valid:

force
The calling process will not be suspended if the port is busy; instead, the port command is forced through. The call will fail with a notsup exception if the driver of the port does not support this. For more information see the ERL_DRV_FLAG_SOFT_BUSY driver flag.

nosuspend
The calling process will not be suspended if the port is busy; instead, the port command is aborted and false is returned.

关于busy_port可以参见文档

erlang:system_monitor(MonitorPid, [Option]) -> MonSettings
busy_port
If a process in the system gets suspended because it sends to a busy port, a message {monitor, SusPid, busy_port, Port} is sent to MonitorPid. SusPid is the pid that got suspended when sending to Port.

erts/emulator/beam/erl_bif_port.c

 215BIF_RETTYPE port_command_2(BIF_ALIST_2)
 216{
 217    return do_port_command(BIF_P, BIF_ARG_1, BIF_ARG_2, NIL, 0);
 218}
 219
 220BIF_RETTYPE port_command_3(BIF_ALIST_3)
 221{
 222    Eterm l = BIF_ARG_3;
 223    Uint32 flags = 0;
 224    while (is_list(l)) {
 225        Eterm* cons = list_val(l);
 226        Eterm car = CAR(cons);
        <strong>    /*处理force和no_suspend选项*/</strong>
 227        if (car == am_force) {
 228            flags |= ERTS_PORT_COMMAND_FLAG_FORCE;
 229        } else if (car == am_nosuspend) {
 230            flags |= ERTS_PORT_COMMAND_FLAG_NOSUSPEND;
 231        } else {
 232            BIF_ERROR(BIF_P, BADARG);
 233        }
 234        l = CDR(cons);
 235    }
 236    if(!is_nil(l)) {
 237        BIF_ERROR(BIF_P, BADARG);
 238    }
 239    return do_port_command(BIF_P, BIF_ARG_1, BIF_ARG_2, BIF_ARG_3, flags);
 240}

 121#define ERTS_PORT_COMMAND_FLAG_FORCE            (((Uint32) 1) << 0)
 122#define ERTS_PORT_COMMAND_FLAG_NOSUSPEND        (((Uint32) 1) << 1)
 123
 124static BIF_RETTYPE do_port_command(Process *BIF_P,
 125                                   Eterm BIF_ARG_1,
 126                                   Eterm BIF_ARG_2,
 127                                   Eterm BIF_ARG_3,
 128                                   Uint32 flags)
 129{
 130    BIF_RETTYPE res;
 131    Port *p;
 132
 133    /* Trace sched out before lock check wait */
 134    if (IS_TRACED_FL(BIF_P, F_TRACE_SCHED_PROCS)) {
 135        trace_virtual_sched(BIF_P, am_out);
 136    }
 137
 138    if (erts_system_profile_flags.runnable_procs && erts_system_profile_flags.exclusive) {
 139        profile_runnable_proc(BIF_P, am_inactive);
 140    }
 141
 142    p = id_or_name2port(BIF_P, BIF_ARG_1);
 143    if (!p) {
 144        if (IS_TRACED_FL(BIF_P, F_TRACE_SCHED_PROCS)) {
 145            trace_virtual_sched(BIF_P, am_in);
 146        }
 147        if (erts_system_profile_flags.runnable_procs && erts_system_profile_flags.exclusive) {
 148            profile_runnable_proc(BIF_P, am_active);
 149        }
 150        BIF_ERROR(BIF_P, BADARG);
 151    }
 152
 153    /* Trace port in, id_or_name2port causes wait */
 154
 155    if (IS_TRACED_FL(p, F_TRACE_SCHED_PORTS)) {
 156        trace_sched_ports_where(p, am_in, am_command);
 157    }
 158    if (erts_system_profile_flags.runnable_ports && !erts_port_is_scheduled(p)) {
 159        profile_runnable_port(p, am_active);
 160    }
 161
 162    ERTS_BIF_PREP_RET(res, am_true);
 163
 164    if ((flags & ERTS_PORT_COMMAND_FLAG_FORCE)
 165        && !(p->drv_ptr->flags & ERL_DRV_FLAG_SOFT_BUSY)) {
 166        ERTS_BIF_PREP_ERROR(res, BIF_P, EXC_NOTSUP);
 167    }
 168    else if (!(flags & ERTS_PORT_COMMAND_FLAG_FORCE)
 169             && p->status & ERTS_PORT_SFLG_PORT_BUSY) {
 170        if (flags & ERTS_PORT_COMMAND_FLAG_NOSUSPEND) {
 171            ERTS_BIF_PREP_RET(res, am_false);
 172        }
 173        else {<strong>/*挂起调用者进程, 同时发送busy_port*/</strong>
 174            erts_suspend(BIF_P, ERTS_PROC_LOCK_MAIN, p);
 175            if (erts_system_monitor_flags.busy_port) {
 176                monitor_generic(BIF_P, am_busy_port, p->id);
 177            }
 178            ERTS_BIF_PREP_YIELD3(res, bif_export[BIF_port_command_3], BIF_P,
 179                                 BIF_ARG_1, BIF_ARG_2, BIF_ARG_3);
 180        }
 181    } else {
 182        int wres;
 183        erts_smp_proc_unlock(BIF_P, ERTS_PROC_LOCK_MAIN);
 184        ERTS_SMP_CHK_NO_PROC_LOCKS;
 185        wres = erts_write_to_port(BIF_P->id, p, BIF_ARG_2);
 186        erts_smp_proc_lock(BIF_P, ERTS_PROC_LOCK_MAIN);
 187        if (wres != 0) {
 188            ERTS_BIF_PREP_ERROR(res, BIF_P, BADARG);
 189        }
 190    }
 191
 192    if (IS_TRACED_FL(p, F_TRACE_SCHED_PORTS)) {
 193        trace_sched_ports_where(p, am_out, am_command);
 194    }
 195    if (erts_system_profile_flags.runnable_ports && !erts_port_is_scheduled(p)) {
 196        profile_runnable_port(p, am_inactive);
 197    }
 198
 199    erts_port_release(p);
 200    /* Trace sched in after port release */
 201    if (IS_TRACED_FL(BIF_P, F_TRACE_SCHED_PROCS)) {
 202        trace_virtual_sched(BIF_P, am_in);
 203    }
 204    if (erts_system_profile_flags.runnable_procs && erts_system_profile_flags.exclusive) {
 205        profile_runnable_proc(BIF_P, am_active);
 206    }
 207
 208    if (ERTS_PROC_IS_EXITING(BIF_P)) {
 209        KILL_CATCHES(BIF_P);    /* Must exit */
 210        ERTS_BIF_PREP_ERROR(res, BIF_P, EXC_ERROR);
 211    }
 212    return res;
 213}
 214

erts/emulator/drivers/common/inet_drv.c

865
866static struct erl_drv_entry tcp_inet_driver_entry =
867{
868    tcp_inet_init,  /* inet_init will add this driver !! */
869    tcp_inet_start,
870    tcp_inet_stop,
871    tcp_inet_command,
872#ifdef __WIN32__
873    tcp_inet_event,
874    NULL,
875#else
876    tcp_inet_drv_input,
877    tcp_inet_drv_output,
878#endif
879    "tcp_inet",
880    NULL,
881    NULL,
882    tcp_inet_ctl,
883    tcp_inet_timeout,
884    tcp_inet_commandv,
885    NULL,
886    tcp_inet_flush,
887    NULL,
888    NULL,
889    ERL_DRV_EXTENDED_MARKER,
890    ERL_DRV_EXTENDED_MAJOR_VERSION,
891    ERL_DRV_EXTENDED_MINOR_VERSION,
892    ERL_DRV_FLAG_USE_PORT_LOCKING|ERL_DRV_FLAG_SOFT_BUSY,<strong>  /*我们的tcp 驱动支持soft_busy*/</strong>
893    NULL,
894    tcp_inet_process_exit,
895    inet_stop_select
896};

897

在tcp:send 虚拟机执行这个层面上,  调用者进程被挂起有以下几种可能:

1. 数据成功推到ERTS, 等待ERTS的发送结果通知. 这是大多数情况.

2. 该socket忙, 我们没有设定port_command的force标志.

3. 调用者进程发送了大量的数据, 时间片用完被运行期挂起.

失败的可能:  我们设定了nosuspend标志, 但是socket忙.

到此为止运行期顺利开始调用erts_write_to_port把数据传递到下一层去了:

我们的疑问是数据组织的, 运行期会对数据如何处理呢? 继续看程序

erts/emulator/beam/io.c

<strong>1054#define ERL_SMALL_IO_BIN_LIMIT (4*ERL_ONHEAP_BIN_LIMIT) /* #define ERL_ONHEAP_BIN_LIMIT 64*/
1055#define SMALL_WRITE_VEC  16</strong>
1056
1057
1058/* write data to a port */
1059int erts_write_to_port(Eterm caller_id, Port *p, Eterm list)
1060{
1061    char *buf;
1062    erts_driver_t *drv = p->drv_ptr;
1063    int size;
1064    int fpe_was_unmasked;
1065
1066    ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(p));
1067    ERTS_SMP_CHK_NO_PROC_LOCKS;
1068
1069    p->caller = caller_id;
1070    if (drv->outputv != NULL) {
1071        int vsize;
1072        int csize;
1073        int pvsize;
1074        int pcsize;
1075        int blimit;
1076        SysIOVec iv[SMALL_WRITE_VEC];   /*最多16个段*/
1077        ErlDrvBinary* bv[SMALL_WRITE_VEC];
1078        SysIOVec* ivp;
1079        ErlDrvBinary**  bvp;
1080        ErlDrvBinary* cbin;
1081        ErlIOVec ev;
1082
1083        if ((size = io_list_vec_len(list, &vsize, &csize,
1084                                    ERL_SMALL_IO_BIN_LIMIT,
1085                                    &pvsize, &pcsize)) < 0) {
1086            goto bad_value;
1087        }
1088        /* To pack or not to pack (small binaries) ...? */
1089        vsize++;
1090        if (vsize <= SMALL_WRITE_VEC) {
1091            /* Do NOT pack */
1092            blimit = 0;
1093        } else {
1094            /* Do pack */
1095            vsize = pvsize + 1;
1096            csize = pcsize;
1097            blimit = ERL_SMALL_IO_BIN_LIMIT;
1098        }
1099        /* Use vsize and csize from now on */
1100        if (vsize <= SMALL_WRITE_VEC) {
1101            ivp = iv;
1102            bvp = bv;
1103        } else {
1104            ivp = (SysIOVec *) erts_alloc(ERTS_ALC_T_TMP,
1105                                          vsize * sizeof(SysIOVec));
1106            bvp = (ErlDrvBinary**) erts_alloc(ERTS_ALC_T_TMP,
1107                                              vsize * sizeof(ErlDrvBinary*));
1108        }
1109        cbin = driver_alloc_binary(csize);
1110        if (!cbin)
1111            erts_alloc_enomem(ERTS_ALC_T_DRV_BINARY, ERTS_SIZEOF_Binary(csize));
1112
1113        /* Element 0 is for driver usage to add header block */
1114        ivp[0].iov_base = NULL;
1115        ivp[0].iov_len = 0;
1116        bvp[0] = NULL;
1117        ev.vsize = io_list_to_vec(list, ivp+1, bvp+1, cbin, blimit);
1118        ev.vsize++;
1119#if 0
1120        /* This assertion may say something useful, but it can
1121           be falsified during the emulator test suites. */
1122        ASSERT((ev.vsize >= 0) && (ev.vsize == vsize));
1123#endif
1124        ev.size = size;  /* total size */
1125        ev.iov = ivp;
1126        ev.binv = bvp;
1127        fpe_was_unmasked = erts_block_fpe();
1128        (*drv->outputv)((ErlDrvData)p->drv_data, &ev);
1129        erts_unblock_fpe(fpe_was_unmasked);
1130        if (ivp != iv) {
1131            erts_free(ERTS_ALC_T_TMP, (void *) ivp);
1132        }
1133        if (bvp != bv) {
1134            erts_free(ERTS_ALC_T_TMP, (void *) bvp);
1135        }
1136        driver_free_binary(cbin);
1137    } else {
1138        int r;
1139
1140        /* Try with an 8KB buffer first (will often be enough I guess). */
1141        size = 8*1024;
1142        /* See below why the extra byte is added. */
1143        buf = erts_alloc(ERTS_ALC_T_TMP, size+1);
1144        r = io_list_to_buf(list, buf, size);
1145
1146        if (r >= 0) {
1147            size -= r;
1148            fpe_was_unmasked = erts_block_fpe();
1149            (*drv->output)((ErlDrvData)p->drv_data, buf, size); <strong> /*调用inet_drv里面的tcp output*/</strong>
1150            erts_unblock_fpe(fpe_was_unmasked);
1151            erts_free(ERTS_ALC_T_TMP, buf);
1152        }
1153        else if (r == -2) {
1154            erts_free(ERTS_ALC_T_TMP, buf);
1155            goto bad_value;
1156        }
1157        else {
1158            ASSERT(r == -1); /* Overflow */
1159            erts_free(ERTS_ALC_T_TMP, buf);
1160            if ((size = io_list_len(list)) < 0) {
1161                goto bad_value;
1162            }
1163
1164            /*
1165             * I know drivers that pad space with '\0' this is clearly
1166             * incorrect but I don't feel like fixing them now, insted
1167             * add ONE extra byte.
1168             */
1169            buf = erts_alloc(ERTS_ALC_T_TMP, size+1);
1170            r = io_list_to_buf(list, buf, size);
1171            fpe_was_unmasked = erts_block_fpe();
1172            (*drv->output)((ErlDrvData)p->drv_data, buf, size);
1173            erts_unblock_fpe(fpe_was_unmasked);
1174            erts_free(ERTS_ALC_T_TMP, buf);
1175        }
1176    }
1177    p->bytes_out += size;
1178    erts_smp_atomic_add(&erts_bytes_out, size);
1179
1180#ifdef ERTS_SMP
1181    if (p->xports)
1182        erts_smp_xports_unlock(p);
1183    ASSERT(!p->xports);
1184#endif
1185    p->caller = NIL;
1186    return 0;
1187
1188 bad_value:
1189    p->caller = NIL;
1190    {
1191        erts_dsprintf_buf_t *dsbufp = erts_create_logger_dsbuf();
1192        erts_dsprintf(dsbufp, "Bad value on output port '%s'\n", p->name);
1193        erts_send_error_to_logger_nogl(dsbufp);
1194        return 1;
1195    }
1196}

gent_tcp:send的时候数据的格式是iolist. 很多人会误会,特地把iolist特地变成list或者binary. 新生成的binary或者list在send之后要GC回收, 如果频繁的话,
系统的性能损失很大.
tcp驱动是支持scatter write的, 最终是调用writev系统调用的.所以我们要充分利用这一特性.
我们从上面的代码可以看出, io是按照这样的规则填充writev向量的: 如果iolist的元素是
1. int, 拷贝.
2. binary是heap binary, 拷贝
3. binary是proc binary而且大小<64字节拷贝.

同时tcp是流协议,我们在发送消息的时候, 通常需要在消息前面添加个头, 比如说4个字节的长度. 这个如果手工做的话, 效率非常低.
tcp_driver支持自动加消息长度, 看文档:

{packet, PacketType}(TCP/IP sockets)
Defines the type of packets to use for a socket. The following values are valid:

raw | 0
No packaging is done.

1 | 2 | 4
Packets consist of a header specifying the number of bytes in the packet, followed by that number of bytes. The length of header can be one, two, or four bytes; containing an unsigned integer in big-endian byte order. Each send operation will generate the header, and the header will be stripped off on each receive operation.

In current implementation the 4-byte header is limited to 2Gb.

到此为止, 数据已经打包准备好, 这时候数据就移到到inet_drv驱动来负责了:
inet_drv内部每个socket都有个消息队列, 保持着上层推来的消息. 这个消息队列有上下水位线的. 当消息的字节数目超过了高水位线的时候, inet_drv就把socket标志为busy. 这个busy要到队列的字节数少于
低水位线的时候才解除.

这是未公开的文档,用法参见下面:
inet:setopts(Socket, [{high_watermark, 131072}]).
inet:setopts(Socket, [{low_watermark, 65536}]).

erts/emulator/drivers/common/inet_drv.c

8641static void tcp_inet_drv_output(ErlDrvData data, ErlDrvEvent event)
8642{
8643    (void)tcp_inet_output((tcp_descriptor*)data, (HANDLE)event);
8644}

8651/* socket ready for ouput:
8652** 1. TCP_STATE_CONNECTING => non block connect ?
8653** 2. TCP_STATE_CONNECTED  => write output
8654*/
8655static int tcp_inet_output(tcp_descriptor* desc, HANDLE event)
8656{
8657    int ret = 0;
8658    ErlDrvPort ix = desc->inet.port;
8659
8660    DEBUGF(("tcp_inet_output(%ld) {s=%d\r\n",
8661            (long)desc->inet.port, desc->inet.s));
8662    if (desc->inet.state == TCP_STATE_CONNECTING) {
8663        sock_select(INETP(desc),FD_CONNECT,0);
8664
8665        driver_cancel_timer(ix);  /* posssibly cancel a timer */
8666#ifndef __WIN32__
8667        /*
8668         * XXX This is strange.  This *should* work on Windows NT too,
8669         * but doesn't.  An bug in Winsock 2.0 for Windows NT?
8670         *
8671         * See "Unix Netwok Programming", W.R.Stevens, p 412 for a
8672         * discussion about Unix portability and non blocking connect.
8673         */
8674
8675#ifndef SO_ERROR
8676        {
8677            int sz = sizeof(desc->inet.remote);
8678            int code = sock_peer(desc->inet.s,
8679                                 (struct sockaddr*) &desc->inet.remote, &sz);
8680
8681            if (code == SOCKET_ERROR) {
8682                desc->inet.state = TCP_STATE_BOUND;  /* restore state */
8683                ret =  async_error(INETP(desc), sock_errno());
8684                goto done;
8685            }
8686        }
8687#else
8688        {
8689            int error = 0;      /* Has to be initiated, we check it */
8690            unsigned int sz = sizeof(error); /* even if we get -1 */
8691            int code = sock_getopt(desc->inet.s, SOL_SOCKET, SO_ERROR,
8692                                   (void *)&error, &sz);
8693
8694            if ((code < 0) || error) {
8695                desc->inet.state = TCP_STATE_BOUND;  /* restore state */
8696                ret = async_error(INETP(desc), error);
8697                goto done;
8698            }
8699        }
8700#endif /* SOCKOPT_CONNECT_STAT */
8701#endif /* !__WIN32__ */
8702
8703        desc->inet.state = TCP_STATE_CONNECTED;
8704        if (desc->inet.active)
8705            sock_select(INETP(desc),(FD_READ|FD_CLOSE),1);
8706        async_ok(INETP(desc));
8707    }
8708    else if (IS_CONNECTED(INETP(desc))) {
8709        for (;;) {
8710            int vsize;
8711            int n;
8712            SysIOVec* iov;
8713
8714            if ((iov = driver_peekq(ix, &vsize)) == NULL) {
8715                sock_select(INETP(desc), FD_WRITE, 0);
8716                send_empty_out_q_msgs(INETP(desc));
8717                goto done;
8718            }
8719            vsize = vsize > MAX_VSIZE ? MAX_VSIZE : vsize;
8720            DEBUGF(("tcp_inet_output(%ld): s=%d, About to send %d items\r\n",
8721                    (long)desc->inet.port, desc->inet.s, vsize));
8722            if (sock_sendv(desc->inet.s, iov, vsize, &n, 0)==SOCKET_ERROR) {
8723                if ((sock_errno() != ERRNO_BLOCK) && (sock_errno() != EINTR)) {
8724                    DEBUGF(("tcp_inet_output(%ld): sock_sendv(%d) errno = %d\r\n",
8725                            (long)desc->inet.port, vsize, sock_errno()));
8726                    ret =  tcp_send_error(desc, sock_errno());
8727                    goto done;
8728                }
8729#ifdef __WIN32__
8730                desc->inet.send_would_block = 1;
8731#endif
8732                goto done;
8733            }
8734            if (driver_deq(ix, n) <= desc->low) {
8735                if (IS_BUSY(INETP(desc))) {
8736                    desc->inet.caller = desc->inet.busy_caller;
8737                    desc->inet.state &= ~INET_F_BUSY;
8738                    set_busy_port(desc->inet.port, 0);
8739                    /* if we have a timer then cancel and send ok to client */
8740                    if (desc->busy_on_send) {
8741                        driver_cancel_timer(desc->inet.port);
8742                        desc->busy_on_send = 0;
8743                    }
8744                    inet_reply_ok(INETP(desc));
8745                }
8746            }
8747        }
8748    }
8749    else {
8750        sock_select(INETP(desc),FD_CONNECT,0);
8751        DEBUGF(("tcp_inet_output(%ld): bad state: %04x\r\n",
8752                (long)desc->inet.port, desc->inet.state));
8753    }
8754 done:
8755    DEBUGF(("tcp_inet_output(%ld) }\r\n", (long)desc->inet.port));
8756    return ret;
8757}
8758

首先看下文档, 那些行为会对这个数据发送有影响.

inet:setopts(Socket, Options) -> ok | {error, posix()}

{delay_send, Boolean}
Normally, when an Erlang process sends to a socket, the driver will try to immediately send the data. If that fails, the driver will use any means available to queue up the message to be sent whenever the operating system says it can handle it. Setting {delay_send, true} will make all messages queue up. This makes the messages actually sent onto the network be larger but fewer. The option actually affects the scheduling of send requests versus Erlang processes instead of changing any real property of the socket. Needless to say it is an implementation specific option. Default is false.

通常tcp_inet_output的时候,  首先先从队列里找出上次为发送完毕的消息, 尝试发送, 如果发送不能全发送出去. 那么剩下的连同现在的消息入队列.

如果发送成功, 那么看下delay_send标志有没有置位, 如果有直接把消息入队列, 凑成大的消息块, 等下次一起发送.

如果队列里有数据的话, tcp 驱动会把该socket登记等待可写事件,等待事件通知,在合适的时间,等待port再调度写.

{sndbuf, Integer}
Gives the size of the send buffer to use for the socket.

这个标志影响socket在内核协议栈的写缓存区, 越大, 系统调用send就越容易把数据推入协议栈.

{send_timeout, Integer}

Only allowed for connection oriented sockets.

Specifies a longest time to wait for a send operation to be accepted by the underlying TCP stack. When the limit is exceeded, the send operation will return {error,timeout}. How much of a packet that actually got sent is unknown, why the socket should be closed whenever a timeout has occurred (see send_timeout_close). Default is infinity.

{send_timeout_close, Boolean}
Only allowed for connection oriented sockets.

Used together with send_timeout to specify whether the socket will be automatically closed when the send operation returns {error,timeout}. The recommended setting is true which will automatically close the socket. Default is false due to backward compatibility.

到现在为止, 数据可能部分在消息队列里面, 部分推到tcp协议栈的buffer中去等待网卡发出去,同时还可能登记着socket的可写事件.

一旦发生可写事件, 运行期 就会调度该socket对应的port来进行进一步的写.

如果一条消息成功的推到协议栈, 那么tcp 驱动会给调用者进程发送{inet_reply,S,Status}消息, 反馈结果. 这时候调用者进程也就是tcp:send返回, 完成了整个流程.

这里有几个要点:

1. port是和进程一样公平调度的.  进程是按照reductions为单位调度的, port是把发送的字节数折合成reductions.  所以如果一个进程发送大量的tcp数据 那么这个进程不是一直会得到执行的. 运行期会强制停止一段时间, 让其他port有机会执行的.

2.  gen_tcp的发送是同步的, 也就是说阻塞在receive {inet_reply,S,Status} -> ?DBG_FORMAT(“prim_inet:send() -> ~p~n”, [Status]),Status end上, 这个对发送大量的消息的场合很不利.

更好的做法是: 手工把gen_tcp的2个步骤分开做:
1. 不停的erlang:port_command(S, Data, OptList)  最好加上force标志
2. 被动等待{inet_reply,S,Status} 消息.

具体请参考hotwheels或者rabbitmq项目的代码.
hotwheels/src/pubsun.erl

 69handle_cast({publish, Msg}, State) ->
 70    io:format("publish,info: ~p~n", [ets:info(State#state.subs)]),
 71    {A, B, C} = Start = now(),
 72    Msg1 = <<A:32, B:32, C:32, ?MESSAGE, Msg/binary>>,
 73    F = fun({_, _, Sock}, _) -> erlang:port_command(Sock, Msg1) end,
 74   erlang:process_flag(priority, high),
 75    ets:foldl(F, ok, State#state.subs),
 76    End = now(),
 77    erlang:process_flag(priority, normal),
 78    io:format("cost time: ~p~n", [timer:now_diff(End, Start) / 1000]),
 79    {noreply, State};

 95
 96handle_info({inet_reply, _Sock, _Error}, State) ->
 97    io:format("inet reply error: ~p~n", [_Error]),
 98    %% there needs to be a reverse lookup from sock to pid
 99    {noreply, State};
100


推论:
gen_tcp:send理论上的效率应该是顶级c程序员写的80%, 如果你低于这个数字, 请按照上面的步骤来排错问题.

参考文章:

http://mryufeng.javaeye.com/blog/475003

http://mryufeng.javaeye.com/blog/289058

http://mryufeng.javaeye.com/blog/288384

http://mryufeng.javaeye.com/blog/366761

http://avindev.javaeye.com/blog/76373

Post Footer automatically generated by wp-posturl plugin for wordpress.