Thursday, July 19, 2007

Parallelizing simple external commands ... Part II

Our loop/3 fun looks like this:

loop(_Max, 0, []) ->
unregister(computing_master),
exit(normal);

Whenever our list of jobs is empty, we deregister the 'computing_master' process and quit normally.

loop(Max, Current, []) ->
receive
stop ->
unregister(computing_master),
exit(normal);

{exited, _Result} ->
io:format("Still ~p childs~n", [Current]),
loop(Max, Current - 1, []);

E ->
io:format("Unhandled message: ~p~n", [E])

after 60000 ->
io:format("~p: Waiting for the last process ~p/~p~n", [erlang:now(), Max, Current]),
loop(Max, Current, [])
end;

In this case, we have are computing the last external process since our job list is empty.
And finally this version of loop/3 is the main one:

loop(Max, Current, List) ->
receive
stop ->
unregister(computing_master),
exit(normal);

{update, NewMax} ->
upto(NewMax, Max, List);

{exited, _Result} ->
io:format("Still ~p childs~n", [Max]),
upto(Max, Max - 1, List);

E ->
io:format("Unhandled message: ~p~n", [E])

after 60000 ->
io:format("~p: Running ~p processes~n", [erlang:now(), Max]),
upto(Max, Current, List)
end.


Here we have a non empty list of job and a number of job to start.
  • Every 60 seconds we write how many processes are running.
  • The message {update, NewMax} let's you alter the number max of concurrent tasks
  • The message {exited, _Result} is received whenever a child process dies, so we restart another job...


Bonus Code, a simple function to test the code:

sleep(Ident) ->
io:format("Waiting ~p~n", [Ident]),
Delay = [ "5", "3", "15", "8" ],
Time = lists:nth(random:uniform(4), Delay),
Cmd = [ "sleep ", Time ],
io:format("Starting: ~p~n", [Cmd]),
Status = os:cmd(Cmd),
computing_master ! {exited, Status}.

This code just calls the 'sleep' command with various arguments picked randomly... Once a process stops the 'os:cmd/1' fun exits and 'computing_master' will receive the {exited, Status} message (explained above)

No comments:

Sticky