Compare commits

...

5 Commits

Author SHA1 Message Date
Gardient
fe424e3fa2 for the docker 2021-10-01 21:30:51 +03:00
Gardient
b5a66b5d2d webhook endpoint queue message in rabbitMQ 2021-10-01 20:17:19 +03:00
Gardient
a0dbdc4ddf webhook endpoint 2021-09-30 23:01:16 +03:00
Gardient
04d70ba424 apidoc updates 2021-09-30 23:00:46 +03:00
Gardient
71c1e322aa add proper rabbitMQ extension 2021-09-30 22:57:13 +03:00
25 changed files with 235 additions and 111 deletions

21
.dockerignore Normal file
View File

@@ -0,0 +1,21 @@
Dockerfile
.dockerignore
docker-compose.yml
.git
.gitignore
.eslintrc.json
.editorconfig
.DS_STORE
Thumbs.db
.idea
.vscode/*
!.vscode/settings.json
!.vscode/tasks.json
!.vscode/launch.json
!.vscode/extensions.json
env/
__pycache__/

19
Dockerfile Normal file
View File

@@ -0,0 +1,19 @@
# Use the Python3.9.7 image
FROM python:3.9.7-bullseye
# Set the working directory to /app
WORKDIR /app
# Copy the current directory contents into the container at /app
ADD . /app
# Install the dependencies
RUN pip install -r requirements/prod.txt
#make sure we have everything we need
RUN flask db upgrade
RUN flask seed
RUN flask setup-rabmq
# run the command to start uWSGI
CMD ["uwsgi", "app.ini"]

View File

@@ -1,8 +1,8 @@
from flask import Flask, Blueprint from flask import Flask, Blueprint
from . import commands, login, target_exchange, target, registration from . import commands, login, target_exchange, target, registration, webhook
from .exceptions import ApiException from .exceptions import ApiException
from .extensions import db, migrate, jwt, apispec from .extensions import db, migrate, jwt, apispec, rabbit
from .settings import ProdConfig, Config from .settings import ProdConfig, Config
@@ -31,6 +31,7 @@ def register_extensions(app: Flask):
migrate.init_app(app, db) migrate.init_app(app, db)
jwt.init_app(app) jwt.init_app(app)
apispec.init_app(app) apispec.init_app(app)
rabbit.init_app(app)
def register_blueprints(app: Flask): def register_blueprints(app: Flask):
@@ -43,6 +44,7 @@ def register_blueprints(app: Flask):
api_blueprint.register_blueprint(registration.views.blueprint, url_prefix='/registration') api_blueprint.register_blueprint(registration.views.blueprint, url_prefix='/registration')
app.register_blueprint(api_blueprint) app.register_blueprint(api_blueprint)
app.register_blueprint(webhook.views.blueprint)
def register_apispecs(app: Flask): def register_apispecs(app: Flask):
@@ -52,6 +54,9 @@ def register_apispecs(app: Flask):
that one doesn't handle the static routes or blueprints in blueprints well that one doesn't handle the static routes or blueprints in blueprints well
""" """
apispec.spec.components.security_scheme('api_key', {"type": "apiKey", "in": "query", "name": "apikey"})
apispec.spec.components.security_scheme('jwt', {"type": "http", "scheme": "bearer", "bearerFormat": "JWT"})
for name, rule in app.view_functions.items(): for name, rule in app.view_functions.items():
if name == 'static': if name == 'static':
continue continue
@@ -82,6 +87,7 @@ def register_shellcontext(app: Flask):
"""Shell context objects.""" """Shell context objects."""
return { return {
'db': db, 'db': db,
'rabbit': rabbit,
'TargetExchange': target_exchange.models.TargetExchange, 'TargetExchange': target_exchange.models.TargetExchange,
'Target': target.models.Target, 'Target': target.models.Target,
'Registration': registration.models.Registration, 'Registration': registration.models.Registration,

View File

@@ -5,7 +5,7 @@ from flask import current_app
from flask.cli import with_appcontext from flask.cli import with_appcontext
from werkzeug.exceptions import MethodNotAllowed, NotFound from werkzeug.exceptions import MethodNotAllowed, NotFound
import rmq_helper from .extensions import rabbit
from .target.models import Target from .target.models import Target
from .target_exchange.models import TargetExchange from .target_exchange.models import TargetExchange
@@ -100,9 +100,11 @@ def seed():
def setup_rabmq(): def setup_rabmq():
"""Set up rabbitMQ""" """Set up rabbitMQ"""
for exchange in TargetExchange.query.filter(TargetExchange.name != "").all(): for exchange in TargetExchange.query.filter(TargetExchange.name != "").all():
rmq_helper.ensure_exchange_exists(exchange.name) print(f"making sure {exchange.name} exchange exists")
rabbit.ensure_exchange_exists(exchange.name)
for target in Target.query.join(Target.exchange).filter(TargetExchange.name == "").all(): for target in Target.query.join(Target.exchange).filter(TargetExchange.name == "").all():
rmq_helper.ensure_queue_exists(target.routing_key) print(f"making sure {target.name} queue exists")
rabbit.ensure_queue_exists(target.routing_key)
pass pass

View File

@@ -2,6 +2,7 @@ from flask_apispec import FlaskApiSpec
from flask_jwt_extended import JWTManager from flask_jwt_extended import JWTManager
from flask_migrate import Migrate from flask_migrate import Migrate
from flask_sqlalchemy import SQLAlchemy, Model from flask_sqlalchemy import SQLAlchemy, Model
from rmq_helper import RabbitMQ
class CRUDMixin(Model): class CRUDMixin(Model):
@@ -36,3 +37,4 @@ db = SQLAlchemy(model_class=CRUDMixin)
migrate = Migrate() migrate = Migrate()
jwt = JWTManager() jwt = JWTManager()
apispec = FlaskApiSpec() apispec = FlaskApiSpec()
rabbit = RabbitMQ()

View File

@@ -1,11 +1,14 @@
from marshmallow import Schema, fields from marshmallow import Schema, fields
class LoginSchema(Schema): class LoginSchema(Schema):
username = fields.Str() username = fields.Str(required=True)
password = fields.Str(load_only=True) password = fields.Str(required=True, load_only=True)
class TokenResponseSchema(Schema): class TokenResponseSchema(Schema):
token = fields.Str() token = fields.Str()
login_schema = LoginSchema() login_schema = LoginSchema()
token_response_schema = TokenResponseSchema() token_response_schema = TokenResponseSchema()

View File

@@ -4,21 +4,19 @@ from flask_apispec import use_kwargs, marshal_with, doc
from marshmallow import fields from marshmallow import fields
from api.exceptions import BadRequestException from api.exceptions import BadRequestException
from api.utils import docwrap
import api.constants as constants import api.constants as constants
from .serializers import token_response_schema from .serializers import token_response_schema, login_schema
blueprint = Blueprint('login', __name__) blueprint = Blueprint('login', __name__)
@doc(tags=['login']) @docwrap('Login', None)
@blueprint.route('', methods=['POST']) @blueprint.route('', methods=['POST'])
@jwt_required(optional=True) @jwt_required(optional=True)
@use_kwargs({ @use_kwargs(login_schema)
'username': fields.Str(required=True),
'password': fields.Str(required=True)
})
@marshal_with(token_response_schema) @marshal_with(token_response_schema)
def login_user(username, password, **kwargs): def login_user(username, password):
if username == constants.API_USER and password == current_app.config[constants.API_PASS]: if username == constants.API_USER and password == current_app.config[constants.API_PASS]:
return {'token': create_access_token(identity=username, fresh=True, expires_delta=False)} return {'token': create_access_token(identity=username, fresh=True, expires_delta=False)}
else: else:

View File

@@ -19,3 +19,6 @@ class Registration(SurrogatePK, Model):
def __init__(self, **kwargs): def __init__(self, **kwargs):
super(Registration, self).__init__(**kwargs) super(Registration, self).__init__(**kwargs)
def __repr__(self):
return f'<{Registration.__name__}({self.id}):{self.name!r}->({",".join([repr(x) for x in self.targets])})>'

View File

@@ -5,18 +5,21 @@ from marshmallow import fields
from sqlalchemy import or_ from sqlalchemy import or_
from api.exceptions import NotFoundException, BadRequestException from api.exceptions import NotFoundException, BadRequestException
from api.utils import docwrap
from api.target.models import Target from api.target.models import Target
from api.target_exchange.models import TargetExchange from api.target_exchange.models import TargetExchange
from .models import Registration from .models import Registration
from .serializers import registration_schema, registrations_schema from .serializers import registration_schema, registrations_schema
blueprint = Blueprint('Registration', __name__) blueprint = Blueprint('Registration', __name__)
doc = docwrap('Registration')
@doc(tags=['Registration']) @doc
@blueprint.route('', methods=['GET']) @blueprint.route('', methods=['GET'])
@jwt_required() @jwt_required()
@use_kwargs({'exchange': fields.Str(), 'target': fields.Str()}) @use_kwargs({'exchange': fields.Str(), 'target': fields.Str()}, location='query')
@marshal_with(registrations_schema) @marshal_with(registrations_schema)
def get_list(exchange=None, target=None): def get_list(exchange=None, target=None):
res = Registration.query res = Registration.query
@@ -26,7 +29,7 @@ def get_list(exchange=None, target=None):
return res.all() return res.all()
@doc(tags=['Registration']) @doc
@blueprint.route('', methods=['POST']) @blueprint.route('', methods=['POST'])
@jwt_required() @jwt_required()
@use_kwargs(registration_schema) @use_kwargs(registration_schema)
@@ -43,7 +46,7 @@ def create(name, routing_key, targets):
return registration return registration
@doc(tags=['Registration']) @doc
@blueprint.route('/<registration_id>', methods=['GET']) @blueprint.route('/<registration_id>', methods=['GET'])
@jwt_required() @jwt_required()
@marshal_with(registration_schema) @marshal_with(registration_schema)
@@ -55,7 +58,7 @@ def get_by_id(registration_id: int):
return NotFoundException(Registration.__name__) return NotFoundException(Registration.__name__)
@doc(tags=['Registration']) @doc
@blueprint.route('/<registration_id>', methods=['PUT']) @blueprint.route('/<registration_id>', methods=['PUT'])
@jwt_required() @jwt_required()
@use_kwargs(registration_schema) @use_kwargs(registration_schema)

View File

@@ -18,6 +18,7 @@ class Config(object):
JWT_AUTH_HEADER_PREFIX = 'Token' JWT_AUTH_HEADER_PREFIX = 'Token'
APISPEC_TITLE = 'MahssageBus API' APISPEC_TITLE = 'MahssageBus API'
APISPEC_VERSION = 'v0.1' APISPEC_VERSION = 'v0.1'
APISPEC_OAS_VERSION = '3.0.0'
RABBITMQ_HOST = os.environ.get("RABBITMQ_HOST", "localhost") RABBITMQ_HOST = os.environ.get("RABBITMQ_HOST", "localhost")

View File

@@ -16,4 +16,4 @@ class Target(SurrogatePK, Model):
super(Target, self).__init__(**kwargs) super(Target, self).__init__(**kwargs)
def __repr__(self): def __repr__(self):
return '<%s(%d):%r->%r>' % (Target.__name__, self.id, self.routing_key, self.exchange.name) return f'<{Target.__name__}({self.id}):{self.routing_key!r}->{self.exchange.name!r}>'

View File

@@ -1,20 +1,23 @@
from flask import Blueprint from flask import Blueprint
from flask_apispec import use_kwargs, marshal_with, doc from flask_apispec import use_kwargs, marshal_with
from flask_jwt_extended import jwt_required from flask_jwt_extended import jwt_required
from marshmallow import fields from marshmallow import fields
from api.exceptions import NotFoundException, BadRequestException from api.exceptions import NotFoundException, BadRequestException
from api.target_exchange.models import TargetExchange from api.target_exchange.models import TargetExchange
from api.utils import docwrap
from .models import Target from .models import Target
from .serializers import target_schema, targets_schema from .serializers import target_schema, targets_schema
blueprint = Blueprint('target', __name__) blueprint = Blueprint('target', __name__)
doc = docwrap('Target')
@doc(tags=['Target']) @doc
@blueprint.route('', methods=['GET']) @blueprint.route('', methods=['GET'])
@jwt_required() @jwt_required()
@use_kwargs({'exchange': fields.Str()}) @use_kwargs({'exchange': fields.Str()}, location='query')
@marshal_with(targets_schema) @marshal_with(targets_schema)
def get_list(exchange=None): def get_list(exchange=None):
res = Target.query res = Target.query
@@ -23,7 +26,7 @@ def get_list(exchange=None):
return res.all() return res.all()
@doc(tags=['Target']) @doc
@blueprint.route('', methods=['POST']) @blueprint.route('', methods=['POST'])
@jwt_required() @jwt_required()
@use_kwargs(target_schema) @use_kwargs(target_schema)
@@ -37,7 +40,7 @@ def create(name, routing_key, exchange):
return target return target
@doc(tags=['Target']) @doc
@blueprint.route('/<target_id>', methods=['GET']) @blueprint.route('/<target_id>', methods=['GET'])
@jwt_required() @jwt_required()
@marshal_with(target_schema) @marshal_with(target_schema)
@@ -49,7 +52,7 @@ def get_by_id(target_id: int):
return NotFoundException(Target.__name__) return NotFoundException(Target.__name__)
@doc(tags=['Target']) @doc
@blueprint.route('/<target_id>', methods=['PUT']) @blueprint.route('/<target_id>', methods=['PUT'])
@jwt_required() @jwt_required()
@use_kwargs(target_schema) @use_kwargs(target_schema)

View File

@@ -9,6 +9,9 @@ class TargetExchange(SurrogatePK, Model):
def __init__(self, **kwargs): def __init__(self, **kwargs):
super(TargetExchange, self).__init__(**kwargs) super(TargetExchange, self).__init__(**kwargs)
def __repr__(self):
return f'<{TargetExchange.__name__}({self.id}):{self.name!r}>'
@staticmethod @staticmethod
def ensure_created(name): def ensure_created(name):
if TargetExchange.query.filter_by(name=name).first() is None: if TargetExchange.query.filter_by(name=name).first() is None:

View File

@@ -1,16 +1,18 @@
from flask import Blueprint from flask import Blueprint
from flask_jwt_extended import jwt_required from flask_jwt_extended import jwt_required
from flask_apispec import use_kwargs, marshal_with, doc from flask_apispec import use_kwargs, marshal_with
from api.exceptions import NotFoundException from api.exceptions import NotFoundException
from api.utils import docwrap
from .models import TargetExchange from .models import TargetExchange
from .serializers import target_exchange_schema, target_exchanges_schema from .serializers import target_exchange_schema, target_exchanges_schema
blueprint = Blueprint('target_exchange', __name__) blueprint = Blueprint('target_exchange', __name__)
doc = docwrap('TargetExchange')
@doc(tags=['TargetExchange']) @doc
@blueprint.route('', methods=['GET']) @blueprint.route('', methods=['GET'])
@jwt_required() @jwt_required()
@marshal_with(target_exchanges_schema) @marshal_with(target_exchanges_schema)
@@ -18,7 +20,7 @@ def get_list():
return TargetExchange.query.all() return TargetExchange.query.all()
@doc(tags=['TargetExchange']) @doc
@blueprint.route('', methods=['POST']) @blueprint.route('', methods=['POST'])
@jwt_required() @jwt_required()
@use_kwargs(target_exchange_schema) @use_kwargs(target_exchange_schema)
@@ -29,7 +31,7 @@ def create(name):
return target_exchange return target_exchange
@doc(tags=['TargetExchange']) @doc
@blueprint.route('/<exchange_id>', methods=['GET']) @blueprint.route('/<exchange_id>', methods=['GET'])
@jwt_required() @jwt_required()
@marshal_with(target_exchange_schema) @marshal_with(target_exchange_schema)
@@ -41,7 +43,7 @@ def get_by_id(exchange_id):
return NotFoundException(__name__) return NotFoundException(__name__)
@doc(tags=['TargetExchange']) @doc
@blueprint.route('/<exchange_id>', methods=['PUT']) @blueprint.route('/<exchange_id>', methods=['PUT'])
@jwt_required() @jwt_required()
@use_kwargs(target_exchange_schema) @use_kwargs(target_exchange_schema)

5
api/utils.py Normal file
View File

@@ -0,0 +1,5 @@
from flask_apispec import doc
def docwrap(tag, security='jwt', **kwargs):
return doc(tags=[tag], security=([{security: []}] if security is not None else None), **kwargs)

3
api/webhook/__init__.py Normal file
View File

@@ -0,0 +1,3 @@
"""the webhook endpoint"""
from . import views

34
api/webhook/views.py Normal file
View File

@@ -0,0 +1,34 @@
from flask import Blueprint, request, jsonify
from flask_apispec import use_kwargs
from marshmallow import fields
from api.exceptions import NotFoundException
from api.registration.models import Registration
from api.utils import docwrap
from api.extensions import rabbit
blueprint = Blueprint('webhook', __name__)
@docwrap('webhook', 'api_key')
@blueprint.route('/webhook', methods=['GET', 'POST'])
@use_kwargs({'apikey': fields.String(required=True)}, location='query')
def webhook(apikey):
reg: Registration = Registration.query.filter_by(token=apikey).first()
if reg is None:
raise NotFoundException(Registration.__name__)
if request.method == 'GET':
message = request.args.to_dict()
message.pop('apikey', None)
else:
message = request.get_json()
for target in reg.targets:
if target.exchange.name == '':
rabbit.queue_publish(target.name, message)
else:
rabbit.exchange_publish(target.exchange.name, target.name, message)
return None, 204

10
app.ini Normal file
View File

@@ -0,0 +1,10 @@
[uwsgi]
wsgi-file = run.py
callable = app
socket = :8080
processes = 4
threads = 2
master = true
chmod-socket = 660
vacuum = true
die-on-term = true

26
requirements/prod.txt Normal file
View File

@@ -0,0 +1,26 @@
alembic==1.7.1
apispec==5.1.0
autopep8==1.5.7
click==8.0.1
colorama==0.4.4
Flask==2.0.1
flask-apispec==0.11.0
Flask-JWT-Extended==4.3.0
Flask-Migrate==3.1.0
Flask-SQLAlchemy==2.5.1
greenlet==1.1.1
itsdangerous==2.0.1
Jinja2==3.0.1
Mako==1.1.5
MarkupSafe==2.0.1
marshmallow==3.13.0
pika==1.2.0
pika-stubs==0.1.3
pycodestyle==2.7.0
PyJWT==2.1.0
python-dotenv==0.19.0
SQLAlchemy==1.4.23
toml==0.10.2
webargs==8.0.1
Werkzeug==2.0.1
uwsgi

View File

@@ -1,3 +1,61 @@
"""RabbitMQ helpers""" """RabbitMQ helpers"""
from .exchanges import ensure_exchange_exists, exchange_publish import pika
from .queues import ensure_queue_exists, queue_publish from flask import Flask, json
from pika.adapters.blocking_connection import BlockingChannel
class RabbitMQ:
host: str
connection: pika.BlockingConnection
channel: BlockingChannel
def __init__(self, app: Flask = None):
if app is not None:
self.init_app(app)
def init_app(self, app: Flask):
self.host = app.config.get('RABBITMQ_HOST', 'localhost')
def _ensure_connection(self):
if self.channel is None:
self.connection = pika.BlockingConnection(
pika.ConnectionParameters(host=self.host))
self.channel = self.connection.channel()
def ensure_queue_exists(self, queue_name: str):
"""Ensures the queue exists on the default exchange
:param queue_name: the name of the queue
"""
self._ensure_connection()
self.channel.queue_declare(queue=queue_name, durable=True)
def queue_publish(self, queue_name: str, message):
self._ensure_connection()
self.ensure_queue_exists(queue_name)
body = json.dumps(message)
body_bytes = body.encode('utf-8')
self.channel.basic_publish(exchange='', routing_key=queue_name, body=body_bytes)
def ensure_exchange_exists(self, name: str) -> None:
"""Ensures exchange exists
:param name: the name of the exchange
"""
self._ensure_connection()
self.channel.exchange_declare(exchange=name, exchange_type='topic', durable=True)
def exchange_publish(self, exchange_name: str, topic: str, message):
"""Publishes a message to a
:param exchange_name: the name of the exchange
:param topic: the topic to publish to
:param message: the message to publish (will be turned into a json)
"""
self._ensure_connection()
self.ensure_exchange_exists(exchange_name)
body = json.dumps(message)
body_bytes = body.encode('utf-8')
self.channel.basic_publish(exchange=exchange_name, routing_key=topic, body=body_bytes)

View File

@@ -1,31 +0,0 @@
import json
from pika.adapters.blocking_connection import BlockingChannel
from .extension import with_pika_channel
@with_pika_channel
def ensure_exchange_exists(name: str, channel: BlockingChannel = None) -> None:
"""Ensures exchange exists
:param name: the name of the exchange
:param channel: a pika channel, if none specified a new one will be created
"""
channel.exchange_declare(exchange=name, exchange_type='topic')
@with_pika_channel
def exchange_publish(exchange_name: str, topic: str, message, channel: BlockingChannel = None):
"""Publishes a message to a
:param exchange_name: the name of the exchange
:param topic: the topic to publish to
:param message: the message to publish (will be turned into a json)
:param channel: a pika channel, if none specified a new one will be created
"""
ensure_exchange_exists(exchange_name, channel=channel)
body = json.dumps(message)
body_bytes = body.encode('utf-8')
channel.basic_publish(exchange=exchange_name, routing_key=topic, body=body_bytes)

View File

@@ -1,25 +0,0 @@
from functools import wraps
import pika
from .settings import RABBITMQ_HOST
def with_pika_channel(f):
@wraps(f)
def wrapper(*args, **kwargs):
# if we are already passing channel, just reuse it
if 'channel' in kwargs:
return f(*args, **kwargs)
# otherwise set up a new connection and channel to wrap our code with
connection = pika.BlockingConnection(
pika.ConnectionParameters(host=RABBITMQ_HOST))
channel = connection.channel()
ret = f(*args, **kwargs, channel=channel)
channel.close()
return ret
return wrapper

View File

@@ -1,19 +0,0 @@
import json
from pika.adapters.blocking_connection import BlockingChannel
from .extension import with_pika_channel
@with_pika_channel
def ensure_queue_exists(name: str, channel: BlockingChannel = None):
channel.queue_declare(queue=name)
@with_pika_channel
def queue_publish(queue_name: str, message, channel: BlockingChannel = None):
ensure_queue_exists(queue_name, channel=channel)
body = json.dumps(message)
body_bytes = body.encode('utf-8')
channel.basic_publish(exchange='', routing_key=queue_name, body=body_bytes)

View File

@@ -1,3 +0,0 @@
import os
RABBITMQ_HOST = os.environ.get("RABBITMQ_HOST", "localhost")