This is the third and last part of this series of blog posts. At this point, you have already learnt about each type of ZMQ sockets and how to use them. If not, you might want to read Who Transports your Messages? Part 1/3 and Who Transports your Messages? Part 2/3 of this blog post series.
ZMQ delivers blobs of data (messages) to nodes. A node can be anything: a thread, a process, or even a host. ZMQ just gives you a single socket API to work with, no matter what transport you choose (like in-process, inter-process, TCP, or multicast). Reconnections are transparent to the developer. Messages are queued at the sender or at the receiver as needed according to resource availability. It does all I/O in background threads. It uses lock-free techniques for talking between nodes, so there are never locks, waits, semaphores, or deadlocks.
Essentially, ZMQ routes and queues messages according to precise recipes called patterns. They encapsulate our hard-earned experience of the best ways to distribute data and work.
Patterns: The Ones You’ll Love
ZMQ gives you a few predefined patterns to use and make the implementation easier if they fit your needs:
- Request/Reply pattern
- Publish/Subscribe pattern
- Pipeline pattern
- Exclusive pair pattern
These patterns are predefined, or hard-coded (like ZMQ likes to call them), but there are intentions to implement a feature that allows developers to define their own patterns.
This pattern involves the socket types REQ and REP. It is intended to provide reliable communication between two nodes in a synchronous manner. We identify a client node using an REQ socket and a server node using a REP socket.
This pattern can be slightly modified in the newer version of the library (at least 4.x). Using the options ZMQ_REQ_RELAXED the user can send a request without receiving a reply for the previously sent request. It was introduced to support node failures. If a client sends a request to a dead node, then it is not allowed to continue processing because it has to wait for a reply (of course the user must be aware of dead nodes and implement what it takes to recover from this situation). This option is only useful if the client is connected to more than one server. When a second request is sent without a previous reply, ZMQ will route this one to the next available server in a round-robin manner. This pattern is synchronous: the client sends a request and then waits for a reply. Trying to send two consecutive requests without receiving a reply results in an error.
You can read more about this pattern in the section Ask and Ye Shall Receive from the ZMQ guide.
This pattern involves the SUB and PUB socket types. It is intended to provide a push notification mechanism between a server and a bunch of clients (note that I omitted the word reliable on purpose). We identify a client node using a SUB socket and a server node using a PUB socket. This is a fully asynchronous pattern, publishing a message does not require a previous request or a subsequent reply.
This is a one-way pattern. Messages only flow in the direction PUB → SUB. If you intend to send a message as a client through a SUB socket, you will get an error.
This pattern works with subscriptions. Publishers offer a set of topics for message categorization and clients chose what messages to receive by subscribing to the corresponding topics. A client can subscribe to several topics. If you do not subscribe to any topic, you will get no messages at all. The following code snippet shows subscription examples:
// — The client side —
// Create the ZMQ context
zmq::context_t context (1);
// Socket to talk to server
zmq::socket_t subscriber (context, ZMQ_SUB);
// Subscribe to zipcode C1427DCC and C1427DMZ
const char* filter1 = “C1427DCC”;
subscriber.setsockopt(ZMQ_SUBSCRIBE, filter1, strlen (filter1));
const char* filter2 = “C1427DMZ”;
subscriber.setsockopt(ZMQ_SUBSCRIBE, filter2, strlen (filter2));
// Unsubscribe from zipcode C1427MMM
const char* filter3 = “C1427MMM”;
subscriber.setsockopt(ZMQ_SUBSCRIBE, filter3, strlen (filter3));
// Subscribe to everything
const char* filter4 = “”;
subscriber.setsockopt(ZMQ_SUBSCRIBE, filter4, strlen (filter4));
// Wait for a message
// — The server side —
// Prepare our context and publisher
zmq::context_t context (1);
zmq::socket_t publisher (context, ZMQ_PUB);
// Generate some weather data
char* zipcode = “C1427DCC”; // <– This is the topic.
Int temperature = 20;
Int relhumidity = 50;
// Send message to all subscribers
snprintf ((char*) message.data(), 20, “%s %d %d”, zipcode, temperature, relhumidity);
The messages are filtered at the server side, so the network is not flooded with messages that a client will discard (this does not apply for ZMQ version 2.x). It is important to note that if a client subscribes to a topic after the server starts, some messages will be lost. That is why I omitted the word reliable at the beginning of the section.
A very, very important issue to keep in mind is the problem called slow subscriber. In this case, some messages could be lost when a client subscribes due to a subscription process, even if the client starts before the server. This is a well-known issue and it is documented in section Slow Subscriber Detection (Suicidal Snail Pattern) from the ZMQ guide, along with some possible solutions.
I have one last comment. The topic can be anything: a readable string or a binary chunk, any option you choose. It is better and also generates better code if the topic is sent as the first frame of a multiframe message (you can read Who transports your messages? Part 2/3 to know how to handle messages with more than one frame).
This pattern involves the push and pull socket types. It is intended to provide a work distribution workflow with reliable communication. Here we can identify three kinds of nodes:
- Ventilator: it is a node that uses a push socket. This node requires a task to be performed that can be split into several subtasks.
- Worker: it is a node that uses a pull socket to receive the subtasks from the ventilator and it uses a push socket to send the results.
- Sink: it is a node using a pull socket that collects the subtasks results.
This is a fully asynchronous pattern. This pattern can be used as two separate patterns: one using the ventilator/worker part and the other using the worker/sink part. Its usage is up to the developer.
Note that all the worker nodes must execute the same procedure, a subtask sent to different workers must generate the same result. It is this way because the dispatch is done in a round-robin manner. Having four subtasks (A, B, C, and D) and three workers (W1, W2, and W3), the subtasks will be sent this way:
- A → W1
- B → W2
- C → W3
- D → W1
The sink node works in a similar manner.
This pattern exhibits the slow joiner syndrome, leading to one of your workers getting way more messages than the others. It is because that pull socket has joined faster than the others and grabs a lot of messages before the others manage to connect. This is a well-known issue and it is documented in The Load Balancing Pattern section in the ZMQ guide, along with some possible solutions.
Exclusive Pair Pattern
This pattern involves the pair socket type. It is intended to provide a signaling system between threads. Yes, I wrote threads instead of nodes. Its main difference from other patterns is that this is a one-to-one connection: only two sockets are involved in this pattern. There is no message filtering or message routing.
In my personal opinion, this is the best way to replace the blocking queues inside an application. It gives you the possibility to distribute each side of the pattern (each step is shown in Figure 4) in different nodes with minor changes. This pattern is intended to be used with inproc transport for inter-process communication.
Well, now it all depends on you. The ZMQ guide is a perfect place to start. If you are planning on starting to use ZMQ in your applications, read the guide. It worth every second you spend on it.