import json from pika.adapters.blocking_connection import BlockingChannel from .extension import with_pika_channel @with_pika_channel def ensure_exchange_exists(name: str, channel: BlockingChannel = None) -> None: """Ensures exchange exists :param name: the name of the exchange :param channel: a pika channel, if none specified a new one will be created """ channel.exchange_declare(exchange=name, exchange_type='topic') @with_pika_channel def exchange_publish(exchange_name: str, topic: str, message, channel: BlockingChannel = None): """Publishes a message to a :param exchange_name: the name of the exchange :param topic: the topic to publish to :param message: the message to publish (will be turned into a json) :param channel: a pika channel, if none specified a new one will be created """ ensure_exchange_exists(exchange_name, channel=channel) body = json.dumps(message) body_bytes = body.encode('utf-8') channel.basic_publish(exchange=exchange_name, routing_key=topic, body=body_bytes)