add proper rabbitMQ extension

This commit is contained in:
Gardient
2021-09-30 22:57:13 +03:00
parent b50c67ae1e
commit 71c1e322aa
4 changed files with 61 additions and 6 deletions

View File

@@ -2,7 +2,7 @@ from flask import Flask, Blueprint
from . import commands, login, target_exchange, target, registration from . import commands, login, target_exchange, target, registration
from .exceptions import ApiException from .exceptions import ApiException
from .extensions import db, migrate, jwt, apispec from .extensions import db, migrate, jwt, apispec, rabbit
from .settings import ProdConfig, Config from .settings import ProdConfig, Config
@@ -31,6 +31,7 @@ def register_extensions(app: Flask):
migrate.init_app(app, db) migrate.init_app(app, db)
jwt.init_app(app) jwt.init_app(app)
apispec.init_app(app) apispec.init_app(app)
rabbit.init_app(app)
def register_blueprints(app: Flask): def register_blueprints(app: Flask):
@@ -82,6 +83,7 @@ def register_shellcontext(app: Flask):
"""Shell context objects.""" """Shell context objects."""
return { return {
'db': db, 'db': db,
'rabbit': rabbit,
'TargetExchange': target_exchange.models.TargetExchange, 'TargetExchange': target_exchange.models.TargetExchange,
'Target': target.models.Target, 'Target': target.models.Target,
'Registration': registration.models.Registration, 'Registration': registration.models.Registration,

View File

@@ -5,7 +5,7 @@ from flask import current_app
from flask.cli import with_appcontext from flask.cli import with_appcontext
from werkzeug.exceptions import MethodNotAllowed, NotFound from werkzeug.exceptions import MethodNotAllowed, NotFound
import rmq_helper from .extensions import rabbit
from .target.models import Target from .target.models import Target
from .target_exchange.models import TargetExchange from .target_exchange.models import TargetExchange
@@ -100,9 +100,9 @@ def seed():
def setup_rabmq(): def setup_rabmq():
"""Set up rabbitMQ""" """Set up rabbitMQ"""
for exchange in TargetExchange.query.filter(TargetExchange.name != "").all(): for exchange in TargetExchange.query.filter(TargetExchange.name != "").all():
rmq_helper.ensure_exchange_exists(exchange.name) rabbit.ensure_exchange_exists(exchange.name)
for target in Target.query.join(Target.exchange).filter(TargetExchange.name == "").all(): for target in Target.query.join(Target.exchange).filter(TargetExchange.name == "").all():
rmq_helper.ensure_queue_exists(target.routing_key) rabbit.ensure_queue_exists(target.routing_key)
pass pass

View File

@@ -2,6 +2,7 @@ from flask_apispec import FlaskApiSpec
from flask_jwt_extended import JWTManager from flask_jwt_extended import JWTManager
from flask_migrate import Migrate from flask_migrate import Migrate
from flask_sqlalchemy import SQLAlchemy, Model from flask_sqlalchemy import SQLAlchemy, Model
from rmq_helper import RabbitMQ
class CRUDMixin(Model): class CRUDMixin(Model):
@@ -36,3 +37,4 @@ db = SQLAlchemy(model_class=CRUDMixin)
migrate = Migrate() migrate = Migrate()
jwt = JWTManager() jwt = JWTManager()
apispec = FlaskApiSpec() apispec = FlaskApiSpec()
rabbit = RabbitMQ()

View File

@@ -1,3 +1,54 @@
"""RabbitMQ helpers""" """RabbitMQ helpers"""
from .exchanges import ensure_exchange_exists, exchange_publish import pika
from .queues import ensure_queue_exists, queue_publish from flask import Flask, json
from pika.adapters.blocking_connection import BlockingChannel
class RabbitMQ:
host: str
connection: pika.BlockingConnection
channel: BlockingChannel
def __init__(self, app: Flask = None):
if app is not None:
self.init_app(app)
def init_app(self, app: Flask):
self.host = app.config.get('RABBITMQ_HOST', 'localhost')
self.connection = pika.BlockingConnection(
pika.ConnectionParameters(host=self.host))
self.channel = self.connection.channel()
def ensure_queue_exists(self, queue_name: str):
"""Ensures the queue exists on the default exchange
:param queue_name: the name of the queue
"""
self.channel.queue_declare(queue=queue_name, durable=True)
def queue_publish(self, queue_name: str, message):
self.ensure_queue_exists(queue_name)
body = json.dumps(message)
body_bytes = body.encode('utf-8')
self.channel.basic_publish(exchange='', routing_key=queue_name, body=body_bytes)
def ensure_exchange_exists(self, name: str) -> None:
"""Ensures exchange exists
:param name: the name of the exchange
"""
self.channel.exchange_declare(exchange=name, exchange_type='topic', durable=True)
def exchange_publish(self, exchange_name: str, topic: str, message):
"""Publishes a message to a
:param exchange_name: the name of the exchange
:param topic: the topic to publish to
:param message: the message to publish (will be turned into a json)
"""
self.ensure_exchange_exists(exchange_name)
body = json.dumps(message)
body_bytes = body.encode('utf-8')
self.channel.basic_publish(exchange=exchange_name, routing_key=topic, body=body_bytes)