Create: frontend

This commit is contained in:
Yûki VACHOT 2022-01-24 20:04:49 +01:00
parent b6c8cc2f33
commit 0f70917b7b
19 changed files with 3 additions and 1699 deletions

View file

@ -1,30 +0,0 @@
name: Push-Development
on:
push:
branches: [ master ]
pull_request:
branches: [ master ]
jobs:
build:
name: Push Development
runs-on: ubuntu-latest
steps:
- name: Checkout code
uses: actions/checkout@v2
- name: Run tests
run: docker-compose run test
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v1
- name: Login to DockerHub
uses: docker/login-action@v1
with:
username: ${{ secrets.DOCKER_USER }}
password: ${{ secrets.DOCKER_PASS }}
- name: Build and push
id: docker_build
uses: docker/build-push-action@v2
with:
push: true
tags: user/app:latest

View file

@ -1,6 +0,0 @@
FROM python:latest
WORKDIR /data/backend
COPY requirements.txt requirements.txt
RUN pip install --upgrade pip
RUN pip install -r requirements.txt
COPY . .

View file

@ -1,8 +0,0 @@
from application import create_app
import os
app = create_app(os.environ.get('FLASK_ENV', None))
if __name__ == "__main__":
PORT = os.environ.get('PORT', 33507)
app.run(host='0.0.0.0', port=PORT, DEBUG=True)

View file

@ -1,35 +0,0 @@
from flask import Flask
from flask_sqlalchemy import SQLAlchemy
from flask_cors import CORS
import sys
db = SQLAlchemy()
def create_app(flask_env='development'):
app = Flask(__name__, instance_relative_config=False)
origin = app.config.get('ALLOW_ORIGIN')
if origin is None:
origin = ['http://127.0.0.1:4200', 'http://localhost:4200']
CORS(app, supports_credentials=True, origins=origin)
if flask_env == 'production':
app.config.from_object("config.ProductionConfig")
elif flask_env == 'testing':
app.config.from_object("config.TestingConfig")
elif flask_env == 'development':
app.config.from_object("config.DevelopmentConfig")
else:
app.config.from_object("config.Config")
if app.config['SQLALCHEMY_DATABASE_URI_1'] is None or app.config['SQLALCHEMY_DATABASE_URI_2'] is None:
print('No ENV Variable for DATABASE_URL_USERS or DATABASE_URL_LOGS')
sys.exit(1)
else:
print('ENV Variables passed : ', app.config['SQLALCHEMY_BINDS'])
db.init_app(app)
with app.app_context():
from . import routes
app.register_blueprint(routes.bp)
db.create_all()
return app

View file

