62 lines
2.0 KiB
Python
62 lines
2.0 KiB
Python
"""RabbitMQ helpers"""
|
|
import pika
|
|
from flask import Flask, json
|
|
from pika.adapters.blocking_connection import BlockingChannel
|
|
|
|
|
|
class RabbitMQ:
|
|
host: str
|
|
connection: pika.BlockingConnection = None
|
|
channel: BlockingChannel = None
|
|
|
|
def __init__(self, app: Flask = None):
|
|
if app is not None:
|
|
self.init_app(app)
|
|
|
|
def init_app(self, app: Flask):
|
|
self.host = app.config.get('RABBITMQ_HOST', 'localhost')
|
|
|
|
def _ensure_connection(self):
|
|
if self.channel is None:
|
|
self.connection = pika.BlockingConnection(
|
|
pika.ConnectionParameters(host=self.host))
|
|
self.channel = self.connection.channel()
|
|
|
|
def ensure_queue_exists(self, queue_name: str):
|
|
"""Ensures the queue exists on the default exchange
|
|
|
|
:param queue_name: the name of the queue
|
|
"""
|
|
self._ensure_connection()
|
|
self.channel.queue_declare(queue=queue_name, durable=True)
|
|
|
|
def queue_publish(self, queue_name: str, message):
|
|
self._ensure_connection()
|
|
self.ensure_queue_exists(queue_name)
|
|
|
|
body = json.dumps(message)
|
|
body_bytes = body.encode('utf-8')
|
|
self.channel.basic_publish(exchange='', routing_key=queue_name, body=body_bytes)
|
|
|
|
def ensure_exchange_exists(self, name: str) -> None:
|
|
"""Ensures exchange exists
|
|
|
|
:param name: the name of the exchange
|
|
"""
|
|
self._ensure_connection()
|
|
self.channel.exchange_declare(exchange=name, exchange_type='topic', durable=True)
|
|
|
|
def exchange_publish(self, exchange_name: str, topic: str, message):
|
|
"""Publishes a message to a
|
|
|
|
:param exchange_name: the name of the exchange
|
|
:param topic: the topic to publish to
|
|
:param message: the message to publish (will be turned into a json)
|
|
"""
|
|
self._ensure_connection()
|
|
self.ensure_exchange_exists(exchange_name)
|
|
|
|
body = json.dumps(message)
|
|
body_bytes = body.encode('utf-8')
|
|
self.channel.basic_publish(exchange=exchange_name, routing_key=topic, body=body_bytes)
|