Update: Finished Implementing (Not tested)

This commit is contained in:
Yûki VACHOT 2022-01-31 10:07:12 +01:00
parent 566c476456
commit 3d65dd45de

View file

@ -7,22 +7,22 @@ from .sessionJWT import create_auth_token, check_auth_token
# Request Post # Request Post
def request_post(url, data_json): def request_post(url, data_json):
return requests.post(app.config['SQLALCHEMY_BINDS'] + url, json=data_json) return requests.post(app.config['SQLALCHEMY_BINDS'] + url, json=data_json)
# Request Put # Request Put
def request_put(url, data_json): def request_put(url, data_json):
return requests.put(app.config['SQLALCHEMY_BINDS'] + url, json=data_json) return requests.put(app.config['SQLALCHEMY_BINDS'] + url, json=data_json)
# Request Get # Request Get
def request_get(url): def request_get(url):
return requests.get(app.config['SQLALCHEMY_BINDS'] + url) return requests.get(app.config['SQLALCHEMY_BINDS'] + url)
# Request Delete # Request Delete
def request_delete(url, data_json): def request_delete(url, data_json):
return requests.delete(app.config['SQLALCHEMY_BINDS'] + url, json=data_json) return requests.delete(app.config['SQLALCHEMY_BINDS'] + url, json=data_json)
bp = Blueprint('myapp', __name__) bp = Blueprint('myapp', __name__)
@ -30,346 +30,302 @@ bp = Blueprint('myapp', __name__)
@bp.app_errorhandler(HTTPException) @bp.app_errorhandler(HTTPException)
def handle_exception(e): def handle_exception(e):
return send_error(e.code, e.name) return send_error(e.code, e.name)
@bp.route('/', methods=['GET']) @bp.route('/', methods=['GET'])
def root(): def root():
return render_template('index.html') return render_template('index.html')
# Login # Login
@bp.route('/api/login', methods=['POST']) @bp.route('/api/login', methods=['POST'])
def login(): def login():
post_json = request.json post_json = request.json
try: try:
post_email = str(post_json['email']) post_email = str(post_json['email'])
post_password = str(post_json['password']) post_password = str(post_json['password'])
if post_email != '' and post_password != '': if post_email != '' and post_password != '':
ip = request.remote_addr ip = request.remote_addr
# res = db_login(ip, post_email, post_password) # res = db_login(ip, post_email, post_password)
res = request_post('login', {'ip': ip, 'email': post_email, 'password': post_password}).json() res = request_post('login', {'ip': ip, 'email': post_email, 'password': post_password}).json()
if res['status'] == 0: if res['status'] == 0:
user = res['data'] user = res['data']
token = create_auth_token(user) token = create_auth_token(user)
return send_message(res['message'], user, token) return send_message(res['message'], user, token)
elif res['status'] == 1: elif res['status'] == 1:
user = None user = None
token = create_auth_token(user) token = create_auth_token(user)
return send_error(400, res['message'], token) return send_error(400, res['message'], token)
else: else:
return send_error(400, 'Empty email and/or password fields.') return send_error(400, 'Empty email and/or password fields.')
except KeyError as e: except KeyError as e:
return send_error(400, 'Need email, password fields.') return send_error(400, 'Need email, password fields.')
# Register # Register
@bp.route('/api/register', methods=['POST']) @bp.route('/api/register', methods=['POST'])
def register(): def register():
post_json = request.json post_json = request.json
try: try:
post_email = str(post_json['email']) post_email = str(post_json['email'])
post_nickname = str(post_json['nickname']) post_nickname = str(post_json['nickname'])
post_password = str(post_json['password']) post_password = str(post_json['password'])
if post_email != '' and post_password != '' and post_nickname != '': if post_email != '' and post_password != '' and post_nickname != '':
ip = request.remote_addr ip = request.remote_addr
# res = db_register(ip, post_email, post_nickname, post_password) # res = db_register(ip, post_email, post_nickname, post_password)
res = request_post('register', {'ip': ip, 'email': post_email, 'nickname': post_nickname, 'password': post_password}).json() res = request_post('register', {'ip': ip, 'email': post_email, 'nickname': post_nickname,
if res['status'] == 1: 'password': post_password}).json()
return send_error(500, res['message']) if res['status'] == 1:
elif res['status'] == 0: return send_error(500, res['message'])
return send_message(res['message'], res['data']) elif res['status'] == 0:
else: return send_message(res['message'], res['data'])
return send_error(400, 'Empty email and/or password and/or nickname fields.') else:
except KeyError as e: return send_error(400, 'Empty email and/or password and/or nickname fields.')
return send_error(400, 'Need ' + str(e) + 'field.') except KeyError as e:
return send_error(400, 'Need ' + str(e) + 'field.')
# Logout # Logout
@bp.route('/api/logout', methods=['DELETE']) @bp.route('/api/logout', methods=['DELETE'])
def logout(): def logout():
token = check_auth_token(request) token = check_auth_token(request)
if token['success']: if token['success']:
ip = request.remote_addr ip = request.remote_addr
message = 'User disconnected.' request_post('logout', {'ip': ip}).json()
request_post('log', { message = 'User disconnected.'
'ip': ip, return send_message(message, None, token_delete=True)
'action': 'logout', else:
'message': message, return send_error(500, token['message'])
'has_succeeded': True,
'status_code': 0,
'table': 'users',
'id_user': token['payload']['id']
})
return send_message(message, None, token_delete=True)
else:
return send_error(500, token['message'])
# Update User (Nickname, Password) # Update User (Nickname, Password)
@bp.route('/api/user/update', methods=['PUT']) @bp.route('/api/user/update', methods=['PUT'])
def user_update(): def user_update():
token = check_auth_token(request) token = check_auth_token(request)
if token['success']: if token['success']:
post_json = request.json post_json = request.json
post_nickname = None post_nickname = None
post_password = None post_password = None
fields = '' fields = ''
if 'nickname' in post_json: if 'nickname' in post_json:
post_nickname = str(post_json['nickname']) post_nickname = str(post_json['nickname'])
else: else:
fields += 'nickname ' fields += 'nickname '
if 'password' in post_json: if 'password' in post_json:
post_password = str(post_json['password']) post_password = str(post_json['password'])
else: else:
fields += 'password ' fields += 'password '
if post_nickname is not None or post_password is not None: if post_nickname is not None or post_password is not None:
if post_nickname != '' and post_password != '': if post_nickname != '' and post_password != '':
ip = request.remote_addr ip = request.remote_addr
user_id = token['payload']['id'] user_id = token['payload']['id']
# res = db_user_update(ip, user_id, post_nickname, post_password) # res = db_user_update(ip, user_id, post_nickname, post_password)
res = request_post('user/update', {'ip': ip, 'user_id': user_id, 'nickname': post_nickname, 'password': post_password}).json() res = request_post('user/update', {'ip': ip, 'user_id': user_id, 'nickname': post_nickname,
if res['status'] == 1: 'password': post_password}).json()
return send_error(500, res['message']) if res['status'] == 1:
elif res['status'] == 0: return send_error(500, res['message'])
return send_message(res['message'], res['data']) elif res['status'] == 0:
else: return send_message(res['message'], res['data'])
return send_error(400, 'Empty nickname and/or password fields.') else:
return send_error(400, 'Empty nickname and/or password fields.')
else:
return send_error(400, 'Need ' + fields + 'field.')
else: else:
return send_error(400, 'Need ' + fields + 'field.') return send_error(500, token['message'])
else:
return send_error(500, token['message'])
# Delete User # Delete User
@bp.route('/api/user/delete', methods=['DELETE']) @bp.route('/api/user/delete', methods=['DELETE'])
def user_delete(): def user_delete():
token = check_auth_token(request) token = check_auth_token(request)
if token['success']: if token['success']:
ip = request.remote_addr ip = request.remote_addr
user_id = token['payload']['id'] user_id = token['payload']['id']
# res = db_user_delete(ip, user_id) # res = db_user_delete(ip, user_id)
res = request_post('user/update', {'ip': ip, 'user_id': user_id}).json() res = request_post('user/delete', {'ip': ip, 'user_id': user_id}).json()
if res['status'] != 0: if res['status'] != 0:
return send_error(500, res['message']) return send_error(500, res['message'])
else:
return send_message(res['message'], None, token_delete=True)
else: else:
request_post('log', { return send_error(500, token['message'])
'ip': ip,
'action': 'user/delete',
'message': 'User deleted.',
'has_succeeded': True,
'status_code': 0,
'table': 'users',
'id_user': token['payload']['id']
})
return send_message(res['message'], None, token_delete=True)
else:
return send_error(500, token['message'])
# Admin : Create User # Admin : Create User
@bp.route('/api/admin/create/user', methods=['POST']) @bp.route('/api/admin/create/user', methods=['POST'])
def admin_create_user(): def admin_create_user():
token = check_auth_token(request) token = check_auth_token(request)
if token['success']: if token['success']:
ip = request.remote_addr ip = request.remote_addr
user_id = token['payload']['id'] user_id = token['payload']['id']
is_admin = token['payload']['is_admin'] is_admin = token['payload']['is_admin']
if is_admin: if is_admin:
post_json = request.json post_json = request.json
post_email = None post_email = None
post_nickname = None post_nickname = None
post_password = None post_password = None
post_is_admin = None post_is_admin = None
fields = '' fields = ''
if 'email' in post_json: if 'email' in post_json:
post_email = str(post_json['email']) post_email = str(post_json['email'])
else: else:
fields += 'email ' fields += 'email '
if 'nickname' in post_json: if 'nickname' in post_json:
post_nickname = str(post_json['nickname']) post_nickname = str(post_json['nickname'])
else: else:
fields += 'nickname ' fields += 'nickname '
if 'password' in post_json: if 'password' in post_json:
post_password = str(post_json['password']) post_password = str(post_json['password'])
else: else:
fields += 'password ' fields += 'password '
if 'is_admin' in post_json: if 'is_admin' in post_json:
post_is_admin = bool(post_json['is_admin']) post_is_admin = bool(post_json['is_admin'])
else: else:
fields += 'is_admin ' fields += 'is_admin '
if post_email is not None or post_nickname is not None or post_password is not None or post_is_admin is not None: if post_email is not None or post_nickname is not None or post_password is not None or post_is_admin is not None:
if post_email != '' and post_nickname != '' and post_password != '' and str(post_is_admin) != '': if post_email != '' and post_nickname != '' and post_password != '' and str(post_is_admin) != '':
res = db_register(ip, post_email, post_nickname, post_password, is_admin=post_is_admin) # res = db_register(ip, post_email, post_nickname, post_password, is_admin=post_is_admin)
if res['status'] == 1: res = request_post('admin/create/user', {
db_create_log( 'ip': ip,
ip=ip, 'email': post_email,
action='admin/create/user', 'nickname': post_nickname,
message=res['message'], 'password': post_password,
has_succeeded=False, 'is_admin': post_is_admin
status_code=res['status'], })
table='users', if res['status'] == 1:
id_user=user_id return send_error(500, res['message'])
) elif res['status'] == 0:
return send_error(500, res['message']) return send_message(res['message'], res['data'])
elif res['status'] == 0: else:
db_create_log( return send_error(400, 'Empty email and/or nickname and/or password and/or is_admin fields.')
ip=ip, else:
action='admin/create/user', return send_error(400, 'Need ' + fields + 'field.')
message=res['message'],
has_succeeded=True,
status_code=res['status'],
table='users',
id_user=user_id
)
return send_message(res['message'], res['data'])
else: else:
return send_error(400, 'Empty email and/or nickname and/or password and/or is_admin fields.') return send_error(500, 'User does not have permission.')
else:
return send_error(400, 'Need ' + fields + 'field.')
else: else:
return send_error(500, 'User does not have permission.') return send_error(500, token['message'])
else:
return send_error(500, token['message'])
# Admin : Change User password and/or role # Admin : Change User password and/or role
@bp.route('/api/admin/update/user', methods=['PUT']) @bp.route('/api/admin/update/user', methods=['PUT'])
def admin_update_user(): def admin_update_user():
token = check_auth_token(request) token = check_auth_token(request)
if token['success']: if token['success']:
user_id = token['payload']['id'] user_id = token['payload']['id']
is_admin = token['payload']['is_admin'] is_admin = token['payload']['is_admin']
if is_admin: if is_admin:
post_json = request.json post_json = request.json
post_is_admin = None post_is_admin = None
post_password = None post_password = None
post_user_id_delete = None post_user_id_delete = None
fields = '' fields = ''
if 'id' in post_json: if 'id' in post_json:
post_user_id_delete = int(post_json['id']) post_user_id_delete = int(post_json['id'])
else: else:
fields += 'id ' fields += 'id '
if 'is_admin' in post_json: if 'is_admin' in post_json:
post_is_admin = bool(post_json['is_admin']) post_is_admin = bool(post_json['is_admin'])
else: else:
fields += 'is_admin ' fields += 'is_admin '
if 'password' in post_json: if 'password' in post_json:
post_password = str(post_json['password']) post_password = str(post_json['password'])
else: else:
fields += 'password ' fields += 'password '
if post_user_id_delete is not None and (post_is_admin is not None or post_password is not None): if post_user_id_delete is not None and (post_is_admin is not None or post_password is not None):
if str(post_is_admin) != '' and post_password != '' and str(post_user_id_delete) != '': if str(post_is_admin) != '' and post_password != '' and str(post_user_id_delete) != '':
ip = request.remote_addr ip = request.remote_addr
res = db_admin_update_user(ip, post_user_id_delete, post_is_admin, post_password) # res = db_admin_update_user(ip, post_user_id_delete, post_is_admin, post_password)
if res['status'] == 1: res = request_put('admin/update/user', {
db_create_log( 'ip': ip,
ip=ip, 'user_id_delete': post_user_id_delete,
action='admin/update/user', 'is_admin': post_is_admin,
message=res['message'], 'password': post_password
has_succeeded=False, })
status_code=res['status'], if res['status'] == 1:
table='users', return send_error(500, res['message'])
id_user=user_id elif res['status'] == 0:
) return send_message(res['message'], res['data'])
return send_error(500, res['message']) else:
elif res['status'] == 0: return send_error(400, 'Empty is_admin and/or password fields.')
db_create_log( else:
ip=ip, return send_error(400, 'Need ' + fields + 'field.')
action='admin/update/user',
message=res['message'],
has_succeeded=True,
status_code=res['status'],
table='users',
id_user=user_id
)
return send_message(res['message'], res['data'])
else: else:
return send_error(400, 'Empty is_admin and/or password fields.') return send_error(500, 'User does not have permission.')
else:
return send_error(400, 'Need ' + fields + 'field.')
else: else:
return send_error(500, 'User does not have permission.') return send_error(500, token['message'])
else:
return send_error(500, token['message'])
# Admin : Delete User # Admin : Delete User
@bp.route('/api/admin/delete/user/<id>', methods=['DELETE']) @bp.route('/api/admin/delete/user/<id>', methods=['DELETE'])
def admin_delete_user(id): def admin_delete_user(id):
token = check_auth_token(request) token = check_auth_token(request)
if token['success']: if token['success']:
ip = request.remote_addr ip = request.remote_addr
user_id = token['payload']['id'] # user_id = token['payload']['id']
is_admin = token['payload']['is_admin'] is_admin = token['payload']['is_admin']
if is_admin: if is_admin:
post_json = {'id': id} post_json = {'id': id}
post_user_id_delete = None post_user_id_delete = None
fields = '' fields = ''
if 'id' in post_json: if 'id' in post_json:
post_user_id_delete = int(post_json['id']) post_user_id_delete = int(post_json['id'])
else: else:
fields += 'id' fields += 'id'
if post_user_id_delete is not None: if post_user_id_delete is not None:
if str(post_user_id_delete) != '': if str(post_user_id_delete) != '':
res = db_user_delete(ip, int(post_user_id_delete)) # res = db_user_delete(ip, int(post_user_id_delete))
if res['status'] == 1: res = request_delete('admin/delete/user',
db_create_log( {'ip': ip, 'user_id_delete': post_user_id_delete, 'is_admin': is_admin})
ip=ip, if res['status'] == 1:
action='admin/delete/user', return send_error(500, res['message'])
message=res['message'], else:
has_succeeded=False, return send_message(res['message'], None)
status_code=res['status'], else:
table='users', return send_error(400, 'Empty id field.')
id_user=user_id else:
) return send_error(400, 'Need ' + fields + 'field.')
return send_error(500, res['message'])
else:
db_create_log(
ip=ip,
action='admin/delete/user',
message=res['message'],
has_succeeded=True,
status_code=res['status'],
table='users',
id_user=user_id
)
return send_message(res['message'], None)
else: else:
return send_error(400, 'Empty id field.') return send_error(500, 'User does not have permission.')
else:
return send_error(400, 'Need ' + fields + 'field.')
else: else:
return send_error(500, 'User does not have permission.') return send_error(500, token['message'])
else:
return send_error(500, token['message'])
# List of User (must be authenticated) & Search # List of User (must be authenticated) & Search
@bp.route('/api/users', methods=['GET']) @bp.route('/api/users', methods=['GET'])
def users(): def users():
token = check_auth_token(request) token = check_auth_token(request)
if token['success']: if token['success']:
ip = request.remote_addr ip = request.remote_addr
user_id = token['payload']['id'] user_id = token['payload']['id']
get_query = request.args.get('q') get_query = request.args.get('q')
get_by = request.args.get('by') get_by = request.args.get('by')
get_id = request.args.get('id') get_id = request.args.get('id')
get_is_admin = request.args.get('is_admin') get_is_admin = request.args.get('is_admin')
get_order_by = request.args.get('order_by') get_order_by = request.args.get('order_by')
res = db_users(ip, user_id, get_query, get_by, get_id, get_is_admin, get_order_by) # res = db_users(ip, user_id, get_query, get_by, get_id, get_is_admin, get_order_by)
if res['status'] == 1: res = request_get(
return send_error(500, res['message']) 'users'
f'?ip={ip}'
f'&user_id={user_id}'
f'&q={get_query}'
f'&by={get_by}'
f'&id={get_id}'
f'&is_admin={get_is_admin}'
f'&order_by={get_order_by}'
)
if res['status'] == 1:
return send_error(500, res['message'])
else:
return send_message(res['message'], res['data'])
else: else:
return send_message(res['message'], res['data']) return send_error(500, token['message'])
else:
return send_error(500, token['message'])