Compare commits
3 Commits
a0dbdc4ddf
...
main
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
c9f07df262 | ||
|
|
fe424e3fa2 | ||
|
|
b5a66b5d2d |
21
.dockerignore
Normal file
21
.dockerignore
Normal file
@@ -0,0 +1,21 @@
|
||||
Dockerfile
|
||||
.dockerignore
|
||||
docker-compose.yml
|
||||
|
||||
.git
|
||||
.gitignore
|
||||
.eslintrc.json
|
||||
.editorconfig
|
||||
|
||||
.DS_STORE
|
||||
Thumbs.db
|
||||
|
||||
.idea
|
||||
.vscode/*
|
||||
!.vscode/settings.json
|
||||
!.vscode/tasks.json
|
||||
!.vscode/launch.json
|
||||
!.vscode/extensions.json
|
||||
|
||||
env/
|
||||
__pycache__/
|
||||
19
Dockerfile
Normal file
19
Dockerfile
Normal file
@@ -0,0 +1,19 @@
|
||||
# Use the Python3.9.7 image
|
||||
FROM python:3.10.1-bullseye
|
||||
|
||||
ADD https://github.com/ufoscout/docker-compose-wait/releases/download/2.9.0/wait /wait
|
||||
RUN chmod +x /wait
|
||||
|
||||
# Set the working directory to /app
|
||||
WORKDIR /app
|
||||
|
||||
# Copy the current directory contents into the container at /app
|
||||
ADD . /app
|
||||
|
||||
# Install the dependencies
|
||||
RUN pip install -r requirements/prod.txt
|
||||
|
||||
RUN chmod +x scripts/entrypoint.sh
|
||||
|
||||
# run the command to start uWSGI
|
||||
CMD ["scripts/entrypoint.sh"]
|
||||
@@ -91,6 +91,7 @@ def register_shellcontext(app: Flask):
|
||||
'TargetExchange': target_exchange.models.TargetExchange,
|
||||
'Target': target.models.Target,
|
||||
'Registration': registration.models.Registration,
|
||||
'config': app.config,
|
||||
}
|
||||
|
||||
app.shell_context_processor(shell_context)
|
||||
|
||||
@@ -100,9 +100,11 @@ def seed():
|
||||
def setup_rabmq():
|
||||
"""Set up rabbitMQ"""
|
||||
for exchange in TargetExchange.query.filter(TargetExchange.name != "").all():
|
||||
print(f"making sure {exchange.name} exchange exists")
|
||||
rabbit.ensure_exchange_exists(exchange.name)
|
||||
|
||||
for target in Target.query.join(Target.exchange).filter(TargetExchange.name == "").all():
|
||||
print(f"making sure {target.name} queue exists")
|
||||
rabbit.ensure_queue_exists(target.routing_key)
|
||||
|
||||
pass
|
||||
|
||||
@@ -35,11 +35,11 @@ def get_list(exchange=None, target=None):
|
||||
@use_kwargs(registration_schema)
|
||||
@marshal_with(registration_schema)
|
||||
def create(name, routing_key, targets):
|
||||
target_ids = [t.id for t in targets]
|
||||
target_ids = [t["id"] for t in targets]
|
||||
db_targets = Target.query.filter(Target.id.in_(target_ids))
|
||||
if len(db_targets) != len(targets):
|
||||
xchange_ids = [t.id for t in db_targets]
|
||||
not_found = ','.join([f'{t.name}({t.id})' for t in targets if t.id not in xchange_ids])
|
||||
xchange_ids = [t["id"] for t in db_targets]
|
||||
not_found = ','.join([f'{t["name"]}({t["id"]})' for t in targets if t["id"] not in xchange_ids])
|
||||
raise BadRequestException(f"the target {not_found} could not be found")
|
||||
registration = Registration(name=name, token=routing_key, targets=db_targets)
|
||||
registration.save()
|
||||
|
||||
@@ -32,9 +32,9 @@ def get_list(exchange=None):
|
||||
@use_kwargs(target_schema)
|
||||
@marshal_with(target_schema)
|
||||
def create(name, routing_key, exchange):
|
||||
xchange = TargetExchange.get_by_id(exchange.id)
|
||||
xchange = TargetExchange.get_by_id(exchange["id"])
|
||||
if xchange is None:
|
||||
raise BadRequestException(f"the exchange {exchange.name}({exchange.id}) could not be found")
|
||||
raise BadRequestException(f"the exchange {exchange['name']}({exchange['id']}) could not be found")
|
||||
target = Target(name=name, routing_key=routing_key, target_exchange_id=xchange.id)
|
||||
target.save()
|
||||
return target
|
||||
|
||||
@@ -1,22 +1,34 @@
|
||||
from flask import Blueprint, jsonify
|
||||
from flask import Blueprint, request, jsonify
|
||||
from flask_apispec import use_kwargs
|
||||
from marshmallow import fields
|
||||
|
||||
from api.exceptions import NotFoundException
|
||||
from api.registration.models import Registration
|
||||
from api.utils import docwrap
|
||||
from api.extensions import rabbit
|
||||
|
||||
blueprint = Blueprint('webhook', __name__)
|
||||
|
||||
|
||||
@docwrap('webhook', 'api_key')
|
||||
@blueprint.route('/webhook', methods=['GET'])
|
||||
@blueprint.route('/webhook', methods=['GET', 'POST'])
|
||||
@use_kwargs({'apikey': fields.String(required=True)}, location='query')
|
||||
def webhook(apikey):
|
||||
reg = Registration.query.filter_by(token=apikey).first()
|
||||
reg: Registration = Registration.query.filter_by(token=apikey).first()
|
||||
|
||||
if reg is None:
|
||||
raise NotFoundException(Registration.__name__)
|
||||
|
||||
return jsonify({'response': repr(reg)})
|
||||
pass
|
||||
if request.method == 'GET':
|
||||
message = request.args.to_dict()
|
||||
message.pop('apikey', None)
|
||||
else:
|
||||
message = request.get_json()
|
||||
|
||||
for target in reg.targets:
|
||||
if target.exchange.name == '':
|
||||
rabbit.queue_publish(target.name, message)
|
||||
else:
|
||||
rabbit.exchange_publish(target.exchange.name, target.name, message)
|
||||
|
||||
return None, 204
|
||||
|
||||
10
app.ini
Normal file
10
app.ini
Normal file
@@ -0,0 +1,10 @@
|
||||
[uwsgi]
|
||||
wsgi-file = run.py
|
||||
callable = app
|
||||
http = 0.0.0.0:8080
|
||||
processes = 4
|
||||
threads = 2
|
||||
master = true
|
||||
chmod-socket = 660
|
||||
vacuum = true
|
||||
die-on-term = true
|
||||
45
docker-compose.yml
Normal file
45
docker-compose.yml
Normal file
@@ -0,0 +1,45 @@
|
||||
version: "3.9"
|
||||
services:
|
||||
rabbit:
|
||||
image: rabbitmq:management
|
||||
container_name: rabbit
|
||||
volumes:
|
||||
- rabbit_data:/var/lib/rabbitmq
|
||||
expose:
|
||||
- 5672
|
||||
- 15672
|
||||
ports:
|
||||
- 15672:15762
|
||||
networks:
|
||||
net:
|
||||
aliases:
|
||||
- rabbit
|
||||
api:
|
||||
build: .
|
||||
restart: always
|
||||
environment:
|
||||
FLASK_APP: run.py
|
||||
SECRET_KEY: $API_SECRET
|
||||
API_PASS: $API_PASS
|
||||
RABBITMQ_HOST: rabbit
|
||||
DB_PATH: /app/db/prod.db
|
||||
WAIT_HOSTS: rabbit:15672
|
||||
expose:
|
||||
- 8080
|
||||
ports:
|
||||
- 8080:8080
|
||||
networks:
|
||||
net:
|
||||
aliases:
|
||||
- api
|
||||
volumes:
|
||||
- api_db:/app/db
|
||||
depends_on:
|
||||
- rabbit
|
||||
|
||||
volumes:
|
||||
rabbit_data:
|
||||
api_db:
|
||||
|
||||
networks:
|
||||
net:
|
||||
26
requirements/prod.txt
Normal file
26
requirements/prod.txt
Normal file
@@ -0,0 +1,26 @@
|
||||
alembic==1.7.1
|
||||
apispec==5.1.0
|
||||
autopep8==1.5.7
|
||||
click==8.0.1
|
||||
colorama==0.4.4
|
||||
Flask==2.0.1
|
||||
flask-apispec==0.11.0
|
||||
Flask-JWT-Extended==4.3.0
|
||||
Flask-Migrate==3.1.0
|
||||
Flask-SQLAlchemy==2.5.1
|
||||
greenlet==1.1.1
|
||||
itsdangerous==2.0.1
|
||||
Jinja2==3.0.1
|
||||
Mako==1.1.5
|
||||
MarkupSafe==2.0.1
|
||||
marshmallow==3.13.0
|
||||
pika==1.2.0
|
||||
pika-stubs==0.1.3
|
||||
pycodestyle==2.7.0
|
||||
PyJWT==2.1.0
|
||||
python-dotenv==0.19.0
|
||||
SQLAlchemy==1.4.23
|
||||
toml==0.10.2
|
||||
webargs==8.0.1
|
||||
Werkzeug==2.0.1
|
||||
uwsgi
|
||||
@@ -6,8 +6,8 @@ from pika.adapters.blocking_connection import BlockingChannel
|
||||
|
||||
class RabbitMQ:
|
||||
host: str
|
||||
connection: pika.BlockingConnection
|
||||
channel: BlockingChannel
|
||||
connection: pika.BlockingConnection = None
|
||||
channel: BlockingChannel = None
|
||||
|
||||
def __init__(self, app: Flask = None):
|
||||
if app is not None:
|
||||
@@ -15,18 +15,23 @@ class RabbitMQ:
|
||||
|
||||
def init_app(self, app: Flask):
|
||||
self.host = app.config.get('RABBITMQ_HOST', 'localhost')
|
||||
self.connection = pika.BlockingConnection(
|
||||
pika.ConnectionParameters(host=self.host))
|
||||
self.channel = self.connection.channel()
|
||||
|
||||
def _ensure_connection(self):
|
||||
if self.channel is None:
|
||||
self.connection = pika.BlockingConnection(
|
||||
pika.ConnectionParameters(host=self.host))
|
||||
self.channel = self.connection.channel()
|
||||
|
||||
def ensure_queue_exists(self, queue_name: str):
|
||||
"""Ensures the queue exists on the default exchange
|
||||
|
||||
:param queue_name: the name of the queue
|
||||
"""
|
||||
self._ensure_connection()
|
||||
self.channel.queue_declare(queue=queue_name, durable=True)
|
||||
|
||||
def queue_publish(self, queue_name: str, message):
|
||||
self._ensure_connection()
|
||||
self.ensure_queue_exists(queue_name)
|
||||
|
||||
body = json.dumps(message)
|
||||
@@ -38,6 +43,7 @@ class RabbitMQ:
|
||||
|
||||
:param name: the name of the exchange
|
||||
"""
|
||||
self._ensure_connection()
|
||||
self.channel.exchange_declare(exchange=name, exchange_type='topic', durable=True)
|
||||
|
||||
def exchange_publish(self, exchange_name: str, topic: str, message):
|
||||
@@ -47,6 +53,7 @@ class RabbitMQ:
|
||||
:param topic: the topic to publish to
|
||||
:param message: the message to publish (will be turned into a json)
|
||||
"""
|
||||
self._ensure_connection()
|
||||
self.ensure_exchange_exists(exchange_name)
|
||||
|
||||
body = json.dumps(message)
|
||||
|
||||
@@ -1,31 +0,0 @@
|
||||
import json
|
||||
|
||||
from pika.adapters.blocking_connection import BlockingChannel
|
||||
|
||||
from .extension import with_pika_channel
|
||||
|
||||
|
||||
@with_pika_channel
|
||||
def ensure_exchange_exists(name: str, channel: BlockingChannel = None) -> None:
|
||||
"""Ensures exchange exists
|
||||
|
||||
:param name: the name of the exchange
|
||||
:param channel: a pika channel, if none specified a new one will be created
|
||||
"""
|
||||
channel.exchange_declare(exchange=name, exchange_type='topic')
|
||||
|
||||
|
||||
@with_pika_channel
|
||||
def exchange_publish(exchange_name: str, topic: str, message, channel: BlockingChannel = None):
|
||||
"""Publishes a message to a
|
||||
|
||||
:param exchange_name: the name of the exchange
|
||||
:param topic: the topic to publish to
|
||||
:param message: the message to publish (will be turned into a json)
|
||||
:param channel: a pika channel, if none specified a new one will be created
|
||||
"""
|
||||
ensure_exchange_exists(exchange_name, channel=channel)
|
||||
|
||||
body = json.dumps(message)
|
||||
body_bytes = body.encode('utf-8')
|
||||
channel.basic_publish(exchange=exchange_name, routing_key=topic, body=body_bytes)
|
||||
@@ -1,25 +0,0 @@
|
||||
from functools import wraps
|
||||
|
||||
import pika
|
||||
|
||||
from .settings import RABBITMQ_HOST
|
||||
|
||||
|
||||
def with_pika_channel(f):
|
||||
@wraps(f)
|
||||
def wrapper(*args, **kwargs):
|
||||
# if we are already passing channel, just reuse it
|
||||
if 'channel' in kwargs:
|
||||
return f(*args, **kwargs)
|
||||
|
||||
# otherwise set up a new connection and channel to wrap our code with
|
||||
connection = pika.BlockingConnection(
|
||||
pika.ConnectionParameters(host=RABBITMQ_HOST))
|
||||
channel = connection.channel()
|
||||
|
||||
ret = f(*args, **kwargs, channel=channel)
|
||||
|
||||
channel.close()
|
||||
return ret
|
||||
|
||||
return wrapper
|
||||
@@ -1,19 +0,0 @@
|
||||
import json
|
||||
|
||||
from pika.adapters.blocking_connection import BlockingChannel
|
||||
|
||||
from .extension import with_pika_channel
|
||||
|
||||
|
||||
@with_pika_channel
|
||||
def ensure_queue_exists(name: str, channel: BlockingChannel = None):
|
||||
channel.queue_declare(queue=name)
|
||||
|
||||
|
||||
@with_pika_channel
|
||||
def queue_publish(queue_name: str, message, channel: BlockingChannel = None):
|
||||
ensure_queue_exists(queue_name, channel=channel)
|
||||
|
||||
body = json.dumps(message)
|
||||
body_bytes = body.encode('utf-8')
|
||||
channel.basic_publish(exchange='', routing_key=queue_name, body=body_bytes)
|
||||
@@ -1,3 +0,0 @@
|
||||
import os
|
||||
|
||||
RABBITMQ_HOST = os.environ.get("RABBITMQ_HOST", "localhost")
|
||||
7
scripts/entrypoint.sh
Normal file
7
scripts/entrypoint.sh
Normal file
@@ -0,0 +1,7 @@
|
||||
#!/usr/bin/env bash
|
||||
|
||||
/wait
|
||||
flask db upgrade
|
||||
flask seed
|
||||
flask setup-rabmq
|
||||
uwsgi app.ini
|
||||
Reference in New Issue
Block a user