From b50c67ae1e890c9a991905e33b3741c3c5810f79 Mon Sep 17 00:00:00 2001 From: Gardient Date: Wed, 22 Sep 2021 22:13:04 +0300 Subject: [PATCH] added rabbitmq helpers and setup command --- .idea/mahssage-bus-api.iml | 4 ---- api/__init__.py | 1 + api/commands.py | 15 +++++++++++++++ api/settings.py | 1 + requirements.txt | Bin 844 -> 428 bytes rmq_helper/__init__.py | 3 +++ rmq_helper/exchanges.py | 31 +++++++++++++++++++++++++++++++ rmq_helper/extension.py | 25 +++++++++++++++++++++++++ rmq_helper/queues.py | 19 +++++++++++++++++++ rmq_helper/settings.py | 3 +++ 10 files changed, 98 insertions(+), 4 deletions(-) create mode 100644 rmq_helper/__init__.py create mode 100644 rmq_helper/exchanges.py create mode 100644 rmq_helper/extension.py create mode 100644 rmq_helper/queues.py create mode 100644 rmq_helper/settings.py diff --git a/.idea/mahssage-bus-api.iml b/.idea/mahssage-bus-api.iml index b17f0b0..e627057 100644 --- a/.idea/mahssage-bus-api.iml +++ b/.idea/mahssage-bus-api.iml @@ -7,8 +7,4 @@ - - \ No newline at end of file diff --git a/api/__init__.py b/api/__init__.py index fbad6c7..a6b10f7 100644 --- a/api/__init__.py +++ b/api/__init__.py @@ -95,3 +95,4 @@ def register_commands(app: Flask): app.cli.add_command(commands.clean) app.cli.add_command(commands.urls) app.cli.add_command(commands.seed) + app.cli.add_command(commands.setup_rabmq) diff --git a/api/commands.py b/api/commands.py index 67ec59f..6cbe64d 100644 --- a/api/commands.py +++ b/api/commands.py @@ -5,6 +5,8 @@ from flask import current_app from flask.cli import with_appcontext from werkzeug.exceptions import MethodNotAllowed, NotFound +import rmq_helper +from .target.models import Target from .target_exchange.models import TargetExchange @@ -91,3 +93,16 @@ def seed(): TargetExchange.ensure_created("") TargetExchange.ensure_created("webhooks") + + +@click.command() +@with_appcontext +def setup_rabmq(): + """Set up rabbitMQ""" + for exchange in TargetExchange.query.filter(TargetExchange.name != "").all(): + rmq_helper.ensure_exchange_exists(exchange.name) + + for target in Target.query.join(Target.exchange).filter(TargetExchange.name == "").all(): + rmq_helper.ensure_queue_exists(target.routing_key) + + pass diff --git a/api/settings.py b/api/settings.py index 7b29e45..423278a 100644 --- a/api/settings.py +++ b/api/settings.py @@ -18,6 +18,7 @@ class Config(object): JWT_AUTH_HEADER_PREFIX = 'Token' APISPEC_TITLE = 'MahssageBus API' APISPEC_VERSION = 'v0.1' + RABBITMQ_HOST = os.environ.get("RABBITMQ_HOST", "localhost") class ProdConfig(Config): diff --git a/requirements.txt b/requirements.txt index ec358b3ddbe3ac94aa233468da717122a5a1c244..ef40ac71350bfe1f1e84ae9aa6e56659b21f6d50 100644 GIT binary patch literal 428 zcmYjN%WlIU5WMp*Mt;iafkO}NA!?*nsg(2D-q_S&5a6cn*LT56RB>6DVIB(#btiiv z3f}U9ftB7OT^nBToWUB5MSF;F!&}xX+0Q~8cuw5jDC0nb5IL`T&0ZCF0?Pzm<0X5@ z5kKlBN_M<_W-mVhmjMSMYF<5%lN=oc6rxJ@JN^9e_N@8|^%lh`7#khpqACQ7L9U!EAPRL$jM#dxk6d>|r%HKWR|P+>&(+WaUiF3QmkPXFjYQ zsgoM7nB96apbwi$A}x{LH|iA?V^O<>$t>AmsO5M1^hB@!n-sLOvSPt#mGGRG?1mk< QkxcdlU(n6JxQ_Rje=*a2!T<(^3(CygIY5Ei~4USK!U=%S?VXI$X}!+Pjfc!>j2!!wK;22~BMBBFx0w}y>) z-*4C>cJ6p`>LfhU7kX<)`cK3|>bvfs)xB&mufwNP(KOEKnaRT#GrYmWBR=Uq#vThk z7p5=mo6fEY=UACn^~kI0x)r8)#WNo0Fv6PE2~O6b30#7jPHZthd)CEtQs*yDm;Z*h z1)YVgS+OQ(S|x8(*-m3jcYD0EGBDK5Q1eW9zh&>G!oc<;BqvsKhg1GuQspnZwv$tL zZF}0$Z;E51<6k6}aZ>G7i57H!ve{}zy;*J_FGSoPYV4e(I1bcGIgv-El>d~{hbg+v z-Zf1BWi38!GoU-At@kvc8|gU?Jp;pzr(|l)m^~auc;@$0PP%Xc>50 diff --git a/rmq_helper/__init__.py b/rmq_helper/__init__.py new file mode 100644 index 0000000..8eaaeb9 --- /dev/null +++ b/rmq_helper/__init__.py @@ -0,0 +1,3 @@ +"""RabbitMQ helpers""" +from .exchanges import ensure_exchange_exists, exchange_publish +from .queues import ensure_queue_exists, queue_publish diff --git a/rmq_helper/exchanges.py b/rmq_helper/exchanges.py new file mode 100644 index 0000000..2669371 --- /dev/null +++ b/rmq_helper/exchanges.py @@ -0,0 +1,31 @@ +import json + +from pika.adapters.blocking_connection import BlockingChannel + +from .extension import with_pika_channel + + +@with_pika_channel +def ensure_exchange_exists(name: str, channel: BlockingChannel = None) -> None: + """Ensures exchange exists + + :param name: the name of the exchange + :param channel: a pika channel, if none specified a new one will be created + """ + channel.exchange_declare(exchange=name, exchange_type='topic') + + +@with_pika_channel +def exchange_publish(exchange_name: str, topic: str, message, channel: BlockingChannel = None): + """Publishes a message to a + + :param exchange_name: the name of the exchange + :param topic: the topic to publish to + :param message: the message to publish (will be turned into a json) + :param channel: a pika channel, if none specified a new one will be created + """ + ensure_exchange_exists(exchange_name, channel=channel) + + body = json.dumps(message) + body_bytes = body.encode('utf-8') + channel.basic_publish(exchange=exchange_name, routing_key=topic, body=body_bytes) diff --git a/rmq_helper/extension.py b/rmq_helper/extension.py new file mode 100644 index 0000000..be47ecb --- /dev/null +++ b/rmq_helper/extension.py @@ -0,0 +1,25 @@ +from functools import wraps + +import pika + +from .settings import RABBITMQ_HOST + + +def with_pika_channel(f): + @wraps(f) + def wrapper(*args, **kwargs): + # if we are already passing channel, just reuse it + if 'channel' in kwargs: + return f(*args, **kwargs) + + # otherwise set up a new connection and channel to wrap our code with + connection = pika.BlockingConnection( + pika.ConnectionParameters(host=RABBITMQ_HOST)) + channel = connection.channel() + + ret = f(*args, **kwargs, channel=channel) + + channel.close() + return ret + + return wrapper diff --git a/rmq_helper/queues.py b/rmq_helper/queues.py new file mode 100644 index 0000000..daef8f1 --- /dev/null +++ b/rmq_helper/queues.py @@ -0,0 +1,19 @@ +import json + +from pika.adapters.blocking_connection import BlockingChannel + +from .extension import with_pika_channel + + +@with_pika_channel +def ensure_queue_exists(name: str, channel: BlockingChannel = None): + channel.queue_declare(queue=name) + + +@with_pika_channel +def queue_publish(queue_name: str, message, channel: BlockingChannel = None): + ensure_queue_exists(queue_name, channel=channel) + + body = json.dumps(message) + body_bytes = body.encode('utf-8') + channel.basic_publish(exchange='', routing_key=queue_name, body=body_bytes) diff --git a/rmq_helper/settings.py b/rmq_helper/settings.py new file mode 100644 index 0000000..eced5b3 --- /dev/null +++ b/rmq_helper/settings.py @@ -0,0 +1,3 @@ +import os + +RABBITMQ_HOST = os.environ.get("RABBITMQ_HOST", "localhost")