diff --git a/.idea/mahssage-bus-api.iml b/.idea/mahssage-bus-api.iml index b17f0b0..e627057 100644 --- a/.idea/mahssage-bus-api.iml +++ b/.idea/mahssage-bus-api.iml @@ -7,8 +7,4 @@ - - \ No newline at end of file diff --git a/api/__init__.py b/api/__init__.py index fbad6c7..a6b10f7 100644 --- a/api/__init__.py +++ b/api/__init__.py @@ -95,3 +95,4 @@ def register_commands(app: Flask): app.cli.add_command(commands.clean) app.cli.add_command(commands.urls) app.cli.add_command(commands.seed) + app.cli.add_command(commands.setup_rabmq) diff --git a/api/commands.py b/api/commands.py index 67ec59f..6cbe64d 100644 --- a/api/commands.py +++ b/api/commands.py @@ -5,6 +5,8 @@ from flask import current_app from flask.cli import with_appcontext from werkzeug.exceptions import MethodNotAllowed, NotFound +import rmq_helper +from .target.models import Target from .target_exchange.models import TargetExchange @@ -91,3 +93,16 @@ def seed(): TargetExchange.ensure_created("") TargetExchange.ensure_created("webhooks") + + +@click.command() +@with_appcontext +def setup_rabmq(): + """Set up rabbitMQ""" + for exchange in TargetExchange.query.filter(TargetExchange.name != "").all(): + rmq_helper.ensure_exchange_exists(exchange.name) + + for target in Target.query.join(Target.exchange).filter(TargetExchange.name == "").all(): + rmq_helper.ensure_queue_exists(target.routing_key) + + pass diff --git a/api/settings.py b/api/settings.py index 7b29e45..423278a 100644 --- a/api/settings.py +++ b/api/settings.py @@ -18,6 +18,7 @@ class Config(object): JWT_AUTH_HEADER_PREFIX = 'Token' APISPEC_TITLE = 'MahssageBus API' APISPEC_VERSION = 'v0.1' + RABBITMQ_HOST = os.environ.get("RABBITMQ_HOST", "localhost") class ProdConfig(Config): diff --git a/requirements.txt b/requirements.txt index ec358b3..ef40ac7 100644 Binary files a/requirements.txt and b/requirements.txt differ diff --git a/rmq_helper/__init__.py b/rmq_helper/__init__.py new file mode 100644 index 0000000..8eaaeb9 --- /dev/null +++ b/rmq_helper/__init__.py @@ -0,0 +1,3 @@ +"""RabbitMQ helpers""" +from .exchanges import ensure_exchange_exists, exchange_publish +from .queues import ensure_queue_exists, queue_publish diff --git a/rmq_helper/exchanges.py b/rmq_helper/exchanges.py new file mode 100644 index 0000000..2669371 --- /dev/null +++ b/rmq_helper/exchanges.py @@ -0,0 +1,31 @@ +import json + +from pika.adapters.blocking_connection import BlockingChannel + +from .extension import with_pika_channel + + +@with_pika_channel +def ensure_exchange_exists(name: str, channel: BlockingChannel = None) -> None: + """Ensures exchange exists + + :param name: the name of the exchange + :param channel: a pika channel, if none specified a new one will be created + """ + channel.exchange_declare(exchange=name, exchange_type='topic') + + +@with_pika_channel +def exchange_publish(exchange_name: str, topic: str, message, channel: BlockingChannel = None): + """Publishes a message to a + + :param exchange_name: the name of the exchange + :param topic: the topic to publish to + :param message: the message to publish (will be turned into a json) + :param channel: a pika channel, if none specified a new one will be created + """ + ensure_exchange_exists(exchange_name, channel=channel) + + body = json.dumps(message) + body_bytes = body.encode('utf-8') + channel.basic_publish(exchange=exchange_name, routing_key=topic, body=body_bytes) diff --git a/rmq_helper/extension.py b/rmq_helper/extension.py new file mode 100644 index 0000000..be47ecb --- /dev/null +++ b/rmq_helper/extension.py @@ -0,0 +1,25 @@ +from functools import wraps + +import pika + +from .settings import RABBITMQ_HOST + + +def with_pika_channel(f): + @wraps(f) + def wrapper(*args, **kwargs): + # if we are already passing channel, just reuse it + if 'channel' in kwargs: + return f(*args, **kwargs) + + # otherwise set up a new connection and channel to wrap our code with + connection = pika.BlockingConnection( + pika.ConnectionParameters(host=RABBITMQ_HOST)) + channel = connection.channel() + + ret = f(*args, **kwargs, channel=channel) + + channel.close() + return ret + + return wrapper diff --git a/rmq_helper/queues.py b/rmq_helper/queues.py new file mode 100644 index 0000000..daef8f1 --- /dev/null +++ b/rmq_helper/queues.py @@ -0,0 +1,19 @@ +import json + +from pika.adapters.blocking_connection import BlockingChannel + +from .extension import with_pika_channel + + +@with_pika_channel +def ensure_queue_exists(name: str, channel: BlockingChannel = None): + channel.queue_declare(queue=name) + + +@with_pika_channel +def queue_publish(queue_name: str, message, channel: BlockingChannel = None): + ensure_queue_exists(queue_name, channel=channel) + + body = json.dumps(message) + body_bytes = body.encode('utf-8') + channel.basic_publish(exchange='', routing_key=queue_name, body=body_bytes) diff --git a/rmq_helper/settings.py b/rmq_helper/settings.py new file mode 100644 index 0000000..eced5b3 --- /dev/null +++ b/rmq_helper/settings.py @@ -0,0 +1,3 @@ +import os + +RABBITMQ_HOST = os.environ.get("RABBITMQ_HOST", "localhost")