Compare commits

...

3 Commits

Author SHA1 Message Date
Gardient
c9f07df262 fix things for docker compose 2021-12-19 20:27:13 +02:00
Gardient
fe424e3fa2 for the docker 2021-10-01 21:30:51 +03:00
Gardient
b5a66b5d2d webhook endpoint queue message in rabbitMQ 2021-10-01 20:17:19 +03:00
17 changed files with 165 additions and 93 deletions

21
.dockerignore Normal file
View File

@@ -0,0 +1,21 @@
Dockerfile
.dockerignore
docker-compose.yml
.git
.gitignore
.eslintrc.json
.editorconfig
.DS_STORE
Thumbs.db
.idea
.vscode/*
!.vscode/settings.json
!.vscode/tasks.json
!.vscode/launch.json
!.vscode/extensions.json
env/
__pycache__/

19
Dockerfile Normal file
View File

@@ -0,0 +1,19 @@
# Use the Python3.9.7 image
FROM python:3.10.1-bullseye
ADD https://github.com/ufoscout/docker-compose-wait/releases/download/2.9.0/wait /wait
RUN chmod +x /wait
# Set the working directory to /app
WORKDIR /app
# Copy the current directory contents into the container at /app
ADD . /app
# Install the dependencies
RUN pip install -r requirements/prod.txt
RUN chmod +x scripts/entrypoint.sh
# run the command to start uWSGI
CMD ["scripts/entrypoint.sh"]

View File

@@ -91,6 +91,7 @@ def register_shellcontext(app: Flask):
'TargetExchange': target_exchange.models.TargetExchange, 'TargetExchange': target_exchange.models.TargetExchange,
'Target': target.models.Target, 'Target': target.models.Target,
'Registration': registration.models.Registration, 'Registration': registration.models.Registration,
'config': app.config,
} }
app.shell_context_processor(shell_context) app.shell_context_processor(shell_context)

View File

@@ -100,9 +100,11 @@ def seed():
def setup_rabmq(): def setup_rabmq():
"""Set up rabbitMQ""" """Set up rabbitMQ"""
for exchange in TargetExchange.query.filter(TargetExchange.name != "").all(): for exchange in TargetExchange.query.filter(TargetExchange.name != "").all():
print(f"making sure {exchange.name} exchange exists")
rabbit.ensure_exchange_exists(exchange.name) rabbit.ensure_exchange_exists(exchange.name)
for target in Target.query.join(Target.exchange).filter(TargetExchange.name == "").all(): for target in Target.query.join(Target.exchange).filter(TargetExchange.name == "").all():
print(f"making sure {target.name} queue exists")
rabbit.ensure_queue_exists(target.routing_key) rabbit.ensure_queue_exists(target.routing_key)
pass pass

View File

@@ -35,11 +35,11 @@ def get_list(exchange=None, target=None):
@use_kwargs(registration_schema) @use_kwargs(registration_schema)
@marshal_with(registration_schema) @marshal_with(registration_schema)
def create(name, routing_key, targets): def create(name, routing_key, targets):
target_ids = [t.id for t in targets] target_ids = [t["id"] for t in targets]
db_targets = Target.query.filter(Target.id.in_(target_ids)) db_targets = Target.query.filter(Target.id.in_(target_ids))
if len(db_targets) != len(targets): if len(db_targets) != len(targets):
xchange_ids = [t.id for t in db_targets] xchange_ids = [t["id"] for t in db_targets]
not_found = ','.join([f'{t.name}({t.id})' for t in targets if t.id not in xchange_ids]) not_found = ','.join([f'{t["name"]}({t["id"]})' for t in targets if t["id"] not in xchange_ids])
raise BadRequestException(f"the target {not_found} could not be found") raise BadRequestException(f"the target {not_found} could not be found")
registration = Registration(name=name, token=routing_key, targets=db_targets) registration = Registration(name=name, token=routing_key, targets=db_targets)
registration.save() registration.save()

View File

@@ -32,9 +32,9 @@ def get_list(exchange=None):
@use_kwargs(target_schema) @use_kwargs(target_schema)
@marshal_with(target_schema) @marshal_with(target_schema)
def create(name, routing_key, exchange): def create(name, routing_key, exchange):
xchange = TargetExchange.get_by_id(exchange.id) xchange = TargetExchange.get_by_id(exchange["id"])
if xchange is None: if xchange is None:
raise BadRequestException(f"the exchange {exchange.name}({exchange.id}) could not be found") raise BadRequestException(f"the exchange {exchange['name']}({exchange['id']}) could not be found")
target = Target(name=name, routing_key=routing_key, target_exchange_id=xchange.id) target = Target(name=name, routing_key=routing_key, target_exchange_id=xchange.id)
target.save() target.save()
return target return target

View File

@@ -1,22 +1,34 @@
from flask import Blueprint, jsonify from flask import Blueprint, request, jsonify
from flask_apispec import use_kwargs from flask_apispec import use_kwargs
from marshmallow import fields from marshmallow import fields
from api.exceptions import NotFoundException from api.exceptions import NotFoundException
from api.registration.models import Registration from api.registration.models import Registration
from api.utils import docwrap from api.utils import docwrap
from api.extensions import rabbit
blueprint = Blueprint('webhook', __name__) blueprint = Blueprint('webhook', __name__)
@docwrap('webhook', 'api_key') @docwrap('webhook', 'api_key')
@blueprint.route('/webhook', methods=['GET']) @blueprint.route('/webhook', methods=['GET', 'POST'])
@use_kwargs({'apikey': fields.String(required=True)}, location='query') @use_kwargs({'apikey': fields.String(required=True)}, location='query')
def webhook(apikey): def webhook(apikey):
reg = Registration.query.filter_by(token=apikey).first() reg: Registration = Registration.query.filter_by(token=apikey).first()
if reg is None: if reg is None:
raise NotFoundException(Registration.__name__) raise NotFoundException(Registration.__name__)
return jsonify({'response': repr(reg)}) if request.method == 'GET':
pass message = request.args.to_dict()
message.pop('apikey', None)
else:
message = request.get_json()
for target in reg.targets:
if target.exchange.name == '':
rabbit.queue_publish(target.name, message)
else:
rabbit.exchange_publish(target.exchange.name, target.name, message)
return None, 204

10
app.ini Normal file
View File

@@ -0,0 +1,10 @@
[uwsgi]
wsgi-file = run.py
callable = app
http = 0.0.0.0:8080
processes = 4
threads = 2
master = true
chmod-socket = 660
vacuum = true
die-on-term = true

45
docker-compose.yml Normal file
View File

@@ -0,0 +1,45 @@
version: "3.9"
services:
rabbit:
image: rabbitmq:management
container_name: rabbit
volumes:
- rabbit_data:/var/lib/rabbitmq
expose:
- 5672
- 15672
ports:
- 15672:15762
networks:
net:
aliases:
- rabbit
api:
build: .
restart: always
environment:
FLASK_APP: run.py
SECRET_KEY: $API_SECRET
API_PASS: $API_PASS
RABBITMQ_HOST: rabbit
DB_PATH: /app/db/prod.db
WAIT_HOSTS: rabbit:15672
expose:
- 8080
ports:
- 8080:8080
networks:
net:
aliases:
- api
volumes:
- api_db:/app/db
depends_on:
- rabbit
volumes:
rabbit_data:
api_db:
networks:
net:

26
requirements/prod.txt Normal file
View File

@@ -0,0 +1,26 @@
alembic==1.7.1
apispec==5.1.0
autopep8==1.5.7
click==8.0.1
colorama==0.4.4
Flask==2.0.1
flask-apispec==0.11.0
Flask-JWT-Extended==4.3.0
Flask-Migrate==3.1.0
Flask-SQLAlchemy==2.5.1
greenlet==1.1.1
itsdangerous==2.0.1
Jinja2==3.0.1
Mako==1.1.5
MarkupSafe==2.0.1
marshmallow==3.13.0
pika==1.2.0
pika-stubs==0.1.3
pycodestyle==2.7.0
PyJWT==2.1.0
python-dotenv==0.19.0
SQLAlchemy==1.4.23
toml==0.10.2
webargs==8.0.1
Werkzeug==2.0.1
uwsgi

View File

@@ -6,8 +6,8 @@ from pika.adapters.blocking_connection import BlockingChannel
class RabbitMQ: class RabbitMQ:
host: str host: str
connection: pika.BlockingConnection connection: pika.BlockingConnection = None
channel: BlockingChannel channel: BlockingChannel = None
def __init__(self, app: Flask = None): def __init__(self, app: Flask = None):
if app is not None: if app is not None:
@@ -15,6 +15,9 @@ class RabbitMQ:
def init_app(self, app: Flask): def init_app(self, app: Flask):
self.host = app.config.get('RABBITMQ_HOST', 'localhost') self.host = app.config.get('RABBITMQ_HOST', 'localhost')
def _ensure_connection(self):
if self.channel is None:
self.connection = pika.BlockingConnection( self.connection = pika.BlockingConnection(
pika.ConnectionParameters(host=self.host)) pika.ConnectionParameters(host=self.host))
self.channel = self.connection.channel() self.channel = self.connection.channel()
@@ -24,9 +27,11 @@ class RabbitMQ:
:param queue_name: the name of the queue :param queue_name: the name of the queue
""" """
self._ensure_connection()
self.channel.queue_declare(queue=queue_name, durable=True) self.channel.queue_declare(queue=queue_name, durable=True)
def queue_publish(self, queue_name: str, message): def queue_publish(self, queue_name: str, message):
self._ensure_connection()
self.ensure_queue_exists(queue_name) self.ensure_queue_exists(queue_name)
body = json.dumps(message) body = json.dumps(message)
@@ -38,6 +43,7 @@ class RabbitMQ:
:param name: the name of the exchange :param name: the name of the exchange
""" """
self._ensure_connection()
self.channel.exchange_declare(exchange=name, exchange_type='topic', durable=True) self.channel.exchange_declare(exchange=name, exchange_type='topic', durable=True)
def exchange_publish(self, exchange_name: str, topic: str, message): def exchange_publish(self, exchange_name: str, topic: str, message):
@@ -47,6 +53,7 @@ class RabbitMQ:
:param topic: the topic to publish to :param topic: the topic to publish to
:param message: the message to publish (will be turned into a json) :param message: the message to publish (will be turned into a json)
""" """
self._ensure_connection()
self.ensure_exchange_exists(exchange_name) self.ensure_exchange_exists(exchange_name)
body = json.dumps(message) body = json.dumps(message)

View File

@@ -1,31 +0,0 @@
import json
from pika.adapters.blocking_connection import BlockingChannel
from .extension import with_pika_channel
@with_pika_channel
def ensure_exchange_exists(name: str, channel: BlockingChannel = None) -> None:
"""Ensures exchange exists
:param name: the name of the exchange
:param channel: a pika channel, if none specified a new one will be created
"""
channel.exchange_declare(exchange=name, exchange_type='topic')
@with_pika_channel
def exchange_publish(exchange_name: str, topic: str, message, channel: BlockingChannel = None):
"""Publishes a message to a
:param exchange_name: the name of the exchange
:param topic: the topic to publish to
:param message: the message to publish (will be turned into a json)
:param channel: a pika channel, if none specified a new one will be created
"""
ensure_exchange_exists(exchange_name, channel=channel)
body = json.dumps(message)
body_bytes = body.encode('utf-8')
channel.basic_publish(exchange=exchange_name, routing_key=topic, body=body_bytes)

View File

@@ -1,25 +0,0 @@
from functools import wraps
import pika
from .settings import RABBITMQ_HOST
def with_pika_channel(f):
@wraps(f)
def wrapper(*args, **kwargs):
# if we are already passing channel, just reuse it
if 'channel' in kwargs:
return f(*args, **kwargs)
# otherwise set up a new connection and channel to wrap our code with
connection = pika.BlockingConnection(
pika.ConnectionParameters(host=RABBITMQ_HOST))
channel = connection.channel()
ret = f(*args, **kwargs, channel=channel)
channel.close()
return ret
return wrapper

View File

@@ -1,19 +0,0 @@
import json
from pika.adapters.blocking_connection import BlockingChannel
from .extension import with_pika_channel
@with_pika_channel
def ensure_queue_exists(name: str, channel: BlockingChannel = None):
channel.queue_declare(queue=name)
@with_pika_channel
def queue_publish(queue_name: str, message, channel: BlockingChannel = None):
ensure_queue_exists(queue_name, channel=channel)
body = json.dumps(message)
body_bytes = body.encode('utf-8')
channel.basic_publish(exchange='', routing_key=queue_name, body=body_bytes)

View File

@@ -1,3 +0,0 @@
import os
RABBITMQ_HOST = os.environ.get("RABBITMQ_HOST", "localhost")

7
scripts/entrypoint.sh Normal file
View File

@@ -0,0 +1,7 @@
#!/usr/bin/env bash
/wait
flask db upgrade
flask seed
flask setup-rabmq
uwsgi app.ini