"""RabbitMQ helpers""" import pika from flask import Flask, json from pika.adapters.blocking_connection import BlockingChannel class RabbitMQ: host: str connection: pika.BlockingConnection channel: BlockingChannel def __init__(self, app: Flask = None): if app is not None: self.init_app(app) def init_app(self, app: Flask): self.host = app.config.get('RABBITMQ_HOST', 'localhost') def _ensure_connection(self): if self.channel is None: self.connection = pika.BlockingConnection( pika.ConnectionParameters(host=self.host)) self.channel = self.connection.channel() def ensure_queue_exists(self, queue_name: str): """Ensures the queue exists on the default exchange :param queue_name: the name of the queue """ self._ensure_connection() self.channel.queue_declare(queue=queue_name, durable=True) def queue_publish(self, queue_name: str, message): self._ensure_connection() self.ensure_queue_exists(queue_name) body = json.dumps(message) body_bytes = body.encode('utf-8') self.channel.basic_publish(exchange='', routing_key=queue_name, body=body_bytes) def ensure_exchange_exists(self, name: str) -> None: """Ensures exchange exists :param name: the name of the exchange """ self._ensure_connection() self.channel.exchange_declare(exchange=name, exchange_type='topic', durable=True) def exchange_publish(self, exchange_name: str, topic: str, message): """Publishes a message to a :param exchange_name: the name of the exchange :param topic: the topic to publish to :param message: the message to publish (will be turned into a json) """ self._ensure_connection() self.ensure_exchange_exists(exchange_name) body = json.dumps(message) body_bytes = body.encode('utf-8') self.channel.basic_publish(exchange=exchange_name, routing_key=topic, body=body_bytes)