zmsg(3) ======= NAME ---- zmsg - working with multipart messages SYNOPSIS -------- ---- // This is a stable class, and may not change except for emergencies. It // is provided in stable builds. // This class has draft methods, which may change over time. They are not // in stable releases, by default. Use --enable-drafts to enable. // Create a new empty message object CZMQ_EXPORT zmsg_t * zmsg_new (void); // Receive message from socket, returns zmsg_t object or NULL if the recv // was interrupted. Does a blocking recv. If you want to not block then use // the zloop class or zmsg_recv_nowait or zmq_poll to check for socket input // before receiving. CZMQ_EXPORT zmsg_t * zmsg_recv (void *source); // Load/append an open file into new message, return the message. // Returns NULL if the message could not be loaded. CZMQ_EXPORT zmsg_t * zmsg_load (FILE *file); // Decodes a serialized message buffer created by zmsg_encode () and returns // a new zmsg_t object. Returns NULL if the buffer was badly formatted or // there was insufficient memory to work. CZMQ_EXPORT zmsg_t * zmsg_decode (const byte *buffer, size_t buffer_size); // Generate a signal message encoding the given status. A signal is a short // message carrying a 1-byte success/failure code (by convention, 0 means // OK). Signals are encoded to be distinguishable from "normal" messages. CZMQ_EXPORT zmsg_t * zmsg_new_signal (byte status); // Destroy a message object and all frames it contains CZMQ_EXPORT void zmsg_destroy (zmsg_t **self_p); // Send message to destination socket, and destroy the message after sending // it successfully. If the message has no frames, sends nothing but destroys // the message anyhow. Nullifies the caller's reference to the message (as // it is a destructor). CZMQ_EXPORT int zmsg_send (zmsg_t **self_p, void *dest); // Send message to destination socket as part of a multipart sequence, and // destroy the message after sending it successfully. Note that after a // zmsg_sendm, you must call zmsg_send or another method that sends a final // message part. If the message has no frames, sends nothing but destroys // the message anyhow. Nullifies the caller's reference to the message (as // it is a destructor). CZMQ_EXPORT int zmsg_sendm (zmsg_t **self_p, void *dest); // Return size of message, i.e. number of frames (0 or more). CZMQ_EXPORT size_t zmsg_size (zmsg_t *self); // Return total size of all frames in message. CZMQ_EXPORT size_t zmsg_content_size (zmsg_t *self); // Push frame to the front of the message, i.e. before all other frames. // Message takes ownership of frame, will destroy it when message is sent. // Returns 0 on success, -1 on error. Deprecates zmsg_push, which did not // nullify the caller's frame reference. CZMQ_EXPORT int zmsg_prepend (zmsg_t *self, zframe_t **frame_p); // Add frame to the end of the message, i.e. after all other frames. // Message takes ownership of frame, will destroy it when message is sent. // Returns 0 on success. Deprecates zmsg_add, which did not nullify the // caller's frame reference. CZMQ_EXPORT int zmsg_append (zmsg_t *self, zframe_t **frame_p); // Remove first frame from message, if any. Returns frame, or NULL. // Caller owns return value and must destroy it when done. CZMQ_EXPORT zframe_t * zmsg_pop (zmsg_t *self); // Push block of memory to front of message, as a new frame. // Returns 0 on success, -1 on error. CZMQ_EXPORT int zmsg_pushmem (zmsg_t *self, const void *src, size_t size); // Add block of memory to the end of the message, as a new frame. // Returns 0 on success, -1 on error. CZMQ_EXPORT int zmsg_addmem (zmsg_t *self, const void *src, size_t size); // Push string as new frame to front of message. // Returns 0 on success, -1 on error. CZMQ_EXPORT int zmsg_pushstr (zmsg_t *self, const char *string); // Push string as new frame to end of message. // Returns 0 on success, -1 on error. CZMQ_EXPORT int zmsg_addstr (zmsg_t *self, const char *string); // Push formatted string as new frame to front of message. // Returns 0 on success, -1 on error. CZMQ_EXPORT int zmsg_pushstrf (zmsg_t *self, const char *format, ...) CHECK_PRINTF (2); // Push formatted string as new frame to end of message. // Returns 0 on success, -1 on error. CZMQ_EXPORT int zmsg_addstrf (zmsg_t *self, const char *format, ...) CHECK_PRINTF (2); // Pop frame off front of message, return as fresh string. If there were // no more frames in the message, returns NULL. // Caller owns return value and must destroy it when done. CZMQ_EXPORT char * zmsg_popstr (zmsg_t *self); // Push encoded message as a new frame. Message takes ownership of // submessage, so the original is destroyed in this call. Returns 0 on // success, -1 on error. CZMQ_EXPORT int zmsg_addmsg (zmsg_t *self, zmsg_t **msg_p); // Remove first submessage from message, if any. Returns zmsg_t, or NULL if // decoding was not succesful. // Caller owns return value and must destroy it when done. CZMQ_EXPORT zmsg_t * zmsg_popmsg (zmsg_t *self); // Remove specified frame from list, if present. Does not destroy frame. CZMQ_EXPORT void zmsg_remove (zmsg_t *self, zframe_t *frame); // Set cursor to first frame in message. Returns frame, or NULL, if the // message is empty. Use this to navigate the frames as a list. CZMQ_EXPORT zframe_t * zmsg_first (zmsg_t *self); // Return the next frame. If there are no more frames, returns NULL. To move // to the first frame call zmsg_first(). Advances the cursor. CZMQ_EXPORT zframe_t * zmsg_next (zmsg_t *self); // Return the last frame. If there are no frames, returns NULL. CZMQ_EXPORT zframe_t * zmsg_last (zmsg_t *self); // Save message to an open file, return 0 if OK, else -1. The message is // saved as a series of frames, each with length and data. Note that the // file is NOT guaranteed to be portable between operating systems, not // versions of CZMQ. The file format is at present undocumented and liable // to arbitrary change. CZMQ_EXPORT int zmsg_save (zmsg_t *self, FILE *file); // Serialize multipart message to a single buffer. Use this method to send // structured messages across transports that do not support multipart data. // Allocates and returns a new buffer containing the serialized message. // To decode a serialized message buffer, use zmsg_decode (). CZMQ_EXPORT size_t zmsg_encode (zmsg_t *self, byte **buffer); // Create copy of message, as new message object. Returns a fresh zmsg_t // object. If message is null, or memory was exhausted, returns null. // Caller owns return value and must destroy it when done. CZMQ_EXPORT zmsg_t * zmsg_dup (zmsg_t *self); // Send message to zsys log sink (may be stdout, or system facility as // configured by zsys_set_logstream). CZMQ_EXPORT void zmsg_print (zmsg_t *self); // Return true if the two messages have the same number of frames and each // frame in the first message is identical to the corresponding frame in the // other message. As with zframe_eq, return false if either message is NULL. CZMQ_EXPORT bool zmsg_eq (zmsg_t *self, zmsg_t *other); // Return signal value, 0 or greater, if message is a signal, -1 if not. CZMQ_EXPORT int zmsg_signal (zmsg_t *self); // Probe the supplied object, and report if it looks like a zmsg_t. CZMQ_EXPORT bool zmsg_is (void *self); // Self test of this class. CZMQ_EXPORT void zmsg_test (bool verbose); #ifdef CZMQ_BUILD_DRAFT_API // *** Draft method, for development use, may change without warning *** // Return message routing ID, if the message came from a ZMQ_SERVER socket. // Else returns zero. CZMQ_EXPORT uint32_t zmsg_routing_id (zmsg_t *self); // *** Draft method, for development use, may change without warning *** // Set routing ID on message. This is used if/when the message is sent to a // ZMQ_SERVER socket. CZMQ_EXPORT void zmsg_set_routing_id (zmsg_t *self, uint32_t routing_id); #endif // CZMQ_BUILD_DRAFT_API ---- DESCRIPTION ----------- The zmsg class provides methods to send and receive multipart messages across 0MQ sockets. This class provides a list-like container interface, with methods to work with the overall container. zmsg_t messages are composed of zero or more zframe_t frames. Please add @discuss section in ../src/zmsg.c. EXAMPLE ------- .From zmsg_test method ---- // Create two PAIR sockets and connect over inproc zsock_t *output = zsock_new_pair ("@inproc://zmsg.test"); assert (output); zsock_t *input = zsock_new_pair (">inproc://zmsg.test"); assert (input); // Test send and receive of single-frame message zmsg_t *msg = zmsg_new (); assert (msg); zframe_t *frame = zframe_new ("Hello", 5); assert (frame); zmsg_prepend (msg, &frame); assert (zmsg_size (msg) == 1); assert (zmsg_content_size (msg) == 5); rc = zmsg_send (&msg, output); assert (msg == NULL); assert (rc == 0); msg = zmsg_recv (input); assert (msg); assert (zmsg_size (msg) == 1); assert (zmsg_content_size (msg) == 5); zmsg_destroy (&msg); // Test send and receive of multi-frame message msg = zmsg_new (); assert (msg); rc = zmsg_addmem (msg, "Frame0", 6); assert (rc == 0); rc = zmsg_addmem (msg, "Frame1", 6); assert (rc == 0); rc = zmsg_addmem (msg, "Frame2", 6); assert (rc == 0); rc = zmsg_addmem (msg, "Frame3", 6); assert (rc == 0); rc = zmsg_addmem (msg, "Frame4", 6); assert (rc == 0); rc = zmsg_addmem (msg, "Frame5", 6); assert (rc == 0); rc = zmsg_addmem (msg, "Frame6", 6); assert (rc == 0); rc = zmsg_addmem (msg, "Frame7", 6); assert (rc == 0); rc = zmsg_addmem (msg, "Frame8", 6); assert (rc == 0); rc = zmsg_addmem (msg, "Frame9", 6); assert (rc == 0); zmsg_t *copy = zmsg_dup (msg); assert (copy); rc = zmsg_send (©, output); assert (rc == 0); rc = zmsg_send (&msg, output); assert (rc == 0); copy = zmsg_recv (input); assert (copy); assert (zmsg_size (copy) == 10); assert (zmsg_content_size (copy) == 60); zmsg_destroy (©); msg = zmsg_recv (input); assert (msg); assert (zmsg_size (msg) == 10); assert (zmsg_content_size (msg) == 60); // Save to a file, read back FILE *file = fopen ("zmsg.test", "w"); assert (file); rc = zmsg_save (msg, file); assert (rc == 0); fclose (file); file = fopen ("zmsg.test", "r"); rc = zmsg_save (msg, file); assert (rc == -1); fclose (file); zmsg_destroy (&msg); file = fopen ("zmsg.test", "r"); msg = zmsg_load (file); assert (msg); fclose (file); remove ("zmsg.test"); assert (zmsg_size (msg) == 10); assert (zmsg_content_size (msg) == 60); // Remove all frames except first and last int frame_nbr; for (frame_nbr = 0; frame_nbr < 8; frame_nbr++) { zmsg_first (msg); frame = zmsg_next (msg); zmsg_remove (msg, frame); zframe_destroy (&frame); } // Test message frame manipulation assert (zmsg_size (msg) == 2); frame = zmsg_last (msg); assert (zframe_streq (frame, "Frame9")); assert (zmsg_content_size (msg) == 12); frame = zframe_new ("Address", 7); assert (frame); zmsg_prepend (msg, &frame); assert (zmsg_size (msg) == 3); rc = zmsg_addstr (msg, "Body"); assert (rc == 0); assert (zmsg_size (msg) == 4); frame = zmsg_pop (msg); zframe_destroy (&frame); assert (zmsg_size (msg) == 3); char *body = zmsg_popstr (msg); assert (streq (body, "Frame0")); free (body); zmsg_destroy (&msg); // Test encoding/decoding msg = zmsg_new (); assert (msg); byte *blank = (byte *) zmalloc (100000); assert (blank); rc = zmsg_addmem (msg, blank, 0); assert (rc == 0); rc = zmsg_addmem (msg, blank, 1); assert (rc == 0); rc = zmsg_addmem (msg, blank, 253); assert (rc == 0); rc = zmsg_addmem (msg, blank, 254); assert (rc == 0); rc = zmsg_addmem (msg, blank, 255); assert (rc == 0); rc = zmsg_addmem (msg, blank, 256); assert (rc == 0); rc = zmsg_addmem (msg, blank, 65535); assert (rc == 0); rc = zmsg_addmem (msg, blank, 65536); assert (rc == 0); rc = zmsg_addmem (msg, blank, 65537); assert (rc == 0); free (blank); assert (zmsg_size (msg) == 9); byte *buffer; size_t buffer_size = zmsg_encode (msg, &buffer); zmsg_destroy (&msg); msg = zmsg_decode (buffer, buffer_size); assert (msg); free (buffer); zmsg_destroy (&msg); // Test submessages msg = zmsg_new (); assert (msg); zmsg_t *submsg = zmsg_new (); zmsg_pushstr (msg, "matr"); zmsg_pushstr (submsg, "joska"); rc = zmsg_addmsg (msg, &submsg); assert (rc == 0); assert (submsg == NULL); submsg = zmsg_popmsg (msg); assert (submsg == NULL); // string "matr" is not encoded zmsg_t, so was discarded submsg = zmsg_popmsg (msg); assert (submsg); body = zmsg_popstr (submsg); assert (streq (body, "joska")); free (body); zmsg_destroy (&submsg); frame = zmsg_pop (msg); assert (frame == NULL); zmsg_destroy (&msg); // Test comparison of two messages msg = zmsg_new (); zmsg_addstr (msg, "One"); zmsg_addstr (msg, "Two"); zmsg_addstr (msg, "Three"); zmsg_t *msg_other = zmsg_new (); zmsg_addstr (msg_other, "One"); zmsg_addstr (msg_other, "Two"); zmsg_addstr (msg_other, "One-Hundred"); zmsg_t *msg_dup = zmsg_dup (msg); zmsg_t *empty_msg = zmsg_new (); zmsg_t *empty_msg_2 = zmsg_new (); assert (zmsg_eq (msg, msg_dup)); assert (!zmsg_eq (msg, msg_other)); assert (zmsg_eq (empty_msg, empty_msg_2)); assert (!zmsg_eq (msg, NULL)); assert (!zmsg_eq (NULL, empty_msg)); assert (!zmsg_eq (NULL, NULL)); zmsg_destroy (&msg); zmsg_destroy (&msg_other); zmsg_destroy (&msg_dup); zmsg_destroy (&empty_msg); zmsg_destroy (&empty_msg_2); // Test signal messages msg = zmsg_new_signal (0); assert (zmsg_signal (msg) == 0); zmsg_destroy (&msg); msg = zmsg_new_signal (-1); assert (zmsg_signal (msg) == 255); zmsg_destroy (&msg); // Now try methods on an empty message msg = zmsg_new (); assert (msg); assert (zmsg_size (msg) == 0); assert (zmsg_unwrap (msg) == NULL); assert (zmsg_first (msg) == NULL); assert (zmsg_last (msg) == NULL); assert (zmsg_next (msg) == NULL); assert (zmsg_pop (msg) == NULL); // Sending an empty message is valid and destroys the message assert (zmsg_send (&msg, output) == 0); assert (!msg); zsock_destroy (&input); zsock_destroy (&output); #if defined (ZMQ_SERVER) // Create server and client sockets and connect over inproc zsock_t *server = zsock_new_server ("inproc://zmsg-test-routing"); assert (server); zsock_t *client = zsock_new_client ("inproc://zmsg-test-routing"); assert (client); // Send request from client to server zmsg_t *request = zmsg_new (); assert (request); zmsg_addstr (request, "Hello"); rc = zmsg_send (&request, client); assert (rc == 0); assert (!request); // Read request and send reply request = zmsg_recv (server); assert (request); char *string = zmsg_popstr (request); assert (streq (string, "Hello")); assert (zmsg_routing_id (request)); zstr_free (&string); zmsg_t *reply = zmsg_new (); assert (reply); zmsg_addstr (reply, "World"); zmsg_set_routing_id (reply, zmsg_routing_id (request)); rc = zmsg_send (&reply, server); assert (rc == 0); zmsg_destroy (&request); // Read reply reply = zmsg_recv (client); string = zmsg_popstr (reply); assert (streq (string, "World")); assert (zmsg_routing_id (reply) == 0); zmsg_destroy (&reply); zstr_free (&string); // Client and server disallow multipart msg = zmsg_new (); zmsg_addstr (msg, "One"); zmsg_addstr (msg, "Two"); rc = zmsg_send (&msg, client); assert (rc == -1); assert (zmsg_size (msg) == 2); rc = zmsg_send (&msg, server); assert (rc == -1); assert (zmsg_size (msg) == 2); zmsg_destroy (&msg); zsock_destroy (&client); zsock_destroy (&server); #endif ----