Cro::ZeroMQ

This module enables building Cro pipelines using ZeroMQ. The lower-level components are intended to be composed together using Cro.compose. Some higher level classes also exist to form Cro services and clients in a more convenient way for the most common use cases. This document assumes the reader has some existing understanding of ZeroMQ and its various socket types.

Cro::ZeroMQ::Message

This class represents a ZeroMQ message. It does the Cro::Message role. A Cro::ZeroMQ::Message can represent both a message that was received and a message to be sent (in a simple echo server, for example, it would be both).

ZeroMQ messages may consist of multiple parts. The parts method returns an List of the parts that were received (or, if this is a message to be sent, the parts added to send so far). Each part is represented as a Blob. Since a List is returned, this cannot be mutated. The body-blob method returns the last part, assuming that previous parts are some form of envelope. This is only a convention provided by Cro. The body-text property will try to decode body-blob as UTF-8; pass the :enc parameter to choose another encoding.

The identity method returns a List of Blob, which one Blob per frame up to and including the first zero-length frame. It is intended for use when working with ROUTER sockets. If there is no zero-length frame in the message, it will throw an Exception of type X::Cro::ZeroMQ::MissingIdentity.

The most basic way to construct a message to to pass an list of Blobs to the new method:

Cro::ZeroMQ::Message.new(
    parts => ($blob-key, $blob-message)
)

There are also a number of convenience constructors taking a single positional parameter that is either a string or a blob:

Cro::ZeroMQ::Message.new("hello world")
Cro::ZeroMQ::Message.new("goodbye cruel world".encode("ascii"))

As well as a slurpy candidate for multi-part messages:

Cro::ZeroMQ::Message.new("SomeEventName", $event-json)

Whenever a Str is passed, it will first be encoded using UTF-8 before being stored. (This is true of both the Str constructor and the slurpy positional constructor; the constructor taking the parts named attribute expects only Blobs.)

A Cro::ZeroMQ::Message is immutable once constructed.

Note that there is no way to provide or obtain a Supply of parts because ZeroMQ doesn't send parts until it has seen them all, and doesn't pass parts on to the application until it has received them all. Therefore, there's no opportunity for increased performance by sending some parts earlier.

Pipeline Components

Pipeline components exist each of the ZeroMQ socket types. They all may be constructed with either (but not both of):

Passing neither of connect or bind, or passing both of connect and bind, will result in an exception of type X::Cro::ZeroMQ::IllegalBind.

The optional high-water-mark named parameter sets the high water mark (how many messages may be outstanding for the socket enters an exception state, either starting to block on send or drop messages, depending on the socket type).

All of the ZeroMQ pipeline components do the Cro::ZeroMQ::Component role, which factors out these commonalities. There will be little reason for typical Cro applications to care about this, however.

Cro::ZeroMQ::Socket::Push

This class is a Cro::Sink backed by a ZeroMQ PUSH socket. It is to be placed at the end of a pipeline, consuming Cro::ZeroMQ::Message objects and sending them.

Cro::ZeroMQ::Socket::Pull

This class is a Cro::Source backed by a ZeroMQ PULL socket. It is to be placed at the start of a pipeline, and produces Cro::ZeroMQ::Message objects.

Cro::ZeroMQ::Socket::Pub

This class is a Cro::Sink backed by a ZeroMQ PUB socket. It consumes Cro::ZeroMQ::Message objects and publishes them.

Cro::ZeroMQ::Socket::Sub

This class is a Cro::Source backed by a ZeroMQ SUB socket. It produces Cro::ZeroMQ::Message objects for each message received. Subscriptions are filtered on message prefix (only the first message part being considered as the subscription key).

The subscribe named argument is required (since not subscribing will always result in receiving no messages). It may take:

Optionally, unsubscribe may be passed. This only takes a Supply, and each time a topic is emitted (either as a Blob or a Str) an unsubscription will take place. Note that it only makes sense to unsubscribe from topics that were already subscribed to, and so this likely only makes sense in combination with having passed a Supply to the subscribe named argument.