@ -1,364 +0,0 @@
import hashlib
import os
from datetime import datetime
from flask_sqlalchemy import inspect
from sqlalchemy import asc, desc, or_
from .users_model import Users, db
from .logs_model import Logs
def db_create_log(ip, action, message, has_succeeded, status_code, table=None, id_user=None):
log = Logs(
date=datetime.now().strftime('%Y-%m-%dT%H:%M:%S'),
ip=ip,
action=action,
message=message,
has_succeeded=has_succeeded,
status_code=status_code,
table=table,
id_user=id_user
)
db.session.add(log)
db.session.commit()
return log.json()
def hash_password(salt, password):
return hashlib.pbkdf2_hmac('sha256', password.encode('utf-8'), salt, 100000)
def db_login(ip, email, password):
user = Users.query.filter(
Users.email == email
).scalar()
# Check User and Hash Pass
if user and user.hash_pass == hash_password(user.salt, password):
message = 'User authenticated.'
db_create_log(
ip=ip,
action='login',
message=message,
has_succeeded=True,
status_code=0,
table='users',
id_user=user.id
)
return {'status': 0, 'message': message, 'data': user.json()}
else:
message = f'Email or password invalid'
db_create_log(
ip=ip,
action='login',
message=message,
has_succeeded=False,
status_code=1,
table='users',
id_user=None
)
return {'status': 1, 'message': message} # Email or password invalid
def db_register(ip, email, nickname, password, is_admin=False):
user = Users.query.filter(
Users.email == email
).scalar()
if user:
message = f'{email} already exist.'
db_create_log(
ip=ip,
action='register',
message=message,
has_succeeded=False,
status_code=1,
table='users',
id_user=None
)
return {'status': 1, 'message': message} # User already exist
# Salt Hash Pass with SHA256
salt = os.urandom(32)
hash_pass = hash_password(salt, password)
user = Users(
email=email,
hash_pass=hash_pass,
nickname=nickname,
salt=salt,
is_admin=is_admin
)
user_inspect = inspect(user)
db.session.add(user)
check_inspect = user_inspect.pending
db.session.commit()
# Add to logs
if check_inspect:
id_user = user.json()['id']
message = 'User registered.'
has_succeeded = True
status_code = 0
else:
id_user = None
message = 'Internal Error: User not registered.'
has_succeeded = False
status_code = 1
db_create_log(
ip=ip,
action='register',
message=message,
has_succeeded=has_succeeded,
status_code=status_code,
table='users',
id_user=id_user
)
if status_code == 0:
return {'status': 0, 'message': message, 'data': user.json()}
elif status_code == 1:
return {'status': 1, 'message': message}
def db_user_update(ip, user_id, nickname, password):
user = Users.query.filter(
Users.id == user_id
).scalar()
if user:
has_succeeded = False
status_code = 2
if nickname and password:
# Salt Hash Pass with SHA256
salt = os.urandom(32)
hash_pass = hash_password(salt, password)
Users.query.filter(Users.id == user_id).update({'nickname': nickname, 'hash_pass': hash_pass, 'salt': salt})
db.session.commit()
message = 'User nickname and password updated.'
has_succeeded = True
status_code = 0
elif nickname:
Users.query.filter(Users.id == user_id).update({'nickname': nickname})
db.session.commit()
message = 'User nickname updated.'
has_succeeded = True
status_code = 0
elif password:
# Salt Hash Pass with SHA256
salt = os.urandom(32)
hash_pass = hash_password(salt, password)
Users.query.filter(Users.id == user_id).update({'hash_pass': hash_pass, 'salt': salt})
db.session.commit()
message = 'User password updated.'
has_succeeded = True
status_code = 0
else:
message = 'Only nickname and/or password can be changed.'
db_create_log(
ip=ip,
action='user_update',
message=message,
has_succeeded=has_succeeded,
status_code=status_code,
table='users',
id_user=user_id
)
return {'status': status_code, 'message': message, 'data': user.json()}
else:
message = 'User do not exist.'
db_create_log(
ip=ip,
action='user_update',
message=message,
has_succeeded=False,
status_code=1,
table='users',
id_user=user_id
)
return {'status': 1, 'message': message}
def db_user_delete(ip, user_id):
user_to_delete = Users.query.filter(Users.id == user_id).scalar()
if user_to_delete:
is_admin = bool(user_to_delete.json()['is_admin'])
if is_admin and (Users.query.filter(Users.is_admin == True).count() <= 1 or user_id == 0):
message = 'Can\'t delete last admin'
db_create_log(
ip=ip,
action='user_delete',
message=message,
has_succeeded=False,
status_code=2,
table='users',
id_user=user_id
)
return {'status': 2, 'message': message}
else:
test = Users.query.filter(Users.id == user_id).delete()
if test == 1:
db.session.commit()
message = 'User deleted.'
db_create_log(
ip=ip,
action='user_delete',
message=message,
has_succeeded=True,
status_code=0,
table='users',
id_user=user_id
)
return {'status': 0, 'message': message, 'data': None}
else:
message = 'User do not exist.'
db_create_log(
ip=ip,
action='user_delete',
message=message,
has_succeeded=False,
status_code=1,
table='users',
id_user=user_id
)
return {'status': 1, 'message': message}
else:
message = 'User do not exist.'
db_create_log(
ip=ip,
action='user_delete',
message=message,
has_succeeded=False,
status_code=1,
table='users',
id_user=user_id
)
return {'status': 1, 'message': message}
def db_admin_update_user(ip, user_id, is_admin, password):
user = Users.query.filter(
Users.id == user_id
).scalar()
if user:
has_succeeded = False
status_code = 2
if is_admin is not None and password:
# Salt Hash Pass with SHA256
salt = os.urandom(32)
hash_pass = hash_password(salt, password)
Users.query.filter(Users.id == user_id).update({'is_admin': is_admin, 'hash_pass': hash_pass, 'salt': salt})
db.session.commit()
message = 'User is_admin and password updated.'
has_succeeded = True
status_code = 0
elif is_admin is not None:
Users.query.filter(Users.id == user_id).update({'is_admin': is_admin})
db.session.commit()
message = 'User is_admin updated.'
has_succeeded = True
status_code = 0
elif password:
# Salt Hash Pass with SHA256
salt = os.urandom(32)
hash_pass = hash_password(salt, password)
Users.query.filter(Users.id == user_id).update({'hash_pass': hash_pass, 'salt': salt})
db.session.commit()
message = 'User password updated.'
has_succeeded = True
status_code = 0
else:
message = 'Only is_admin and/or password can be changed.'
db_create_log(
ip=ip,
action='user_update',
message=message,
has_succeeded=has_succeeded,
status_code=status_code,
table='users',
id_user=user_id
)
return {'status': status_code, 'message': message, 'data': user.json()}
else:
message = 'User do not exist.'
db_create_log(
ip=ip,
action='user_update',
message=message,
has_succeeded=False,
status_code=1,
table='users',
id_user=user_id
)
return {'status': 1, 'message': message}
def db_users(ip, user_id, query, by='email,nickname', id=None, is_admin=None, order_by='email,asc'):
# q= or id =
# if q= then by= (default: email,nickname) or email or nickname
# is_admin =
# order_by = (default: email,asc), (nickname,asc), (id,desc), (is_admin,desc)
users = Users.query
if query is id and query is not None:
message = 'Query or id field'
db_create_log(
ip=ip,
action='users',
message=message,
has_succeeded=False,
status_code=1,
table='users',
id_user=user_id
)
return {'status': 1, 'message': message}
if query and by:
if by == 'email':
users = users.filter(Users.email.like('%' + query + '%'))
elif by == 'nickname':
users = users.filter(Users.nickname.like('%' + query + '%'))
else:
users = users.filter(or_(Users.nickname.like('%' + query + '%'), Users.email.like('%' + query + '%')))
elif query and not by:
users = users.filter(or_(Users.nickname.like('%' + query + '%'), Users.email.like('%' + query + '%')))
elif id:
users = users.filter(Users.id == id)
else:
pass
if is_admin is not None:
users = users.filter(Users.is_admin == is_admin)
order_by = order_by.split(',')
if order_by[0] == 'nickname':
order = Users.nickname
elif order_by[0] == 'id':
order = Users.id
elif order_by[0] == 'is_admin':
order = Users.is_admin
else:
order = Users.email
if len(order_by) > 1:
if order_by[1] == 'asc':
users = users.order_by(asc(order))
elif order_by[1] == 'desc':
users = users.order_by(desc(order))
else:
users = users.order_by(asc(order))
else:
users = users.order_by(asc(order))
users = users.all()
message = f'query({query}), by({by}), id({id}), is_admin({is_admin}) and order_by({order_by}): {len(users)} result(s)'
db_create_log(
ip=ip,
action='users',
message=message,
has_succeeded=True,
status_code=0,
table='users',
id_user=user_id
)
return {'status': 0, 'message': message, 'data': [user.json() for user in users]}

View file

@ -1,41 +0,0 @@
from . import db
class Logs(db.Model):
__bind_key__ = 'db-logs'
id = db.Column(db.Integer, primary_key=True)
date = db.Column(db.TIMESTAMP(), nullable=False)
id_user = db.Column(db.Integer, nullable=True)
ip = db.Column(db.String(), nullable=False)
table = db.Column(db.String(), nullable=False)
action = db.Column(db.String(), nullable=False)
message = db.Column(db.String(), nullable=False)
has_succeeded = db.Column(db.Boolean, nullable=False)
status_code = db.Column(db.Integer, nullable=False)
def __init__(self, date, id_user, ip, table, action, message, has_succeeded, status_code):
self.date = date
self.id_user = id_user
self.ip = ip
self.table = table
self.action = action
self.message = message
self.has_succeeded = has_succeeded
self.status_code = status_code
def __repr__(self):
return {
'id': self.id,
'date': self.date,
'id_user': self.id_user,
'ip': self.ip,
'table': self.table,
'action': self.action,
'message': self.message,
'has_succeeded': self.has_succeeded,
'status_code': self.status_code
}
def json(self):
return self.__repr__()

