Monday, November 19, 2007

NAGIOS (beurk) nrpe support for erlang

NAGIOS a pretty bad software uses a pretty bad protocol, but NAGIOS seems to be installed everywhere...
I needed a way to bypass its really poor scheduling process, and naturally erlang comes to my rescue... But everything is not so simple.

NRPE this horrible protocols uses fixed length packets (from the code the 2 last characters are never sets to 0, sizeof seems to be really misunderstood by the nagios developer :p).

But NRPE is another crap CRC32 code, and for efficiency and time saving I didn't wanted to reimplemented it in Erlang, so I wrote a nrpe_crc32 port...

Here's the crc32 code:


#include <unistd.h>
#include <stdio.h>
#include <string.h>

static unsigned long crc32_table[256];

typedef struct packet_struct
{
int16_t packet_version;
int16_t packet_type;
u_int32_t crc32_value;
int16_t result_code;
char buffer[MAX_PACKETBUFFER_LENGTH];
} packet;

/* build the crc table - must be called before calculating the crc value */
void generate_crc32_table(void){
unsigned long crc, poly;
int i, j;

poly=0xEDB88320L;
for(i=0;i<256;i++){
crc=i;
for(j=8;j>0;j--){
if(crc & 1)
crc=(crc>>1)^poly;
else
crc>>=1;
}
crc32_table[i]=crc;
}

return;
}

/* calculates the CRC 32 value for a buffer */
unsigned long calculate_crc32(char *buffer, unsigned int buffer_size){
register unsigned long crc;
int this_char;
int current_index;

crc=0xFFFFFFFF;

for(current_index=0;current_index this_char=(int)buffer[current_index];
crc=((crc>>8) & 0x00FFFFFF) ^ crc32_table[(crc ^ this_char) & 0xFF];
}

return (crc ^ 0xFFFFFFFF);
}

unsigned long test(const char *value)
{
return calculate_crc32((char *) value, strlen(value));
}


The port_driver:

/* port_driver.c */

#include "erl_driver.h"

extern void generate_crc32_table(void);
extern unsigned long calculate_crc32(char *, unsigned int);

typedef struct {
ErlDrvPort port;
} crc32_data;

static ErlDrvData crc32_drv_start(ErlDrvPort port, char *buff)
{
crc32_data* d = (crc32_data*)driver_alloc(sizeof(crc32_data));
d->port = port;

/* init crc32 table */
generate_crc32_table();
return (ErlDrvData) d;
}

static void crc32_drv_stop(ErlDrvData handle)
{
driver_free((char*)handle);
}

static void crc32_drv_output(ErlDrvData handle, char *buff, int bufflen)
{
crc32_data* d = (crc32_data*)handle;

char fn = buff[0];
char *arg = &buff[1];
unsigned long res;

switch (fn) {
case 1:
res = calculate_crc32(arg, bufflen - 1);
driver_output(d->port, (char *) &res, (sizeof(unsigned long)));
break;
default:
break;
}
}

ErlDrvEntry crc32_driver_entry = {
NULL, /* F_PTR init, N/A */
crc32_drv_start, /* L_PTR start, called when port is opened */
crc32_drv_stop, /* F_PTR stop, called when port is closed */
crc32_drv_output, /* F_PTR output, called when erlang has sent */
NULL, /* F_PTR ready_input, called when input descriptor ready */
NULL, /* F_PTR ready_output, called when output descriptor ready */
"crc32_drv", /* char *driver_name, the argument to open_port */
NULL, /* F_PTR finish, called when unloaded */
NULL, /* F_PTR control, port_command callback */
NULL, /* F_PTR timeout, reserved */
NULL /* F_PTR outputv, reserved */
};

DRIVER_INIT(crc32_drv) /* must match name in driver_entry */
{
return &crc32_driver_entry;
}


The crc32 module, initializing the lib, and calling the crc32 fun:


-module(crc32).

