mlm_server(3) ============= NAME ---- mlm_server - Malamute Server SYNOPSIS -------- ---- // To work with mlm_server, use the CZMQ zactor API: // // Create new mlm_server instance, passing logging prefix: // // zactor_t *mlm_server = zactor_new (mlm_server, "myname"); // // Destroy mlm_server instance // // zactor_destroy (&mlm_server); // // Enable verbose logging of commands and activity: // // zstr_send (mlm_server, "VERBOSE"); // // Bind mlm_server to specified endpoint. TCP endpoints may specify // the port number as "*" to aquire an ephemeral port: // // zstr_sendx (mlm_server, "BIND", endpoint, NULL); // // Return assigned port number, specifically when BIND was done using an // an ephemeral port: // // zstr_sendx (mlm_server, "PORT", NULL); // char *command, *port_str; // zstr_recvx (mlm_server, &command, &port_str, NULL); // assert (streq (command, "PORT")); // // Specify configuration file to load, overwriting any previous loaded // configuration file or options: // // zstr_sendx (mlm_server, "LOAD", filename, NULL); // // Set configuration path value: // // zstr_sendx (mlm_server, "SET", path, value, NULL); // // Save configuration data to config file on disk: // // zstr_sendx (mlm_server, "SAVE", filename, NULL); // // Send zmsg_t instance to mlm_server: // // zactor_send (mlm_server, &msg); // // Receive zmsg_t instance from mlm_server: // // zmsg_t *msg = zactor_recv (mlm_server); // // This is the mlm_server constructor as a zactor_fn: // MLM_EXPORT void mlm_server (zsock_t *pipe, void *args); // Self test of this class MLM_EXPORT void mlm_server_test (bool verbose); Receive names of connected clients as multipart string message, where first frame (string) is repeated command (CLIENTLIST in this case): zstr_sendx (mlm_server, "CLIENTLIST", NULL); Receive names of iknown streams to the broker as multipart string message, where first frame (string) is repeated command (STREAMLIST in this case): zstr_sendx (mlm_server, "STREAMLIST", NULL); ---- DESCRIPTION ----------- This actor implements the Malamute service. The actor uses the CZMQ socket command interface, rather than a classic C API. You can start as many instances of the Malamute service as you like. Each will run in its own namespace, as virtual hosts. This class is wrapped as a main program via the malamute.c application, and can be wrapped in other languages in the same way as any C API. This is a minimal, incomplete implementation of Malamute. It does however not have any known bugs. EXAMPLE ------- .From mlm_server_test method ---- zactor_t *server = zactor_new (mlm_server, "mlm_server_test"); if (verbose) zstr_send (server, "VERBOSE"); zstr_sendx (server, "BIND", "tcp://127.0.0.1:9999", NULL); zsock_t *reader = zsock_new (ZMQ_DEALER); assert (reader); zsock_connect (reader, "tcp://127.0.0.1:9999"); zsock_set_rcvtimeo (reader, 500); mlm_proto_t *proto = mlm_proto_new (); // Server insists that connection starts properly mlm_proto_set_id (proto, MLM_PROTO_STREAM_WRITE); mlm_proto_send (proto, reader); zclock_sleep (500); // to calm things down && make memcheck pass. Thanks @malanka mlm_proto_recv (proto, reader); zclock_sleep (500); // detto as above assert (mlm_proto_id (proto) == MLM_PROTO_ERROR); assert (mlm_proto_status_code (proto) == MLM_PROTO_COMMAND_INVALID); // Now do a stream publish-subscribe test zsock_t *writer = zsock_new (ZMQ_DEALER); assert (writer); zsock_connect (writer, "tcp://127.0.0.1:9999"); zsock_set_rcvtimeo (reader, 500); // Open connections from both reader and writer mlm_proto_set_id (proto, MLM_PROTO_CONNECTION_OPEN); mlm_proto_send (proto, reader); mlm_proto_recv (proto, reader); assert (mlm_proto_id (proto) == MLM_PROTO_OK); mlm_proto_set_id (proto, MLM_PROTO_CONNECTION_OPEN); mlm_proto_send (proto, writer); mlm_proto_recv (proto, writer); assert (mlm_proto_id (proto) == MLM_PROTO_OK); // Prepare to write and read a "weather" stream mlm_proto_set_id (proto, MLM_PROTO_STREAM_WRITE); mlm_proto_set_stream (proto, "weather"); mlm_proto_send (proto, writer); mlm_proto_recv (proto, writer); assert (mlm_proto_id (proto) == MLM_PROTO_OK); mlm_proto_set_id (proto, MLM_PROTO_STREAM_READ); mlm_proto_set_pattern (proto, "temp.*"); mlm_proto_send (proto, reader); mlm_proto_recv (proto, reader); assert (mlm_proto_id (proto) == MLM_PROTO_OK); // Now send some weather data, with null contents mlm_proto_set_id (proto, MLM_PROTO_STREAM_SEND); mlm_proto_set_subject (proto, "temp.moscow"); mlm_proto_send (proto, writer); mlm_proto_set_subject (proto, "rain.moscow"); mlm_proto_send (proto, writer); mlm_proto_set_subject (proto, "temp.chicago"); mlm_proto_send (proto, writer); mlm_proto_set_subject (proto, "rain.chicago"); mlm_proto_send (proto, writer); mlm_proto_set_subject (proto, "temp.london"); mlm_proto_send (proto, writer); mlm_proto_set_subject (proto, "rain.london"); mlm_proto_send (proto, writer); // We should receive exactly three deliveries, in order mlm_proto_recv (proto, reader); assert (mlm_proto_id (proto) == MLM_PROTO_STREAM_DELIVER); assert (streq (mlm_proto_subject (proto), "temp.moscow")); mlm_proto_recv (proto, reader); assert (mlm_proto_id (proto) == MLM_PROTO_STREAM_DELIVER); assert (streq (mlm_proto_subject (proto), "temp.chicago")); mlm_proto_recv (proto, reader); assert (mlm_proto_id (proto) == MLM_PROTO_STREAM_DELIVER); assert (streq (mlm_proto_subject (proto), "temp.london")); mlm_proto_destroy (&proto); // Finished, we can clean up zsock_destroy (&writer); zsock_destroy (&reader); zactor_destroy (&server); // Test Case: // CLIENTLIST command { const char *endpoint = "inproc://mlm_server_clientlist_test"; zactor_t *server = zactor_new (mlm_server, "mlm_server_clientlist_test"); if (verbose) zstr_send (server, "VERBOSE"); zstr_sendx (server, "BIND", endpoint, NULL); mlm_client_t *client_1 = mlm_client_new (); int rv = mlm_client_connect (client_1, endpoint, 1000, "Karol"); assert (rv >= 0); mlm_client_t *client_2 = mlm_client_new (); rv = mlm_client_connect (client_2, endpoint, 1000, "Tomas"); assert (rv >= 0); mlm_client_t *client_3 = mlm_client_new (); rv = mlm_client_connect (client_3, endpoint, 1000, "Alenka"); assert (rv >= 0); zclock_sleep (500); zstr_sendx (server, "CLIENTLIST", NULL); zmsg_t *message = zmsg_recv (server); assert (message); assert (zmsg_size (message) == 4); char *pop = zmsg_popstr (message); assert (streq (pop, "CLIENTLIST")); zstr_free (&pop); pop = zmsg_popstr (message); assert (streq (pop, "Karol") || streq (pop, "Tomas") || streq (pop, "Alenka")); zstr_free (&pop); pop = zmsg_popstr (message); assert (streq (pop, "Karol") || streq (pop, "Tomas") || streq (pop, "Alenka")); zstr_free (&pop); pop = zmsg_popstr (message); assert (streq (pop, "Karol") || streq (pop, "Tomas") || streq (pop, "Alenka")); zstr_free (&pop); pop = zmsg_popstr (message); assert (pop == NULL); zmsg_destroy (&message); // remove a client Karol mlm_client_destroy (&client_1); zstr_sendx (server, "CLIENTLIST", NULL); zclock_sleep (500); message = zmsg_recv (server); assert (message); assert (zmsg_size (message) == 3); pop = zmsg_popstr (message); assert (streq (pop, "CLIENTLIST")); zstr_free (&pop); pop = zmsg_popstr (message); assert (streq (pop, "Tomas") || streq (pop, "Alenka")); zstr_free (&pop); pop = zmsg_popstr (message); assert (streq (pop, "Tomas") || streq (pop, "Alenka")); zstr_free (&pop); zmsg_destroy (&message); mlm_client_destroy (&client_2); mlm_client_destroy (&client_3); zactor_destroy (&server); } // Test Case: // STREAMLIST command { const char *endpoint = "inproc://mlm_server_streamlist_test"; zactor_t *server = zactor_new (mlm_server, "mlm_server_streamlist_test"); if (verbose) zstr_send (server, "VERBOSE"); zstr_sendx (server, "BIND", endpoint, NULL); mlm_client_t *client_1 = mlm_client_new (); int rv = mlm_client_connect (client_1, endpoint, 1000, "Karol"); assert (rv != -1); rv = mlm_client_set_producer (client_1, "STREAM_1"); assert (rv != -1); mlm_client_t *client_2 = mlm_client_new (); rv = mlm_client_connect (client_2, endpoint, 1000, "Tomas"); assert (rv != -1); rv = mlm_client_set_producer (client_2, "STREAM_2"); assert (rv != -1); mlm_client_t *client_3 = mlm_client_new (); rv = mlm_client_connect (client_3, endpoint, 1000, "Alenka"); assert (rv != -1); rv = mlm_client_set_consumer (client_3, "STREAM_2", ".*"); assert (rv != -1); zclock_sleep (500); zstr_sendx (server, "STREAMLIST", NULL); zmsg_t *message = zmsg_recv (server); assert (message); assert (zmsg_size (message) == 3); char *pop = zmsg_popstr (message); assert (streq (pop, "STREAMLIST")); zstr_free (&pop); pop = zmsg_popstr (message); assert (streq (pop, "STREAM_1") || streq (pop, "STREAM_2")); zstr_free (&pop); pop = zmsg_popstr (message); assert (streq (pop, "STREAM_1") || streq (pop, "STREAM_2")); zstr_free (&pop); zmsg_destroy (&message); // remove a client Karol mlm_client_destroy (&client_1); zstr_sendx (server, "STREAMLIST", NULL); zclock_sleep (500); message = zmsg_recv (server); assert (message); assert (zmsg_size (message) == 3); pop = zmsg_popstr (message); assert (streq (pop, "STREAMLIST")); zstr_free (&pop); pop = zmsg_popstr (message); assert (streq (pop, "STREAM_1") || streq (pop, "STREAM_2")); zstr_free (&pop); pop = zmsg_popstr (message); assert (streq (pop, "STREAM_1") || streq (pop, "STREAM_2")); zstr_free (&pop); zmsg_destroy (&message); mlm_client_destroy (&client_2); mlm_client_destroy (&client_3); zactor_destroy (&server); } ----