Compare commits
5 commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 3644c47843 | |||
| b60149d3ec | |||
| 567ddbd41c | |||
| 122d0e7af6 | |||
|
|
06fae502df |
33 changed files with 16 additions and 1645 deletions
16
README.md
Normal file
16
README.md
Normal file
|
|
@ -0,0 +1,16 @@
|
|||
# Projet Sécurité INFO5A 2022
|
||||
|
||||
**Auteurs**:
|
||||
- Baptiste Peupier
|
||||
- Wilfried Vallée
|
||||
- Mihary Ranaivoson
|
||||
- Yûki Vachot
|
||||
|
||||
**Branches**:
|
||||
- db1: Base de données Utilisateur
|
||||
- db2: Base de données Log
|
||||
- backend: Flask Backend
|
||||
- frontend: Serveur Angular avec un autre serveur Flask Backend communiquant avec la "branche" backend
|
||||
- development: non mis à jour ne pas cloner
|
||||
|
||||
docker compose up pour chaque branche
|
||||
|
|
@ -1,6 +0,0 @@
|
|||
FROM python:latest
|
||||
WORKDIR /data/backend
|
||||
COPY requirements.txt requirements.txt
|
||||
RUN pip install --upgrade pip
|
||||
RUN pip install -r requirements.txt
|
||||
COPY . .
|
||||
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
|
|
@ -1,8 +0,0 @@
|
|||
from application import create_app
|
||||
import os
|
||||
|
||||
app = create_app(os.environ.get('FLASK_ENV', None))
|
||||
|
||||
if __name__ == "__main__":
|
||||
PORT = os.environ.get('PORT', 33507)
|
||||
app.run(host='0.0.0.0', port=PORT, DEBUG=True)
|
||||
|
|
@ -1,35 +0,0 @@
|
|||
from flask import Flask
|
||||
from flask_sqlalchemy import SQLAlchemy
|
||||
from flask_cors import CORS
|
||||
import sys
|
||||
|
||||
db = SQLAlchemy()
|
||||
|
||||
|
||||
def create_app(flask_env='development'):
|
||||
app = Flask(__name__, instance_relative_config=False)
|
||||
origin = app.config.get('ALLOW_ORIGIN')
|
||||
if origin is None:
|
||||
origin = ['http://10.1.1.10:4200', 'https://10.1.1.10:4200', 'http://127.0.0.1:4200', 'http://localhost:4200']
|
||||
CORS(app, supports_credentials=True, origins=origin)
|
||||
if flask_env == 'production':
|
||||
app.config.from_object("config.ProductionConfig")
|
||||
elif flask_env == 'testing':
|
||||
app.config.from_object("config.TestingConfig")
|
||||
elif flask_env == 'development':
|
||||
app.config.from_object("config.DevelopmentConfig")
|
||||
else:
|
||||
app.config.from_object("config.Config")
|
||||
|
||||
if app.config['SQLALCHEMY_DATABASE_URI_1'] is None or app.config['SQLALCHEMY_DATABASE_URI_2'] is None:
|
||||
print('No ENV Variable for DATABASE_URL_USERS or DATABASE_URL_LOGS')
|
||||
sys.exit(1)
|
||||
else:
|
||||
print('ENV Variables passed : ', app.config['SQLALCHEMY_BINDS'])
|
||||
|
||||
db.init_app(app)
|
||||
with app.app_context():
|
||||
from . import routes
|
||||
app.register_blueprint(routes.bp)
|
||||
db.create_all()
|
||||
return app
|
||||
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
|
|
@ -1,366 +0,0 @@
|
|||
import hashlib
|
||||
import os
|
||||
from datetime import datetime
|
||||
from flask_sqlalchemy import inspect
|
||||
from sqlalchemy import asc, desc, or_
|
||||
from .users_model import Users, db
|
||||
from .logs_model import Logs
|
||||
|
||||
|
||||
def db_create_log(ip, action, message, has_succeeded, status_code, table=None, id_user=None):
|
||||
log = Logs(
|
||||
date=datetime.now().strftime('%Y-%m-%dT%H:%M:%S'),
|
||||
ip=ip,
|
||||
action=action,
|
||||
message=message,
|
||||
has_succeeded=has_succeeded,
|
||||
status_code=status_code,
|
||||
table=table,
|
||||
id_user=id_user
|
||||
)
|
||||
db.session.add(log)
|
||||
db.session.commit()
|
||||
return log.json()
|
||||
|
||||
|
||||
def hash_password(salt, password):
|
||||
return hashlib.pbkdf2_hmac('sha256', password.encode('utf-8'), salt, 100000)
|
||||
|
||||
|
||||
def db_login(ip, email, password):
|
||||
user = Users.query.filter(
|
||||
Users.email == email
|
||||
).scalar()
|
||||
|
||||
# Check User and Hash Pass
|
||||
if user and user.hash_pass == hash_password(user.salt, password):
|
||||
message = 'User authenticated.'
|
||||
db_create_log(
|
||||
ip=ip,
|
||||
action='login',
|
||||
message=message,
|
||||
has_succeeded=True,
|
||||
status_code=0,
|
||||
table='users',
|
||||
id_user=user.id
|
||||
)
|
||||
return {'status': 0, 'message': message, 'data': user.json()}
|
||||
else:
|
||||
message = f'Email or password invalid'
|
||||
db_create_log(
|
||||
ip=ip,
|
||||
action='login',
|
||||
message=message,
|
||||
has_succeeded=False,
|
||||
status_code=1,
|
||||
table='users',
|
||||
id_user=None
|
||||
)
|
||||
return {'status': 1, 'message': message} # Email or password invalid
|
||||
|
||||
|
||||
def db_register(ip, email, nickname, password, is_admin=False):
|
||||
user = Users.query.filter(
|
||||
Users.email == email
|
||||
).scalar()
|
||||
if user:
|
||||
message = f'{email} already exist.'
|
||||
db_create_log(
|
||||
ip=ip,
|
||||
action='register',
|
||||
message=message,
|
||||
has_succeeded=False,
|
||||
status_code=1,
|
||||
table='users',
|
||||
id_user=None
|
||||
)
|
||||
return {'status': 1, 'message': message} # User already exist
|
||||
|
||||
# Salt Hash Pass with SHA256
|
||||
salt = os.urandom(32)
|
||||
hash_pass = hash_password(salt, password)
|
||||
|
||||
user = Users(
|
||||
email=email,
|
||||
hash_pass=hash_pass,
|
||||
nickname=nickname,
|
||||
salt=salt,
|
||||
is_admin=is_admin
|
||||
)
|
||||
user_inspect = inspect(user)
|
||||
db.session.add(user)
|
||||
check_inspect = user_inspect.pending
|
||||
db.session.commit()
|
||||
|
||||
# Add to logs
|
||||
if check_inspect:
|
||||
id_user = user.json()['id']
|
||||
message = 'User registered.'
|
||||
has_succeeded = True
|
||||
status_code = 0
|
||||
else:
|
||||
id_user = None
|
||||
message = 'Internal Error: User not registered.'
|
||||
has_succeeded = False
|
||||
status_code = 1
|
||||
|
||||
db_create_log(
|
||||
ip=ip,
|
||||
action='register',
|
||||
message=message,
|
||||
has_succeeded=has_succeeded,
|
||||
status_code=status_code,
|
||||
table='users',
|
||||
id_user=id_user
|
||||
)
|
||||
if status_code == 0:
|
||||
return {'status': 0, 'message': message, 'data': user.json()}
|
||||
elif status_code == 1:
|
||||
return {'status': 1, 'message': message}
|
||||
|
||||
|
||||
def db_user_update(ip, user_id, nickname, password):
|
||||
user = Users.query.filter(
|
||||
Users.id == user_id
|
||||
).scalar()
|
||||
if user:
|
||||
has_succeeded = False
|
||||
status_code = 2
|
||||
if nickname and password:
|
||||
# Salt Hash Pass with SHA256
|
||||
salt = os.urandom(32)
|
||||
hash_pass = hash_password(salt, password)
|
||||
Users.query.filter(Users.id == user_id).update({'nickname': nickname, 'hash_pass': hash_pass, 'salt': salt})
|
||||
db.session.commit()
|
||||
message = 'User nickname and password updated.'
|
||||
has_succeeded = True
|
||||
status_code = 0
|
||||
elif nickname:
|
||||
Users.query.filter(Users.id == user_id).update({'nickname': nickname})
|
||||
db.session.commit()
|
||||
message = 'User nickname updated.'
|
||||
has_succeeded = True
|
||||
status_code = 0
|
||||
elif password:
|
||||
# Salt Hash Pass with SHA256
|
||||
salt = os.urandom(32)
|
||||
hash_pass = hash_password(salt, password)
|
||||
Users.query.filter(Users.id == user_id).update({'hash_pass': hash_pass, 'salt': salt})
|
||||
db.session.commit()
|
||||
message = 'User password updated.'
|
||||
has_succeeded = True
|
||||
status_code = 0
|
||||
else:
|
||||
message = 'Only nickname and/or password can be changed.'
|
||||
|
||||
db_create_log(
|
||||
ip=ip,
|
||||
action='user_update',
|
||||
message=message,
|
||||
has_succeeded=has_succeeded,
|
||||
status_code=status_code,
|
||||
table='users',
|
||||
id_user=user_id
|
||||
)
|
||||
return {'status': status_code, 'message': message, 'data': user.json()}
|
||||
else:
|
||||
message = 'User do not exist.'
|
||||
db_create_log(
|
||||
ip=ip,
|
||||
action='user_update',
|
||||
message=message,
|
||||
has_succeeded=False,
|
||||
status_code=1,
|
||||
table='users',
|
||||
id_user=user_id
|
||||
)
|
||||
return {'status': 1, 'message': message}
|
||||
|
||||
|
||||
def db_user_delete(ip, user_id):
|
||||
user_to_delete = Users.query.filter(Users.id == user_id).scalar()
|
||||
if user_to_delete:
|
||||
is_admin = bool(user_to_delete.json()['is_admin'])
|
||||
if is_admin and (Users.query.filter(Users.is_admin == True).count() <= 1 or user_id == 0):
|
||||
message = 'Can\'t delete last admin'
|
||||
db_create_log(
|
||||
ip=ip,
|
||||
action='user_delete',
|
||||
message=message,
|
||||
has_succeeded=False,
|
||||
status_code=2,
|
||||
table='users',
|
||||
id_user=user_id
|
||||
)
|
||||
return {'status': 2, 'message': message}
|
||||
else:
|
||||
test = Users.query.filter(Users.id == user_id).delete()
|
||||
if test == 1:
|
||||
db.session.commit()
|
||||
message = 'User deleted.'
|
||||
db_create_log(
|
||||
ip=ip,
|
||||
action='user_delete',
|
||||
message=message,
|
||||
has_succeeded=True,
|
||||
status_code=0,
|
||||
table='users',
|
||||
id_user=user_id
|
||||
)
|
||||
return {'status': 0, 'message': message, 'data': None}
|
||||
else:
|
||||
message = 'User do not exist.'
|
||||
db_create_log(
|
||||
ip=ip,
|
||||
action='user_delete',
|
||||
message=message,
|
||||
has_succeeded=False,
|
||||
status_code=1,
|
||||
table='users',
|
||||
id_user=user_id
|
||||
)
|
||||
return {'status': 1, 'message': message}
|
||||
else:
|
||||
message = 'User do not exist.'
|
||||
db_create_log(
|
||||
ip=ip,
|
||||
action='user_delete',
|
||||
message=message,
|
||||
has_succeeded=False,
|
||||
status_code=1,
|
||||
table='users',
|
||||
id_user=user_id
|
||||
)
|
||||
return {'status': 1, 'message': message}
|
||||
|
||||
|
||||
def db_admin_update_user(ip, user_id, is_admin, password):
|
||||
user = Users.query.filter(
|
||||
Users.id == user_id
|
||||
).scalar()
|
||||
if user:
|
||||
has_succeeded = False
|
||||
status_code = 2
|
||||
if is_admin is not None and password:
|
||||
# Salt Hash Pass with SHA256
|
||||
salt = os.urandom(32)
|
||||
hash_pass = hash_password(salt, password)
|
||||
Users.query.filter(Users.id == user_id).update({'is_admin': is_admin, 'hash_pass': hash_pass, 'salt': salt})
|
||||
db.session.commit()
|
||||
message = 'User is_admin and password updated.'
|
||||
has_succeeded = True
|
||||
status_code = 0
|
||||
elif is_admin is not None:
|
||||
Users.query.filter(Users.id == user_id).update({'is_admin': is_admin})
|
||||
db.session.commit()
|
||||
message = 'User is_admin updated.'
|
||||
has_succeeded = True
|
||||
status_code = 0
|
||||
elif password:
|
||||
# Salt Hash Pass with SHA256
|
||||
salt = os.urandom(32)
|
||||
hash_pass = hash_password(salt, password)
|
||||
Users.query.filter(Users.id == user_id).update({'hash_pass': hash_pass, 'salt': salt})
|
||||
db.session.commit()
|
||||
message = 'User password updated.'
|
||||
has_succeeded = True
|
||||
status_code = 0
|
||||
else:
|
||||
message = 'Only is_admin and/or password can be changed.'
|
||||
|
||||
db_create_log(
|
||||
ip=ip,
|
||||
action='user_update',
|
||||
message=message,
|
||||
has_succeeded=has_succeeded,
|
||||
status_code=status_code,
|
||||
table='users',
|
||||
id_user=user_id
|
||||
)
|
||||
return {'status': status_code, 'message': message, 'data': user.json()}
|
||||
else:
|
||||
message = 'User do not exist.'
|
||||
db_create_log(
|
||||
ip=ip,
|
||||
action='user_update',
|
||||
message=message,
|
||||
has_succeeded=False,
|
||||
status_code=1,
|
||||
table='users',
|
||||
id_user=user_id
|
||||
)
|
||||
return {'status': 1, 'message': message}
|
||||
|
||||
|
||||
def db_users(ip, user_id, query, by='email,nickname', id=None, is_admin=None, order_by='email,asc'):
|
||||
# q= or id =
|
||||
# if q= then by= (default: email,nickname) or email or nickname
|
||||
# is_admin =
|
||||
# order_by = (default: email,asc), (nickname,asc), (id,desc), (is_admin,desc)
|
||||
|
||||
users = Users.query
|
||||
if query is id and query is not None:
|
||||
message = 'Query or id field'
|
||||
db_create_log(
|
||||
ip=ip,
|
||||
action='users',
|
||||
message=message,
|
||||
has_succeeded=False,
|
||||
status_code=1,
|
||||
table='users',
|
||||
id_user=user_id
|
||||
)
|
||||
return {'status': 1, 'message': message}
|
||||
|
||||
if query and by:
|
||||
if by == 'email':
|
||||
users = users.filter(Users.email.like('%' + query + '%'))
|
||||
elif by == 'nickname':
|
||||
users = users.filter(Users.nickname.like('%' + query + '%'))
|
||||
else:
|
||||
users = users.filter(or_(Users.nickname.like('%' + query + '%'), Users.email.like('%' + query + '%')))
|
||||
elif query and not by:
|
||||
users = users.filter(or_(Users.nickname.like('%' + query + '%'), Users.email.like('%' + query + '%')))
|
||||
elif id:
|
||||
users = users.filter(Users.id == id)
|
||||
else:
|
||||
pass
|
||||
|
||||
if is_admin is not None:
|
||||
users = users.filter(Users.is_admin == is_admin)
|
||||
|
||||
if order_by is not None:
|
||||
order_by = order_by.split(',')
|
||||
if order_by[0] == 'nickname':
|
||||
order = Users.nickname
|
||||
elif order_by[0] == 'id':
|
||||
order = Users.id
|
||||
elif order_by[0] == 'is_admin':
|
||||
order = Users.is_admin
|
||||
else:
|
||||
order = Users.email
|
||||
if len(order_by) > 1:
|
||||
if order_by[1] == 'asc':
|
||||
users = users.order_by(asc(order))
|
||||
elif order_by[1] == 'desc':
|
||||
users = users.order_by(desc(order))
|
||||
else:
|
||||
users = users.order_by(asc(order))
|
||||
else:
|
||||
users = users.order_by(asc(order))
|
||||
else:
|
||||
users = users.order_by(asc(Users.email))
|
||||
|
||||
users = users.all()
|
||||
|
||||
message = f'query({query}), by({by}), id({id}), is_admin({is_admin}) and order_by({order_by}): {len(users)} result(s)'
|
||||
db_create_log(
|
||||
ip=ip,
|
||||
action='users',
|
||||
message=message,
|
||||
has_succeeded=True,
|
||||
status_code=0,
|
||||
table='users',
|
||||
id_user=user_id
|
||||
)
|
||||
return {'status': 0, 'message': message, 'data': [user.json() for user in users]}
|
||||
|
|
@ -1,41 +0,0 @@
|
|||
from . import db
|
||||
|
||||
|
||||
class Logs(db.Model):
|
||||
__bind_key__ = 'db-logs'
|
||||
|
||||
id = db.Column(db.Integer, primary_key=True)
|
||||
date = db.Column(db.TIMESTAMP(), nullable=False)
|
||||
id_user = db.Column(db.Integer, nullable=True)
|
||||
ip = db.Column(db.String(), nullable=False)
|
||||
table = db.Column(db.String(), nullable=False)
|
||||
action = db.Column(db.String(), nullable=False)
|
||||
message = db.Column(db.String(), nullable=False)
|
||||
has_succeeded = db.Column(db.Boolean, nullable=False)
|
||||
status_code = db.Column(db.Integer, nullable=False)
|
||||
|
||||
def __init__(self, date, id_user, ip, table, action, message, has_succeeded, status_code):
|
||||
self.date = date
|
||||
self.id_user = id_user
|
||||
self.ip = ip
|
||||
self.table = table
|
||||
self.action = action
|
||||
self.message = message
|
||||
self.has_succeeded = has_succeeded
|
||||
self.status_code = status_code
|
||||
|
||||
def __repr__(self):
|
||||
return {
|
||||
'id': self.id,
|
||||
'date': self.date,
|
||||
'id_user': self.id_user,
|
||||
'ip': self.ip,
|
||||
'table': self.table,
|
||||
'action': self.action,
|
||||
'message': self.message,
|
||||
'has_succeeded': self.has_succeeded,
|
||||
'status_code': self.status_code
|
||||
}
|
||||
|
||||
def json(self):
|
||||
return self.__repr__()
|
||||
|
|
@ -1,35 +0,0 @@
|
|||
from flask import current_app as app
|
||||
import json
|
||||
|
||||
|
||||
def send_error(status_code, message, token=None):
|
||||
data_json = {
|
||||
'status': 'error',
|
||||
'message': message
|
||||
}
|
||||
res = app.response_class(
|
||||
response=json.dumps(data_json),
|
||||
status=status_code,
|
||||
mimetype='application/json'
|
||||
)
|
||||
if token is not None:
|
||||
res.set_cookie('SESSIONID', token)
|
||||
return res
|
||||
|
||||
|
||||
def send_message(message, data, token=None, token_delete=False):
|
||||
data_json = {
|
||||
'status': 'success',
|
||||
'message': message,
|
||||
'data': data
|
||||
}
|
||||
res = app.response_class(
|
||||
response=json.dumps(data_json),
|
||||
status=200,
|
||||
mimetype='application/json'
|
||||
)
|
||||
if token is not None:
|
||||
res.set_cookie('SESSIONID', token)
|
||||
if token_delete:
|
||||
res.delete_cookie('SESSIONID')
|
||||
return res
|
||||
|
|
@ -1,337 +0,0 @@
|
|||
from flask import request, Blueprint
|
||||
from werkzeug.exceptions import HTTPException
|
||||
from .responses import send_message, send_error
|
||||
from .api_functions import db_login, db_register, db_user_update, db_create_log, db_user_delete, db_admin_update_user, db_users
|
||||
|
||||
bp = Blueprint('myapp', __name__)
|
||||
|
||||
|
||||
@bp.app_errorhandler(HTTPException)
|
||||
def handle_exception(e):
|
||||
return send_error(e.code, e.name)
|
||||
|
||||
|
||||
# Login
|
||||
@bp.route('/api/login', methods=['POST'])
|
||||
def login():
|
||||
post_json = request.json
|
||||
try:
|
||||
post_ip = str(post_json['ip'])
|
||||
post_email = str(post_json['email'])
|
||||
post_password = str(post_json['password'])
|
||||
if post_email != '' and post_password != '':
|
||||
res = db_login(post_ip, post_email, post_password)
|
||||
if res['status'] == 0:
|
||||
user = res['data']
|
||||
return send_message(res['message'], user)
|
||||
elif res['status'] == 1:
|
||||
return send_error(400, res['message'])
|
||||
else:
|
||||
return send_error(400, 'Empty email and/or password fields.')
|
||||
except KeyError as e:
|
||||
return send_error(400, 'Need email, password fields.')
|
||||
|
||||
|
||||
# Register
|
||||
@bp.route('/api/register', methods=['POST'])
|
||||
def register():
|
||||
post_json = request.json
|
||||
try:
|
||||
post_ip = str(post_json['ip'])
|
||||
post_email = str(post_json['email'])
|
||||
post_nickname = str(post_json['nickname'])
|
||||
post_password = str(post_json['password'])
|
||||
if post_email != '' and post_password != '' and post_nickname != '':
|
||||
res = db_register(post_ip, post_email, post_nickname, post_password)
|
||||
if res['status'] == 1:
|
||||
return send_error(500, res['message'])
|
||||
elif res['status'] == 0:
|
||||
return send_message(res['message'], res['data'])
|
||||
else:
|
||||
return send_error(400, 'Empty email and/or password and/or nickname fields.')
|
||||
except KeyError as e:
|
||||
return send_error(400, 'Need ' + str(e) + 'field.')
|
||||
|
||||
|
||||
# Logout
|
||||
@bp.route('/api/logout', methods=['DELETE'])
|
||||
def logout():
|
||||
post_json = request.json
|
||||
try:
|
||||
post_ip = str(post_json['ip'])
|
||||
post_user_id = str(post_json['user_id'])
|
||||
message = 'User disconnected.'
|
||||
db_create_log(
|
||||
ip=post_ip,
|
||||
action='logout',
|
||||
message=message,
|
||||
has_succeeded=True,
|
||||
status_code=0,
|
||||
table='users',
|
||||
id_user=post_user_id
|
||||
)
|
||||
return send_message(message, None, token_delete=True)
|
||||
except KeyError as e:
|
||||
return send_error(400, 'Need ' + str(e) + 'field.')
|
||||
|
||||
|
||||
# Update User (Nickname, Password)
|
||||
@bp.route('/api/user/update', methods=['PUT'])
|
||||
def user_update():
|
||||
post_json = request.json
|
||||
post_nickname = None
|
||||
post_password = None
|
||||
fields = ''
|
||||
if 'nickname' in post_json:
|
||||
post_nickname = str(post_json['nickname'])
|
||||
else:
|
||||
fields += 'nickname '
|
||||
|
||||
if 'password' in post_json:
|
||||
post_password = str(post_json['password'])
|
||||
else:
|
||||
fields += 'password '
|
||||
|
||||
if post_nickname is not None or post_password is not None:
|
||||
if post_nickname != '' and post_password != '':
|
||||
post_ip = str(post_json['ip'])
|
||||
post_user_id = str(post_json['user_id'])
|
||||
res = db_user_update(post_ip, post_user_id, post_nickname, post_password)
|
||||
if res['status'] == 1:
|
||||
return send_error(500, res['message'])
|
||||
elif res['status'] == 0:
|
||||
return send_message(res['message'], res['data'])
|
||||
else:
|
||||
return send_error(400, 'Empty nickname and/or password fields.')
|
||||
else:
|
||||
return send_error(400, 'Need ' + fields + 'field.')
|
||||
|
||||
|
||||
# Delete User
|
||||
@bp.route('/api/user/delete', methods=['DELETE'])
|
||||
def user_delete():
|
||||
post_json = request.json
|
||||
try:
|
||||
post_ip = str(post_json['ip'])
|
||||
post_user_id = str(post_json['user_id'])
|
||||
res = db_user_delete(post_ip, post_user_id)
|
||||
if res['status'] != 0:
|
||||
return send_error(500, res['message'])
|
||||
else:
|
||||
db_create_log(
|
||||
ip=post_ip,
|
||||
action='delete',
|
||||
message='User deleted.',
|
||||
has_succeeded=True,
|
||||
status_code=0,
|
||||
table='users',
|
||||
id_user=post_user_id
|
||||
)
|
||||
return send_message(res['message'], None, token_delete=True)
|
||||
except KeyError as e:
|
||||
return send_error(400, 'Need ' + str(e) + 'field.')
|
||||
|
||||
|
||||
# Admin : Create User
|
||||
@bp.route('/api/admin/create/user', methods=['POST'])
|
||||
def admin_create_user():
|
||||
post_json = request.json
|
||||
try:
|
||||
post_ip = str(post_json['ip'])
|
||||
post_user_id = str(post_json['user_id'])
|
||||
token_is_admin = bool(post_json['token_is_admin'])
|
||||
if token_is_admin:
|
||||
post_email = None
|
||||
post_nickname = None
|
||||
post_password = None
|
||||
post_is_admin = None
|
||||
fields = ''
|
||||
if 'email' in post_json:
|
||||
post_email = str(post_json['email'])
|
||||
else:
|
||||
fields += 'email '
|
||||
|
||||
if 'nickname' in post_json:
|
||||
post_nickname = str(post_json['nickname'])
|
||||
else:
|
||||
fields += 'nickname '
|
||||
|
||||
if 'password' in post_json:
|
||||
post_password = str(post_json['password'])
|
||||
else:
|
||||
fields += 'password '
|
||||
|
||||
if 'is_admin' in post_json:
|
||||
post_is_admin = bool(post_json['is_admin'])
|
||||
else:
|
||||
fields += 'is_admin '
|
||||
|
||||
if post_email is not None or post_nickname is not None or post_password is not None or post_is_admin is not None:
|
||||
if post_email != '' and post_nickname != '' and post_password != '' and str(post_is_admin) != '':
|
||||
res = db_register(post_ip, post_email, post_nickname, post_password, is_admin=post_is_admin)
|
||||
if res['status'] == 1:
|
||||
db_create_log(
|
||||
ip=post_ip,
|
||||
action='admin/create/user',
|
||||
message=res['message'],
|
||||
has_succeeded=False,
|
||||
status_code=res['status'],
|
||||
table='users',
|
||||
id_user=post_user_id
|
||||
)
|
||||
return send_error(500, res['message'])
|
||||
elif res['status'] == 0:
|
||||
db_create_log(
|
||||
ip=post_ip,
|
||||
action='admin/create/user',
|
||||
message=res['message'],
|
||||
has_succeeded=True,
|
||||
status_code=res['status'],
|
||||
table='users',
|
||||
id_user=post_user_id
|
||||
)
|
||||
return send_message(res['message'], res['data'])
|
||||
else:
|
||||
return send_error(400, 'Empty email and/or nickname and/or password and/or is_admin fields.')
|
||||
else:
|
||||
return send_error(400, 'Need ' + fields + 'field.')
|
||||
else:
|
||||
return send_error(500, 'User does not have permission.')
|
||||
except KeyError as e:
|
||||
return send_error(400, 'Need ' + str(e) + 'field.')
|
||||
|
||||
|
||||
# Admin : Change User password and/or role
|
||||
@bp.route('/api/admin/update/user', methods=['PUT'])
|
||||
def admin_update_user():
|
||||
post_json = request.json
|
||||
try:
|
||||
post_ip = str(post_json['ip'])
|
||||
post_user_id = str(post_json['user_id'])
|
||||
token_is_admin = bool(post_json['token_is_admin'])
|
||||
if token_is_admin:
|
||||
post_is_admin = None
|
||||
post_password = None
|
||||
post_user_id_delete = None
|
||||
fields = ''
|
||||
if 'id' in post_json:
|
||||
post_user_id_delete = int(post_json['id'])
|
||||
else:
|
||||
fields += 'id '
|
||||
|
||||
if 'is_admin' in post_json:
|
||||
post_is_admin = bool(post_json['is_admin'])
|
||||
else:
|
||||
fields += 'is_admin '
|
||||
|
||||
if 'password' in post_json:
|
||||
post_password = str(post_json['password'])
|
||||
else:
|
||||
fields += 'password '
|
||||
|
||||
if post_user_id_delete is not None and (post_is_admin is not None or post_password is not None):
|
||||
if str(post_is_admin) != '' and post_password != '' and str(post_user_id_delete) != '':
|
||||
res = db_admin_update_user(post_ip, post_user_id_delete, post_is_admin, post_password)
|
||||
if res['status'] == 1:
|
||||
db_create_log(
|
||||
ip=post_ip,
|
||||
action='admin/update/user',
|
||||
message=res['message'],
|
||||
has_succeeded=False,
|
||||
status_code=res['status'],
|
||||
table='users',
|
||||
id_user=post_user_id
|
||||
)
|
||||
return send_error(500, res['message'])
|
||||
elif res['status'] == 0:
|
||||
db_create_log(
|
||||
ip=post_ip,
|
||||
action='admin/update/user',
|
||||
message=res['message'],
|
||||
has_succeeded=True,
|
||||
status_code=res['status'],
|
||||
table='users',
|
||||
id_user=post_user_id
|
||||
)
|
||||
return send_message(res['message'], res['data'])
|
||||
else:
|
||||
return send_error(400, 'Empty is_admin and/or password fields.')
|
||||
else:
|
||||
return send_error(400, 'Need ' + fields + 'field.')
|
||||
else:
|
||||
return send_error(500, 'User does not have permission.')
|
||||
except KeyError as e:
|
||||
return send_error(400, 'Need ' + str(e) + 'field.')
|
||||
|
||||
|
||||
# Admin : Delete User
|
||||
@bp.route('/api/admin/delete/user', methods=['DELETE'])
|
||||
def admin_delete_user():
|
||||
post_json = request.json
|
||||
try:
|
||||
post_ip = str(post_json['ip'])
|
||||
post_user_id = str(post_json['user_id'])
|
||||
token_is_admin = bool(post_json['token_is_admin'])
|
||||
if token_is_admin:
|
||||
post_user_id_delete = None
|
||||
fields = ''
|
||||
if 'id' in post_json:
|
||||
post_user_id_delete = int(post_json['id'])
|
||||
else:
|
||||
fields += 'id '
|
||||
if post_user_id_delete is not None:
|
||||
if str(post_user_id_delete) != '':
|
||||
res = db_user_delete(post_ip, int(post_user_id_delete))
|
||||
if res['status'] == 1:
|
||||
db_create_log(
|
||||
ip=post_ip,
|
||||
action='admin/delete/user',
|
||||
message=res['message'],
|
||||
has_succeeded=False,
|
||||
status_code=res['status'],
|
||||
table='users',
|
||||
id_user=post_user_id
|
||||
)
|
||||
return send_error(500, res['message'])
|
||||
else:
|
||||
db_create_log(
|
||||
ip=post_ip,
|
||||
action='admin/delete/user',
|
||||
message=res['message'],
|
||||
has_succeeded=True,
|
||||
status_code=res['status'],
|
||||
table='users',
|
||||
id_user=post_user_id
|
||||
)
|
||||
return send_message(res['message'], None)
|
||||
else:
|
||||
return send_error(400, 'Empty id field.')
|
||||
else:
|
||||
return send_error(400, 'Need ' + fields + 'field.')
|
||||
else:
|
||||
return send_error(500, 'User does not have permission.')
|
||||
except KeyError as e:
|
||||
return send_error(400, 'Need ' + str(e) + 'field.')
|
||||
|
||||
|
||||
# List of User (must be authenticated) & Search
|
||||
@bp.route('/api/users', methods=['GET'])
|
||||
def users():
|
||||
get_ip = request.args.get('ip')
|
||||
get_user_id = request.args.get('user_id')
|
||||
get_query = request.args.get('q')
|
||||
get_by = request.args.get('by')
|
||||
get_id = request.args.get('id')
|
||||
get_is_admin = request.args.get('is_admin')
|
||||
get_order_by = request.args.get('order_by')
|
||||
|
||||
if get_ip is None:
|
||||
get_ip = request.remote_addr
|
||||
if get_user_id is None:
|
||||
get_user_id = 0
|
||||
print(get_ip, get_user_id, get_query, get_by, get_id, get_is_admin, get_order_by)
|
||||
res = db_users(get_ip, get_user_id, get_query, get_by, get_id, get_is_admin, get_order_by)
|
||||
if res['status'] == 1:
|
||||
return send_error(500, res['message'])
|
||||
else:
|
||||
return send_message(res['message'], res['data'])
|
||||
|
|
@ -1,37 +0,0 @@
|
|||
from . import db
|
||||
|
||||
|
||||
class Users(db.Model):
|
||||
__bind_key__ = 'db-users'
|
||||
|
||||
id = db.Column(db.Integer, primary_key=True, autoincrement=True)
|
||||
email = db.Column(db.String(), nullable=False, unique=True)
|
||||
nickname = db.Column(db.String(), nullable=False)
|
||||
hash_pass = db.Column(db.LargeBinary(), nullable=False)
|
||||
salt = db.Column(db.LargeBinary(), nullable=False)
|
||||
is_admin = db.Column(db.Boolean, default=False, nullable=False)
|
||||
|
||||
def __init__(self, email, nickname, hash_pass, salt, is_admin):
|
||||
self.email = email
|
||||
self.hash_pass = hash_pass
|
||||
self.nickname = nickname
|
||||
self.salt = salt
|
||||
self.is_admin = is_admin
|
||||
|
||||
def __repr__(self):
|
||||
return {
|
||||
'id': self.id,
|
||||
'email': self.email,
|
||||
'nickname': self.nickname,
|
||||
'hash_pass': self.hash_pass,
|
||||
'salt': self.salt,
|
||||
'is_admin': self.is_admin
|
||||
}
|
||||
|
||||
def json(self):
|
||||
return {
|
||||
'id': self.id,
|
||||
'email': self.email,
|
||||
'nickname': self.nickname,
|
||||
'is_admin': self.is_admin
|
||||
}
|
||||
|
|
@ -1,43 +0,0 @@
|
|||
import os
|
||||
|
||||
basedir = os.path.abspath(os.path.dirname(__file__))
|
||||
|
||||
|
||||
class Config(object):
|
||||
DEBUG = False
|
||||
TESTING = False
|
||||
CSRF_ENABLED = True
|
||||
|
||||
FLASK_APP = os.environ.get('FLASK_APP', None)
|
||||
FLASK_ENV = os.environ.get('FLASK_ENV', None)
|
||||
|
||||
SQLALCHEMY_ECHO = False
|
||||
SQLALCHEMY_TRACK_MODIFICATIONS = False
|
||||
SQLALCHEMY_DATABASE_URI_1 = os.environ.get('DATABASE_URL_USERS', None)
|
||||
SQLALCHEMY_DATABASE_URI_2 = os.environ.get('DATABASE_URL_LOGS', None)
|
||||
SQLALCHEMY_BINDS = {
|
||||
'db-users': SQLALCHEMY_DATABASE_URI_1,
|
||||
'db-logs': SQLALCHEMY_DATABASE_URI_2
|
||||
}
|
||||
|
||||
SECRET_KEY = os.environ.get('SECRET_KEY', 'default_secret_key')
|
||||
ALLOW_ORIGIN = os.environ.get('ALLOW_ORIGIN', None)
|
||||
|
||||
|
||||
class ProductionConfig(Config):
|
||||
DEBUG = False
|
||||
SQLALCHEMY_ECHO = False
|
||||
SQLALCHEMY_TRACK_MODIFICATIONS = False
|
||||
|
||||
|
||||
class TestingConfig(Config):
|
||||
TESTING = True
|
||||
SQLALCHEMY_ECHO = False
|
||||
SQLALCHEMY_TRACK_MODIFICATIONS = False
|
||||
|
||||
|
||||
class DevelopmentConfig(Config):
|
||||
DEVELOPMENT = True
|
||||
DEBUG = True
|
||||
SQLALCHEMY_ECHO = True
|
||||
SQLALCHEMY_TRACK_MODIFICATIONS = True
|
||||
|
|
@ -1,47 +0,0 @@
|
|||
import os
|
||||
from application.users_model import Users
|
||||
from application.api_functions import hash_password
|
||||
|
||||
TAB_USER_WITH_PASSWORD = [
|
||||
{
|
||||
"id": 1,
|
||||
"email": "riri@gmail.com",
|
||||
"nickname": "Riri",
|
||||
"password": "ririPass",
|
||||
"is_admin": False
|
||||
},
|
||||
{
|
||||
"id": 2,
|
||||
"email": "fifi@gmail.com",
|
||||
"nickname": "Fifi",
|
||||
"password": "fifiPass",
|
||||
"is_admin": False
|
||||
},
|
||||
{
|
||||
"id": 3,
|
||||
"email": "donald@gmail.com",
|
||||
"nickname": "Donald",
|
||||
"password": "donaldPass",
|
||||
"is_admin": False
|
||||
},
|
||||
{
|
||||
"id": 4,
|
||||
"email": "daisy@gmail.com",
|
||||
"nickname": "Daisy",
|
||||
"password": "daisyPass",
|
||||
"is_admin": True
|
||||
},
|
||||
]
|
||||
|
||||
|
||||
# Convert user with passord (uwp) to user
|
||||
def uwp_to_user(uwp):
|
||||
salt0 = os.urandom(32)
|
||||
hash_pass0 = hash_password(salt0, uwp["password"])
|
||||
return Users(
|
||||
email=uwp["email"],
|
||||
nickname=uwp["nickname"],
|
||||
hash_pass=hash_pass0,
|
||||
salt=salt0,
|
||||
is_admin=uwp["is_admin"]
|
||||
)
|
||||
|
|
@ -1,13 +0,0 @@
|
|||
alembic==1.7.5
|
||||
Flask==2.0.2
|
||||
Flask-Migrate==3.1.0
|
||||
Flask-Script==2.0.6
|
||||
Flask-Testing==0.8.1
|
||||
Flask-SQLAlchemy==2.5.1
|
||||
Flask-WTF==0.15.1
|
||||
pipreqs==0.4.10
|
||||
PyJWT==2.3.0
|
||||
pytest==6.2.5
|
||||
SQLAlchemy==1.4.27
|
||||
psycopg2==2.9.2
|
||||
Flask-Cors==3.0.10
|
||||
643
backend/test.py
643
backend/test.py
|
|
@ -1,643 +0,0 @@
|
|||
import unittest
|
||||
from flask_testing import TestCase
|
||||
from fictive_users import TAB_USER_WITH_PASSWORD, uwp_to_user
|
||||
from application import db, create_app
|
||||
|
||||
|
||||
class BaseTestCase(TestCase):
|
||||
|
||||
def create_app(self):
|
||||
app = create_app('testing')
|
||||
return app
|
||||
|
||||
def setUp(self):
|
||||
db.drop_all()
|
||||
db.create_all()
|
||||
for uwp in TAB_USER_WITH_PASSWORD:
|
||||
db.session.add(uwp_to_user(uwp))
|
||||
db.session.commit()
|
||||
|
||||
def tearDown(self):
|
||||
db.session.remove()
|
||||
db.drop_all()
|
||||
db.create_all()
|
||||
|
||||
|
||||
class FlaskTestCase(BaseTestCase):
|
||||
|
||||
# -- UTILS ---
|
||||
|
||||
def login(self, email, password):
|
||||
data0 = {
|
||||
"ip": "127.0.0.1",
|
||||
"email": email,
|
||||
"password": password
|
||||
}
|
||||
response = self.client.post('/api/login', json=data0)
|
||||
return response
|
||||
|
||||
# --- LOGIN ---
|
||||
|
||||
def test_login_NoFields_statusCode(self):
|
||||
response = self.client.post('/api/login', json={})
|
||||
self.assertEqual(response.status_code, 400)
|
||||
|
||||
def test_login_NoFields_message(self):
|
||||
response = self.client.post('/api/login', json={})
|
||||
self.assertEqual(response.json['message'], 'Need email, password fields.')
|
||||
|
||||
def test_login_emptyFields_statusCode(self):
|
||||
data0 = {
|
||||
"ip": "127.0.0.1",
|
||||
"email": "",
|
||||
"password": "blabla"
|
||||
}
|
||||
response = self.client.post('/api/login', json=data0)
|
||||
self.assertEqual(response.status_code, 400)
|
||||
|
||||
def test_login_emptyFields_message(self):
|
||||
data0 = {
|
||||
"ip": "127.0.0.1",
|
||||
"email": "",
|
||||
"password": "blabla"
|
||||
}
|
||||
response = self.client.post('/api/login', json=data0)
|
||||
self.assertEqual(response.json['message'], 'Empty email and/or password fields.')
|
||||
|
||||
def test_login_wrongFields_statusCode(self):
|
||||
data0 = {
|
||||
"ip": "127.0.0.1",
|
||||
"email": "nimp@gmail.com",
|
||||
"password": "nimp"
|
||||
}
|
||||
response = self.client.post('/api/login', json=data0)
|
||||
self.assertEqual(response.status_code, 400)
|
||||
|
||||
def test_login_wrongFields_message(self):
|
||||
data0 = {
|
||||
"ip": "127.0.0.1",
|
||||
"email": "nimp@gmail.com",
|
||||
"password": "nimp"
|
||||
}
|
||||
response = self.client.post('/api/login', json=data0)
|
||||
self.assertEqual(response.json['message'], 'Email or password invalid')
|
||||
|
||||
def test_login_success_statusCode(self):
|
||||
data0 = {
|
||||
"ip": "127.0.0.1",
|
||||
"email": "riri@gmail.com",
|
||||
"password": "ririPass"
|
||||
}
|
||||
response = self.client.post('/api/login', json=data0)
|
||||
self.assertEqual(response.status_code, 200)
|
||||
|
||||
def test_login_success_message(self):
|
||||
data0 = {
|
||||
"ip": "127.0.0.1",
|
||||
"email": "riri@gmail.com",
|
||||
"password": "ririPass"
|
||||
}
|
||||
response = self.client.post('/api/login', json=data0)
|
||||
self.assertEqual(response.json['message'], 'User authenticated.')
|
||||
|
||||
# --- REGISTER ---
|
||||
|
||||
def test_register_noFields_statusCode(self):
|
||||
response = self.client.post('/api/register', json={})
|
||||
self.assertEqual(response.status_code, 400)
|
||||
|
||||
def test_register_noFields_message(self):
|
||||
response = self.client.post('/api/register', json={})
|
||||
self.assertIn('Need', response.json['message'])
|
||||
|
||||
def test_register_emptyFields_statusCode(self):
|
||||
data0 = {
|
||||
"ip": "127.0.0.1",
|
||||
"email": "",
|
||||
"password": "blabla",
|
||||
"nickname": "blabla"
|
||||
}
|
||||
response = self.client.post('/api/register', json=data0)
|
||||
self.assertEqual(response.status_code, 400)
|
||||
|
||||
def test_register_emptyFields_message(self):
|
||||
data0 = {
|
||||
"ip": "127.0.0.1",
|
||||
"email": "",
|
||||
"password": "blabla",
|
||||
"nickname": "blabla"
|
||||
}
|
||||
response = self.client.post('/api/register', json=data0)
|
||||
self.assertEqual(response.json['message'], 'Empty email and/or password and/or nickname fields.')
|
||||
|
||||
def test_register_alreadyExist_statusCode(self):
|
||||
data0 = {
|
||||
"ip": "127.0.0.1",
|
||||
"email": "riri@gmail.com",
|
||||
"password": "blabla",
|
||||
"nickname": "blabla"
|
||||
}
|
||||
response = self.client.post('/api/register', json=data0)
|
||||
self.assertEqual(response.status_code, 500)
|
||||
|
||||
def test_register_alreadyExist_message(self):
|
||||
data0 = {
|
||||
"ip": "127.0.0.1",
|
||||
"email": "riri@gmail.com",
|
||||
"password": "blabla",
|
||||
"nickname": "blabla"
|
||||
}
|
||||
response = self.client.post('/api/register', json=data0)
|
||||
self.assertIn('already exist', response.json['message'])
|
||||
|
||||
def test_register_success_statusCode(self):
|
||||
data0 = {
|
||||
"ip": "127.0.0.1",
|
||||
"email": "loulou@gmail.com",
|
||||
"password": "loulouPass",
|
||||
"nickname": "Loulou"
|
||||
}
|
||||
response = self.client.post('/api/register', json=data0)
|
||||
self.assertEqual(response.status_code, 200)
|
||||
|
||||
def test_register_success_message(self):
|
||||
data0 = {
|
||||
"ip": "127.0.0.1",
|
||||
"email": "loulou@gmail.com",
|
||||
"password": "loulouPass",
|
||||
"nickname": "Loulou"
|
||||
}
|
||||
response = self.client.post('/api/register', json=data0)
|
||||
self.assertEqual(response.json['message'], 'User registered.')
|
||||
|
||||
# --- LOGOUT ---
|
||||
|
||||
def test_logout_fail_(self):
|
||||
response = self.client.delete('/api/logout', json={})
|
||||
self.assertEqual(response.status_code, 400)
|
||||
|
||||
def test_logout_success(self):
|
||||
response = self.login("riri@gmail.com", "ririPass")
|
||||
self.assertEqual(response.status_code, 200)
|
||||
data0 = {
|
||||
"ip": "127.0.0.1",
|
||||
"user_id": 0
|
||||
}
|
||||
response = self.client.delete('/api/logout', json=data0)
|
||||
self.assertEqual(response.status_code, 200)
|
||||
|
||||
# --- USER/UPDATE ---
|
||||
|
||||
def test_userUpdate_noFields_statusCode(self):
|
||||
response = self.login("riri@gmail.com", "ririPass")
|
||||
self.assertEqual(response.status_code, 200)
|
||||
response = self.client.put('/api/user/update', json={})
|
||||
self.assertEqual(response.status_code, 400)
|
||||
|
||||
def test_userUpdate_noFields_message(self):
|
||||
response = self.login("riri@gmail.com", "ririPass")
|
||||
self.assertEqual(response.status_code, 200)
|
||||
response = self.client.put('/api/user/update', json={})
|
||||
self.assertIn('Need', response.json['message'])
|
||||
|
||||
def test_userUpdate_emptyFields_statusCode(self):
|
||||
response = self.login("riri@gmail.com", "ririPass")
|
||||
self.assertEqual(response.status_code, 200)
|
||||
data0 = {
|
||||
"ip": "127.0.0.1",
|
||||
"user_id": 0,
|
||||
"nickname": "",
|
||||
"password": "blabla"
|
||||
}
|
||||
response = self.client.put('/api/user/update', json=data0)
|
||||
self.assertEqual(response.status_code, 400)
|
||||
|
||||
def test_userUpdate_emptyFields_message(self):
|
||||
response = self.login("riri@gmail.com", "ririPass")
|
||||
self.assertEqual(response.status_code, 200)
|
||||
data0 = {
|
||||
"ip": "127.0.0.1",
|
||||
"user_id": 0,
|
||||
"nickname": "",
|
||||
"password": "blabla"
|
||||
}
|
||||
response = self.client.put('/api/user/update', json=data0)
|
||||
self.assertEqual(response.json['message'], 'Empty nickname and/or password fields.')
|
||||
|
||||
def test_self_update_success_statusCode(self):
|
||||
response = self.login("riri@gmail.com", "ririPass")
|
||||
self.assertEqual(response.status_code, 200)
|
||||
data0 = {
|
||||
"ip": "127.0.0.1",
|
||||
"user_id": 1,
|
||||
"nickname": "Ririri",
|
||||
"password": "ririPass"
|
||||
}
|
||||
response = self.client.put('/api/user/update', json=data0)
|
||||
self.assertEqual(response.status_code, 200)
|
||||
|
||||
# --- USER/DELETE ---
|
||||
|
||||
def test_userDelete_success_statusCode(self):
|
||||
response = self.login('riri@gmail.com', 'ririPass')
|
||||
self.assertEqual(response.status_code, 200)
|
||||
data0 = {
|
||||
"ip": "127.0.0.1",
|
||||
"user_id": 1
|
||||
}
|
||||
response = self.client.delete('/api/user/delete', json=data0)
|
||||
self.assertEqual(response.status_code, 200)
|
||||
|
||||
def test_userDelete_success_message(self):
|
||||
response = self.login('riri@gmail.com', 'ririPass')
|
||||
self.assertEqual(response.status_code, 200)
|
||||
data0 = {
|
||||
"ip": "127.0.0.1",
|
||||
"user_id": 1
|
||||
}
|
||||
response = self.client.delete('/api/user/delete', json=data0)
|
||||
self.assertEqual(response.json['message'], 'User deleted.')
|
||||
|
||||
def test_userDelete_lastAdmin_statusCode(self):
|
||||
response = self.login('donald@gmail.com', 'donaldPass')
|
||||
self.assertEqual(response.status_code, 200)
|
||||
data0 = {
|
||||
"ip": "127.0.0.1",
|
||||
"user_id": 3
|
||||
}
|
||||
response = self.client.delete('/api/user/delete', json=data0)
|
||||
self.assertEqual(response.status_code, 200)
|
||||
response = self.login('daisy@gmail.com', 'daisyPass')
|
||||
self.assertEqual(response.status_code, 200)
|
||||
data0 = {
|
||||
"ip": "127.0.0.1",
|
||||
"user_id": 4
|
||||
}
|
||||
response = self.client.delete('/api/user/delete', json=data0)
|
||||
self.assertEqual(response.status_code, 500)
|
||||
|
||||
def test_userDelete_lastAdmin_message(self):
|
||||
response = self.login('donald@gmail.com', 'donaldPass')
|
||||
self.assertEqual(response.status_code, 200)
|
||||
data0 = {
|
||||
"ip": "127.0.0.1",
|
||||
"user_id": 3
|
||||
}
|
||||
response = self.client.delete('/api/user/delete', json=data0)
|
||||
self.assertEqual(response.status_code, 200)
|
||||
response = self.login('daisy@gmail.com', 'daisyPass')
|
||||
self.assertEqual(response.status_code, 200)
|
||||
data0 = {
|
||||
"ip": "127.0.0.1",
|
||||
"user_id": 4
|
||||
}
|
||||
response = self.client.delete('/api/user/delete', json=data0)
|
||||
self.assertEqual(response.json['message'], 'Can\'t delete last admin')
|
||||
|
||||
# --- ADMIN/CREATE/USER ---
|
||||
|
||||
def test_adminCreate_noPermission_statusCode(self):
|
||||
response = self.login('riri@gmail.com', 'ririPass')
|
||||
self.assertEqual(response.status_code, 200)
|
||||
data0 = {
|
||||
"ip": "127.0.0.1",
|
||||
"user_id": 1,
|
||||
"token_is_admin": False,
|
||||
"email": "azd",
|
||||
"nickname": "Mickey",
|
||||
"password": "mickeyPass",
|
||||
"is_admin": True
|
||||
}
|
||||
response = self.client.post('/api/admin/create/user', json=data0)
|
||||
self.assertEqual(response.status_code, 500)
|
||||
|
||||
def test_adminCreate_noPermission_message(self):
|
||||
response = self.login('riri@gmail.com', 'ririPass')
|
||||
self.assertEqual(response.status_code, 200)
|
||||
data0 = {
|
||||
"ip": "127.0.0.1",
|
||||
"user_id": 1,
|
||||
"token_is_admin": False,
|
||||
"email": "azd",
|
||||
"nickname": "Mickey",
|
||||
"password": "mickeyPass",
|
||||
"is_admin": True
|
||||
}
|
||||
response = self.client.post('/api/admin/create/user', json=data0)
|
||||
self.assertEqual(response.json['message'], 'User does not have permission.')
|
||||
|
||||
def test_adminCreate_noFields_statusCode(self):
|
||||
response = self.login('daisy@gmail.com', 'daisyPass')
|
||||
self.assertEqual(response.status_code, 200)
|
||||
response = self.client.post('/api/admin/create/user', json={})
|
||||
self.assertEqual(response.status_code, 400)
|
||||
|
||||
def test_adminCreate_noFields_message(self):
|
||||
response = self.login('daisy@gmail.com', 'daisyPass')
|
||||
self.assertEqual(response.status_code, 200)
|
||||
response = self.client.post('/api/admin/create/user', json={})
|
||||
self.assertIn('Need', response.json['message'])
|
||||
|
||||
def test_adminCreate_emptyFields_statusCode(self):
|
||||
response = self.login('daisy@gmail.com', 'daisyPass')
|
||||
self.assertEqual(response.status_code, 200)
|
||||
data0 = {
|
||||
"ip": "127.0.0.1",
|
||||
"user_id": 4,
|
||||
"token_is_admin": True,
|
||||
"email": "",
|
||||
"nickname": "Mickey",
|
||||
"password": "mickeyPass",
|
||||
"is_admin": True,
|
||||
}
|
||||
response = self.client.post('/api/admin/create/user', json=data0)
|
||||
self.assertEqual(response.status_code, 400)
|
||||
|
||||
def test_adminCreate_emptyFields_message(self):
|
||||
response = self.login('daisy@gmail.com', 'daisyPass')
|
||||
self.assertEqual(response.status_code, 200)
|
||||
data0 = {
|
||||
"ip": "127.0.0.1",
|
||||
"user_id": 4,
|
||||
"token_is_admin": True,
|
||||
"email": "",
|
||||
"nickname": "Mickey",
|
||||
"password": "mickeyPass",
|
||||
"is_admin": True,
|
||||
}
|
||||
response = self.client.post('/api/admin/create/user', json=data0)
|
||||
self.assertEqual(response.json['message'],
|
||||
'Empty email and/or nickname and/or password and/or is_admin fields.')
|
||||
|
||||
def test_adminCreate_alreadyExist_statusCode(self):
|
||||
response = self.login('daisy@gmail.com', 'daisyPass')
|
||||
self.assertEqual(response.status_code, 200)
|
||||
data0 = {
|
||||
"ip": "127.0.0.1",
|
||||
"user_id": 4,
|
||||
"token_is_admin": True,
|
||||
"email": "riri@gmail.com",
|
||||
"passord": "blabla",
|
||||
"nickname": "blabla",
|
||||
}
|
||||
response = self.client.post('/api/admin/create/user', json=data0)
|
||||
self.assertEqual(response.status_code, 500)
|
||||
|
||||
def test_adminCreate_alreadyExist_message(self):
|
||||
response = self.login('daisy@gmail.com', 'daisyPass')
|
||||
self.assertEqual(response.status_code, 200)
|
||||
data0 = {
|
||||
"ip": "127.0.0.1",
|
||||
"user_id": 4,
|
||||
"token_is_admin": True,
|
||||
"email": "riri@gmail.com",
|
||||
"passord": "blabla",
|
||||
"nickname": "blabla",
|
||||
}
|
||||
response = self.client.post('/api/admin/create/user', json=data0)
|
||||
self.assertIn('already exist', response.json['message'])
|
||||
|
||||
def test_adminCreate_success_statusCode(self):
|
||||
response = self.login('daisy@gmail.com', 'daisyPass')
|
||||
self.assertEqual(response.status_code, 200)
|
||||
data0 = {
|
||||
"ip": "127.0.0.1",
|
||||
"user_id": 4,
|
||||
"token_is_admin": True,
|
||||
"email": "mickey@gmail.com",
|
||||
"nickname": "Mickey",
|
||||
"password": "mickeyPass",
|
||||
"is_admin": True,
|
||||
}
|
||||
response = self.client.post('/api/admin/create/user', json=data0)
|
||||
self.assertEqual(response.status_code, 200)
|
||||
|
||||
def test_adminCreate_success_message(self):
|
||||
response = self.login('daisy@gmail.com', 'daisyPass')
|
||||
self.assertEqual(response.status_code, 200)
|
||||
data0 = {
|
||||
"ip": "127.0.0.1",
|
||||
"user_id": 4,
|
||||
"token_is_admin": True,
|
||||
"email": "mickey@gmail.com",
|
||||
"nickname": "Mickey",
|
||||
"password": "mickeyPass",
|
||||
"is_admin": True,
|
||||
}
|
||||
response = self.client.post('/api/admin/create/user', json=data0)
|
||||
self.assertEqual(response.json['message'], 'User registered.')
|
||||
|
||||
# --- ADMIN/UPDATE/USER ---
|
||||
|
||||
def test_adminUpdate_noPermission_statusCode(self):
|
||||
response = self.login('riri@gmail.com', 'ririPass')
|
||||
self.assertEqual(response.status_code, 200)
|
||||
data0 = {
|
||||
"ip": "127.0.0.1",
|
||||
"user_id": 1,
|
||||
"token_is_admin": False
|
||||
}
|
||||
response = self.client.put('/api/admin/update/user', json=data0)
|
||||
self.assertEqual(response.status_code, 500)
|
||||
|
||||
def test_adminUpdate_noPermission_message(self):
|
||||
response = self.login('riri@gmail.com', 'ririPass')
|
||||
self.assertEqual(response.status_code, 200)
|
||||
data0 = {
|
||||
"ip": "127.0.0.1",
|
||||
"user_id": 1,
|
||||
"token_is_admin": False
|
||||
}
|
||||
response = self.client.put('/api/admin/update/user', json=data0)
|
||||
self.assertEqual(response.json['message'], 'User does not have permission.')
|
||||
|
||||
def test_adminUpdate_noFields_statusCode(self):
|
||||
response = self.login('daisy@gmail.com', 'daisyPass')
|
||||
self.assertEqual(response.status_code, 200)
|
||||
response = self.client.put('/api/admin/update/user', json={})
|
||||
self.assertEqual(response.status_code, 400)
|
||||
|
||||
def test_adminUpdate_noFields_message(self):
|
||||
response = self.login('daisy@gmail.com', 'daisyPass')
|
||||
self.assertEqual(response.status_code, 200)
|
||||
response = self.client.put('/api/admin/update/user', json={})
|
||||
self.assertIn('Need', response.json['message'])
|
||||
|
||||
def test_adminUpdate_emptyFields_statusCode(self):
|
||||
response = self.login('daisy@gmail.com', 'daisyPass')
|
||||
self.assertEqual(response.status_code, 200)
|
||||
data0 = {
|
||||
"ip": "127.0.0.1",
|
||||
"user_id": 4,
|
||||
"token_is_admin": True,
|
||||
"id": 1,
|
||||
"password": "",
|
||||
"is_admin": False,
|
||||
}
|
||||
response = self.client.put('/api/admin/update/user', json=data0)
|
||||
self.assertEqual(response.status_code, 400)
|
||||
|
||||
def test_adminUpdate_emptyFields_message(self):
|
||||
response = self.login('daisy@gmail.com', 'daisyPass')
|
||||
self.assertEqual(response.status_code, 200)
|
||||
data0 = {
|
||||
"ip": "127.0.0.1",
|
||||
"user_id": 4,
|
||||
"token_is_admin": True,
|
||||
"id": 1,
|
||||
"password": "",
|
||||
"is_admin": False,
|
||||
}
|
||||
response = self.client.put('/api/admin/update/user', json=data0)
|
||||
self.assertEqual(response.json['message'], 'Empty is_admin and/or password fields.')
|
||||
|
||||
def test_adminUpdate_notExists_statusCode(self):
|
||||
response = self.login('daisy@gmail.com', 'daisyPass')
|
||||
self.assertEqual(response.status_code, 200)
|
||||
data0 = {
|
||||
"ip": "127.0.0.1",
|
||||
"user_id": 4,
|
||||
"token_is_admin": True,
|
||||
"id": 99,
|
||||
"password": "blabla",
|
||||
"is_admin": False
|
||||
}
|
||||
response = self.client.put('/api/admin/update/user', json=data0)
|
||||
self.assertEqual(response.status_code, 500)
|
||||
|
||||
def test_adminUpdate_notExists_message(self):
|
||||
response = self.login('daisy@gmail.com', 'daisyPass')
|
||||
self.assertEqual(response.status_code, 200)
|
||||
data0 = {
|
||||
"ip": "127.0.0.1",
|
||||
"user_id": 4,
|
||||
"token_is_admin": True,
|
||||
"id": 99,
|
||||
"password": "blabla",
|
||||
"is_admin": False
|
||||
}
|
||||
response = self.client.put('/api/admin/update/user', json=data0)
|
||||
self.assertEqual(response.json['message'], 'User do not exist.')
|
||||
|
||||
def test_adminUpdate_success_message(self):
|
||||
response = self.login('daisy@gmail.com', 'daisyPass')
|
||||
self.assertEqual(response.status_code, 200)
|
||||
data0 = {
|
||||
"ip": "127.0.0.1",
|
||||
"user_id": 4,
|
||||
"token_is_admin": True,
|
||||
"id": 1,
|
||||
"password": "roroPass",
|
||||
"is_admin": False,
|
||||
}
|
||||
response = self.client.put('/api/admin/update/user', json=data0)
|
||||
self.assertEqual(response.status_code, 200)
|
||||
|
||||
def test_adminUpdate_success_statusCode(self):
|
||||
response = self.login('daisy@gmail.com', 'daisyPass')
|
||||
self.assertEqual(response.status_code, 200)
|
||||
data0 = {
|
||||
"ip": "127.0.0.1",
|
||||
"user_id": 4,
|
||||
"token_is_admin": True,
|
||||
"id": 1,
|
||||
"password": "roroPass",
|
||||
"is_admin": False,
|
||||
}
|
||||
response = self.client.put('/api/admin/update/user', json=data0)
|
||||
self.assertIn("updated", response.json['message'])
|
||||
# --- ADMIN/DELETE/USER ---
|
||||
|
||||
def test_adminDelete_noPermission_statusCode(self):
|
||||
response = self.login('riri@gmail.com', 'ririPass')
|
||||
self.assertEqual(response.status_code, 200)
|
||||
data0 = {
|
||||
"ip": "127.0.0.1",
|
||||
"user_id": 1,
|
||||
"token_is_admin": False
|
||||
}
|
||||
response = self.client.delete('/api/admin/delete/user', json=data0)
|
||||
self.assertEqual(response.status_code, 500)
|
||||
|
||||
def test_adminDelete_noPermission_message(self):
|
||||
response = self.login('riri@gmail.com', 'ririPass')
|
||||
self.assertEqual(response.status_code, 200)
|
||||
data0 = {
|
||||
"ip": "127.0.0.1",
|
||||
"user_id": 1,
|
||||
"token_is_admin": False
|
||||
}
|
||||
response = self.client.delete('/api/admin/delete/user', json=data0)
|
||||
self.assertEqual(response.json['message'], 'User does not have permission.')
|
||||
|
||||
def test_adminDelete_noFields_statusCode(self):
|
||||
response = self.login('daisy@gmail.com', 'daisyPass')
|
||||
self.assertEqual(response.status_code, 200)
|
||||
response = self.client.delete('/api/admin/delete/user/')
|
||||
self.assertEqual(response.status_code, 404)
|
||||
|
||||
def test_adminDelete_no_fields(self):
|
||||
response = self.login('daisy@gmail.com', 'daisyPass')
|
||||
self.assertEqual(response.status_code, 200)
|
||||
response = self.client.delete('/api/admin/delete/user/')
|
||||
self.assertEqual('Not Found', response.json['message'])
|
||||
|
||||
def test_adminDelete_notExists_statusCode(self):
|
||||
response = self.login('daisy@gmail.com', 'daisyPass')
|
||||
self.assertEqual(response.status_code, 200)
|
||||
data0 = {
|
||||
"ip": "127.0.0.1",
|
||||
"user_id": 4,
|
||||
"token_is_admin": True,
|
||||
"id": 99
|
||||
}
|
||||
response = self.client.delete('/api/admin/delete/user', json=data0)
|
||||
self.assertEqual(response.status_code, 500)
|
||||
|
||||
def test_adminDelete_notExists_message(self):
|
||||
response = self.login('daisy@gmail.com', 'daisyPass')
|
||||
self.assertEqual(response.status_code, 200)
|
||||
data0 = {
|
||||
"ip": "127.0.0.1",
|
||||
"user_id": 4,
|
||||
"token_is_admin": True,
|
||||
"id": 99
|
||||
}
|
||||
response = self.client.delete('/api/admin/delete/user', json=data0)
|
||||
self.assertEqual(response.json['message'], 'User do not exist.')
|
||||
|
||||
def test_adminDelete_success_statusCode(self):
|
||||
response = self.login('daisy@gmail.com', 'daisyPass')
|
||||
self.assertEqual(response.status_code, 200)
|
||||
data0 = {
|
||||
"ip": "127.0.0.1",
|
||||
"user_id": 4,
|
||||
"token_is_admin": True,
|
||||
"id": 2
|
||||
}
|
||||
response = self.client.delete('/api/admin/delete/user', json=data0)
|
||||
self.assertEqual(response.status_code, 200)
|
||||
|
||||
def test_adminDelete_success_message(self):
|
||||
response = self.login('daisy@gmail.com', 'daisyPass')
|
||||
self.assertEqual(response.status_code, 200)
|
||||
data0 = {
|
||||
"ip": "127.0.0.1",
|
||||
"user_id": 4,
|
||||
"token_is_admin": True,
|
||||
"id": 2
|
||||
}
|
||||
response = self.client.delete('/api/admin/delete/user', json=data0)
|
||||
self.assertEqual(response.json['message'], 'User deleted.')
|
||||
|
||||
# --- LIST OF USER ---
|
||||
|
||||
def test_listOfUsers_success(self):
|
||||
response = self.login('riri@gmail.com', 'ririPass')
|
||||
self.assertEqual(response.status_code, 200)
|
||||
response = self.client.get('/api/users?order_by=nickname')
|
||||
self.assertEqual(response.status_code, 200)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
|
|
@ -1,34 +0,0 @@
|
|||
version: '3.8'
|
||||
|
||||
services:
|
||||
backend:
|
||||
container_name: backend
|
||||
build: ./backend
|
||||
command: python -m flask run --host=0.0.0.0
|
||||
ports:
|
||||
- "5000:5000"
|
||||
volumes:
|
||||
- ./backend:/data/backend
|
||||
environment:
|
||||
- FLASK_APP=app.py
|
||||
- FLASK_ENV=development
|
||||
- FLASK_DEBUG=1
|
||||
- PYTHONUNBUFFERED=1
|
||||
- DATABASE_URL_USERS=postgresql://flaskaled1:aled1@10.1.3.10:5433/flaskaledDb1
|
||||
- DATABASE_URL_LOGS=postgresql://flaskaled2:aled2@10.1.4.10:5434/flaskaledDb2
|
||||
- SECRET_KEY=default_secret_key
|
||||
|
||||
test:
|
||||
container_name: test
|
||||
build: ./backend
|
||||
command: python test.py
|
||||
volumes:
|
||||
- ./backend:/data/backend
|
||||
environment:
|
||||
- FLASK_APP=app.py
|
||||
- FLASK_ENV=test
|
||||
- FLASK_DEBUG=0
|
||||
- PYTHONUNBUFFERED=1
|
||||
- DATABASE_URL_USERS=postgresql://flaskaled1:aled1@10.1.3.10:5433/flaskaledDb1
|
||||
- DATABASE_URL_LOGS=postgresql://flaskaled2:aled2@10.1.4.10:5434/flaskaledDb2
|
||||
- SECRET_KEY=default_secret_key
|
||||
Loading…
Add table
Add a link
Reference in a new issue