View file

@ -1,35 +0,0 @@
from flask import current_app as app
import json
def send_error(status_code, message, token=None):
data_json = {
'status': 'error',
'message': message
}
res = app.response_class(
response=json.dumps(data_json),
status=status_code,
mimetype='application/json'
)
if token is not None:
res.set_cookie('SESSIONID', token)
return res
def send_message(message, data, token=None, token_delete=False):
data_json = {
'status': 'success',
'message': message,
'data': data
}
res = app.response_class(
response=json.dumps(data_json),
status=200,
mimetype='application/json'
)
if token is not None:
res.set_cookie('SESSIONID', token)
if token_delete:
res.delete_cookie('SESSIONID')
return res

View file

@ -1,345 +0,0 @@
from flask import request, Blueprint
from werkzeug.exceptions import HTTPException
from .responses import send_message, send_error
from .api_functions import db_login, db_register, db_user_update, db_create_log, db_user_delete, db_admin_update_user, db_users
from .sessionJWT import create_auth_token, check_auth_token
bp = Blueprint('myapp', __name__)
@bp.app_errorhandler(HTTPException)
def handle_exception(e):
return send_error(e.code, e.name)
# Login
@bp.route('/api/login', methods=['POST'])
def login():
post_json = request.json
try:
post_email = str(post_json['email'])
post_password = str(post_json['password'])
if post_email != '' and post_password != '':
ip = request.remote_addr
res = db_login(ip, post_email, post_password)
if res['status'] == 0:
user = res['data']
token = create_auth_token(user)
return send_message(res['message'], user, token)
elif res['status'] == 1:
user = None
token = create_auth_token(user)
return send_error(400, res['message'], token)
else:
return send_error(400, 'Empty email and/or password fields.')
except KeyError as e:
return send_error(400, 'Need email, password fields.')
# Register
@bp.route('/api/register', methods=['POST'])
def register():
post_json = request.json
try:
post_email = str(post_json['email'])
post_nickname = str(post_json['nickname'])
post_password = str(post_json['password'])
if post_email != '' and post_password != '' and post_nickname != '':
ip = request.remote_addr
res = db_register(ip, post_email, post_nickname, post_password)
if res['status'] == 1:
return send_error(500, res['message'])
elif res['status'] == 0:
return send_message(res['message'], res['data'])
else:
return send_error(400, 'Empty email and/or password and/or nickname fields.')
except KeyError as e:
return send_error(400, 'Need ' + str(e) + 'field.')
# Logout
@bp.route('/api/logout', methods=['DELETE'])
def logout():
token = check_auth_token(request)
if token['success']:
ip = request.remote_addr
message = 'User disconnected.'
db_create_log(
ip=ip,
action='logout',
message=message,
has_succeeded=True,
status_code=0,
table='users',
id_user=token['payload']['id']
)
return send_message(message, None, token_delete=True)
else:
return send_error(500, token['message'])
# Update User (Nickname, Password)
@bp.route('/api/user/update', methods=['PUT'])
def user_update():
token = check_auth_token(request)
if token['success']:
post_json = request.json
post_nickname = None
post_password = None
fields = ''
if 'nickname' in post_json:
post_nickname = str(post_json['nickname'])
else:
fields += 'nickname '
if 'password' in post_json:
post_password = str(post_json['password'])
else:
fields += 'password '
if post_nickname is not None or post_password is not None:
if post_nickname != '' and post_password != '':
ip = request.remote_addr
user_id = token['payload']['id']
res = db_user_update(ip, user_id, post_nickname, post_password)
if res['status'] == 1:
return send_error(500, res['message'])
elif res['status'] == 0:
return send_message(res['message'], res['data'])
else:
return send_error(400, 'Empty nickname and/or password fields.')
else:
return send_error(400, 'Need ' + fields + 'field.')
else:
return send_error(500, token['message'])
# Delete User
@bp.route('/api/user/delete', methods=['DELETE'])
def user_delete():
token = check_auth_token(request)
if token['success']:
ip = request.remote_addr
user_id = token['payload']['id']
res = db_user_delete(ip, user_id)
if res['status'] != 0:
return send_error(500, res['message'])
else:
db_create_log(
ip=ip,
action='delete',
message='User deleted.',
has_succeeded=True,
status_code=0,
table='users',
id_user=token['payload']['id']
)
return send_message(res['message'], None, token_delete=True)
else:
return send_error(500, token['message'])
# Admin : Create User
@bp.route('/api/admin/create/user', methods=['POST'])
def admin_create_user():
token = check_auth_token(request)
if token['success']:
ip = request.remote_addr
user_id = token['payload']['id']
is_admin = token['payload']['is_admin']
if is_admin:
post_json = request.json
post_email = None
post_nickname = None
post_password = None
post_is_admin = None
fields = ''
if 'email' in post_json:
post_email = str(post_json['email'])
else:
fields += 'email '
if 'nickname' in post_json:
post_nickname = str(post_json['nickname'])
else:
fields += 'nickname '
if 'password' in post_json:
post_password = str(post_json['password'])
else:
fields += 'password '
if 'is_admin' in post_json:
post_is_admin = bool(post_json['is_admin'])
else:
fields += 'is_admin '
if post_email is not None or post_nickname is not None or post_password is not None or post_is_admin is not None:
if post_email != '' and post_nickname != '' and post_password != '' and str(post_is_admin) != '':
res = db_register(ip, post_email, post_nickname, post_password, is_admin=post_is_admin)
if res['status'] == 1:
db_create_log(
ip=ip,
action='admin/create/user',
message=res['message'],
has_succeeded=False,
status_code=res['status'],
table='users',
id_user=user_id
)
return send_error(500, res['message'])
elif res['status'] == 0:
db_create_log(
ip=ip,
action='admin/create/user',
message=res['message'],
has_succeeded=True,
status_code=res['status'],
table='users',
id_user=user_id
)
return send_message(res['message'], res['data'])
else:
return send_error(400, 'Empty email and/or nickname and/or password and/or is_admin fields.')
else:
return send_error(400, 'Need ' + fields + 'field.')
else:
return send_error(500, 'User does not have permission.')
else:
return send_error(500, token['message'])
# Admin : Change User password and/or role
@bp.route('/api/admin/update/user', methods=['PUT'])
def admin_update_user():
token = check_auth_token(request)
if token['success']:
user_id = token['payload']['id']
is_admin = token['payload']['is_admin']
if is_admin:
post_json = request.json
post_is_admin = None
post_password = None
post_user_id_delete = None
fields = ''
if 'id' in post_json:
post_user_id_delete = int(post_json['id'])
else:
fields += 'id '
if 'is_admin' in post_json:
post_is_admin = bool(post_json['is_admin'])
else:
fields += 'is_admin '
if 'password' in post_json:
post_password = str(post_json['password'])
else:
fields += 'password '
if post_user_id_delete is not None and (post_is_admin is not None or post_password is not None):
if str(post_is_admin) != '' and post_password != '' and str(post_user_id_delete) != '':
ip = request.remote_addr
res = db_admin_update_user(ip, post_user_id_delete, post_is_admin, post_password)
if res['status'] == 1:
db_create_log(
ip=ip,
action='admin/update/user',
message=res['message'],
has_succeeded=False,
status_code=res['status'],
table='users',
id_user=user_id
)
return send_error(500, res['message'])
elif res['status'] == 0:
db_create_log(
ip=ip,
action='admin/update/user',
message=res['message'],
has_succeeded=True,
status_code=res['status'],
table='users',
id_user=user_id
)
return send_message(res['message'], res['data'])
else:
return send_error(400, 'Empty is_admin and/or password fields.')
else:
return send_error(400, 'Need ' + fields + 'field.')
else:
return send_error(500, 'User does not have permission.')
else:
return send_error(500, token['message'])
# Admin : Delete User
@bp.route('/api/admin/delete/user/<id>', methods=['DELETE'])
def admin_delete_user(id):
token = check_auth_token(request)
if token['success']:
ip = request.remote_addr
user_id = token['payload']['id']
is_admin = token['payload']['is_admin']
if is_admin:
post_json = {'id': id}
post_user_id_delete = None
fields = ''
if 'id' in post_json:
post_user_id_delete = int(post_json['id'])
else:
fields += 'id'
if post_user_id_delete is not None:
if str(post_user_id_delete) != '':
res = db_user_delete(ip, int(post_user_id_delete))
if res['status'] == 1:
db_create_log(
ip=ip,
action='admin/delete/user',
message=res['message'],
has_succeeded=False,
status_code=res['status'],
table='users',
id_user=user_id
)
return send_error(500, res['message'])
else:
db_create_log(
ip=ip,
action='admin/delete/user',
message=res['message'],
has_succeeded=True,
status_code=res['status'],
table='users',
id_user=user_id
)
return send_message(res['message'], None)
else:
return send_error(400, 'Empty id field.')
else:
return send_error(400, 'Need ' + fields + 'field.')
else:
return send_error(500, 'User does not have permission.')
else:
return send_error(500, token['message'])
# List of User (must be authenticated) & Search
@bp.route('/api/users', methods=['GET'])
def users():
token = check_auth_token(request)
if token['success']:
ip = request.remote_addr
user_id = token['payload']['id']
get_query = request.args.get('q')
get_by = request.args.get('by')
get_id = request.args.get('id')
get_is_admin = request.args.get('is_admin')
get_order_by = request.args.get('order_by')
res = db_users(ip, user_id, get_query, get_by, get_id, get_is_admin, get_order_by)
if res['status'] == 1:
return send_error(500, res['message'])
else:
return send_message(res['message'], res['data'])
else:
return send_error(500, token['message'])

