Compare commits

...

2 Commits

Author SHA1 Message Date
Gardient
fe424e3fa2 for the docker 2021-10-01 21:30:51 +03:00
Gardient
b5a66b5d2d webhook endpoint queue message in rabbitMQ 2021-10-01 20:17:19 +03:00
12 changed files with 105 additions and 86 deletions

21
.dockerignore Normal file
View File

@@ -0,0 +1,21 @@
Dockerfile
.dockerignore
docker-compose.yml
.git
.gitignore
.eslintrc.json
.editorconfig
.DS_STORE
Thumbs.db
.idea
.vscode/*
!.vscode/settings.json
!.vscode/tasks.json
!.vscode/launch.json
!.vscode/extensions.json
env/
__pycache__/

19
Dockerfile Normal file
View File

@@ -0,0 +1,19 @@
# Use the Python3.9.7 image
FROM python:3.9.7-bullseye
# Set the working directory to /app
WORKDIR /app
# Copy the current directory contents into the container at /app
ADD . /app
# Install the dependencies
RUN pip install -r requirements/prod.txt
#make sure we have everything we need
RUN flask db upgrade
RUN flask seed
RUN flask setup-rabmq
# run the command to start uWSGI
CMD ["uwsgi", "app.ini"]

View File

@@ -100,9 +100,11 @@ def seed():
def setup_rabmq(): def setup_rabmq():
"""Set up rabbitMQ""" """Set up rabbitMQ"""
for exchange in TargetExchange.query.filter(TargetExchange.name != "").all(): for exchange in TargetExchange.query.filter(TargetExchange.name != "").all():
print(f"making sure {exchange.name} exchange exists")
rabbit.ensure_exchange_exists(exchange.name) rabbit.ensure_exchange_exists(exchange.name)
for target in Target.query.join(Target.exchange).filter(TargetExchange.name == "").all(): for target in Target.query.join(Target.exchange).filter(TargetExchange.name == "").all():
print(f"making sure {target.name} queue exists")
rabbit.ensure_queue_exists(target.routing_key) rabbit.ensure_queue_exists(target.routing_key)
pass pass

View File

@@ -1,22 +1,34 @@
from flask import Blueprint, jsonify from flask import Blueprint, request, jsonify
from flask_apispec import use_kwargs from flask_apispec import use_kwargs
from marshmallow import fields from marshmallow import fields
from api.exceptions import NotFoundException from api.exceptions import NotFoundException
from api.registration.models import Registration from api.registration.models import Registration
from api.utils import docwrap from api.utils import docwrap
from api.extensions import rabbit
blueprint = Blueprint('webhook', __name__) blueprint = Blueprint('webhook', __name__)
@docwrap('webhook', 'api_key') @docwrap('webhook', 'api_key')
@blueprint.route('/webhook', methods=['GET']) @blueprint.route('/webhook', methods=['GET', 'POST'])
@use_kwargs({'apikey': fields.String(required=True)}, location='query') @use_kwargs({'apikey': fields.String(required=True)}, location='query')
def webhook(apikey): def webhook(apikey):
reg = Registration.query.filter_by(token=apikey).first() reg: Registration = Registration.query.filter_by(token=apikey).first()
if reg is None: if reg is None:
raise NotFoundException(Registration.__name__) raise NotFoundException(Registration.__name__)
return jsonify({'response': repr(reg)}) if request.method == 'GET':
pass message = request.args.to_dict()
message.pop('apikey', None)
else:
message = request.get_json()
for target in reg.targets:
if target.exchange.name == '':
rabbit.queue_publish(target.name, message)
else:
rabbit.exchange_publish(target.exchange.name, target.name, message)
return None, 204

10
app.ini Normal file
View File

@@ -0,0 +1,10 @@
[uwsgi]
wsgi-file = run.py
callable = app
socket = :8080
processes = 4
threads = 2
master = true
chmod-socket = 660
vacuum = true
die-on-term = true

26
requirements/prod.txt Normal file
View File

@@ -0,0 +1,26 @@
alembic==1.7.1
apispec==5.1.0
autopep8==1.5.7
click==8.0.1
colorama==0.4.4
Flask==2.0.1
flask-apispec==0.11.0
Flask-JWT-Extended==4.3.0
Flask-Migrate==3.1.0
Flask-SQLAlchemy==2.5.1
greenlet==1.1.1
itsdangerous==2.0.1
Jinja2==3.0.1
Mako==1.1.5
MarkupSafe==2.0.1
marshmallow==3.13.0
pika==1.2.0
pika-stubs==0.1.3
pycodestyle==2.7.0
PyJWT==2.1.0
python-dotenv==0.19.0
SQLAlchemy==1.4.23
toml==0.10.2
webargs==8.0.1
Werkzeug==2.0.1
uwsgi

View File

@@ -15,6 +15,9 @@ class RabbitMQ:
def init_app(self, app: Flask): def init_app(self, app: Flask):
self.host = app.config.get('RABBITMQ_HOST', 'localhost') self.host = app.config.get('RABBITMQ_HOST', 'localhost')
def _ensure_connection(self):
if self.channel is None:
self.connection = pika.BlockingConnection( self.connection = pika.BlockingConnection(
pika.ConnectionParameters(host=self.host)) pika.ConnectionParameters(host=self.host))
self.channel = self.connection.channel() self.channel = self.connection.channel()
@@ -24,9 +27,11 @@ class RabbitMQ:
:param queue_name: the name of the queue :param queue_name: the name of the queue
""" """
self._ensure_connection()
self.channel.queue_declare(queue=queue_name, durable=True) self.channel.queue_declare(queue=queue_name, durable=True)
def queue_publish(self, queue_name: str, message): def queue_publish(self, queue_name: str, message):
self._ensure_connection()
self.ensure_queue_exists(queue_name) self.ensure_queue_exists(queue_name)
body = json.dumps(message) body = json.dumps(message)
@@ -38,6 +43,7 @@ class RabbitMQ:
:param name: the name of the exchange :param name: the name of the exchange
""" """
self._ensure_connection()
self.channel.exchange_declare(exchange=name, exchange_type='topic', durable=True) self.channel.exchange_declare(exchange=name, exchange_type='topic', durable=True)
def exchange_publish(self, exchange_name: str, topic: str, message): def exchange_publish(self, exchange_name: str, topic: str, message):
@@ -47,6 +53,7 @@ class RabbitMQ:
:param topic: the topic to publish to :param topic: the topic to publish to
:param message: the message to publish (will be turned into a json) :param message: the message to publish (will be turned into a json)
""" """
self._ensure_connection()
self.ensure_exchange_exists(exchange_name) self.ensure_exchange_exists(exchange_name)
body = json.dumps(message) body = json.dumps(message)

View File

@@ -1,31 +0,0 @@
import json
from pika.adapters.blocking_connection import BlockingChannel
from .extension import with_pika_channel
@with_pika_channel
def ensure_exchange_exists(name: str, channel: BlockingChannel = None) -> None:
"""Ensures exchange exists
:param name: the name of the exchange
:param channel: a pika channel, if none specified a new one will be created
"""
channel.exchange_declare(exchange=name, exchange_type='topic')
@with_pika_channel
def exchange_publish(exchange_name: str, topic: str, message, channel: BlockingChannel = None):
"""Publishes a message to a
:param exchange_name: the name of the exchange
:param topic: the topic to publish to
:param message: the message to publish (will be turned into a json)
:param channel: a pika channel, if none specified a new one will be created
"""
ensure_exchange_exists(exchange_name, channel=channel)
body = json.dumps(message)
body_bytes = body.encode('utf-8')
channel.basic_publish(exchange=exchange_name, routing_key=topic, body=body_bytes)

View File

@@ -1,25 +0,0 @@
from functools import wraps
import pika
from .settings import RABBITMQ_HOST
def with_pika_channel(f):
@wraps(f)
def wrapper(*args, **kwargs):
# if we are already passing channel, just reuse it
if 'channel' in kwargs:
return f(*args, **kwargs)
# otherwise set up a new connection and channel to wrap our code with
connection = pika.BlockingConnection(
pika.ConnectionParameters(host=RABBITMQ_HOST))
channel = connection.channel()
ret = f(*args, **kwargs, channel=channel)
channel.close()
return ret
return wrapper

View File

@@ -1,19 +0,0 @@
import json
from pika.adapters.blocking_connection import BlockingChannel
from .extension import with_pika_channel
@with_pika_channel
def ensure_queue_exists(name: str, channel: BlockingChannel = None):
channel.queue_declare(queue=name)
@with_pika_channel
def queue_publish(queue_name: str, message, channel: BlockingChannel = None):
ensure_queue_exists(queue_name, channel=channel)
body = json.dumps(message)
body_bytes = body.encode('utf-8')
channel.basic_publish(exchange='', routing_key=queue_name, body=body_bytes)

View File

@@ -1,3 +0,0 @@
import os
RABBITMQ_HOST = os.environ.get("RABBITMQ_HOST", "localhost")