-export([start/0,init/1,compute/1]).

start() ->
start("crc32_drv").

start(SharedLib) ->
case erl_ddll:load_driver(".", SharedLib) of
ok -> ok;
{error, already_loaded} -> ok;
_E -> io:format("Error: ~p~n", [_E]),
exit({error, could_not_load_driver})
end,
spawn(?MODULE, init, [SharedLib]).

init(SharedLib) ->
register(?MODULE, self()),
Port = open_port({spawn, SharedLib}, [binary]),
loop(Port).


compute(X) ->
Bin = iolist_to_binary(X),
call_port(<<1, Bin/binary>>).

call_port(Msg) ->
?MODULE ! {call, self(), Msg},
receive
{?MODULE, Result} ->
Result
end.

loop(Port) ->
receive
{call, Caller, Msg} ->
Port ! {self(), {command, Msg}},
receive
{Port, {data, Data}} ->
Caller ! {?MODULE, decode(Data)}
end,
loop(Port);

stop ->
Port ! {self(), close},
receive
{Port, closed} ->
exit(normal)
end;

{'EXIT', Port, Reason} ->
io:format("~p ~n", [Reason]),
exit(port_terminated)
end.

% Also, Valid for Network
decode(<<U:32/big-unsigned>> = Bin) when is_binary(Bin) ->
U.

decode(X) -> X.



Now the nrpe module, there you'll see why the nrpe is pure crap, fixed packet length for this type of tool is nonsense...


-module(nrpe).

-export([encode/1, request/1, crc32/1, connect/1, connect/2]).


encode(Bin) ->
{ Crc, _} = crc32:compute(Bin),
<<Crc:32, Bin>>.

request(Query) ->
Version = 2,
Type = 1,
Crc = 0,
Code = 0,
Blank = <<0:32/unit:256>>, % 1024 bytes
Q = iolist_to_binary(Query),
Padlen = 1024 - size(Q),
{C, _} = crc32:compute(
<<Version:16, Type:16, Crc:32, Code:16, Q/binary, 0, 0, Blank:Padlen/binary>>),
<<Version:16, Type:16, C:32, Code:16, Q/binary, 0, 0, Blank:Padlen/binary>>.


Building two binaries to only send one, is completely dump. But this is required... Thanks
to nrpe...



crc32(Bin) ->
{Crc, _} = crc32:compute(Bin),
{Crc, Bin}.

%
% send_packet.packet_version=(int16_t)htons(NRPE_PACKET_VERSION_2);
% send_packet.packet_type=(int16_t)htons(QUERY_PACKET);
% strncpy(&send_packet.buffer[0],query,MAX_PACKETBUFFER_LENGTH);
% send_packet.buffer[MAX_PACKETBUFFER_LENGTH-1]='\x0';
%
% send_packet.crc32_value=(u_int32_t)0L;
% calculated_crc32=calculate_crc32((char *)&send_packet,sizeof(send_packet));
% send_packet.crc32_value=(u_int32_t)htonl(calculated_crc32);

%% #define QUERY_PACKET 1 /* id code for a packet containing a query */
%% #define RESPONSE_PACKET 2 /* id code for a packet containing a response */
%%
%% #define NRPE_PACKET_VERSION_2 2 /* packet version identifier */
%% #define NRPE_PACKET_VERSION_1 1 /* older packet version identifiers (no longer supported) */
%%
%% #define MAX_PACKETBUFFER_LENGTH 1024 /* max amount of data we'll send in one query/response */

%% typedef struct packet_struct{
%% int16_t packet_version;
%% int16_t packet_type;
%% u_int32_t crc32_value;
%% int16_t result_code;
%% char buffer[MAX_PACKETBUFFER_LENGTH];
%% }packet;

connect(Host) ->
connect(Host, 5666).