View file

@ -1,39 +0,0 @@
from datetime import datetime, timedelta
from flask import current_app as app
import jwt
def create_auth_token(user, time_second=1800):
try:
time = datetime.now()
payload = {
'exp': time + timedelta(days=0, seconds=time_second),
'iat': time,
'user': user
}
return jwt.encode(
payload,
app.config.get('SECRET_KEY'),
algorithm='HS256'
)
except Exception as e:
return e
def decode_auth_token(auth_token):
try:
payload = jwt.decode(
auth_token,
app.config.get('SECRET_KEY'),
algorithms='HS256'
)
return {'success': True, 'payload': payload['user']}
except jwt.ExpiredSignatureError:
return {'success': False, 'message': 'Signature expired . Please log in again.'}
except jwt.InvalidTokenError as e:
return {'success': False, 'message': 'User not authenticated.'}
def check_auth_token(request):
token = request.cookies.get('SESSIONID')
return decode_auth_token(token)

View file

@ -1,37 +0,0 @@
from . import db
class Users(db.Model):
__bind_key__ = 'db-users'
id = db.Column(db.Integer, primary_key=True, autoincrement=True)
email = db.Column(db.String(), nullable=False, unique=True)
nickname = db.Column(db.String(), nullable=False)
hash_pass = db.Column(db.LargeBinary(), nullable=False)
salt = db.Column(db.LargeBinary(), nullable=False)
is_admin = db.Column(db.Boolean, default=False, nullable=False)
def __init__(self, email, nickname, hash_pass, salt, is_admin):
self.email = email
self.hash_pass = hash_pass
self.nickname = nickname
self.salt = salt
self.is_admin = is_admin
def __repr__(self):
return {
'id': self.id,
'email': self.email,
'nickname': self.nickname,
'hash_pass': self.hash_pass,
'salt': self.salt,
'is_admin': self.is_admin
}
def json(self):
return {
'id': self.id,
'email': self.email,
'nickname': self.nickname,
'is_admin': self.is_admin
}

View file

