7. Developer Guide¶
Broker is based on the CAF, the C++ Actor Framework. Experience with CAF certainly helps, but a basic understanding of the actor model as well as publish/subscribe suffices for understanding (and contributing to) Broker.
In the code base of Broker, we frequently use templates, lambdas, and common C++ idioms such as CRTP and RAII.
7.1. Architecture¶
From a user perspective, the Broker endpoint is the primary component in the API (see Section 2.1.1). Internally, an endoint is a container for an actor system that hosts the core actor plus any number of subscribers and publishers. The figure below shows a simplified architecture of Broker in terms of actors.
An endpoint always contains exactly one core actor. From the perspective of the implementation, this actor is the primary component. It manages publishers and subscribers, maintains peering relations, forwards messages to remote peers, etc.
Broker uses four types of messages internally, whereas a message here simply means a copy-on-write (COW) tuple.
A data message consists of a topic and user-defined data (cf. Data Model). Any direct user interaction on publishers or subscribers uses this message type internally.
A command message consists of a topic and a private data type called
internal_command
. This type of message usually remains hidden to Broker users since this message type represents internal communication of data stores (between masters and clones).A packed message represents a data, command, or routing-related message in a serialized form. Each packed message consists of a type tag, a TTL field, a topic, and a byte buffer. The type tag stores the type information needed to deserialize the byte buffer. The TTL field is a counter that decrements whenever Broker forwards a message between peers. Once the counter reaches zero, Broker no longer forwards the message.
A node message represents a message between two Broker nodes (endpoints). The routing and dispatching logic in the core actor operates on this message type. Node messages are tuples that consist of two endpoint IDs, one for the sender and one for the receiver, as well as a packed message that represents the actual content.
Broker organizes those message types in data flows, as depicted below:
The core actor represents the main hub that connects local and remote flows. In general, publishers generate data for peers and subscribers consume data from peers. In the core actor, there is a central merge point where all messages flow though. Peers directly tap into this dispatching point. Since peers operate on node messages, they do not need to serialize or deserialize any payloads when writing to or reading from the network. The core actor selects all messages with local subscriptions from the central merge point and only deserializes node messages into data or command messages once.
Likewise, incoming messages from publishers get serialized once immediately after receiving them and then they flow as node messages into the central merge point.
7.2. Implementation¶
Endpoints, master stores, and clone stores (see Section 1) all map to actors. Endpoints wrap the actor system and the main component: the core actor (see architecture).
7.2.1. The Core Actor¶
As the name suggests, this actor embodies the central hub for the publish/subscribe communication. Everything flows through this actor: local subscriptions, peering requests, local and remote publications, control messages for the stores, and so on.
The data flow stages shown in the Architecture Section also appear in the source code. However, in the actual implementation we need to distinguish between data and command messages since they use different C++ types. Hence, the core actor primarily revolves around these member variables:
data_inputs
Merges all inputs from local publishers. We also push data directly into this merger when receiving publication messages that bypass the flow abstractions. These messages get generated from
endpoint::publish
. Thecentral_merge
consumes messages from this merger (after converting eachdata_message
to anode_message
).command_inputs
Merges all inputs from data store actors. Just like
data_inputs
, we convert every incoming message to anode_message
and then feed it into thecentral_merge
.central_merge
Merges inputs from connected peers, local publishers and local data store actors. Everything flows through this central point. This makes it easy to tap into the message flow of Broker: each new downstream simply filters messages of interest and then operates on the selected subset.
data_outputs
This stage makes all data messages that match local subscriptions (and that should be visible on this endpoint) available by filtering from the
central_merge
and deserializing the payloads. Broker initializes this stage lazily. As long as no local subscriber appears, Broker does not deserialize anything.command_outputs
Similar to
data_outputs
, this stage makes command messages available to local data store actors. We also construct this member lazily.
New peers are modeled as a pair of flows: one for incoming node messages and one for outgoing node messages. The peers themselves are trivially implemented. We receive a connected socket from the connector after a successful peering handshake. We hand this socket over to a CAF socket manager that translates from the data flows to socket reads and writes. All we need in addition to the flow management is a trait class that informs CAF how to serialize and deserialize the data.
The core actor also emits messages for peering-related events that users can consume with status subscribers. For the peering-related events, the core actor implements the following callbacks that also make it easy to add additional logic to any of those events:
peer_discovered
peer_connected
peer_disconnected
peer_removed
peer_unreachable
cannot_remove_peer
peer_unavailable
7.2.1.1. Handshakes¶
Handshakes are performed by the connector. The core actor implements a listener interface to enable it to receive connected sockets after successful handshakes.
Broker uses a three-way handshake to make sure there is always exactly at most
one connection between two peers. Each Broker endpoint has a unique ID (a
randomly generated UUID). After establishing a TCP connection, Broker peers send
a hello
message with their own endpoint ID. By convention, the endpoint with
the smaller ID becomes the originator. The example below depicts all handshake
messages with two nodes, Peer A (establishes the TCP connection) and Peer B (has
a smaller ID than A).
+-------------+ +-------------+
| Peer A | | Peer B |
+------+------+ +------+------+
| |
endpoint::peer | |
+-------------->+ |
| |
+---+ |
| | try to connect via TCP |
+<--+ |
| |
| (hello) |
+--------------------------------->+
| |
| (originator_syn) |
+<---------------------------------+
| |
| (responder_syn_ack) |
+--------------------------------->+
| |
| (originator_ack) |
+<---------------------------------+
| |
| |
Peers abort handshakes with drop_conn
messages when detecting redundant
connections.
7.3. Logical Time¶
Broker has two types for modelling logical clocks:
broker::lamport_timestamp
broker::vector_timestamp
The former type is a thin wrapper (AKA strong typedef) for a 64-bit unsigned
integer. It provides operator++
as well as the comparison operators.
7.4. Channels¶
Channels model logical connections between one producer and any number of consumers on top of an unreliable transport. Changes in the topology of Broker at runtime can cause reordering of messages if a faster path appears or loss of messages if a path disappears.
In places where Broker requires ordered and reliable communication, e.g.,
communication between clone and master actors, the class
broker::internal::channel
provides a building block to add ordering and
reliability.
A channel is unaware of the underlying transport and leaves the rendezvous
process (i.e., how producers learn handles of new consumers) to the user. The
class channel
defines message types as well as interfaces for producer
and consumer
implementations (both use CRTP to interface with user code).
7.4.1. Producer¶
The producer requires a Backend
template parameter and expects a pointer of
type Backend*
in its constructor. This backend implements a transport layer
for the channel and must provide the following interface (pseudo code):
interface Backend {
// Sends a unicast message to `hdl`.
void send(producer*, const Handle& hdl, const T& msg);
// Sends a multicast message to all consumers.
void broadcast(producer*, const T& msg)
// Called to indicate that a consumer got removed by the producer.
void drop(producer*, const Handle& hdl, ec reason)
// Called to indicate that the producer received the initial ACK.
void handshake_completed(producer*, const Handle& hdl)
};
The first argument is always the this
pointer of the producer. This enables
the backend to multiplex more than one producer at a time. The type Handle
identifies a single consumer. In the data store actors, this is an
entity_id
. Finally, T
is one of the following message types:
Type |
Semantics |
---|---|
|
Transmits the first sequence number to a consumer. |
|
Transmits ordered data to consumers. |
|
Notifies that an event is no longer available. |
|
Keeps connections to consumers alive. |
7.4.2. Consumer¶
Similar to the producer, the consumer also requires a Backend
for providing
a transport and consuming incoming events (pseudo code):
interface Backend {
// process a single event.
void consume(consumer*, Payload)
// Sends a control message to the producer.
void send(consumer*, T)`
// Process a lost event. The callback may abort further processing by
// returning a non-default error. In this case, the consumer immediately
// calls `close` with the returned error.
error consume_nil(consumer*)
// Drops this consumer. After calling this function, no further function
// calls on the consumer are allowed (except calling the destructor).
void close(consumer*, error)
};
The first argument is always the this
pointer of the consumer. This enables
the backend to multiplex more than one consumer at a time. The member function
send
always implicitly transmits control messages to the single producer.
The type Payload
is a template parameter of channel
and denotes the
content of event
messages of the producer. Finally, T
is one of the
following message types:
Type |
Semantics |
---|---|
|
Notifies the producer which events were processed. |
|
Notifies the producer that events got lost. |
Consumers send cumulative_ack
messages periodically, even if no messages
were received. This enables the producer to keep track of which consumers are
still alive and reachable.
7.5. Channels in Data Store Actors¶
In general, the master actor broadcasts state updates to its clones. This maps
directly to the one-producer-many-consumers model of channel
. However,
clones can also take the role a producer when forwarding mutating operations to
the master.
In a nutshell, the master actor (see master_actor.hh
) always has a producer
attached to it and any number of consumers:
using producer_type = channel_type::producer<master_state>;
using consumer_type = channel_type::consumer<master_state>;
producer_type output;
std::unordered_map<entity_id, consumer_type> inputs;
Conversely, the clone actor (see clone_actor.hh
) always has a consumer
attached to it and it may have a producer:
using consumer_type = channel_type::consumer<clone_state>;
using producer_type = channel_type::producer<clone_state, producer_base>;
consumer_type input;
std::unique_ptr<producer_type> output_ptr;
Clones initialize the field output_ptr
lazily on the first mutating
operation they need to forward to the master.
7.5.1. Mapping Channel to Command Messages¶
The message types defined in channel
are never used for actor-to-actor
communication directly. Instead, masters and clones exchange command_message
objects, which consist of a topic
and an internal_command
(the
Payload
type for the channels). Masters and clones convert between Broker
message types and channel message types on the fly (using a surjective mapping).
The essential interface for internal_command
is defined as follows:
enum class command_tag {
action,
producer_control,
consumer_control,
};
class internal_command {
public:
// ...
using variant_type
= std::variant<put_command, put_unique_command, put_unique_result_command,
erase_command, expire_command, add_command, subtract_command,
clear_command, attach_clone_command, attach_writer_command,
keepalive_command, cumulative_ack_command, nack_command,
ack_clone_command, retransmit_failed_command>;
sequence_number_type seq;
entity_id sender;
variant_type content;
};
command_tag tag_of(const internal_command& cmd);
Furthermore, data store actors define channel_type
as
channel<entity_id, internal_command>
. When processing an
internal_command
, the receiver first looks at the tag.
Control messages directly map to channel messages:
Internal Command Type |
Channel Message Type |
---|---|
|
|
|
|
|
|
|
|
|
|
|
|
Note that attach_clone_command
does not map to any channel message type.
This message is the discovery message used by clones to find the master. When
receiving it, the master initiates the handshake on the channel by sending
ack_clone_command
(which contains a snapshot of the state and is thus not
broadcasted).
When a clone adds a writer, it already knows the master and thus skips the
discovery phase by directly sending the attach_writer_command
handshake.
All internal commands that contain an action,
such as put_comand
, get forwarded to the channel as payload. Either by
calling produce
on a producer
or by calling handle_event
on a
consumer. The latter then calls consume
on the data store actor with the
internal_command
messages in the order defined by the sequence number.
7.6. Cluster Setup and Testing¶
Peering, path discovery, subscription propagation, etc. takes some unspecified amount of time when setting up a cluster. If a single manager is responsible for this setup, the work flow usually relies on some feedback to the manager to signal when the cluster is fully connected and ready to use. The same applies when writing high-level integration tests.
In order to wait for two nodes to add each other their routing tables and
exchange subscriptions, the class endpoint
provides the member function
await_peer
:
/// Blocks execution of the current thread until either `whom` was added to
/// the routing table and its subscription flooding reached this endpoint or a
/// timeout occurs.
/// @param whom ID of another endpoint.
/// @param timeout An optional timeout for the configuring the maximum time
/// this function may block.
/// @returns `true` if `whom` was added before the timeout, `false` otherwise.
[[nodiscard]] bool
await_peer(endpoint_id whom, timespan timeout = defaults::await_peer_timeout);
/// Asynchronously runs `callback()` when `whom` was added to the routing
/// table and its subscription flooding reached this endpoint.
/// @param whom ID of another endpoint.
/// @param callback A function object wrapping code for asynchronous
/// execution. The argument for the callback is `true` if
/// `whom` was added before the timeout, `false` otherwise.
void await_peer(endpoint_id whom, std::function<void(bool)> callback,
timespan timeout = defaults::await_peer_timeout);
The first overload blocks the caller, until a timeout (or error) occurs or the
awaited peer has connected. The second overload is an asynchronous version that
takes a callback instead. On success, the endpoint calls the callback with
true
and otherwise it calls the callback with false
.
To retrieve the entity_id
from an endpoint
object, simply call
node_id()
. For example, if both endpoints belong to the same process:
endpoint ep0;
endpoint ep1;
// ... call listen and peer ...
ep0.await_peer(ep1.node_id());
ep1.await_peer(ep0.node_id());
Note that ep0.await_peer(...)
only confirms that ep0
has a path to the
other endpoint and received a list of subscribed topics. To confirm a mutual
relation, always call await_peer
on both endpoints.
The Python bindings also expose the blocking overload of await_peer
. For
example, connecting three endpoints with data stores attached to them in a unit
test can follow this recipe:
def run_tri_setup(self, f):
with broker.Endpoint() as ep0, \
broker.Endpoint() as ep1, \
broker.Endpoint() as ep2, \
ep0.attach_master("test", broker.Backend.Memory) as m, \
ep1.attach_clone("test") as c1, \
ep2.attach_clone("test") as c2:
# connect the nodes
port = ep0.listen("127.0.0.1", 0)
self.assertTrue(ep1.peer("127.0.0.1", port))
self.assertTrue(ep2.peer("127.0.0.1", port))
# wait until the nodes are fully connected
self.assertTrue(ep0.await_peer(ep1.node_id()))
self.assertTrue(ep0.await_peer(ep2.node_id()))
self.assertTrue(ep1.await_peer(ep0.node_id()))
self.assertTrue(ep2.await_peer(ep0.node_id()))
# wait until the clones have connected to the master
self.assertTrue(c1.await_idle())
self.assertTrue(c2.await_idle())
f(m, c1, c2)
Note
When setting up a cluster, make sure to add subscribers (and data stores) before establishing the peering relations. Otherwise, the subscriptions get flooded after all connections have been established. This means any broadcasted event that arrives before the subscriptions gets lost.
7.6.1. Data Stores¶
When working with data stores, the member function store::await_idle
allows
establishing a predefined order:
/// Blocks execution of the current thread until the frontend actor reached an
/// IDLE state. On a master, this means that all clones have caught up with
/// the master and have ACKed the most recent command. On a clone, this means
/// that the master has ACKed any pending put commands from this store and
/// that the clone is not waiting on any out-of-order messages from the
/// master.
/// @param timeout The maximum amount of time this function may block.
/// @returns `true` if the frontend actor responded before the timeout,
/// `false` otherwise.
[[nodiscard]] bool await_idle(timespan timeout
= defaults::store::await_idle_timeout);
/// Asynchronously runs `callback(true)` when the frontend actor reached an
/// IDLE state or `callback(false)` if the optional timeout triggered first
/// (or in case of an error).
/// @param timeout The maximum amount of time this function may block.
/// @param callback A function object wrapping code for asynchronous
/// execution. The argument for the callback is `true` if the
/// frontend actor responded before the timeout, `false`
/// otherwise.
void await_idle(std::function<void(bool)> callback,
timespan timeout = defaults::store::await_idle_timeout);
What idle means depends on the role:
- For a master, idle means the following:
There are no pending handshakes to clones.
All clones have ACKed the latest command.
All input buffers are empty, i.e., there exists no buffered command from a writer.
- For a clone, idle means the following:
The clone successfully connected to the master.
The input buffer is empty, i.e., there exists no buffered command from the master.
All local writes (if any) have been ACKed by the master.
Just like await_peer
, calling await_idle
on only one store
object
usually does not guarantee the desired state. For example, consider a setup with
one master (m
) and three clones (c0
, c1
, and c2
). When calling
put
on c0
, await_idle
would return after m
has ACKed that it
received the put
command. At this point, c1
and c2
might not yet
have seen the command. Hence, the process must also call await_idle
on the
master before it make the assumption that all data stores are in sync:
c0.put("foo", "bar");
if (!c0.await_idle()) {
// ... handle timeout ...
}
if (!m.await_idle()) {
// ... handle timeout ...
}
Note
In the example above, calling await_idle
on c1
and c2
as well is
not necessary. The master enters the idle mode after all clones have ACKed
the latest command.