connect(Host, Port) ->
case gen_tcp:connect(Host, Port, [binary, {active, false}]) of
{ok, Sock} ->
Query = request("test"),
send(Sock, Query),
io:format("Response: '~s'~n", [recv(Sock)]),
close(Sock);

{error, Error} ->
io:format("Connect-error: ~p~n", [Error])
end.

send(Sock, Data) ->
case gen_tcp:send(Sock, Data) of
ok ->
ok;

{error, Error} ->
io:format("send-error: ~p~n", [Error])
end.

recv(Sock) ->
case gen_tcp:recv(Sock, 0, 2000) of
{ok, Packet} ->
io:format("read: ~p~n", [Packet]),
decode(Packet);

{error, Error} ->
io:format("recv-error: ~p~n", [Error])
end.


close(Sock) ->
gen_tcp:close(Sock).


decode(<<Version:16, Type:16, Crc:32, 0, 0, Rest/binary>>) ->
io:format("Version: ~p, Type: ~p, Crc: ~p~n", [Version, Type, Crc]),
decode_response(Rest).

decode_response(Bin) ->
Len = msg_len(Bin, 0),
{Msg, _} = split_binary(Bin, Len),
Msg.


msg_len(<<0, Rest/binary>>, Len) ->
Len;
msg_len(Bin, Len) ->
{_, Next} = split_binary(Bin, 1),
msg_len(Next, Len + 1).


I hope someone will find this interesting :p

Sunday, November 18, 2007

Treregex-0.7 download now

You can download the treregex-0.7 from here.

To test it you need to install the libtre library, normally you can do this by using apt-get:

-apt-cache search libtre
libtre-dev - development package for the libtre4 regexp matching library
libtre4 - regexp matching library with approximate matching


Install the libtre4 and libtre-dev, and then try to ./configure the treregex. You may need to edit Makefile adjusting path for your needs...

And as usual, feedback is welcome :p

Digraph and your network, too easy

The digraph module can help you build directed graph (or not directed) very easily. I need to know the status of all my hosts within my network, and I make statistics about service availability.
With the digraph module I am able to write links between my hosts, my hosts are: karoten, ultraten, muloten, arsen, masculen, colen, pollen.
So I define them at the first time

(beta@karoten)425> f(D), D = digraph:new(). % a new digraph
{graph,22,23,24,true}
(beta@karoten)426> servers:add(D, [karoten,ultraten,muloten,arsen,masculen,colen,pollen]).
ok

Now I can manipulate my nodes (my servers)

(beta@karoten)427> servers:connect(D, karoten, [ultraten,{muloten,http}, arsen, {masculen,ssh}]).
ok

karoten can reach ultraten, and muloten with http, arsen, and masculen with ssh.

(beta@karoten)428> servers:connect(D, colen, [{muloten,http}, arsen, {pollen,ssh}]).
ok

colen can reach muloten with http, arsen, and pollen with ssh.

So let's find colen links:

(beta@karoten)429> servers:links(D, colen).
[{colen,muloten,http},{colen,arsen,[]},{colen,pollen,ssh}]

This exactly what I've written before, good ...

And muloten links:

(beta@karoten)430> servers:links(D, muloten).
[{karoten,muloten,http},{colen,muloten,http}]

This is deduced from what I've describe before...

Now let's imagine we want to find a way to reach one node from another:

(beta@karoten)431> digraph:get_path(D, karoten, arsen).
[karoten,arsen]

Karoten seems to be connected with arsen.

Let's create a new link, between ultraten and colen:

(beta@karoten)434> servers:connect(D, ultraten, colen).
['$e'|7]

Let's try to reach pollen from karoten:

(beta@karoten)435> digraph:get_path(D, karoten, pollen).
[karoten,ultraten,colen,pollen]

So the way is: thru ultraten, colen, karoten can reach pollen...

Now let's design a more web design approach, with a firewall, a load balancer lb, and various httpd and application servers, finally databases:

