Compare commits

..

5 Commits

Author SHA1 Message Date
Gardient
fe424e3fa2 for the docker 2021-10-01 21:30:51 +03:00
Gardient
b5a66b5d2d webhook endpoint queue message in rabbitMQ 2021-10-01 20:17:19 +03:00
Gardient
a0dbdc4ddf webhook endpoint 2021-09-30 23:01:16 +03:00
Gardient
04d70ba424 apidoc updates 2021-09-30 23:00:46 +03:00
Gardient
71c1e322aa add proper rabbitMQ extension 2021-09-30 22:57:13 +03:00
25 changed files with 235 additions and 111 deletions

21
.dockerignore Normal file
View File

@@ -0,0 +1,21 @@
Dockerfile
.dockerignore
docker-compose.yml
.git
.gitignore
.eslintrc.json
.editorconfig
.DS_STORE
Thumbs.db
.idea
.vscode/*
!.vscode/settings.json
!.vscode/tasks.json
!.vscode/launch.json
!.vscode/extensions.json
env/
__pycache__/

19
Dockerfile Normal file
View File

@@ -0,0 +1,19 @@
# Use the Python3.9.7 image
FROM python:3.9.7-bullseye
# Set the working directory to /app
WORKDIR /app
# Copy the current directory contents into the container at /app
ADD . /app
# Install the dependencies
RUN pip install -r requirements/prod.txt
#make sure we have everything we need
RUN flask db upgrade
RUN flask seed
RUN flask setup-rabmq
# run the command to start uWSGI
CMD ["uwsgi", "app.ini"]

View File

@@ -1,8 +1,8 @@
from flask import Flask, Blueprint
from . import commands, login, target_exchange, target, registration
from . import commands, login, target_exchange, target, registration, webhook
from .exceptions import ApiException
from .extensions import db, migrate, jwt, apispec
from .extensions import db, migrate, jwt, apispec, rabbit
from .settings import ProdConfig, Config
@@ -31,6 +31,7 @@ def register_extensions(app: Flask):
migrate.init_app(app, db)
jwt.init_app(app)
apispec.init_app(app)
rabbit.init_app(app)
def register_blueprints(app: Flask):
@@ -43,6 +44,7 @@ def register_blueprints(app: Flask):
api_blueprint.register_blueprint(registration.views.blueprint, url_prefix='/registration')
app.register_blueprint(api_blueprint)
app.register_blueprint(webhook.views.blueprint)
def register_apispecs(app: Flask):
@@ -52,6 +54,9 @@ def register_apispecs(app: Flask):
that one doesn't handle the static routes or blueprints in blueprints well
"""
apispec.spec.components.security_scheme('api_key', {"type": "apiKey", "in": "query", "name": "apikey"})
apispec.spec.components.security_scheme('jwt', {"type": "http", "scheme": "bearer", "bearerFormat": "JWT"})
for name, rule in app.view_functions.items():
if name == 'static':
continue
@@ -82,6 +87,7 @@ def register_shellcontext(app: Flask):
"""Shell context objects."""
return {
'db': db,
'rabbit': rabbit,
'TargetExchange': target_exchange.models.TargetExchange,
'Target': target.models.Target,
'Registration': registration.models.Registration,

View File

@@ -5,7 +5,7 @@ from flask import current_app
from flask.cli import with_appcontext
from werkzeug.exceptions import MethodNotAllowed, NotFound
import rmq_helper
from .extensions import rabbit
from .target.models import Target
from .target_exchange.models import TargetExchange
@@ -100,9 +100,11 @@ def seed():
def setup_rabmq():
"""Set up rabbitMQ"""
for exchange in TargetExchange.query.filter(TargetExchange.name != "").all():
rmq_helper.ensure_exchange_exists(exchange.name)
print(f"making sure {exchange.name} exchange exists")
rabbit.ensure_exchange_exists(exchange.name)
for target in Target.query.join(Target.exchange).filter(TargetExchange.name == "").all():
rmq_helper.ensure_queue_exists(target.routing_key)
print(f"making sure {target.name} queue exists")
rabbit.ensure_queue_exists(target.routing_key)
pass

View File

@@ -2,6 +2,7 @@ from flask_apispec import FlaskApiSpec
from flask_jwt_extended import JWTManager
from flask_migrate import Migrate
from flask_sqlalchemy import SQLAlchemy, Model
from rmq_helper import RabbitMQ
class CRUDMixin(Model):
@@ -36,3 +37,4 @@ db = SQLAlchemy(model_class=CRUDMixin)
migrate = Migrate()
jwt = JWTManager()
apispec = FlaskApiSpec()
rabbit = RabbitMQ()

View File

@@ -1,11 +1,14 @@
from marshmallow import Schema, fields
class LoginSchema(Schema):
username = fields.Str()
password = fields.Str(load_only=True)
username = fields.Str(required=True)
password = fields.Str(required=True, load_only=True)
class TokenResponseSchema(Schema):
token = fields.Str()
login_schema = LoginSchema()
token_response_schema = TokenResponseSchema()

View File

@@ -4,21 +4,19 @@ from flask_apispec import use_kwargs, marshal_with, doc
from marshmallow import fields
from api.exceptions import BadRequestException
from api.utils import docwrap
import api.constants as constants
from .serializers import token_response_schema
from .serializers import token_response_schema, login_schema
blueprint = Blueprint('login', __name__)
@doc(tags=['login'])
@docwrap('Login', None)
@blueprint.route('', methods=['POST'])
@jwt_required(optional=True)
@use_kwargs({
'username': fields.Str(required=True),
'password': fields.Str(required=True)
})
@use_kwargs(login_schema)
@marshal_with(token_response_schema)
def login_user(username, password, **kwargs):
def login_user(username, password):
if username == constants.API_USER and password == current_app.config[constants.API_PASS]:
return {'token': create_access_token(identity=username, fresh=True, expires_delta=False)}
else:

View File

@@ -19,3 +19,6 @@ class Registration(SurrogatePK, Model):
def __init__(self, **kwargs):
super(Registration, self).__init__(**kwargs)
def __repr__(self):
return f'<{Registration.__name__}({self.id}):{self.name!r}->({",".join([repr(x) for x in self.targets])})>'

View File

@@ -5,18 +5,21 @@ from marshmallow import fields
from sqlalchemy import or_
from api.exceptions import NotFoundException, BadRequestException
from api.utils import docwrap
from api.target.models import Target
from api.target_exchange.models import TargetExchange
from .models import Registration
from .serializers import registration_schema, registrations_schema
blueprint = Blueprint('Registration', __name__)
doc = docwrap('Registration')
@doc(tags=['Registration'])
@doc
@blueprint.route('', methods=['GET'])
@jwt_required()
@use_kwargs({'exchange': fields.Str(), 'target': fields.Str()})
@use_kwargs({'exchange': fields.Str(), 'target': fields.Str()}, location='query')
@marshal_with(registrations_schema)
def get_list(exchange=None, target=None):
res = Registration.query
@@ -26,7 +29,7 @@ def get_list(exchange=None, target=None):
return res.all()
@doc(tags=['Registration'])
@doc
@blueprint.route('', methods=['POST'])
@jwt_required()
@use_kwargs(registration_schema)
@@ -43,7 +46,7 @@ def create(name, routing_key, targets):
return registration
@doc(tags=['Registration'])
@doc
@blueprint.route('/<registration_id>', methods=['GET'])
@jwt_required()
@marshal_with(registration_schema)
@@ -55,7 +58,7 @@ def get_by_id(registration_id: int):
return NotFoundException(Registration.__name__)
@doc(tags=['Registration'])
@doc
@blueprint.route('/<registration_id>', methods=['PUT'])
@jwt_required()
@use_kwargs(registration_schema)

View File

@@ -18,6 +18,7 @@ class Config(object):
JWT_AUTH_HEADER_PREFIX = 'Token'
APISPEC_TITLE = 'MahssageBus API'
APISPEC_VERSION = 'v0.1'
APISPEC_OAS_VERSION = '3.0.0'
RABBITMQ_HOST = os.environ.get("RABBITMQ_HOST", "localhost")

View File

@@ -16,4 +16,4 @@ class Target(SurrogatePK, Model):
super(Target, self).__init__(**kwargs)
def __repr__(self):
return '<%s(%d):%r->%r>' % (Target.__name__, self.id, self.routing_key, self.exchange.name)
return f'<{Target.__name__}({self.id}):{self.routing_key!r}->{self.exchange.name!r}>'

View File

@@ -1,20 +1,23 @@
from flask import Blueprint
from flask_apispec import use_kwargs, marshal_with, doc
from flask_apispec import use_kwargs, marshal_with
from flask_jwt_extended import jwt_required
from marshmallow import fields
from api.exceptions import NotFoundException, BadRequestException
from api.target_exchange.models import TargetExchange
from api.utils import docwrap
from .models import Target
from .serializers import target_schema, targets_schema
blueprint = Blueprint('target', __name__)
doc = docwrap('Target')
@doc(tags=['Target'])
@doc
@blueprint.route('', methods=['GET'])
@jwt_required()
@use_kwargs({'exchange': fields.Str()})
@use_kwargs({'exchange': fields.Str()}, location='query')
@marshal_with(targets_schema)
def get_list(exchange=None):
res = Target.query
@@ -23,7 +26,7 @@ def get_list(exchange=None):
return res.all()
@doc(tags=['Target'])
@doc
@blueprint.route('', methods=['POST'])
@jwt_required()
@use_kwargs(target_schema)
@@ -37,7 +40,7 @@ def create(name, routing_key, exchange):
return target
@doc(tags=['Target'])
@doc
@blueprint.route('/<target_id>', methods=['GET'])
@jwt_required()
@marshal_with(target_schema)
@@ -49,7 +52,7 @@ def get_by_id(target_id: int):
return NotFoundException(Target.__name__)
@doc(tags=['Target'])
@doc
@blueprint.route('/<target_id>', methods=['PUT'])
@jwt_required()
@use_kwargs(target_schema)

View File

@@ -9,6 +9,9 @@ class TargetExchange(SurrogatePK, Model):
def __init__(self, **kwargs):
super(TargetExchange, self).__init__(**kwargs)
def __repr__(self):
return f'<{TargetExchange.__name__}({self.id}):{self.name!r}>'
@staticmethod
def ensure_created(name):
if TargetExchange.query.filter_by(name=name).first() is None:

View File

@@ -1,16 +1,18 @@
from flask import Blueprint
from flask_jwt_extended import jwt_required
from flask_apispec import use_kwargs, marshal_with, doc
from flask_apispec import use_kwargs, marshal_with
from api.exceptions import NotFoundException
from api.utils import docwrap
from .models import TargetExchange
from .serializers import target_exchange_schema, target_exchanges_schema
blueprint = Blueprint('target_exchange', __name__)
doc = docwrap('TargetExchange')
@doc(tags=['TargetExchange'])
@doc
@blueprint.route('', methods=['GET'])
@jwt_required()
@marshal_with(target_exchanges_schema)
@@ -18,7 +20,7 @@ def get_list():
return TargetExchange.query.all()
@doc(tags=['TargetExchange'])
@doc
@blueprint.route('', methods=['POST'])
@jwt_required()
@use_kwargs(target_exchange_schema)
@@ -29,7 +31,7 @@ def create(name):
return target_exchange
@doc(tags=['TargetExchange'])
@doc
@blueprint.route('/<exchange_id>', methods=['GET'])
@jwt_required()
@marshal_with(target_exchange_schema)
@@ -41,7 +43,7 @@ def get_by_id(exchange_id):
return NotFoundException(__name__)
@doc(tags=['TargetExchange'])
@doc
@blueprint.route('/<exchange_id>', methods=['PUT'])
@jwt_required()
@use_kwargs(target_exchange_schema)

5
api/utils.py Normal file
View File

@@ -0,0 +1,5 @@
from flask_apispec import doc
def docwrap(tag, security='jwt', **kwargs):
return doc(tags=[tag], security=([{security: []}] if security is not None else None), **kwargs)

3
api/webhook/__init__.py Normal file
View File

@@ -0,0 +1,3 @@
"""the webhook endpoint"""
from . import views

34
api/webhook/views.py Normal file
View File

@@ -0,0 +1,34 @@
from flask import Blueprint, request, jsonify
from flask_apispec import use_kwargs
from marshmallow import fields
from api.exceptions import NotFoundException
from api.registration.models import Registration
from api.utils import docwrap
from api.extensions import rabbit
blueprint = Blueprint('webhook', __name__)
@docwrap('webhook', 'api_key')
@blueprint.route('/webhook', methods=['GET', 'POST'])
@use_kwargs({'apikey': fields.String(required=True)}, location='query')
def webhook(apikey):
reg: Registration = Registration.query.filter_by(token=apikey).first()
if reg is None:
raise NotFoundException(Registration.__name__)
if request.method == 'GET':
message = request.args.to_dict()
message.pop('apikey', None)
else:
message = request.get_json()
for target in reg.targets:
if target.exchange.name == '':
rabbit.queue_publish(target.name, message)
else:
rabbit.exchange_publish(target.exchange.name, target.name, message)
return None, 204

10
app.ini Normal file
View File

@@ -0,0 +1,10 @@
[uwsgi]
wsgi-file = run.py
callable = app
socket = :8080
processes = 4
threads = 2
master = true
chmod-socket = 660
vacuum = true
die-on-term = true

26
requirements/prod.txt Normal file
View File

@@ -0,0 +1,26 @@
alembic==1.7.1
apispec==5.1.0
autopep8==1.5.7
click==8.0.1
colorama==0.4.4
Flask==2.0.1
flask-apispec==0.11.0
Flask-JWT-Extended==4.3.0
Flask-Migrate==3.1.0
Flask-SQLAlchemy==2.5.1
greenlet==1.1.1
itsdangerous==2.0.1
Jinja2==3.0.1
Mako==1.1.5
MarkupSafe==2.0.1
marshmallow==3.13.0
pika==1.2.0
pika-stubs==0.1.3
pycodestyle==2.7.0
PyJWT==2.1.0
python-dotenv==0.19.0
SQLAlchemy==1.4.23
toml==0.10.2
webargs==8.0.1
Werkzeug==2.0.1
uwsgi

View File

@@ -1,3 +1,61 @@
"""RabbitMQ helpers"""
from .exchanges import ensure_exchange_exists, exchange_publish
from .queues import ensure_queue_exists, queue_publish
import pika
from flask import Flask, json
from pika.adapters.blocking_connection import BlockingChannel
class RabbitMQ:
host: str
connection: pika.BlockingConnection
channel: BlockingChannel
def __init__(self, app: Flask = None):
if app is not None:
self.init_app(app)
def init_app(self, app: Flask):
self.host = app.config.get('RABBITMQ_HOST', 'localhost')
def _ensure_connection(self):
if self.channel is None:
self.connection = pika.BlockingConnection(
pika.ConnectionParameters(host=self.host))
self.channel = self.connection.channel()
def ensure_queue_exists(self, queue_name: str):
"""Ensures the queue exists on the default exchange
:param queue_name: the name of the queue
"""
self._ensure_connection()
self.channel.queue_declare(queue=queue_name, durable=True)
def queue_publish(self, queue_name: str, message):
self._ensure_connection()
self.ensure_queue_exists(queue_name)
body = json.dumps(message)
body_bytes = body.encode('utf-8')
self.channel.basic_publish(exchange='', routing_key=queue_name, body=body_bytes)
def ensure_exchange_exists(self, name: str) -> None:
"""Ensures exchange exists
:param name: the name of the exchange
"""
self._ensure_connection()
self.channel.exchange_declare(exchange=name, exchange_type='topic', durable=True)
def exchange_publish(self, exchange_name: str, topic: str, message):
"""Publishes a message to a
:param exchange_name: the name of the exchange
:param topic: the topic to publish to
:param message: the message to publish (will be turned into a json)
"""
self._ensure_connection()
self.ensure_exchange_exists(exchange_name)
body = json.dumps(message)
body_bytes = body.encode('utf-8')
self.channel.basic_publish(exchange=exchange_name, routing_key=topic, body=body_bytes)

View File

@@ -1,31 +0,0 @@
import json
from pika.adapters.blocking_connection import BlockingChannel
from .extension import with_pika_channel
@with_pika_channel
def ensure_exchange_exists(name: str, channel: BlockingChannel = None) -> None:
"""Ensures exchange exists
:param name: the name of the exchange
:param channel: a pika channel, if none specified a new one will be created
"""
channel.exchange_declare(exchange=name, exchange_type='topic')
@with_pika_channel
def exchange_publish(exchange_name: str, topic: str, message, channel: BlockingChannel = None):
"""Publishes a message to a
:param exchange_name: the name of the exchange
:param topic: the topic to publish to
:param message: the message to publish (will be turned into a json)
:param channel: a pika channel, if none specified a new one will be created
"""
ensure_exchange_exists(exchange_name, channel=channel)
body = json.dumps(message)
body_bytes = body.encode('utf-8')
channel.basic_publish(exchange=exchange_name, routing_key=topic, body=body_bytes)

View File

@@ -1,25 +0,0 @@
from functools import wraps
import pika
from .settings import RABBITMQ_HOST
def with_pika_channel(f):
@wraps(f)
def wrapper(*args, **kwargs):
# if we are already passing channel, just reuse it
if 'channel' in kwargs:
return f(*args, **kwargs)
# otherwise set up a new connection and channel to wrap our code with
connection = pika.BlockingConnection(
pika.ConnectionParameters(host=RABBITMQ_HOST))
channel = connection.channel()
ret = f(*args, **kwargs, channel=channel)
channel.close()
return ret
return wrapper

View File

@@ -1,19 +0,0 @@
import json
from pika.adapters.blocking_connection import BlockingChannel
from .extension import with_pika_channel
@with_pika_channel
def ensure_queue_exists(name: str, channel: BlockingChannel = None):
channel.queue_declare(queue=name)
@with_pika_channel
def queue_publish(queue_name: str, message, channel: BlockingChannel = None):
ensure_queue_exists(queue_name, channel=channel)
body = json.dumps(message)
body_bytes = body.encode('utf-8')
channel.basic_publish(exchange='', routing_key=queue_name, body=body_bytes)

View File

@@ -1,3 +0,0 @@
import os
RABBITMQ_HOST = os.environ.get("RABBITMQ_HOST", "localhost")