If the same topic is subscribed to multiple times, then an unsubscription will only remove one of these (put another way, the set of topics subscribed to behave as if they were reference counted). This behavior is provided by ZeroMQ rather than added by Cro. Note that unsubscriptions must match a subscription that is still active to be meaningful; it is not, therefore, possible to subscribe to everything and then exclude things. (At the end of the day, these are all just convenient ways to have zmq_setsockopt called.)

Cro::ZeroMQ::Socket::XPub

This is a Cro::Source backed by a ZeroMQ XPUB socket. It produces Cro::ZeroMQ::Message objects, which represent subscription requests. It is Cro::Replyable, the replier being a Cro::Sink that consumes Cro::ZeroMQ::Message objects (which correspond to messages to publish). Typically used together with Cro::ZeroMQ::XSub.

Cro::ZeroMQ::Socket::XSub

This is a Cro::Source backed by a ZeroMQ XSUB socket. It produces Cro::ZeroMQ::Message objects, which represent messages received from publishers. It is a Cro::Replyable, the replier being a Cro::Sink that consumes Cro::ZeroMQ::Message objects (which represent subscription requests to pass on to the publisher). Typically used together with Cro::ZeroMQ::XPub.

Cro::ZeroMQ::Socket::Rep

This class is a Cro::Source backed by a ZeroMQ REP socket, which produces Cro::ZeroMQ::Message. It is also a Cro::Replyable, and the replier is a Cro::Sink that sends a Cro::ZeroMQ::Message reply. Therefore, an echo server would just be:

my Cro::Service $echo = Cro.compose(
    Cro::ZeroMQ::Rep.new(bind => "tcp://*:5555")
);

A more useful service would place one or more Cro::Transforms into the pipeline to transform the request into a response. A ZeroMQ REP processes a message at a time, so another message will not be emitted until the reply has been sent by the replier. To build services that can work on multiple messages asynchronously, Cro::ZeroMQ::Router is a better bet.

Cro::ZeroMQ::Socket::Req

This class is a Cro::Connector backed by a ZeroMQ REQ socket. It consumes Cro::ZeroMQ::Message and produces Cro::ZeroMQ::Message. It must never have a message emitted to it while it is still waiting for the response from an earlier message, to follow the ZeroMQ REQ constraints. To build clients that can have multiple outstanding requests, Cro::ZeroMQ::Dealer is a better bet.

Cro::ZeroMQ::Socket::Router

This class is a Cro::Source backed by a ZeroMQ ROUTER socket. It produces Cro::ZeroMQ::Message objects. It is also a Cro::Replyable, the replier being a sink that sends on the ROUTER socket.

Cro::ZeroMQ::Socket::Dealer

This class is a Cro::Connector backed by a ZeroMQ DEALER socket. It consumes and produces Cro::ZeroMQ::Message objects.

Higher Level APIs

Cro::ZeroMQ::Service

This is a Cro::Service containing a pipeline for processing ZeroMQ messages. Many services will have a single Cro::Service instance that they start up, but there's no reason a service can't have many. A Cro::Service makes most sense for pipelines that feel "server"-like, or that are the "main loop" of the service.

There are various methods for constructing the most common kinds of pipeline. It is allowable (though not always sensible) to either bind or connect the endpoints.

Replier (REP)

A service that receives and replies to messages. The echo service would just be:

my Cro::Service $rep = Cro::ZeroMQ::Service.rep(
    bind => 'tcp://*:5555'
);

A more realistic example would include a transform implementing the service.

class MyReplier is Cro::Transform {
    method consumes() { Cro::ZeroMQ::Message }
    method produces() { Cro::ZeroMQ::Message }
    method transformer(Supply $messages --> Supply) {
        supply {
            whenever $messages {
                my $response = Cro::ZeroMQ::Message.new(.body-text.uc);
                emit $response;
            }
        }
    }
}
my Cro::Service $rep = Cro::ZeroMQ::Service.rep(
    bind => 'tcp://*:5555',
    MyReplier
);