(beta@karoten)436> servers:add(D, [firewall,lb,http1,http2,http3,app1,app2,app3,app4,db1,db2]).
ok

The firewall is directly connected to the load balancer:

(beta@karoten)437> servers:connect(D, firewall, lb).
ok


The load balancer distribute the load to three httpd:

(beta@karoten)438> servers:connect(D, lb, [http1,http2,http3]).
ok
(beta@karoten)439> servers:connect(D, http1, [app1,app2,app3]).
ok
(beta@karoten)440> servers:connect(D, http2, [app2,app3]).
ok
(beta@karoten)441> servers:connect(D, http3, [app3]).
ok
(beta@karoten)442> servers:connect(D, app3,[db1,db2]).
ok
(beta@karoten)443> servers:connect(D, app2, [db1]).
ok
(beta@karoten)444> servers:connect(D, app1, db2).
['$e'|21]

Finally I can find a path between the firewall and the database 2:

(beta@karoten)445> digraph:get_path(D, firewall, db2).
[firewall,lb,http3,app3,db2]


Now the code:

module(servers).

-export([add/2,del/2,connect/3,links/2,reachable/2]).

add(Graph, Servers) when list(Servers) ->
lists:foreach(fun(X) -> digraph:add_vertex(Graph, X) end, Servers);

add(Graph, Server) ->
digraph:add_vertex(Graph, Server).

del(Graph, Servers) when list(Servers) ->
lists:foreach(fun(X) -> digraph:del_vertex(Graph, X) end, Servers);

del(Graph, Server) ->
digraph:del_vertex(Graph, Server).

connect(_Graph, _Server, []) ->
ok;
connect(Graph, Server, [ {S, L} | Servers ]) ->
digraph:add_edge(Graph, Server, S, L),
connect(Graph, Server, Servers);
connect(Graph, Server, [ S | Servers ]) ->
digraph:add_edge(Graph, Server, S),
connect(Graph, Server, Servers);

% connect(Graph, Server, Servers) when list(Servers) ->
% lists:foreach(fun(X) -> digraph:add_edge(Graph, Server, X) end, Servers);

connect(Graph, Server, S) ->
digraph:add_edge(Graph, Server, S).

links(Graph, Server) ->
lists:map(fun(X) -> {_, S1, S2, Label} = digraph:edge(Graph, X), {S1, S2, Label} end, digraph:edges(Graph, Server)).

reachable(Graph, Server) when list(Server) ->
digraph_utils:reachable(Server, Graph);
reachable(Graph, Server) ->
digraph_utils:reachable([Server], Graph).

Saturday, November 17, 2007

Experimenting with the erlang SSH support, or remote 'tail' with SSH...

While designing my monitoring tool, and working and treregex, I found the ssh documentation and realize that it can be very useful for my tool.

A simple question needed to be answered, is the ssh module able to easily spawn a remote process for me ?
To verify, I tried to build a remote tail module called ssh_tail :)

-module(ssh_tail).

-export([tail/3]).
-define(TIMEOUT, 5000).

tail(Host, User, Pass) ->
case ssh_cm:connect(Host, 22, [{user_dir, "/var/tmp/ssh"}, {user, User}, {password, Pass}]) of
{ok, CM} ->
session(CM, fun(X) -> io:format("-ssh: ~p~n", [X]) end);

Error ->
Error
end.

From the ssh documentation user_dir let you decide where you want to store keys, from my experience it's better to use a separate directory from the ~/.ssh.
It happens that latest version of ssh add meta information to their files that the ssh module can't handle. (more on this in another post).

For the test I wanted to do a "tail -f" on a specific file ie "/var/log/syslog".

session(CM, Callback) ->
case ssh_cm:session_open(CM, ?TIMEOUT) of
{ok, Channel} ->
case ssh_cm:shell(CM, Channel) of
ok ->
ssh_cm:send(CM, Channel, "tail --follow=name /var/log/syslog\n"),
ssh_loop(CM, Channel, Callback);

