Update: sessionJWT removed + Routes adapted

This commit is contained in:
Yûki VACHOT 2022-01-31 10:38:07 +01:00
parent ea17540910
commit f6a6d73dc4
2 changed files with 99 additions and 151 deletions

View file

@ -2,7 +2,6 @@ from flask import request, Blueprint
from werkzeug.exceptions import HTTPException from werkzeug.exceptions import HTTPException
from .responses import send_message, send_error from .responses import send_message, send_error
from .api_functions import db_login, db_register, db_user_update, db_create_log, db_user_delete, db_admin_update_user, db_users from .api_functions import db_login, db_register, db_user_update, db_create_log, db_user_delete, db_admin_update_user, db_users
from .sessionJWT import create_auth_token, check_auth_token
bp = Blueprint('myapp', __name__) bp = Blueprint('myapp', __name__)
@ -17,19 +16,16 @@ def handle_exception(e):
def login(): def login():
post_json = request.json post_json = request.json
try: try:
post_ip = str(post_json['ip'])
post_email = str(post_json['email']) post_email = str(post_json['email'])
post_password = str(post_json['password']) post_password = str(post_json['password'])
if post_email != '' and post_password != '': if post_email != '' and post_password != '':
ip = request.remote_addr res = db_login(post_ip, post_email, post_password)
res = db_login(ip, post_email, post_password)
if res['status'] == 0: if res['status'] == 0:
user = res['data'] user = res['data']
token = create_auth_token(user) return send_message(res['message'], user)
return send_message(res['message'], user, token)
elif res['status'] == 1: elif res['status'] == 1:
user = None return send_error(400, res['message'])
token = create_auth_token(user)
return send_error(400, res['message'], token)
else: else:
return send_error(400, 'Empty email and/or password fields.') return send_error(400, 'Empty email and/or password fields.')
except KeyError as e: except KeyError as e:
@ -41,12 +37,12 @@ def login():
def register(): def register():
post_json = request.json post_json = request.json
try: try:
post_ip = str(post_json['ip'])
post_email = str(post_json['email']) post_email = str(post_json['email'])
post_nickname = str(post_json['nickname']) post_nickname = str(post_json['nickname'])
post_password = str(post_json['password']) post_password = str(post_json['password'])
if post_email != '' and post_password != '' and post_nickname != '': if post_email != '' and post_password != '' and post_nickname != '':
ip = request.remote_addr res = db_register(post_ip, post_email, post_nickname, post_password)
res = db_register(ip, post_email, post_nickname, post_password)
if res['status'] == 1: if res['status'] == 1:
return send_error(500, res['message']) return send_error(500, res['message'])
elif res['status'] == 0: elif res['status'] == 0:
@ -60,29 +56,28 @@ def register():
# Logout # Logout
@bp.route('/api/logout', methods=['DELETE']) @bp.route('/api/logout', methods=['DELETE'])
def logout(): def logout():
token = check_auth_token(request) post_json = request.json
if token['success']: try:
ip = request.remote_addr post_ip = str(post_json['ip'])
post_user_id = str(post_json['user_id'])
message = 'User disconnected.' message = 'User disconnected.'
db_create_log( db_create_log(
ip=ip, ip=post_ip,
action='logout', action='logout',
message=message, message=message,
has_succeeded=True, has_succeeded=True,
status_code=0, status_code=0,
table='users', table='users',
id_user=token['payload']['id'] id_user=post_user_id
) )
return send_message(message, None, token_delete=True) return send_message(message, None, token_delete=True)
else: except KeyError as e:
return send_error(500, token['message']) return send_error(400, 'Need ' + str(e) + 'field.')
# Update User (Nickname, Password) # Update User (Nickname, Password)
@bp.route('/api/user/update', methods=['PUT']) @bp.route('/api/user/update', methods=['PUT'])
def user_update(): def user_update():
token = check_auth_token(request)
if token['success']:
post_json = request.json post_json = request.json
post_nickname = None post_nickname = None
post_password = None post_password = None
@ -99,9 +94,9 @@ def user_update():
if post_nickname is not None or post_password is not None: if post_nickname is not None or post_password is not None:
if post_nickname != '' and post_password != '': if post_nickname != '' and post_password != '':
ip = request.remote_addr post_ip = str(post_json['ip'])
user_id = token['payload']['id'] post_user_id = str(post_json['user_id'])
res = db_user_update(ip, user_id, post_nickname, post_password) res = db_user_update(post_ip, post_user_id, post_nickname, post_password)
if res['status'] == 1: if res['status'] == 1:
return send_error(500, res['message']) return send_error(500, res['message'])
elif res['status'] == 0: elif res['status'] == 0:
@ -110,45 +105,42 @@ def user_update():
return send_error(400, 'Empty nickname and/or password fields.') return send_error(400, 'Empty nickname and/or password fields.')
else: else:
return send_error(400, 'Need ' + fields + 'field.') return send_error(400, 'Need ' + fields + 'field.')
else:
return send_error(500, token['message'])
# Delete User # Delete User
@bp.route('/api/user/delete', methods=['DELETE']) @bp.route('/api/user/delete', methods=['DELETE'])
def user_delete(): def user_delete():
token = check_auth_token(request) post_json = request.json
if token['success']: try:
ip = request.remote_addr post_ip = str(post_json['ip'])
user_id = token['payload']['id'] post_user_id = str(post_json['user_id'])
res = db_user_delete(ip, user_id) res = db_user_delete(post_ip, post_user_id)
if res['status'] != 0: if res['status'] != 0:
return send_error(500, res['message']) return send_error(500, res['message'])
else: else:
db_create_log( db_create_log(
ip=ip, ip=post_ip,
action='delete', action='delete',
message='User deleted.', message='User deleted.',
has_succeeded=True, has_succeeded=True,
status_code=0, status_code=0,
table='users', table='users',
id_user=token['payload']['id'] id_user=post_user_id
) )
return send_message(res['message'], None, token_delete=True) return send_message(res['message'], None, token_delete=True)
else: except KeyError as e:
return send_error(500, token['message']) return send_error(400, 'Need ' + str(e) + 'field.')
# Admin : Create User # Admin : Create User
@bp.route('/api/admin/create/user', methods=['POST']) @bp.route('/api/admin/create/user', methods=['POST'])
def admin_create_user(): def admin_create_user():
token = check_auth_token(request)
if token['success']:
ip = request.remote_addr
user_id = token['payload']['id']
is_admin = token['payload']['is_admin']
if is_admin:
post_json = request.json post_json = request.json
try:
post_ip = str(post_json['ip'])
post_user_id = str(post_json['user_id'])
token_is_admin = str(post_json['token_is_admin'])
if token_is_admin:
post_email = None post_email = None
post_nickname = None post_nickname = None
post_password = None post_password = None
@ -176,27 +168,27 @@ def admin_create_user():
if post_email is not None or post_nickname is not None or post_password is not None or post_is_admin is not None: if post_email is not None or post_nickname is not None or post_password is not None or post_is_admin is not None:
if post_email != '' and post_nickname != '' and post_password != '' and str(post_is_admin) != '': if post_email != '' and post_nickname != '' and post_password != '' and str(post_is_admin) != '':
res = db_register(ip, post_email, post_nickname, post_password, is_admin=post_is_admin) res = db_register(post_ip, post_email, post_nickname, post_password, is_admin=post_is_admin)
if res['status'] == 1: if res['status'] == 1:
db_create_log( db_create_log(
ip=ip, ip=post_ip,
action='admin/create/user', action='admin/create/user',
message=res['message'], message=res['message'],
has_succeeded=False, has_succeeded=False,
status_code=res['status'], status_code=res['status'],
table='users', table='users',
id_user=user_id id_user=post_user_id
) )
return send_error(500, res['message']) return send_error(500, res['message'])
elif res['status'] == 0: elif res['status'] == 0:
db_create_log( db_create_log(
ip=ip, ip=post_ip,
action='admin/create/user', action='admin/create/user',
message=res['message'], message=res['message'],
has_succeeded=True, has_succeeded=True,
status_code=res['status'], status_code=res['status'],
table='users', table='users',
id_user=user_id id_user=post_user_id
) )
return send_message(res['message'], res['data']) return send_message(res['message'], res['data'])
else: else:
@ -205,19 +197,19 @@ def admin_create_user():
return send_error(400, 'Need ' + fields + 'field.') return send_error(400, 'Need ' + fields + 'field.')
else: else:
return send_error(500, 'User does not have permission.') return send_error(500, 'User does not have permission.')
else: except KeyError as e:
return send_error(500, token['message']) return send_error(400, 'Need ' + str(e) + 'field.')
# Admin : Change User password and/or role # Admin : Change User password and/or role
@bp.route('/api/admin/update/user', methods=['PUT']) @bp.route('/api/admin/update/user', methods=['PUT'])
def admin_update_user(): def admin_update_user():
token = check_auth_token(request)
if token['success']:
user_id = token['payload']['id']
is_admin = token['payload']['is_admin']
if is_admin:
post_json = request.json post_json = request.json
try:
post_ip = str(post_json['ip'])
post_user_id = str(post_json['user_id'])
token_is_admin = str(post_json['token_is_admin'])
if token_is_admin:
post_is_admin = None post_is_admin = None
post_password = None post_password = None
post_user_id_delete = None post_user_id_delete = None
@ -239,28 +231,27 @@ def admin_update_user():
if post_user_id_delete is not None and (post_is_admin is not None or post_password is not None): if post_user_id_delete is not None and (post_is_admin is not None or post_password is not None):
if str(post_is_admin) != '' and post_password != '' and str(post_user_id_delete) != '': if str(post_is_admin) != '' and post_password != '' and str(post_user_id_delete) != '':
ip = request.remote_addr res = db_admin_update_user(post_ip, post_user_id_delete, post_is_admin, post_password)
res = db_admin_update_user(ip, post_user_id_delete, post_is_admin, post_password)
if res['status'] == 1: if res['status'] == 1:
db_create_log( db_create_log(
ip=ip, ip=post_ip,
action='admin/update/user', action='admin/update/user',
message=res['message'], message=res['message'],
has_succeeded=False, has_succeeded=False,
status_code=res['status'], status_code=res['status'],
table='users', table='users',
id_user=user_id id_user=post_user_id
) )
return send_error(500, res['message']) return send_error(500, res['message'])
elif res['status'] == 0: elif res['status'] == 0:
db_create_log( db_create_log(
ip=ip, ip=post_ip,
action='admin/update/user', action='admin/update/user',
message=res['message'], message=res['message'],
has_succeeded=True, has_succeeded=True,
status_code=res['status'], status_code=res['status'],
table='users', table='users',
id_user=user_id id_user=post_user_id
) )
return send_message(res['message'], res['data']) return send_message(res['message'], res['data'])
else: else:
@ -269,19 +260,19 @@ def admin_update_user():
return send_error(400, 'Need ' + fields + 'field.') return send_error(400, 'Need ' + fields + 'field.')
else: else:
return send_error(500, 'User does not have permission.') return send_error(500, 'User does not have permission.')
else: except KeyError as e:
return send_error(500, token['message']) return send_error(400, 'Need ' + str(e) + 'field.')
# Admin : Delete User # Admin : Delete User
@bp.route('/api/admin/delete/user/<id>', methods=['DELETE']) @bp.route('/api/admin/delete/user/<id>', methods=['DELETE'])
def admin_delete_user(id): def admin_delete_user(id):
token = check_auth_token(request) post_json = request.json
if token['success']: try:
ip = request.remote_addr post_ip = str(post_json['ip'])
user_id = token['payload']['id'] post_user_id = str(post_json['user_id'])
is_admin = token['payload']['is_admin'] token_is_admin = str(post_json['token_is_admin'])
if is_admin: if token_is_admin:
post_json = {'id': id} post_json = {'id': id}
post_user_id_delete = None post_user_id_delete = None
fields = '' fields = ''
@ -291,27 +282,27 @@ def admin_delete_user(id):
fields += 'id' fields += 'id'
if post_user_id_delete is not None: if post_user_id_delete is not None:
if str(post_user_id_delete) != '': if str(post_user_id_delete) != '':
res = db_user_delete(ip, int(post_user_id_delete)) res = db_user_delete(post_ip, int(post_user_id_delete))
if res['status'] == 1: if res['status'] == 1:
db_create_log( db_create_log(
ip=ip, ip=post_ip,
action='admin/delete/user', action='admin/delete/user',
message=res['message'], message=res['message'],
has_succeeded=False, has_succeeded=False,
status_code=res['status'], status_code=res['status'],
table='users', table='users',
id_user=user_id id_user=post_user_id
) )
return send_error(500, res['message']) return send_error(500, res['message'])
else: else:
db_create_log( db_create_log(
ip=ip, ip=post_ip,
action='admin/delete/user', action='admin/delete/user',
message=res['message'], message=res['message'],
has_succeeded=True, has_succeeded=True,
status_code=res['status'], status_code=res['status'],
table='users', table='users',
id_user=user_id id_user=post_user_id
) )
return send_message(res['message'], None) return send_message(res['message'], None)
else: else:
@ -320,26 +311,22 @@ def admin_delete_user(id):
return send_error(400, 'Need ' + fields + 'field.') return send_error(400, 'Need ' + fields + 'field.')
else: else:
return send_error(500, 'User does not have permission.') return send_error(500, 'User does not have permission.')
else: except KeyError as e:
return send_error(500, token['message']) return send_error(400, 'Need ' + str(e) + 'field.')
# List of User (must be authenticated) & Search # List of User (must be authenticated) & Search
@bp.route('/api/users', methods=['GET']) @bp.route('/api/users', methods=['GET'])
def users(): def users():
token = check_auth_token(request) get_ip = request.args.get('ip')
if token['success']: get_user_id = request.args.get('user_id')
ip = request.remote_addr
user_id = token['payload']['id']
get_query = request.args.get('q') get_query = request.args.get('q')
get_by = request.args.get('by') get_by = request.args.get('by')
get_id = request.args.get('id') get_id = request.args.get('id')
get_is_admin = request.args.get('is_admin') get_is_admin = request.args.get('is_admin')
get_order_by = request.args.get('order_by') get_order_by = request.args.get('order_by')
res = db_users(ip, user_id, get_query, get_by, get_id, get_is_admin, get_order_by) res = db_users(get_ip, get_user_id, get_query, get_by, get_id, get_is_admin, get_order_by)
if res['status'] == 1: if res['status'] == 1:
return send_error(500, res['message']) return send_error(500, res['message'])
else: else:
return send_message(res['message'], res['data']) return send_message(res['message'], res['data'])
else:
return send_error(500, token['message'])

View file

@ -1,39 +0,0 @@
from datetime import datetime, timedelta
from flask import current_app as app
import jwt
def create_auth_token(user, time_second=1800):
try:
time = datetime.now()
payload = {
'exp': time + timedelta(days=0, seconds=time_second),
'iat': time,
'user': user
}
return jwt.encode(
payload,
app.config.get('SECRET_KEY'),
algorithm='HS256'
)
except Exception as e:
return e
def decode_auth_token(auth_token):
try:
payload = jwt.decode(
auth_token,
app.config.get('SECRET_KEY'),
algorithms='HS256'
)
return {'success': True, 'payload': payload['user']}
except jwt.ExpiredSignatureError:
return {'success': False, 'message': 'Signature expired . Please log in again.'}
except jwt.InvalidTokenError as e:
return {'success': False, 'message': 'User not authenticated.'}
def check_auth_token(request):
token = request.cookies.get('SESSIONID')
return decode_auth_token(token)