added rabbitmq helpers and setup command

This commit is contained in:
Gardient
2021-09-22 22:13:04 +03:00
parent 80e4ddf1a8
commit b50c67ae1e
10 changed files with 98 additions and 4 deletions

3
rmq_helper/__init__.py Normal file
View File

@@ -0,0 +1,3 @@
"""RabbitMQ helpers"""
from .exchanges import ensure_exchange_exists, exchange_publish
from .queues import ensure_queue_exists, queue_publish

31
rmq_helper/exchanges.py Normal file
View File

@@ -0,0 +1,31 @@
import json
from pika.adapters.blocking_connection import BlockingChannel
from .extension import with_pika_channel
@with_pika_channel
def ensure_exchange_exists(name: str, channel: BlockingChannel = None) -> None:
"""Ensures exchange exists
:param name: the name of the exchange
:param channel: a pika channel, if none specified a new one will be created
"""
channel.exchange_declare(exchange=name, exchange_type='topic')
@with_pika_channel
def exchange_publish(exchange_name: str, topic: str, message, channel: BlockingChannel = None):
"""Publishes a message to a
:param exchange_name: the name of the exchange
:param topic: the topic to publish to
:param message: the message to publish (will be turned into a json)
:param channel: a pika channel, if none specified a new one will be created
"""
ensure_exchange_exists(exchange_name, channel=channel)
body = json.dumps(message)
body_bytes = body.encode('utf-8')
channel.basic_publish(exchange=exchange_name, routing_key=topic, body=body_bytes)

25
rmq_helper/extension.py Normal file
View File

@@ -0,0 +1,25 @@
from functools import wraps
import pika
from .settings import RABBITMQ_HOST
def with_pika_channel(f):
@wraps(f)
def wrapper(*args, **kwargs):
# if we are already passing channel, just reuse it
if 'channel' in kwargs:
return f(*args, **kwargs)
# otherwise set up a new connection and channel to wrap our code with
connection = pika.BlockingConnection(
pika.ConnectionParameters(host=RABBITMQ_HOST))
channel = connection.channel()
ret = f(*args, **kwargs, channel=channel)
channel.close()
return ret
return wrapper

19
rmq_helper/queues.py Normal file
View File

@@ -0,0 +1,19 @@
import json
from pika.adapters.blocking_connection import BlockingChannel
from .extension import with_pika_channel
@with_pika_channel
def ensure_queue_exists(name: str, channel: BlockingChannel = None):
channel.queue_declare(queue=name)
@with_pika_channel
def queue_publish(queue_name: str, message, channel: BlockingChannel = None):
ensure_queue_exists(queue_name, channel=channel)
body = json.dumps(message)
body_bytes = body.encode('utf-8')
channel.basic_publish(exchange='', routing_key=queue_name, body=body_bytes)

3
rmq_helper/settings.py Normal file
View File

@@ -0,0 +1,3 @@
import os
RABBITMQ_HOST = os.environ.get("RABBITMQ_HOST", "localhost")