@ -1,43 +0,0 @@
import os
basedir = os.path.abspath(os.path.dirname(__file__))
class Config(object):
DEBUG = False
TESTING = False
CSRF_ENABLED = True
FLASK_APP = os.environ.get('FLASK_APP', None)
FLASK_ENV = os.environ.get('FLASK_ENV', None)
SQLALCHEMY_ECHO = False
SQLALCHEMY_TRACK_MODIFICATIONS = False
SQLALCHEMY_DATABASE_URI_1 = os.environ.get('DATABASE_URL_USERS', None)
SQLALCHEMY_DATABASE_URI_2 = os.environ.get('DATABASE_URL_LOGS', None)
SQLALCHEMY_BINDS = {
'db-users': SQLALCHEMY_DATABASE_URI_1,
'db-logs': SQLALCHEMY_DATABASE_URI_2
}
SECRET_KEY = os.environ.get('SECRET_KEY', 'default_secret_key')
ALLOW_ORIGIN = os.environ.get('ALLOW_ORIGIN', None)
class ProductionConfig(Config):
DEBUG = False
SQLALCHEMY_ECHO = False
SQLALCHEMY_TRACK_MODIFICATIONS = False
class TestingConfig(Config):
TESTING = True
SQLALCHEMY_ECHO = False
SQLALCHEMY_TRACK_MODIFICATIONS = False
class DevelopmentConfig(Config):
DEVELOPMENT = True
DEBUG = True
SQLALCHEMY_ECHO = True
SQLALCHEMY_TRACK_MODIFICATIONS = True

View file

@ -1,6 +0,0 @@
export FLASK_APP=app.py
export FLASK_ENV=development
export FLASK_DEBUG=1
export PYTHONUNBUFFERED=1
export DATABASE_URL_USERS=postgresql://flaskaled1:aled1@localhost:5433/flaskaledDb1
export DATABASE_URL_LOGS=postgresql://flaskaled2:aled2@localhost:5434/flaskaledDb2

View file

@ -1,47 +0,0 @@
import os
from application.users_model import Users
from application.api_functions import hash_password
TAB_USER_WITH_PASSWORD = [
{
"id": 1,
"email": "riri@gmail.com",
"nickname": "Riri",
"password": "ririPass",
"is_admin": False
},
{
"id": 2,
"email": "fifi@gmail.com",
"nickname": "Fifi",
"password": "fifiPass",
"is_admin": False
},
{
"id": 3,
"email": "donald@gmail.com",
"nickname": "Donald",
"password": "donaldPass",
"is_admin": False
},
{
"id": 4,
"email": "daisy@gmail.com",
"nickname": "Daisy",
"password": "daisyPass",
"is_admin": True
},
]
# Convert user with passord (uwp) to user
def uwp_to_user(uwp):
salt0 = os.urandom(32)
hash_pass0 = hash_password(salt0, uwp["password"])
return Users(
email=uwp["email"],
nickname=uwp["nickname"],
hash_pass=hash_pass0,
salt=salt0,
is_admin=uwp["is_admin"]
)

View file

@ -1,13 +0,0 @@
-- Table: users
CREATE TABLE IF NOT EXISTS users
(
id serial PRIMARY KEY,
email character varying(320) NOT NULL,
nickname character varying(50) NOT NULL,
hash_pass bytea NOT NULL,
salt bytea NOT NULL,
is_admin boolean NOT NULL DEFAULT FALSE
);
INSERT INTO users VALUES(0,'admin@admin.admin','Admin',decode('e5ed79b503704ed20a1d250770db68182118de7fe0236db9bbfb0dd9684087d6', 'hex'),decode('7012f69f1ac7c23c5dca498c30fa94527b507cc9e40fab9bae284d1465a37724', 'hex'),TRUE);

View file

@ -1,14 +0,0 @@
-- Table: logs
CREATE TABLE IF NOT EXISTS logs
(
id serial PRIMARY KEY,
date timestamp NOT NULL,
id_user integer,
ip character varying(15) NOT NULL,
"table" character varying(25) NOT NULL,
action character varying(50) NOT NULL,
message character varying(512) NOT NULL,
has_succeeded boolean NOT NULL,
status_code smallint NOT NULL
);

View file

@ -1,13 +0,0 @@
alembic==1.7.5
Flask==2.0.2
Flask-Migrate==3.1.0
Flask-Script==2.0.6
Flask-Testing==0.8.1
Flask-SQLAlchemy==2.5.1
Flask-WTF==0.15.1
pipreqs==0.4.10
PyJWT==2.3.0
pytest==6.2.5
SQLAlchemy==1.4.27
psycopg2==2.9.2
Flask-Cors==3.0.10

View file