Error ->
error_logger:error_msg("Error: ~p~n", [Error])
end;
Error ->
error_logger:error_msg("Session Error: ~p~n", [Error])
end.

ssh_cm is responsible for starting a shell, and sending commands to the remote shell process. I send

tail --follow=name /var/log/syslog\n

Don't forget the final '\n' character, since you won't get any results if you don't send it :p
(I didn't think of that while testing for the first time and think that the code didn't work at all...)

ssh_loop(CM, Channel, Callback) ->
receive
stop ->
% Closing channel
% ssh_cm:detach(CM, ?TIMEOUT),
ssh_cm:close(CM, Channel);

{ssh_cm, CM, {data, _Channel, 0, Data}} ->
Callback(Data),
ssh_loop(CM, Channel, Callback);

{ssh_cm, CM, {data, Channel, Type, Data}} ->
io:format("extended (~p): ~p~n", [Type, Data]),
ssh_loop(CM, Channel, Callback);

{ssh_cm, CM, {closed, _Channel}} ->
ssh_cm:detach(CM, ?TIMEOUT);

E ->
error_logger:info_msg("[~p] Received: ~p~n", [?MODULE, E]),
ssh_loop(CM, Channel, Callback)
end.

ssh_cm sends various message to the calling process, more important tuples are

{ssh_cm, CM, {data, _Channel, 0, Data}}

Data holds what you want, and in our case a line sent by the tail process...
The callback defined at the beginning is then executed:

tail(Host, User, Pass) ->
case ssh_cm:connect(Host, 22, [{user_dir, "/var/tmp/ssh"}, {user, User}, {password, Pass}]) of
{ok, CM} ->
session(CM,
fun(X) -> % Our Callback
io:format("-ssh: ~p~n", [X]) % simple display...
end);

Error ->
Error
end.


To conclude this simple module is able to spawn a remote "tail -f" using a ssh connection and using a callback function on every data received.

The code was designed from the ssh_ssh module that you can find in the ssh module source code, because the ssh documentation is really sparse for now...

Thursday, November 15, 2007

An gen_server for mass regexp computing... (LibTre)

This is the first test session of my 'tregex_srv' that provides some nice regexp features:

266> l(tregex_srv).
{module,tregex_srv}
267> tregex_srv:start_link().
{ok,<0.4045.0>}
268> tregex_srv:store( [<<"[0-9+] pid">>, <<"[a-z]+.tmp">>]).
ok
269> tregex_srv:grep(<<"test 9405904.tmp acuu.tmpmulaor 10+ pid">>).
[[[{34,39,<<"+ pid">>}],[{17,25,<<"acuu.tmp">>}]]]
270> tregex_srv:store( [{ <<"test">>, fun(X) -> io:format("found: ~p~n", [X]) end}, <<"[0-9][0-9]">>]).
ok
271> tregex_srv:grep(<<"test 9405904.tmp acuu.tmpmulaor 10+ pid">>).
found: [{0,4,<<"test">>}]
[[[{34,39,<<"+ pid">>}],[{17,25,<<"acuu.tmp">>}]],
[[{0,4,<<"test">>}],[{5,7,<<"94">>}]]]
272> tregex_srv:store( [{ <<"SRC=[^ ]+">>, fun(X) ->
[{_,_,M}] = X, io:format("Source: ~p~n", [M])
end}]).
ok
273> tregex_srv:grep(<<"test 9405904.tmp acuu.tmpmulaor 10+ pid">>).
found: [{0,4,<<"test">>}]
[[[{34,39,<<"+ pid">>}],[{17,25,<<"acuu.tmp">>}]],
[[{0,4,<<"test">>}],[{5,7,<<"94">>}]],
[]]
274> tregex_srv:grep(<<"tst SRC=192.135.15.1 pid">>).
Source: <<"SRC=192.135.15.1">>
[[[{19,24,<<"1 pid">>}]],
[[{8,10,<<"19">>}]],
[[{4,20,<<"SRC=192.135.15.1">>}]]]
275> tregex_srv:store( [{ <<"SRC=([^ ]+)">>, fun(X) ->
[{_,_,_}, {_,_,M}] = X, io:format("Source IP: ~p~n", [M])
end}]).
ok
276> tregex_srv:grep(<<"tst SRC=192.135.15.1 pid">>).
Source IP: <<"192.135.15.1">>
Source: <<"SRC=192.135.15.1">>
[[[{19,24,<<"1 pid">>}]],
[[{8,10,<<"19">>}]],
[[{4,20,<<"SRC=192.135.15.1">>}]],
[[{4,20,<<"SRC=192.135.15.1">>},{8,20,<<"192.135.15.1">>}]]]