Router (ROUTER)

A service that receives messages prefixed with an identify, and processes them. A REP can only process one message at a time, which will not scale so well if multiple messages should be processed. A ROUTER socket allows for processing of many messages at a time. Each message is prefixed with an identity. It is up to the message transform to include that in the response message, so that it can be routed back to the correct client. (Note that the identity actually consists of one or more frames, followed by an empty frame.)

class MyAsyncReplier is Cro::Transform {
    method consumes() { Cro::ZeroMQ::Message }
    method produces() { Cro::ZeroMQ::Message }
    method transformer(Supply $messages --> Supply) {
        supply {
            whenever $messages -> $msg {
                whenever start { self!process($msg.body) } -> $response {
                    emit Cro::ZeroMQ::Message.new($msg.identity, $response);
                }
            }
        }
    }
    method !process($data) {
        ...
    }
}
my Cro::Service $rep = Cro::ZeroMQ::Service.router(
    bind => 'tcp://*:5555',
    MyAsyncReplier
);

Pull (PULL)

Used for a service that will pull messages and process them. Since it doesn't send anything, then the service is a sink.

class MyAggregator is Cro::Sink {
    method consumes() { Cro::ZeroMQ::Message }
    method sinker(Supply $messages --> Supply) {
        supply {
            whenever $messages {
                # Do something with the message.
            }
        }
    }
}
my Cro::Service $pull = Cro::ZeroMQ::Service.pull(
    connect => 'tcp://some-publisher:5555',
    MyAggregator
);

Pull/Push (PULL, PUSH)

This is typically used as a parallel worker. Unlike other service types, it expects:

Subscriber (SUB)

A service that reacts to received messages. Since it doesn't send anything, then the service is a sink.

class MyHandler is Cro::Sink {
    method consumes() { Cro::ZeroMQ::Message }
    method sinker(Supply $messages --> Supply) {
        supply {
            whenever $messages {
                # Do something with the message.
            }
        }
    }
}
my Cro::Service $sub = Cro::ZeroMQ::Service.sub(
    connect => 'tcp://some-publisher:5555',
    MyHandler
);

Cro::ZeroMQ::Client for request/response (REQ, DEALER)

This provides a simple interface for sending messages using a REQ or a DEALER socket and receiving a single response to that message. A req client will send a message, and will not be able to send another until a response has been received. Trying to do so will result in an exception.

my $client = Cro::ZeroMQ::Client.req(
    connect => 'tcp://some-replier:5555'
);
my $reply = await $client.send($message);

By contrast, a dealer is an asynchronous client. It will synthesize an ID for each request and insert it, followed by an empty frame, before sending the request. Responses will cause the correct Promises returned by send to be kept.

my $client = Cro::ZeroMQ::Client.dealer(
    connect => 'tcp://some-replier:5555'
);
my $reply = await $client.send($message);

In most cases, dealer will be preferable for scalability reasons.

Cro::ZeroMQ::Distributor for send-only (PUSH, PUB)

The high level client for PUSH and PUB socket types has a send method, which sends the message asynchronously and return immediately. For example:

my $client = Cro::ZeroMQ::Distributor.push(
    bind => 'tcp://work-source:5555'
);
$client.send($message);

Cro::ZeroMQ::Collector for receive-only (PULL, SUB)

The high level client for PULL and SUB socket types has a Supply method, which will emit received Cro::ZeroMQ::Message objects. A PULL will probably be more suited to a service rather than a client, but may be useful. A useful example of this with a SUB socket would be to create such a client in a web server, and then send all received messages out over a web socket:

my $client = Cro::ZeroMQ::Collector.sub(
    connect => 'tcp://notifications:5555'
);
my $notifications = $client.Supply.share; # Turn it into a live Supply

my $application = route {
    get -> 'notifications' {
        web-socket {
            supply {
                whenever $notifications {
                    .emit;
                }
            }
        }
    }
}