Abstract
In this second part of the blogpost series, I’m going to write about ZeroMQ (ZMQ from now on) usage. Although it is not mandatory to read the first part of this blogpost series Who transports your messages? Part 1/3, it is highly recommended, especially if you are not familiar with ZMQ.
Working with Sockets
Now that you are familiar with the ZMQ objects and operations (if not, read the first part of this blogpost series), let’s see how we can use them.
Play ball!
The first thing we have to do is create the objects, starting with the Context and then the sockets.
// Create the context.
zmq::context_t context;
// Create a REP socket.
zmq::socket_t socket_rep( context, ZMQ_REP );
// Create a REQ socket.
zmq::socket_t socket_req( context, ZMQ_REQ );
The next step is to connect the sockets. For doing so, we should first talk about endpoints. The endpoints are like entry points in compiled library objects; they identify a socket and a way of connecting to it. ZMQ endpoints consist of three parts: a transport definition, an address, and a port number:
transport://address:port
ZMQ provides the following transports:
- TCP: Unicast transport using TCP.
- IPC: Local inter-process communication transport.
- INPROC: Local in-process (inter-thread) communication transport.
- PGM, EPGM: Reliable multicast transport using PGM.
Now we know about transport, let’s establish the connection using TCP over a loopback interface through the port 4000 between the sockets.
// Bind the REP socket.
socket_rep.bind( “tcp://127.0.0.1:4000” );
// Connect the REQ socket.
socket_req.connect( “tcp://127.0.0.1:4000” );
Have you seen something wrong in the code snippet? You can perform a BIND operation and a CONNECT operation on the same code! ZMQ is a fully asynchronous framework. BIND, CONNECT and SEND operations are not blocking. Thanks to this, any socket can perform a BIND or a CONNECT operation; there are no server or client semantics.
Once Socket objects are created and connected, the working procedure is as simple as creating the frames and performing the SEND and RECEIVE operations on each socket.
// Create the request.
std::string message_data = “request_1”;
// Create and fill the frame.
zmq::message_t frame( message_data.size() );
memcpy( frame.data(), message_data.data(), message_data.size() );
// Send the request.
socket_req.send( frame );
// Receive the request.
zmq::message_t request;
socket_rep.recv( &request );
Once again, the SEND operation is not blocking. You can perform the BIND and CONNECT operations on any order; ZMQ will manage itself to establish the connection. Also, you don’t have to wait until the connection is established, you can perform a SEND or a RECEIVE operation as soon as the BIND or CONNECT operation returns.
You can also send a message in several frames: just tell the SEND operation to do so. Take a look at the following code snippet.
std::string message_header = “message header”;
std::string message_data = “message data”;
// Create the header frame.
zmq::message_t frame_header( message_header.size() );
memcpy( frame_header.data(), message_header.data(), message_header.size() );
// Create the data frame.
zmq::message_t frame_data( message_data.size() );
memcpy( frame_data.data(), message_data.data(), message_data.size() );
// Send the header.
socket_req.send( frame_header, ZMQ_SNDMORE );
// Send the data.
socket_req.send( frame_data );
Let’s clean things up
The cleanup process has a little twist that you should have in mind: Objects must to be released in order. First, you must close all the Sockets that belong to a Context. Then, you are able to destroy the Context.
Closing a Socket
To close a socket you have to perform the CLOSE operation on the socket (of course!).
// Close the request Socket.
socket_req.close();
It seems fine… But what happens if the Socket has pending messages to send? If that is the case, the CLOSE operation will block until all messages are delivered. That’s fine and it should be that way, but sometimes it leads to long waiting periods that force you to kill the application. Well, if this behavior is not proper, you can ask ZMQ to drop all the pending messages when the CLOSE operation is performed. It is achieved by setting an option right away after performing the BIND or the CONNECT operations.
// Let’s wait 5 seconds before dropping pending message once the CLOSE
// operation is performed.
int val = 5000;
socket_req.setsockopt( ZMQ_LINGER, &val, sizeof( val ) );
// Let’s drop all the pending messages immediately once the CLOSE
// operation is performed.
int val = 0;
socket_req.setsockopt( ZMQ_LINGER, &val, sizeof( val ) );
Once all the sockets that belong to a Context are closed, the Context itself can be destroyed.
// Destroy the Context.
context.close();
It is very important to close all the Sockets before destroying the Context. If you don’t do that, you will get an assertion when closing the Context. It’s a rule!
One last comment about cleaning up: if you are using the C++ binding, all destructors call the proper CLOSE operation on each object.
If you want to do a field test, you can download all the snippets together in a single C++ source file.
Wait for the third part, I hope you can read it too.