From 71c1e322aa8f6b43abe32466872d48f65097de23 Mon Sep 17 00:00:00 2001 From: Gardient Date: Thu, 30 Sep 2021 22:57:13 +0300 Subject: [PATCH] add proper rabbitMQ extension --- api/__init__.py | 4 ++- api/commands.py | 6 ++--- api/extensions.py | 2 ++ rmq_helper/__init__.py | 55 ++++++++++++++++++++++++++++++++++++++++-- 4 files changed, 61 insertions(+), 6 deletions(-) diff --git a/api/__init__.py b/api/__init__.py index a6b10f7..b040f1d 100644 --- a/api/__init__.py +++ b/api/__init__.py @@ -2,7 +2,7 @@ from flask import Flask, Blueprint from . import commands, login, target_exchange, target, registration from .exceptions import ApiException -from .extensions import db, migrate, jwt, apispec +from .extensions import db, migrate, jwt, apispec, rabbit from .settings import ProdConfig, Config @@ -31,6 +31,7 @@ def register_extensions(app: Flask): migrate.init_app(app, db) jwt.init_app(app) apispec.init_app(app) + rabbit.init_app(app) def register_blueprints(app: Flask): @@ -82,6 +83,7 @@ def register_shellcontext(app: Flask): """Shell context objects.""" return { 'db': db, + 'rabbit': rabbit, 'TargetExchange': target_exchange.models.TargetExchange, 'Target': target.models.Target, 'Registration': registration.models.Registration, diff --git a/api/commands.py b/api/commands.py index 6cbe64d..3427efb 100644 --- a/api/commands.py +++ b/api/commands.py @@ -5,7 +5,7 @@ from flask import current_app from flask.cli import with_appcontext from werkzeug.exceptions import MethodNotAllowed, NotFound -import rmq_helper +from .extensions import rabbit from .target.models import Target from .target_exchange.models import TargetExchange @@ -100,9 +100,9 @@ def seed(): def setup_rabmq(): """Set up rabbitMQ""" for exchange in TargetExchange.query.filter(TargetExchange.name != "").all(): - rmq_helper.ensure_exchange_exists(exchange.name) + rabbit.ensure_exchange_exists(exchange.name) for target in Target.query.join(Target.exchange).filter(TargetExchange.name == "").all(): - rmq_helper.ensure_queue_exists(target.routing_key) + rabbit.ensure_queue_exists(target.routing_key) pass diff --git a/api/extensions.py b/api/extensions.py index 9392df1..7a1c54c 100644 --- a/api/extensions.py +++ b/api/extensions.py @@ -2,6 +2,7 @@ from flask_apispec import FlaskApiSpec from flask_jwt_extended import JWTManager from flask_migrate import Migrate from flask_sqlalchemy import SQLAlchemy, Model +from rmq_helper import RabbitMQ class CRUDMixin(Model): @@ -36,3 +37,4 @@ db = SQLAlchemy(model_class=CRUDMixin) migrate = Migrate() jwt = JWTManager() apispec = FlaskApiSpec() +rabbit = RabbitMQ() diff --git a/rmq_helper/__init__.py b/rmq_helper/__init__.py index 8eaaeb9..3f7f348 100644 --- a/rmq_helper/__init__.py +++ b/rmq_helper/__init__.py @@ -1,3 +1,54 @@ """RabbitMQ helpers""" -from .exchanges import ensure_exchange_exists, exchange_publish -from .queues import ensure_queue_exists, queue_publish +import pika +from flask import Flask, json +from pika.adapters.blocking_connection import BlockingChannel + + +class RabbitMQ: + host: str + connection: pika.BlockingConnection + channel: BlockingChannel + + def __init__(self, app: Flask = None): + if app is not None: + self.init_app(app) + + def init_app(self, app: Flask): + self.host = app.config.get('RABBITMQ_HOST', 'localhost') + self.connection = pika.BlockingConnection( + pika.ConnectionParameters(host=self.host)) + self.channel = self.connection.channel() + + def ensure_queue_exists(self, queue_name: str): + """Ensures the queue exists on the default exchange + + :param queue_name: the name of the queue + """ + self.channel.queue_declare(queue=queue_name, durable=True) + + def queue_publish(self, queue_name: str, message): + self.ensure_queue_exists(queue_name) + + body = json.dumps(message) + body_bytes = body.encode('utf-8') + self.channel.basic_publish(exchange='', routing_key=queue_name, body=body_bytes) + + def ensure_exchange_exists(self, name: str) -> None: + """Ensures exchange exists + + :param name: the name of the exchange + """ + self.channel.exchange_declare(exchange=name, exchange_type='topic', durable=True) + + def exchange_publish(self, exchange_name: str, topic: str, message): + """Publishes a message to a + + :param exchange_name: the name of the exchange + :param topic: the topic to publish to + :param message: the message to publish (will be turned into a json) + """ + self.ensure_exchange_exists(exchange_name) + + body = json.dumps(message) + body_bytes = body.encode('utf-8') + self.channel.basic_publish(exchange=exchange_name, routing_key=topic, body=body_bytes)