@ -1,542 +0,0 @@
import unittest
from flask_testing import TestCase
from fictive_users import TAB_USER_WITH_PASSWORD, uwp_to_user
from application import db, create_app
class BaseTestCase(TestCase):
def create_app(self):
app = create_app('testing')
return app
def setUp(self):
db.drop_all()
db.create_all()
for uwp in TAB_USER_WITH_PASSWORD:
db.session.add(uwp_to_user(uwp))
db.session.commit()
def tearDown(self):
db.session.remove()
db.drop_all()
class FlaskTestCase(BaseTestCase):
# -- UTILS ---
def login(self, email, password):
data0 = {
"email": email,
"password": password
}
response = self.client.post('/api/login', json=data0)
return response
# --- LOGIN ---
def test_login_NoFields_statusCode(self):
response = self.client.post('/api/login', json={})
self.assertEqual(response.status_code, 400)
def test_login_NoFields_message(self):
response = self.client.post('/api/login', json={})
self.assertEqual(response.json['message'], 'Need email, password fields.')
def test_login_emptyFields_statusCode(self):
data0 = {
"email": "",
"password": "blabla"
}
response = self.client.post('/api/login', json=data0)
self.assertEqual(response.status_code, 400)
def test_login_emptyFields_message(self):
data0 = {
"email": "",
"password": "blabla"
}
response = self.client.post('/api/login', json=data0)
self.assertEqual(response.json['message'], 'Empty email and/or password fields.')
def test_login_wrongFields_statusCode(self):
data0 = {
"email": "nimp@gmail.com",
"password": "nimp"
}
response = self.client.post('/api/login', json=data0)
self.assertEqual(response.status_code, 400)
def test_login_wrongFields_message(self):
data0 = {
"email": "nimp@gmail.com",
"password": "nimp"
}
response = self.client.post('/api/login', json=data0)
self.assertEqual(response.json['message'], 'Email or password invalid')
def test_login_success_statusCode(self):
data0 = {
"email": "riri@gmail.com",
"password": "ririPass"
}
response = self.client.post('/api/login', json=data0)
self.assertEqual(response.status_code, 200)
def test_login_success_message(self):
data0 = {
"email": "riri@gmail.com",
"password": "ririPass"
}
response = self.client.post('/api/login', json=data0)
self.assertEqual(response.json['message'], 'User authenticated.')
# --- REGISTER ---
def test_register_noFields_statusCode(self):
response = self.client.post('/api/register', json={})
self.assertEqual(response.status_code, 400)
def test_register_noFields_message(self):
response = self.client.post('/api/register', json={})
self.assertIn('Need', response.json['message'])
def test_register_emptyFields_statusCode(self):
data0 = {
"email": "",
"password": "blabla",
"nickname": "blabla"
}
response = self.client.post('/api/register', json=data0)
self.assertEqual(response.status_code, 400)
def test_register_emptyFields_message(self):
data0 = {
"email": "",
"password": "blabla",
"nickname": "blabla"
}
response = self.client.post('/api/register', json=data0)
self.assertEqual(response.json['message'], 'Empty email and/or password and/or nickname fields.')
def test_register_alreadyExist_statusCode(self):
data0 = {
"email": "riri@gmail.com",
"password": "blabla",
"nickname": "blabla"
}
response = self.client.post('/api/register', json=data0)
self.assertEqual(response.status_code, 500)
def test_register_alreadyExist_statusCode(self):
data0 = {
"email": "riri@gmail.com",
"password": "blabla",
"nickname": "blabla"
}
response = self.client.post('/api/register', json=data0)
self.assertIn('already exist', response.json['message'])
def test_register_success_statusCode(self):
data0 = {
"email": "loulou@gmail.com",
"password": "loulouPass",
"nickname": "Loulou"
}
response = self.client.post('/api/register', json=data0)
self.assertEqual(response.status_code, 200)
def test_register_success_message(self):
data0 = {
"email": "loulou@gmail.com",
"password": "loulouPass",
"nickname": "Loulou"
}
response = self.client.post('/api/register', json=data0)
self.assertEqual(response.json['message'], 'User registered.')
# --- LOGOUT ---
def test_logout_fail_(self):
response = self.client.delete('/api/logout')
self.assertEqual(response.status_code, 500)
def test_logout_success(self):
response = self.login("riri@gmail.com", "ririPass")
self.assertEqual(response.status_code, 200)
response = self.client.delete('/api/logout')
self.assertEqual(response.status_code, 200)
# --- USER/UPDATE ---
def test_userUpdate_notConnected_statusCode(self):
response = self.client.put('/api/user/update', json={})
self.assertEqual(response.status_code, 500)
def test_userUpdate_notConnected_message(self):
response = self.client.put('/api/user/update', json={})
self.assertEqual(response.json['message'], 'User not authenticated.')
def test_userUpdate_noFields_statusCode(self):
response = self.login("riri@gmail.com", "ririPass")
self.assertEqual(response.status_code, 200)
response = self.client.put('/api/user/update', json={})
self.assertEqual(response.status_code, 400)
def test_userUpdate_noFields_message(self):
response = self.login("riri@gmail.com", "ririPass")
self.assertEqual(response.status_code, 200)
response = self.client.put('/api/user/update', json={})
self.assertIn('Need', response.json['message'])
def test_userUpdate_emptyFields_statusCode(self):
response = self.login("riri@gmail.com", "ririPass")
self.assertEqual(response.status_code, 200)
data0 = {
"nickname": "",
"password": "blabla"
}
response = self.client.put('/api/user/update', json=data0)
self.assertEqual(response.status_code, 400)
def test_userUpdate_emptyFields_message(self):
response = self.login("riri@gmail.com", "ririPass")
self.assertEqual(response.status_code, 200)
data0 = {
"nickname": "",
"password": "blabla"
}
response = self.client.put('/api/user/update', json=data0)
self.assertEqual(response.json['message'], 'Empty nickname and/or password fields.')
def test_self_update_success_statusCode(self):
response = self.login("riri@gmail.com", "ririPass")
self.assertEqual(response.status_code, 200)
data0 = {
"nickname": "Ririri",
"password": "ririPass"
}
response = self.client.put('/api/user/update', json=data0)
self.assertEqual(response.status_code, 200)
# --- USER/DELETE ---
def test_userDelete_notConnected_statusCode(self):
response = self.client.delete('/api/user/delete')
self.assertEqual(response.status_code, 500)
def test_userDelete_notConnected_message(self):
response = self.client.delete('/api/user/delete')
self.assertEqual(response.json['message'], 'User not authenticated.')
def test_userDelete_success_statusCode(self):
response = self.login('riri@gmail.com', 'ririPass')
self.assertEqual(response.status_code, 200)
response = self.client.delete('/api/user/delete')
self.assertEqual(response.status_code, 200)
def test_userDelete_success_message(self):
response = self.login('riri@gmail.com', 'ririPass')
self.assertEqual(response.status_code, 200)
response = self.client.delete('/api/user/delete')
self.assertEqual(response.json['message'], 'User deleted.')
def test_userDelete_lastAdmin_statusCode(self):
response = self.login('donald@gmail.com', 'donaldPass')
self.assertEqual(response.status_code, 200)
response = self.client.delete('/api/user/delete')
self.assertEqual(response.status_code, 200)
response = self.login('daisy@gmail.com', 'daisyPass')
self.assertEqual(response.status_code, 200)
response = self.client.delete('/api/user/delete')
self.assertEqual(response.status_code, 500)
def test_userDelete_lastAdmin_message(self):
response = self.login('donald@gmail.com', 'donaldPass')
self.assertEqual(response.status_code, 200)
response = self.client.delete('/api/user/delete')
self.assertEqual(response.status_code, 200)
response = self.login('daisy@gmail.com', 'daisyPass')
self.assertEqual(response.status_code, 200)
response = self.client.delete('/api/user/delete')
self.assertEqual(response.json['message'], 'Can\'t delete last admin')
# --- ADMIN/CREATE/USER ---
def test_adminCreate_notConnected_statusCode(self):
response = self.client.post('/api/admin/create/user', json={})
self.assertEqual(response.status_code, 500)
def test_adminCreate_notConnected_message(self):
response = self.client.post('/api/admin/create/user', json={})
self.assertEqual(response.json['message'], 'User not authenticated.')
def test_adminCreate_noPermission_statusCode(self):
response = self.login('riri@gmail.com', 'ririPass')
self.assertEqual(response.status_code, 200)
response = self.client.post('/api/admin/create/user', json={})
self.assertEqual(response.status_code, 500)
def test_adminCreate_noPermission_message(self):
response = self.login('riri@gmail.com', 'ririPass')
self.assertEqual(response.status_code, 200)
response = self.client.post('/api/admin/create/user', json={})
self.assertEqual(response.json['message'], 'User does not have permission.')
def test_adminCreate_noFields_statusCode(self):
response = self.login('daisy@gmail.com', 'daisyPass')
self.assertEqual(response.status_code, 200)
response = self.client.post('/api/admin/create/user', json={})
self.assertEqual(response.status_code, 400)
def test_adminCreate_noFields_message(self):
response = self.login('daisy@gmail.com', 'daisyPass')
self.assertEqual(response.status_code, 200)
response = self.client.post('/api/admin/create/user', json={})
self.assertIn('Need', response.json['message'])
def test_adminCreate_emptyFields_statusCode(self):
response = self.login('daisy@gmail.com', 'daisyPass')
self.assertEqual(response.status_code, 200)
data0 = {
"email": "",
"nickname": "Mickey",
"password": "mickeyPass",
"is_admin": True,
}
response = self.client.post('/api/admin/create/user', json=data0)
self.assertEqual(response.status_code, 400)
def test_adminCreate_emptyFields_message(self):
response = self.login('daisy@gmail.com', 'daisyPass')
self.assertEqual(response.status_code, 200)
data0 = {
"email": "",
"nickname": "Mickey",
"password": "mickeyPass",
"is_admin": True,
}
response = self.client.post('/api/admin/create/user', json=data0)
self.assertEqual(response.json['message'],
'Empty email and/or nickname and/or password and/or is_admin fields.')
def test_adminCreate_alreadyExist_statusCode(self):
response = self.login('daisy@gmail.com', 'daisyPass')
self.assertEqual(response.status_code, 200)
data0 = {
"email": "riri@gmail.com",
"passord": "blabla",
"nickname": "blabla",
}
response = self.client.post('/api/admin/create/user', json=data0)
self.assertEqual(response.status_code, 500)
def test_adminCreate_alreadyExist_message(self):
response = self.login('daisy@gmail.com', 'daisyPass')
self.assertEqual(response.status_code, 200)
data0 = {
"email": "riri@gmail.com",
"passord": "blabla",
"nickname": "blabla",
}
response = self.client.post('/api/admin/create/user', json=data0)
self.assertIn('already exist', response.json['message'])
def test_adminCreate_success_statusCode(self):
response = self.login('daisy@gmail.com', 'daisyPass')
self.assertEqual(response.status_code, 200)
data0 = {
"email": "mickey@gmail.com",
"nickname": "Mickey",
"password": "mickeyPass",
"is_admin": True,
}
response = self.client.post('/api/admin/create/user', json=data0)
self.assertEqual(response.status_code, 200)
def test_adminCreate_success_message(self):
response = self.login('daisy@gmail.com', 'daisyPass')
self.assertEqual(response.status_code, 200)
data0 = {
"email": "mickey@gmail.com",
"nickname": "Mickey",
"password": "mickeyPass",
"is_admin": True,
}
response = self.client.post('/api/admin/create/user', json=data0)
self.assertEqual(response.json['message'], 'User registered.')
# --- ADMIN/UPDATE/USER ---
def test_adminUpdate_notConnected_statusCode(self):
response = self.client.put('/api/admin/update/user', json={})
self.assertEqual(response.status_code, 500)
def test_adminUpdate_notConnected_message(self):
response = self.client.put('/api/admin/update/user', json={})
self.assertEqual(response.json['message'], 'User not authenticated.')
def test_adminUpdate_noPermission_statusCode(self):
response = self.login('riri@gmail.com', 'ririPass')
self.assertEqual(response.status_code, 200)
response = self.client.put('/api/admin/update/user', json={})
self.assertEqual(response.status_code, 500)
def test_adminUpdate_noPermission_message(self):
response = self.login('riri@gmail.com', 'ririPass')
self.assertEqual(response.status_code, 200)
response = self.client.put('/api/admin/update/user', json={})
self.assertEqual(response.json['message'], 'User does not have permission.')
def test_adminUpdate_noFields_statusCode(self):
response = self.login('daisy@gmail.com', 'daisyPass')
self.assertEqual(response.status_code, 200)
response = self.client.put('/api/admin/update/user', json={})
self.assertEqual(response.status_code, 400)
def test_adminUpdate_noFields_message(self):
response = self.login('daisy@gmail.com', 'daisyPass')
self.assertEqual(response.status_code, 200)
response = self.client.put('/api/admin/update/user', json={})
self.assertIn('Need', response.json['message'])
def test_adminUpdate_emptyFields_statusCode(self):
response = self.login('daisy@gmail.com', 'daisyPass')
self.assertEqual(response.status_code, 200)
data0 = {
"id": 1,
"password": "",
"is_admin": False,
}
response = self.client.put('/api/admin/update/user', json=data0)
self.assertEqual(response.status_code, 400)
def test_adminUpdate_emptyFields_message(self):
response = self.login('daisy@gmail.com', 'daisyPass')
self.assertEqual(response.status_code, 200)
data0 = {
"id": 1,
"password": "",
"is_admin": False,
}
response = self.client.put('/api/admin/update/user', json=data0)
self.assertEqual(response.json['message'], 'Empty is_admin and/or password fields.')
def test_adminUpdate_notExists_statusCode(self):
response = self.login('daisy@gmail.com', 'daisyPass')
self.assertEqual(response.status_code, 200)
data0 = {
"id": 99,
"password": "blabla",
"is_admin": False
}
response = self.client.put('/api/admin/update/user', json=data0)
self.assertEqual(response.status_code, 500)
def test_adminUpdate_notExists_message(self):
response = self.login('daisy@gmail.com', 'daisyPass')
self.assertEqual(response.status_code, 200)
data0 = {
"id": 99,
"password": "blabla",
"is_admin": False
}
response = self.client.put('/api/admin/update/user', json=data0)
self.assertEqual(response.json['message'], 'User do not exist.')
def test_adminUpdate_success_message(self):
response = self.login('daisy@gmail.com', 'daisyPass')
self.assertEqual(response.status_code, 200)
data0 = {
"id": 1,
"password": "roroPass",
"is_admin": False,
}
response = self.client.put('/api/admin/update/user', json=data0)
self.assertEqual(response.status_code, 200)
def test_adminUpdate_success_statusCode(self):
response = self.login('daisy@gmail.com', 'daisyPass')
self.assertEqual(response.status_code, 200)
data0 = {
"id": 1,
"password": "roroPass",
"is_admin": False,
}
response = self.client.put('/api/admin/update/user', json=data0)
self.assertIn("updated", response.json['message'])
# --- ADMIN/DELETE/USER ---
def test_adminDelete_notConnected_statusCode(self):
response = self.client.delete('/api/admin/delete/user/1')
self.assertEqual(response.status_code, 500)
def test_adminDelete_notConnected_message(self):
response = self.client.delete('/api/admin/delete/user/1')
self.assertEqual(response.json['message'], 'User not authenticated.')
def test_adminDelete_noPermission_statusCode(self):
response = self.login('riri@gmail.com', 'ririPass')
self.assertEqual(response.status_code, 200)
response = self.client.delete('/api/admin/delete/user/1')
self.assertEqual(response.status_code, 500)
def test_adminDelete_noPermission_message(self):
response = self.login('riri@gmail.com', 'ririPass')
self.assertEqual(response.status_code, 200)
response = self.client.delete('/api/admin/delete/user/1')
self.assertEqual(response.json['message'], 'User does not have permission.')
def test_adminDelete_noFields_statusCode(self):
response = self.login('daisy@gmail.com', 'daisyPass')
self.assertEqual(response.status_code, 200)
response = self.client.delete('/api/admin/delete/user/')
self.assertEqual(response.status_code, 404)
def test_adminDelete_no_fields(self):
response = self.login('daisy@gmail.com', 'daisyPass')
self.assertEqual(response.status_code, 200)
response = self.client.delete('/api/admin/delete/user')
self.assertEqual('Not Found', response.json['message'])
def test_adminDelete_notExists_statusCode(self):
response = self.login('daisy@gmail.com', 'daisyPass')
self.assertEqual(response.status_code, 200)
response = self.client.delete('/api/admin/delete/user/99')
self.assertEqual(response.status_code, 500)
def test_adminDelete_notExists_message(self):
response = self.login('daisy@gmail.com', 'daisyPass')
self.assertEqual(response.status_code, 200)
response = self.client.delete('/api/admin/delete/user/99')
self.assertEqual(response.json['message'], 'User do not exist.')
def test_adminDelete_success_statusCode(self):
response = self.login('daisy@gmail.com', 'daisyPass')
self.assertEqual(response.status_code, 200)
response = self.client.delete('/api/admin/delete/user/2')
self.assertEqual(response.status_code, 200)
def test_adminDelete_success_message(self):
response = self.login('daisy@gmail.com', 'daisyPass')
self.assertEqual(response.status_code, 200)
response = self.client.delete('/api/admin/delete/user/2')
self.assertEqual(response.json['message'], 'User deleted.')
# --- LIST OF USER ---
def test_listOfUsers_fail(self):
response = self.client.get('/api/users')
self.assertEqual(response.status_code, 500)
def test_listOfUsers_success(self):
response = self.login('riri@gmail.com', 'ririPass')
self.assertEqual(response.status_code, 200)
response = self.client.get('/api/users?order_by=nickname')
self.assertEqual(response.status_code, 200)
if __name__ == '__main__':
unittest.main()

