Initial commit
This commit is contained in:
commit
24e93c585d
3 changed files with 149 additions and 0 deletions
3
.gitignore
vendored
Normal file
3
.gitignore
vendored
Normal file
|
|
@ -0,0 +1,3 @@
|
|||
.idea
|
||||
/wordlists
|
||||
/outputs
|
||||
1
README.md
Normal file
1
README.md
Normal file
|
|
@ -0,0 +1 @@
|
|||
WEB FIND API END POINTS
|
||||
145
web_find_api_end_points.py
Normal file
145
web_find_api_end_points.py
Normal file
|
|
@ -0,0 +1,145 @@
|
|||
import os
|
||||
import argparse
|
||||
import asyncio
|
||||
import csv
|
||||
import json
|
||||
import logging
|
||||
import sys
|
||||
from typing import Dict, Any, List, Tuple
|
||||
from collections import Counter
|
||||
import tqdm
|
||||
from aiohttp import ClientSession
|
||||
|
||||
|
||||
# python web_find_api_end_points.py
|
||||
# --url https://gameinfo.albiononline.com/api/gameinfo/guilds/
|
||||
# --dict dict.txt
|
||||
# --output {url}-tested.csv
|
||||
|
||||
# output.csv
|
||||
# base_url;word_tested;url;status_code;response
|
||||
|
||||
|
||||
def main():
|
||||
args = set_argparse()
|
||||
asyncio.get_event_loop().run_until_complete(
|
||||
web_find_api_end_points(
|
||||
args.url,
|
||||
args.wordlist,
|
||||
args.prefix,
|
||||
args.output
|
||||
)
|
||||
)
|
||||
|
||||
exit(0)
|
||||
|
||||
|
||||
def set_argparse():
|
||||
parser = argparse.ArgumentParser(description='')
|
||||
parser._action_groups.pop()
|
||||
required = parser.add_argument_group('required arguments')
|
||||
optional = parser.add_argument_group('optional arguments')
|
||||
required.add_argument('-u', '--url', help=f'(default: None)', default=None, required=True)
|
||||
required.add_argument('-w', '--wordlist', help=f'Word list to brute force (default: None)', default=None,
|
||||
required=True)
|
||||
|
||||
optional.add_argument('-p', '--prefix', help=f'Prefix (default: None)', default=None)
|
||||
optional.add_argument('-o', '--output', help=f'Output file (default: <url>_<wordlist>_results.csv)', default=None)
|
||||
|
||||
optional.add_argument('-i', '--info', help='Info mode (default: True)', default=True, action='store_false')
|
||||
optional.add_argument('-d', '--debug', help='Debug mode (default: False)', default=False, action='store_true')
|
||||
|
||||
try:
|
||||
args = parser.parse_args()
|
||||
except argparse.ArgumentError as error:
|
||||
logging.error('Catching an argumentError {}'.format(error))
|
||||
sys.exit('Catching an argumentError {}'.format(error))
|
||||
|
||||
if args.debug:
|
||||
logging.basicConfig(stream=sys.stderr, level=logging.DEBUG)
|
||||
elif args.info:
|
||||
logging.basicConfig(stream=sys.stderr, level=logging.INFO)
|
||||
else:
|
||||
logging.basicConfig(stream=sys.stderr, level=logging.ERROR)
|
||||
|
||||
return args
|
||||
|
||||
|
||||
async def web_find_api_end_points(url, word_list_file, prefix, output_file):
|
||||
if prefix is not None:
|
||||
url = f'{url}{prefix}'
|
||||
|
||||
if output_file is None:
|
||||
output_file = f'outputs/{url.replace("/", "_").replace(":", "")}_{os.path.basename(word_list_file).split(".")[0]}_results.csv'
|
||||
|
||||
txtFile = open(word_list_file, 'r')
|
||||
words = txtFile.read().splitlines()
|
||||
txtFile.close()
|
||||
|
||||
outputFile = open(output_file, 'w', newline='')
|
||||
csvFile = csv.writer(outputFile, delimiter=';')
|
||||
csvFile.writerow(['base_url', 'word_tested', 'url', 'status_code', 'response_json', 'response_content'])
|
||||
|
||||
logging.info(f'Example url tested : {url}{words[0]}')
|
||||
|
||||
session = ClientSession()
|
||||
results = await http_get_with_aiohttp_parallel(session, url, words)
|
||||
await session.close()
|
||||
csvFile.writerows(results)
|
||||
|
||||
status_codes = [result[3] for result in results]
|
||||
dict_status = json.dumps(dict(Counter(status_codes)), indent=4)
|
||||
outputFile.close()
|
||||
logging.info(dict_status)
|
||||
return dict_status
|
||||
|
||||
|
||||
async def http_get_with_aiohttp_parallel(session: ClientSession, base_url: str, words: List[str], headers: Dict = {}, proxy: str = None, timeout: int = 10) -> (List[Tuple[int, Dict[str, Any], bytes]], float):
|
||||
tasks = []
|
||||
for word in words:
|
||||
task = asyncio.create_task(http_get_with_aiohttp(session, base_url, word, f'{base_url}{word}', headers, proxy, timeout))
|
||||
tasks.append(task)
|
||||
|
||||
results = [
|
||||
await f
|
||||
for f in tqdm.tqdm(asyncio.as_completed(tasks), total=len(tasks))
|
||||
]
|
||||
return results
|
||||
|
||||
|
||||
class Wrapper:
|
||||
|
||||
def __init__(self, session):
|
||||
self._session = session
|
||||
|
||||
async def get(self, url, headers, proxy, timeout):
|
||||
try:
|
||||
return await self._session.get(url, headers=headers, proxy=proxy, timeout=timeout)
|
||||
except Exception as e:
|
||||
logging.error(e)
|
||||
return None
|
||||
|
||||
|
||||
async def http_get_with_aiohttp(session: ClientSession, base_url: str, word: str, url: str, headers: Dict = {}, proxy: str = None, timeout: int = 30) -> (int, Dict[str, Any], bytes):
|
||||
wrapper = Wrapper(session)
|
||||
response = await wrapper.get(url=url, headers=headers, proxy=proxy, timeout=timeout)
|
||||
|
||||
if response is not None:
|
||||
response_json = None
|
||||
try:
|
||||
response_json = await response.json(content_type=None)
|
||||
except json.decoder.JSONDecodeError as e:
|
||||
pass
|
||||
|
||||
response_content = None
|
||||
try:
|
||||
response_content = await response.read()
|
||||
except:
|
||||
pass
|
||||
return base_url, word, url, response.status, response_json, response_content
|
||||
else:
|
||||
return base_url, word, url, None, None, None
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
Loading…
Add table
Add a link
Reference in a new issue