Building with Message Brokers
Today I am going to talk about how and why I used message brokers in my automation projects along with a demonstration of the implementation in my Python library project.
This post follows my previous on message passing, which includes introduction of some of my common library code pylib.
While this project is an evaluation of my own experience with specific message brokers, I would recommend also reading more broadly on the topic to get a better idea of best fit for your ideas.
MQTT
If you are familiar with IoT, the MQTT protocol needs little introduction. There is so much posted online that any introduction here would just be an inferior duplication of effort. In order to coordinate message transmission between publish and subscribe topics, you need to run a broker to which IoT devices connect. There are cloud-based brokers like EMQX (see also MQTTX) but you can host your own with various levels of authentication and authorization. For my projects, I chose Eclipse Mosquitto which is also supports docker container deployments and can be run with almost no configuration. For my application client, I chose Eclipse Paho for Python.
Referring briefly back to my previous post, you can see how I build my MQTT client into my ZeroMQ pipeline:
The MqttSubscriber
MQTT client wrapper can be found here on GitHub. Here is a summarized form:
import paho.mqtt.client as mqtt
from paho.mqtt.client import MQTT_ERR_SUCCESS, MQTT_ERR_NO_CONN
from pylib.app import AppThread
from pylib.zmq import Closable
from pylib.handler import exception_handler
class MqttSubscriber(AppThread, Closable):
def __init__(self):
AppThread.__init__(self, name=self.__class__.__name__)
Closable.__init__(self, connect_url='inproc://mqtt-publisher')
# push socket to forward received MQTT messages to the event loop
self.processor = self.get_socket(zmq.PUSH)
# Paho client
self._mqtt_client = None
def close(self):
Closable.close(self)
try:
self._mqtt_client.disconnect()
except Exception:
log.warning('Ignoring error closing MQTT socket.', exc_info=True)
def on_connect(self, client, userdata, flags, rc):
self._mqtt_client.subscribe(self._mqtt_subscribe_topics)
def on_disconnect(self, client, userdata, rc):
self._disconnected = True
def on_message(self, client, userdata, msg):
msg_data = None
try:
msg_data = json.loads(msg.payload)
except JSONDecodeError:
log.exception(f'Unstructured message: {msg.payload}')
return
# check assumptions on topic structure
topic_base = '/'.join(msg.topic.split('/')[0:2])
# unpack the message
try:
# ...
# process msg_data
# ...
# forward message data to application event loop
self.processor.send_pyobj({
topic_base: {
'data': {
'device_info': {'inputs': device_inputs},
'active_devices': active_devices
},
}
})
except ContextTerminated:
self.close()
# noinspection PyBroadException
def run(self):
# forward messages to the application event loop
# via the device heartbeat nanny
self.processor.connect('inproc://heartbeat-nanny')
self._mqtt_client = mqtt.Client()
self._mqtt_client.on_connect = self.on_connect
self._mqtt_client.on_disconnect = self.on_disconnect
self._mqtt_client.on_message = self.on_message
self._mqtt_client.connect(self._mqtt_server_address)
# Python context manager to support basic connection and exception handling
with exception_handler(closable=self, and_raise=False, shutdown_on_error=True):
while not threads.shutting_down:
# blocking happens here
rc = self._mqtt_client.loop()
if rc == MQTT_ERR_NO_CONN or self._disconnected:
# this terminates the application but without sending exceptions to Sentry.io
raise ResourceWarning(f'No connection to MQTT broker at {self._mqtt_server_address} (disconnected? {self._disconnected})')
# check for messages to publish back to MQTT
try:
# non-blocking ZMQ read
mqtt_pub_topic, message_data = self.socket.recv_pyobj(flags=zmq.NOBLOCK)
self._mqtt_client.publish(topic=mqtt_pub_topic, payload=message_data)
except ZMQError:
# ignore, no data
pass
RabbitMQ
For communication between my automation applications I chose RabbitMQ to explore another type of broker. The diagram below includes here pylib modules are used. This illustrates the use of a ZeroMQ Pipeline Pattern for inter-thread communication.
For any of my applications, some instance of ZMQListener
(found here) is used to receive RabbitMQ messages from the network.
ZMQListener
extends a class called MQConnection
(found here) in order to support RabbitMQ callbacks (i.e. receiving messages). By extending AppThread
and Closable
, MQConnection
functions as an entirely self-sustaining application thread which contains the implementation to set up a RabbitMQ connection, publication of messages to the configured RabbitMQ exchange and topics, and connection teardown at application shutdown.
From the diagram above, applications that host purely input or output functions also make use of RabbitMQRelay
in the pylib.rabbit
module (found here) for outbound communications. These are typically instantiated in the main application thread and require little more thought after this point. Here is an excerpt illustrating this and how they build up the ZeroMQ pipeline for inter-thread communication.
from pika.exceptions import AMQPConnectionError, StreamLostError, ConnectionClosedByBroker
from pylib import app_config
from pylib.rabbit import ZMQListener, RabbitMQRelay
from pylib import threads
from pylib.threads import die, bye
from pylib.zmq import zmq_term
def main():
mq_server_address=app_config.get('rabbitmq', 'server_address')
mq_exchange_name=app_config.get('rabbitmq', 'mq_exchange')
mq_device_topic=app_config.get('rabbitmq', 'device_topic')
# receives RabbitMQ messages
mq_control_listener = ZMQListener(
zmq_url='inproc://app-thread',
mq_server_address=mq_server_address,
mq_exchange_name=f'{mq_exchange_name}_control',
mq_topic_filter=f'event.control.{mq_device_topic}',
mq_exchange_type='direct')
# sends RabbitMQ messages for any ZeroMQ inter-thread messages sent to zmq_url
try:
mq_relay = RabbitMQRelay(
zmq_url='inproc://rabbit-mq-publisher',
mq_server_address=mq_server_address,
mq_exchange_name=mq_exchange_name,
mq_topic_filter=mq_device_topic,
mq_exchange_type='topic')
except AMQPConnectionError as e:
die(exception=e)
bye()
# ...
mq_control_listener.start()
mq_relay.start()
# ...
try:
# ...
# start thread nanny
nanny = threading.Thread(name='nanny', target=thread_nanny, args=(signal_handler,))
nanny.setDaemon(True)
nanny.start()
# start heartbeat loop
publisher_socket.connect('inproc://rabbit-mq-publisher')
while not threads.shutting_down:
heartbeat_payload = {
'device_info': device_info
}
publisher_socket.send_pyobj((f'event.heartbeat.{mq_relay.device_topic}', heartbeat_payload))
threads.interruptable_sleep.wait(HEARTBEAT_INTERVAL_SECONDS)
raise RuntimeWarning()
except(KeyboardInterrupt, RuntimeWarning, ContextTerminated) as e:
die()
mq_control_listener.stop()
try:
mq_relay.close()
except (AMQPConnectionError, ConnectionClosedByBroker, StreamLostError) as e:
log.warning(f'When closing: {e!s}')
finally:
zmq_term()
bye()
if __name__ == "__main__":
main()
My application event loop also makes use of this pattern in order to send RabbitMQ messages to trigger actions. This is illustrated here with this code excerpt:
class EventProcessor(MQConnection, Closable):
def __init__(self, mq_server_address, mq_exchange_name):
MQConnection.__init__(
self,
mq_server_address=mq_server_address,
mq_exchange_name=mq_exchange_name,
# direct routing
mq_exchange_type='direct',
# no control message should live longer than 90s
mq_arguments={'x-message-ttl': 90*1000})
Closable.__init__(self, connect_url=URL_WORKER_APP)
Publication to RabbitMQ happens in this event loop by invoking MQConnection._basic_publish
.
self._basic_publish(
routing_key=f'event.control.{output_type}',
event_payload=event_payload)