View file

@ -1,60 +1,6 @@
version: '3.8'
services:
flaskaled-srv1:
image: postgres:latest
container_name: flaskaled-srv1
ports:
- "5433:5432"
volumes:
- ./backend/init-db1.sql:/docker-entrypoint-initdb.d/init-db1.sql
environment:
- POSTGRES_HOST=flaskaled-srv1
- POSTGRES_PORT=5432
- POSTGRES_DB=flaskaledDb1
- POSTGRES_USER=flaskaled1
- POSTGRES_PASSWORD=aled1
restart: unless-stopped
flaskaled-srv2:
image: postgres:latest
container_name: flaskaled-srv2
ports:
- "5434:5432"
volumes:
- ./backend/init-db2.sql:/docker-entrypoint-initdb.d/init-db2.sql
environment:
- POSTGRES_HOST=flaskaled-srv2
- POSTGRES_PORT=5432
- POSTGRES_DB=flaskaledDb2
- POSTGRES_USER=flaskaled2
- POSTGRES_PASSWORD=aled2
restart: unless-stopped
backend:
container_name: backend
build: ./backend
command: python -m flask run --host=0.0.0.0
ports:
- "5000:5000"
volumes:
- ./backend:/data/backend
depends_on:
- flaskaled-srv1
- flaskaled-srv2
links:
- flaskaled-srv1
- flaskaled-srv2
environment:
- FLASK_APP=app.py
- FLASK_ENV=development
- FLASK_DEBUG=1
- PYTHONUNBUFFERED=1
- DATABASE_URL_USERS=postgresql://flaskaled1:aled1@flaskaled-srv1/flaskaledDb1
- DATABASE_URL_LOGS=postgresql://flaskaled2:aled2@flaskaled-srv2/flaskaledDb2
- ALLOW_ORIGIN=frontend
- SECRET_KEY=default_secret_key
frontend:
container_name: frontend
build: ./frontend
@ -64,30 +10,5 @@ services:
volumes:
- ./frontend:/data/frontend
- ./frontend/node_modules:/data/frontend/node_modules
depends_on:
- backend
links:
- backend
environment:
- NODE_ENV=development
test:
container_name: test
build: ./backend
command: python test.py
volumes:
- ./backend:/data/backend
depends_on:
- flaskaled-srv1
- flaskaled-srv2
links:
- flaskaled-srv1
- flaskaled-srv2
environment:
- FLASK_APP=app.py
- FLASK_ENV=test
- FLASK_DEBUG=0
- PYTHONUNBUFFERED=1
- DATABASE_URL_USERS=postgresql://flaskaled1:aled1@flaskaled-srv1/flaskaledDb1
- DATABASE_URL_LOGS=postgresql://flaskaled2:aled2@flaskaled-srv2/flaskaledDb2
- SECRET_KEY=default_secret_key
- NODE_ENV=development

View file

@ -1,3 +1,4 @@
export const environment = {
production: true
production: true,
debutUrl: 'http://10.1.2.10:5000/api/'
};