mlm_client(3) ============= NAME ---- mlm_client - Malamute client stack SYNOPSIS -------- ---- // Create a new mlm_client, return the reference if successful, or NULL // if construction failed due to lack of available memory. MLM_EXPORT mlm_client_t * mlm_client_new (void); // Destroy the mlm_client and free all memory used by the object. MLM_EXPORT void mlm_client_destroy (mlm_client_t **self_p); // Return actor, when caller wants to work with multiple actors and/or // input sockets asynchronously. MLM_EXPORT zactor_t * mlm_client_actor (mlm_client_t *self); // Return message pipe for asynchronous message I/O. In the high-volume case, // we send methods and get replies to the actor, in a synchronous manner, and // we send/recv high volume message data to a second pipe, the msgpipe. In // the low-volume case we can do everything over the actor pipe, if traffic // is never ambiguous. MLM_EXPORT zsock_t * mlm_client_msgpipe (mlm_client_t *self); // Return true if client is currently connected, else false. Note that the // client will automatically re-connect if the server dies and restarts after // a successful first connection. MLM_EXPORT bool mlm_client_connected (mlm_client_t *self); // Set PLAIN authentication username and password. If you do not call this, the // client will use NULL authentication. TODO: add "set curve auth". // Returns >= 0 if successful, -1 if interrupted. MLM_EXPORT int mlm_client_set_plain_auth (mlm_client_t *self, const char *username, const char *password); // Connect to server endpoint, with specified timeout in msecs (zero means wait // forever). Constructor succeeds if connection is successful. The caller may // specify its address. // Returns >= 0 if successful, -1 if interrupted. MLM_EXPORT int mlm_client_connect (mlm_client_t *self, const char *endpoint, uint32_t timeout, const char *address); // Prepare to publish to a specified stream. After this, all messages are sent to // this stream exclusively. // Returns >= 0 if successful, -1 if interrupted. MLM_EXPORT int mlm_client_set_producer (mlm_client_t *self, const char *stream); // Consume messages with matching subjects. The pattern is a regular expression // using the CZMQ zrex syntax. The most useful elements are: ^ and $ to match the // start and end, . to match any character, \s and \S to match whitespace and // non-whitespace, \d and \D to match a digit and non-digit, \a and \A to match // alphabetic and non-alphabetic, \w and \W to match alphanumeric and // non-alphanumeric, + for one or more repetitions, * for zero or more repetitions, // and ( ) to create groups. Returns 0 if subscription was successful, else -1. // Returns >= 0 if successful, -1 if interrupted. MLM_EXPORT int mlm_client_set_consumer (mlm_client_t *self, const char *stream, const char *pattern); // Offer a particular named service, where the pattern matches request subjects // using the CZMQ zrex syntax. // Returns >= 0 if successful, -1 if interrupted. MLM_EXPORT int mlm_client_set_worker (mlm_client_t *self, const char *address, const char *pattern); // Send STREAM SEND message to server, takes ownership of message // and destroys message when done sending it. MLM_EXPORT int mlm_client_send (mlm_client_t *self, const char *subject, zmsg_t **content_p); // Send MAILBOX SEND message to server, takes ownership of message // and destroys message when done sending it. MLM_EXPORT int mlm_client_sendto (mlm_client_t *self, const char *address, const char *subject, const char *tracker, uint32_t timeout, zmsg_t **content_p); // Send SERVICE SEND message to server, takes ownership of message // and destroys message when done sending it. MLM_EXPORT int mlm_client_sendfor (mlm_client_t *self, const char *address, const char *subject, const char *tracker, uint32_t timeout, zmsg_t **content_p); // Receive message from server; caller destroys message when done MLM_EXPORT zmsg_t * mlm_client_recv (mlm_client_t *self); // Return last received command. Can be one of these values: // "STREAM DELIVER" // "MAILBOX DELIVER" // "SERVICE DELIVER" MLM_EXPORT const char * mlm_client_command (mlm_client_t *self); // Return last received status MLM_EXPORT int mlm_client_status (mlm_client_t *self); // Return last received reason MLM_EXPORT const char * mlm_client_reason (mlm_client_t *self); // Return last received address MLM_EXPORT const char * mlm_client_address (mlm_client_t *self); // Return last received sender MLM_EXPORT const char * mlm_client_sender (mlm_client_t *self); // Return last received subject MLM_EXPORT const char * mlm_client_subject (mlm_client_t *self); // Return last received content MLM_EXPORT zmsg_t * mlm_client_content (mlm_client_t *self); // Return last received tracker MLM_EXPORT const char * mlm_client_tracker (mlm_client_t *self); // Send multipart string message to stream, end list with NULL // Returns 0 if OK, -1 if failed due to lack of memory or other error. MLM_EXPORT int mlm_client_sendx (mlm_client_t *self, const char *subject, const char *content, ...); // Send multipart string to mailbox, end list with NULL // Returns 0 if OK, -1 if failed due to lack of memory or other error. MLM_EXPORT int mlm_client_sendtox (mlm_client_t *self, const char *address, const char *subject, const char *content, ...); // Send multipart string to service, end list with NULL // Returns 0 if OK, -1 if failed due to lack of memory or other error. MLM_EXPORT int mlm_client_sendforx (mlm_client_t *self, const char *address, const char *subject, const char *content, ...); // Receive a subject and string content from the server. The content may be // 1 or more string frames. This method is orthogonal to the sendx methods. // End the string arguments with NULL. If there are not enough frames in // the received message, remaining strings are set to NULL. Returns number // of string contents received, or -1 in case of error. Free the returned // subject and content strings when finished with them. To get the type of // the command, use mlm_client_command (). MLM_EXPORT int mlm_client_recvx (mlm_client_t *self, char **subject_p, char **string_p, ...); // Self test of this class MLM_EXPORT void mlm_client_test (bool verbose); // To enable verbose tracing (animation) of mlm_client instances, set // this to true. This lets you trace from and including construction. MLM_EXPORT extern volatile int mlm_client_verbose; ---- DESCRIPTION ----------- Provides an async client API to the Malamute Protocol. Please add @discuss section in ../src/mlm_client.c. EXAMPLE ------- .From mlm_client_test method ---- mlm_client_verbose = verbose; // Start a server to test against, and bind to endpoint zactor_t *server = zactor_new (mlm_server, "mlm_client_test"); if (verbose) zstr_send (server, "VERBOSE"); zstr_sendx (server, "LOAD", "src/mlm_client.cfg", NULL); // Install authenticator to test PLAIN access zactor_t *auth = zactor_new (zauth, NULL); assert (auth); if (verbose) { zstr_sendx (auth, "VERBOSE", NULL); zsock_wait (auth); } zstr_sendx (auth, "PLAIN", "src/passwords.cfg", NULL); zsock_wait (auth); // Test stream pattern mlm_client_t *writer = mlm_client_new (); assert (writer); int rc = mlm_client_set_plain_auth (writer, "writer", "secret"); assert (rc == 0); assert (mlm_client_connected (writer) == false); rc = mlm_client_connect (writer, "tcp://127.0.0.1:9999", 1000, "writer"); assert (rc == 0); assert (mlm_client_connected (writer) == true); mlm_client_t *reader = mlm_client_new (); assert (reader); rc = mlm_client_set_plain_auth (reader, "reader", "secret"); assert (rc == 0); rc = mlm_client_connect (reader, "tcp://127.0.0.1:9999", 1000, ""); assert (rc == 0); mlm_client_set_producer (writer, "weather"); mlm_client_set_consumer (reader, "weather", "temp.*"); mlm_client_sendx (writer, "temp.moscow", "1", NULL); mlm_client_sendx (writer, "rain.moscow", "2", NULL); mlm_client_sendx (writer, "temp.madrid", "3", NULL); mlm_client_sendx (writer, "rain.madrid", "4", NULL); mlm_client_sendx (writer, "temp.london", "5", NULL); mlm_client_sendx (writer, "rain.london", "6", NULL); char *subject, *content; mlm_client_recvx (reader, &subject, &content, NULL); assert (streq (subject, "temp.moscow")); assert (streq (content, "1")); assert (streq (mlm_client_command (reader), "STREAM DELIVER")); assert (streq (mlm_client_sender (reader), "writer")); zstr_free (&subject); zstr_free (&content); mlm_client_recvx (reader, &subject, &content, NULL); assert (streq (subject, "temp.madrid")); assert (streq (content, "3")); assert (streq (mlm_client_command (reader), "STREAM DELIVER")); assert (streq (mlm_client_subject (reader), "temp.madrid")); assert (streq (mlm_client_sender (reader), "writer")); zstr_free (&subject); zstr_free (&content); mlm_client_recvx (reader, &subject, &content, NULL); assert (streq (subject, "temp.london")); assert (streq (content, "5")); assert (streq (mlm_client_command (reader), "STREAM DELIVER")); assert (streq (mlm_client_sender (reader), "writer")); zstr_free (&subject); zstr_free (&content); mlm_client_destroy (&reader); // Test mailbox pattern reader = mlm_client_new (); assert (reader); rc = mlm_client_set_plain_auth (reader, "reader", "secret"); assert (rc == 0); rc = mlm_client_connect (reader, "tcp://127.0.0.1:9999", 1000, "mailbox"); assert (rc == 0); mlm_client_sendtox (writer, "mailbox", "subject 1", "Message 1", "attachment", NULL); char *attach; mlm_client_recvx (reader, &subject, &content, &attach, NULL); assert (streq (subject, "subject 1")); assert (streq (content, "Message 1")); assert (streq (attach, "attachment")); assert (streq (mlm_client_command (reader), "MAILBOX DELIVER")); assert (streq (mlm_client_subject (reader), "subject 1")); assert (streq (mlm_client_sender (reader), "writer")); zstr_free (&subject); zstr_free (&content); zstr_free (&attach); // Now test that mailbox survives reader disconnect mlm_client_destroy (&reader); mlm_client_sendtox (writer, "mailbox", "subject 2", "Message 2", NULL); mlm_client_sendtox (writer, "mailbox", "subject 3", "Message 3", NULL); reader = mlm_client_new (); assert (reader); rc = mlm_client_set_plain_auth (reader, "reader", "secret"); assert (rc == 0); rc = mlm_client_connect (reader, "tcp://127.0.0.1:9999", 500, "mailbox"); assert (rc == 0); mlm_client_recvx (reader, &subject, &content, &attach, NULL); assert (streq (subject, "subject 2")); assert (streq (content, "Message 2")); assert (streq (mlm_client_command (reader), "MAILBOX DELIVER")); zstr_free (&subject); zstr_free (&content); mlm_client_recvx (reader, &subject, &content, &attach, NULL); assert (streq (subject, "subject 3")); assert (streq (content, "Message 3")); assert (streq (mlm_client_command (reader), "MAILBOX DELIVER")); zstr_free (&subject); zstr_free (&content); // Test service pattern mlm_client_set_worker (reader, "printer", "bw.*"); mlm_client_set_worker (reader, "printer", "color.*"); mlm_client_sendforx (writer, "printer", "bw.A4", "Important contract", NULL); mlm_client_sendforx (writer, "printer", "bw.A5", "Special conditions", NULL); mlm_client_recvx (reader, &subject, &content, NULL); assert (streq (subject, "bw.A4")); assert (streq (content, "Important contract")); assert (streq (mlm_client_command (reader), "SERVICE DELIVER")); assert (streq (mlm_client_sender (reader), "writer")); zstr_free (&subject); zstr_free (&content); mlm_client_recvx (reader, &subject, &content, NULL); assert (streq (subject, "bw.A5")); assert (streq (content, "Special conditions")); assert (streq (mlm_client_command (reader), "SERVICE DELIVER")); assert (streq (mlm_client_sender (reader), "writer")); zstr_free (&subject); zstr_free (&content); // Test that writer shutdown does not cause message loss mlm_client_set_consumer (reader, "weather", "temp.*"); mlm_client_sendx (writer, "temp.brussels", "7", NULL); mlm_client_destroy (&writer); mlm_client_recvx (reader, &subject, &content, NULL); assert (streq (subject, "temp.brussels")); assert (streq (content, "7")); zstr_free (&subject); zstr_free (&content); mlm_client_destroy (&reader); // Test multiple readers and multiple writers mlm_client_t *writer1 = mlm_client_new (); assert (writer1); rc = mlm_client_set_plain_auth (writer1, "writer", "secret"); assert (rc == 0); rc = mlm_client_connect (writer1, "tcp://127.0.0.1:9999", 1000, ""); assert (rc == 0); mlm_client_t *writer2 = mlm_client_new (); assert (writer2); rc = mlm_client_set_plain_auth (writer2, "writer", "secret"); assert (rc == 0); rc = mlm_client_connect (writer2, "tcp://127.0.0.1:9999", 1000, ""); assert (rc == 0); mlm_client_t *reader1 = mlm_client_new (); assert (reader1); rc = mlm_client_set_plain_auth (reader1, "reader", "secret"); assert (rc == 0); rc = mlm_client_connect (reader1, "tcp://127.0.0.1:9999", 1000, ""); assert (rc == 0); mlm_client_t *reader2 = mlm_client_new (); assert (reader2); rc = mlm_client_set_plain_auth (reader2, "reader", "secret"); assert (rc == 0); rc = mlm_client_connect (reader2, "tcp://127.0.0.1:9999", 1000, ""); assert (rc == 0); mlm_client_set_producer (writer1, "weather"); mlm_client_set_producer (writer2, "traffic"); mlm_client_set_consumer (reader1, "weather", "newyork"); mlm_client_set_consumer (reader1, "traffic", "newyork"); mlm_client_set_consumer (reader2, "weather", "newyork"); mlm_client_set_consumer (reader2, "traffic", "newyork"); mlm_client_sendx (writer1, "newyork", "8", NULL); mlm_client_recvx (reader1, &subject, &content, NULL); assert (streq (mlm_client_address (reader1), "weather")); assert (streq (subject, "newyork")); assert (streq (content, "8")); zstr_free (&subject); zstr_free (&content); mlm_client_recvx (reader2, &subject, &content, NULL); assert (streq (mlm_client_address (reader2), "weather")); assert (streq (subject, "newyork")); assert (streq (content, "8")); zstr_free (&subject); zstr_free (&content); mlm_client_sendx (writer2, "newyork", "85", NULL); mlm_client_recvx (reader1, &subject, &content, NULL); assert (streq (mlm_client_address (reader1), "traffic")); assert (streq (subject, "newyork")); assert (streq (content, "85")); zstr_free (&subject); zstr_free (&content); mlm_client_recvx (reader2, &subject, &content, NULL); assert (streq (mlm_client_address (reader2), "traffic")); assert (streq (subject, "newyork")); assert (streq (content, "85")); zstr_free (&subject); zstr_free (&content); mlm_client_destroy (&writer1); mlm_client_destroy (&writer2); mlm_client_destroy (&reader1); mlm_client_destroy (&reader2); // Done, shut down zactor_destroy (&auth); zactor_destroy (&server); ----