As you can see, you can associate Funs with regexp Matches. This means that you can bind action to regexp...
First we store (in fact add regexp to the existing regexp list) new tuples {RE, Fun}:

275> tregex_srv:store( [{ <<"SRC=([^ ]+)">>, fun(X) ->
[{_,_,_}, {_,_,M}] = X, io:format("Source IP: ~p~n", [M])
end}]).
ok

Now the exec does call already registered funs, but call the new one since our regexp matches and you can see that the IP number is only printed, the "submatches" feature works as expected:

276> tregex_srv:grep(<<"tst SRC=192.135.15.1 pid">>).
Source IP: <<"192.135.15.1">>
Source: <<"SRC=192.135.15.1">>
...

The gen_server state is the following:

-record(state, {
requests,
reindex,
re = [],
pids = []
}).

Its init function is:

init(_Args) ->
process_flag(trap_exit, true),
{ok, #state{
re = ets:new(?MODULE, [set,private]),
requests = 0,
reindex = 1 }}.

Internally the module calls 'treregex:compile' to compile regexp and store the resulting #port into a list that is stored in the 'ets' table. Every call to 'tregex_srv:store' create a new entry in the ets table.

%% Storing RE and Funs
%% Creating simple fun when there's none provided...
%%
store([], Res, State) ->
ets:insert(State#state.re, { State#state.reindex, Res});
store([ { Regexp, Fun } | List ], Res, State) ->
{ok, Re } = treregex:compile(iolist_to_binary(Regexp), [extended]),
store(List, [ { Re, Fun } | Res ], State);
store([ Regexp | List ], Res, State) ->
{ok, Re } = treregex:compile(iolist_to_binary(Regexp), [extended]),
store(List, [ { Re, fun(_) -> false end} | Res ], State).

The 'tregex_srv:grep' just uses 'ets:foldl' to compute results:

handle_call({grep, Line}, _Node, State) ->
Requests = State#state.requests,
Grep = fun({_Reindex, ReList}, Acc) ->
[ exec(ReList, Line, []) | Acc]
end,
{reply, ets:foldl(Grep, [], State#state.re), State#state{ requests = Requests + 1} }.

%% exec, using a List of {Re, Funs}
exec([], _Line, Acc) ->
Acc;
exec([ { Re, Fun } | ReList ], Line, Acc) ->
case treregex:exec(Re, Line) of
{ok, Matches} ->
Fun(Matches),
exec(ReList, Line, [ Matches | Acc ]);

{error, nomatch} ->
exec(ReList, Line, Acc)
end;
exec([ _Any | ReList ], Line, Acc) ->
exec(ReList, Line, Acc).

The code is still young, but seems to work.

The main purpose here, is to be able to massively process lines of logs. I want to be able to
spawn multiple process on multiples nodes that will be able to extract valuable content from
various lines. This is the first step forward :-)

I may cleanup the 'grep' fun since it will returns empty list whenever a regexp didn't match anything from the supplied line...

I'm really excited to think that I will be able to use the 'gen_server:multi_call' with this module :)

Sticky