Sunday, September 28, 2008

gen_fsm vs a simple fsm for sending emails...

I needed to send emails, and check for emails validity.
I choose to use at the beginning the gen_fsm used to send email, for
example this one.

I found that I just need a simple fun that just send a mail, and don't want a full gen_fsm that need to be called for every steps involved in sending a simple mail...

I needed a way to mail efficiently, by efficiently I mean:
- the mx server will be contacted directly
- if one mx server fail try another one
- handle the greylisting transparently, ( don't block the send queue for one message )

My fun then becomes:

start_link(Server,Port,ServerName,MailFrom,To,Data) ->
spawn_link(?MODULE, init, [Server,Port,ServerName,MailFrom,To,Data]).

And my module gets a name 'smtp_client'.

While hacking with the gen_fsm above, I've found that there was no binary strings usage at all,
manipulations are all done using lists... The binary type fit perfectly for this task so I've used
it instead.

So, I've rewrote everything, I've build a simple "smtp_proto.erl" file that just send binary strings for every state in the smtp protocol, and wait for a smtp response from the server. And write a simple "smtp_client.erl" that use only one single loop and call the needed "StateName" fun whenever it is needed.

Here's the loop fun, where everything takes place:

loop(Phase,State) ->
receive
{'$c', Who, info} ->
Who ! {ok, Phase, State#state.to},
loop(Phase, State);

{'$c', stop} ->
error_logger:info_msg("Forced to quit~n"),
[_,_, Email ] = State#state.to,
exit({stop, Email});

{tcp, Socket, Data} ->
[_,_, Email ] = State#state.to,
case smtp_proto:check(Data) of
{more, _Rest} ->
inet:setopts(Socket, [{active,once}]),
loop(Phase, State);

{ok, _Rest} ->
inet:setopts(Socket, [{active,once}]),
case ?MODULE:Phase({Socket, Data}, State) of

{ok, NewPhase, NewState} ->
loop(NewPhase, NewState);

{stop, success, _NewState} ->
mail_stats:add(success),
exit({ok, Email});

{stop, How, _NewState} ->
exit({How, Email});

{error, _NewPhase, _NewState} ->
error_logger:error_msg("Error: ~p '~p'~n", [Phase, Email]),
exit({error, Email});

_Any ->
error_logger:msg("What: ~p~n", [_Any]),
exit({error, _Any})
end;

{error, 421, Rest} ->
error_logger:error_msg("Error ~p for ~s (greylist): ~p ~p~n", [Phase, Email, 421, Rest]),
greylist(State),
exit({error, Rest});

{error, Code, Rest} ->
error_logger:error_msg("Error ~p for ~s: ~p ~p~n", [Phase, Email, Code, Rest]),
exit({error, Rest});

{error, Rest} ->
error_logger:error_msg("Error ~p for ~s: ~p~n", [Phase, Email, Rest]),
exit({error, Rest})
end;

{tcp_error, _Socket, timeout} ->
[_,_, Email ] = State#state.to,
error_logger:error_msg("Error: ~p 'timeout' for ~s~n", [Phase, Email]),
exit({error, timeout});

{tcp_closed, _Socket} ->
[_,_, Email ] = State#state.to,
error_logger:error_msg("Error: ~p 'connection closed' for ~s~n", [Phase, Email]),
exit({error, closed});

_Any ->
error_logger:error_msg("Unhandled message: ~p~n", [_Any]),
loop(Phase,State)

after ?INACTIVITY ->
exit({error, inactivity})
end.


What's important here, is that the gen_tcp sends various messages to the process,
and those messages controls the state of the fsm.


{ok, _Rest} ->
inet:setopts(Socket, [{active,once}]),
case ?MODULE:Phase({Socket, Data}, State) of


My module "smtp_client" calls a fun "Phase" which correspond to a SMTP state...

loop(Phase,State) ->
...
{ok, _Rest} ->
inet:setopts(Socket, [{active,once}]),
case ?MODULE:Phase({Socket, Data}, State) of
{ok, NewPhase, NewState} ->
loop(NewPhase, NewState);


Let's take a look at the helo fun:

helo({Socket, _Data}, State) ->
smtp_proto:mailfrom(Socket, State#state.mailfrom),
{ok, mailfrom, State}.

It's job is simply calling "smtp_proto:mailfrom" and returning the tuple "{ok, mailfrom, State}".
Then in the main loop the consequence is:

case ?MODULE:Phase({Socket, Data}, State) of
{ok, NewPhase, NewState} ->
loop(NewPhase, NewState);

NewPhase = mailfrom, and NewState = State...

Now the mailfrom fun use exactly the same method:

mailfrom({Socket, _Data}, State) ->
[_,_,Email] = State#state.to,
smtp_proto:rcptto(Socket, Email),
{ok, rcptto, State}.

The fun extract the email, then call the "smtp_proto:rcptto" on the Socket with the argument Email.
Technically, it just write this on the Socket:

RCPTO TO:<Email>\r\n

Then it returns the tuple holding the new Phase "rcptto" and the new State (which is the same unmodified)

So what's nice with this method, is that every tcp related
actions are handled within only one and only loop. None of Phase fun need
to catch {tcp_ messages or handle tcp disconnections.

The last thing before the full code, remember the mx trick that returns a list of valid servers.
Then you'll understand why the start_link fun takes a list of servers as first parameter...
(if one mx is down, connect to the next one)

Now the full source code:

-module(smtp_client).

-record(state,{
socket,
servername,
mailfrom, to,
data }).

%% Extra long timeout for strange SMTPs...
-define(TIMEOUT, 10000).
-define(INACTIVITY, 100000).
-define(GREYSLEEP, 300000). %5 minutes

% States
-export([connect/2,helo/2,mailfrom/2,rcptto/2,data/2,quit/2]).

%% External Exports
-export([start_link/1,start_link/6,stop/1]).

%% gen_server callbacks
-export([init/6]).


%%%----------------------------------------------------------------------
%%% API
%%%----------------------------------------------------------------------
% Manual Start

% Supervised Start
start_link(Server,Port,ServerName,MailFrom,To,Data) ->
spawn_link(?MODULE, init, [Server,Port,ServerName,MailFrom,To,Data]).

stop(Pid) ->
Pid ! {'$c', stop}.

%%%----------------------------------------------------------------------
%%% HELO State
%%%----------------------------------------------------------------------

helo(timeout, State) ->
error_logger:error_msg("helo timeout (~p)~n", [State]),
{stop, normal, State};

helo({timeout, _Ref, Reason}, State) ->
[_,_,Email] = State#state.to,
error_logger:error_msg("Timeout: ~p email: '~s'~n", [Reason, Email]),
{stop, normal, State};


helo({Socket, _Data}, State) ->
smtp_proto:mailfrom(Socket, State#state.mailfrom),
{ok, mailfrom, State}.


%%
%% Right after the connection...
%%
connect({Socket, _Data}, State) ->
smtp_proto:helo(Socket, State#state.servername),
{ok, helo, State}.


%%%----------------------------------------------------------------------
%%% MAILFROM State
%%%----------------------------------------------------------------------

mailfrom(timeout, State) ->
error_logger:error_msg("timeout (~p)~n", [State]),
{stop, normal, State};

mailfrom({timeout, _Ref, Reason}, State) ->
[_,_,Email] = State#state.to,
error_logger:error_msg("Timeout: ~p email: '~s'~n", [Reason, Email]),
{stop, normal, State};

mailfrom({Socket, _Data}, State) ->
[_,_,Email] = State#state.to,
smtp_proto:rcptto(Socket, Email),
{ok, rcptto, State}.

%%%----------------------------------------------------------------------
%%% RCPT TO State
%%%----------------------------------------------------------------------

rcptto(timeout, State) ->
error_logger:error_msg("rcptto timeout (~p)~n", [State]),
{stop, normal, State};

rcptto({timeout, _Ref, Reason}, State) ->
[_,_,Email] = State#state.to,
error_logger:error_msg("Timeout: ~p email: '~s'~n", [Reason, Email]),
{stop, normal, State};

rcptto({Socket, _Data}, State) ->
smtp_proto:data(Socket),
{ok, data, State}.

%%%----------------------------------------------------------------------
%%% DATA State
%%%----------------------------------------------------------------------

data(timeout, State) ->
error_logger:error_msg("data timeout (~p)~n", [State]),
{stop, normal, State};

data({timeout, _Ref, Reason}, State) ->
[_,_,Email] = State#state.to,
error_logger:error_msg("Timeout: ~p email: '~s'~n", [Reason, Email]),
{ok, data, State};

data({Socket, _Data}, State) ->
smtp_proto:write(Socket, [ State#state.data, <<"\r\n.">> ]),
{ok, quit, State}.

%%%----------------------------------------------------------------------
%%% QUIT State
%%%----------------------------------------------------------------------

quit(timeout, State) ->
error_logger:error_msg("data timeout (~p)~n", [State]),
{stop, normal, State};

quit({_Socket, _Data}, State) ->
[_,_,Email] = State#state.to,
smtp_proto:quit(State#state.socket),
error_logger:info_msg("Sent to ~s : OK~n", [Email]),
{stop, success, State}.

%%%----------------------------------------------------------------------
%%% Callback functions from gen_fsm
%%%----------------------------------------------------------------------
%% Timeout from ?TIME
%% gen_fsm:start_timer(70000, slow),

init(Servers,Port,ServerName,MailFrom,To,Data) ->
process_flag(trap_exit, true),
connect(Servers,Port,ServerName,MailFrom,To,Data).

connect([], _,_,_,To,_) ->
[_,_,Email] = To,
exit({error, Email});

connect([H | T],Port,ServerName,MailFrom,To,Data) ->
case gen_tcp:connect(H,Port,[binary,{packet,0},{active,once}], ?TIMEOUT) of
{ok,Socket} ->
inet:setopts(Socket, [{active,once}]),
loop(connect, #state{
socket=Socket,
servername=ServerName,
to=To,
mailfrom=MailFrom,
data=Data});
{error, timeout} ->
connect(T,Port,ServerName,MailFrom,To,Data);

{error, Reason} ->
exit({error, Reason})
end.

loop(Phase,State) ->
receive
{'$c', Who, info} ->
Who ! {ok, Phase, State#state.to},
loop(Phase, State);

{'$c', stop} ->
error_logger:info_msg("Forced to quit~n"),
[_,_, Email ] = State#state.to,
exit({stop, Email});

{tcp, Socket, Data} ->
[_,_, Email ] = State#state.to,
case smtp_proto:check(Data) of
{more, _Rest} ->
inet:setopts(Socket, [{active,once}]),
loop(Phase, State);

{ok, _Rest} ->
inet:setopts(Socket, [{active,once}]),
case ?MODULE:Phase({Socket, Data}, State) of

{ok, NewPhase, NewState} ->
loop(NewPhase, NewState);

{stop, success, _NewState} ->
mail_stats:add(success),
exit({ok, Email});

{stop, How, _NewState} ->
exit({How, Email});

{error, _NewPhase, _NewState} ->
error_logger:error_msg("Error: ~p '~p'~n", [Phase, Email]),
exit({error, Email});

_Any ->
error_logger:msg("What: ~p~n", [_Any]),
exit({error, _Any})
end;

{error, 421, Rest} ->
error_logger:error_msg("Error ~p for ~s (greylist): ~p ~p~n", [Phase, Email, 421, Rest]),
greylist(State),
exit({error, Rest});

{error, Code, Rest} ->
error_logger:error_msg("Error ~p for ~s: ~p ~p~n", [Phase, Email, Code, Rest]),
exit({error, Rest});

{error, Rest} ->
error_logger:error_msg("Error ~p for ~s: ~p~n", [Phase, Email, Rest]),
exit({error, Rest})
end;

{tcp_error, _Socket, timeout} ->
[_,_, Email ] = State#state.to,
error_logger:error_msg("Error: ~p 'timeout' for ~s~n", [Phase, Email]),
exit({error, timeout});

{tcp_closed, _Socket} ->
[_,_, Email ] = State#state.to,
error_logger:error_msg("Error: ~p 'connection closed' for ~s~n", [Phase, Email]),
exit({error, closed});

_Any ->
error_logger:error_msg("Unhandled message: ~p~n", [_Any]),
loop(Phase,State)

after ?INACTIVITY ->
exit({error, inactivity})
end.

greylist(State) ->
spawn(greylist, start, [ ?GREYSLEEP, State#state.servername,
State#state.mailfrom,State#state.to,State#state.data]).


Here's the smtp_proto module:

-module(smtp_proto).
-export([read/1,
read/2,
write/2,
helo/2,
ehlo/2,
mailfrom/2,
rcptto/2,
data/1,
noop/1,
rset/1,
help/1,
check/1,
quit/1]).

read(Socket) ->
read(Socket, 5000).

read(Socket, Timeout) ->
case gen_tcp:recv(Socket, 0, Timeout) of
{ok, Packet} ->
check(Packet);

{error, Why} ->
{error, Why}
end.

%% 2XX codes are OK
check(<<"250 ", Rest/binary>>) ->
{ok, Rest};
check(<<"214", Rest/binary>>) ->
{ok, Rest};
check(<<"220", Rest/binary>>) ->
{ok, Rest};
check(<<"221", Rest/binary>>) ->
{ok, Rest};
check(<<"354", _Rest/binary>>) ->
{ok, data};

%Errors
check(<<"421", Rest/binary>>) ->
{error, 421, Rest};
check(<<"503", Rest/binary>>) ->
{error, 503, Rest};
check(<<"511", Rest/binary>>) ->
{error, 511, Rest};
check(<<"540", Rest/binary>>) ->
{error, 540, Rest};
check(<<"550", Rest/binary>>) ->
{error, 550, Rest};
check(<<"554", Rest/binary>>) ->
{error, 554, Rest};
check(Bin) ->
{more, Bin}.

write(Socket, Msg) ->
gen_tcp:send(Socket, [Msg, <<"\r\n">>] ).

helo(Socket, Name) ->
Msg = [ <<"HELO ">>, Name ],
write(Socket, iolist_to_binary(Msg)).

ehlo(Socket, Name) ->
Msg = [ <<"EHLO ">>, list_to_binary(Name) ],
write(Socket, Msg).

rcptto(Socket, Name) ->
Msg = iolist_to_binary([ <<"RCPT TO:<">>, Name, <<">">> ]),
write(Socket, Msg).

mailfrom(Socket, Name) ->
Msg = [ <<"MAIL FROM:<">>, list_to_binary(Name), <<">">> ],
write(Socket, Msg).

help(Socket) ->
Msg = <<"HELP">>,
write(Socket, Msg).

noop(Socket) ->
Msg = <<"NOOP">>,
write(Socket, Msg).

quit(Socket) ->
Msg = <<"QUIT">>,
write(Socket, Msg).

rset(Socket) ->
Msg = <<"RSET">>,
write(Socket, Msg).

data(Socket) ->
Msg = <<"DATA">>,
write(Socket, Msg).



But wait ! There's more !
BONUS: the greylist client
(where you can see the smtp_client in action):

-module(greylist).

-export([start/5, stop/1, flush/1]).
-define(TIMEOUT, 100000).

start(Sleep,ServerName,MailFrom,To,Data) ->
receive
{'$c', flush} ->
ok;

{'$c', stop} ->
error_logger:info_msg("~p: Manual stop~n", [?MODULE]),
exit(normal)
after Sleep ->
ok
end,
[ _, _, Email ] = To,
[ _, Domain ] = string:tokens( Email, "@"),
List = mmailer:get_mx(Domain),
{_, Servers} = lists:unzip( lists:keysort(1, List) ),
Pid = smtp_client:start_link(Servers, 25, ServerName,MailFrom,To,Data),
loop(Pid).

flush(Pid) ->
Pid ! {'$c', flush}.

stop(Pid) ->
Pid ! {'$c', stop}.

loop(Child) ->
receive
{'$c', stop} ->
smtp_client:stop(Child),
error_logger:info_msg("Greylist: Manual stop~n");

{'EXIT', Child, Reason} ->
error_logger:info_msg("~p child ~p died : ~p~n", [?MODULE, Child, Reason]);

{'EXIT', Pid, Reason} ->
error_logger:info_msg("~p not own child ~p died : ~p~n", [?MODULE, Pid, Reason]),
loop(Child);

Msg ->
error_logger:error_msg("~p: Unhandled message received: '~p'", [?MODULE, Msg]),
loop(Child)

after ?TIMEOUT ->
smtp_client:stop(Child)
end.

Sticky