-
Notifications
You must be signed in to change notification settings - Fork 25
/
web_scanner.py
112 lines (95 loc) · 4.22 KB
/
web_scanner.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
#!/usr/local/bin/python3
# coding=utf-8
# web路由扫描器
import time
from queue import Queue
import argparse
import requests
import threading
import socket
class Dirscan(object):
def __init__(self, scanSite, scanDict, scanOutput, threadNum):
print('=====Dirscan is running=====')
self.scanSite = scanSite if scanSite.find('://') != -1 else 'http://%s' % scanSite
print('=====Scan target: ', self.scanSite, '=====')
self.scanDict = scanDict
self.scanOutput = scanSite.rstrip('/').replace('https://', '').replace('http://', '')+'.txt' if scanOutput == 0 else scanOutput
truncate = open(self.scanOutput, 'w')
truncate.close()
self.threadNum = threadNum
self.lock = threading.Lock()
self._loadHeaders()
self._loadDict(self.scanDict)
self._analysis404()
self.STOP_ME = False
def _loadDict(self, dict_list):
self.q = Queue()
with open(dict_list, 'r') as f:
for line in f:
if line[0:1] != '#':
self.q.put(line.strip())
if self.q.qsize() > 0:
print('=====Total Dictionary:', self.q.qsize(), '=====')
else:
print('=====Dict is Null ???=====')
quit()
def _loadHeaders(self):
self.headers = {
'Accept': '*/*',
'Referer': 'http://www.baidu.com',
'User-Agent': 'Mozilla/5.0 (compatible; MSIE 11.0; Windows NT 6.1; ',
'Cache-Control': 'no-cache',
}
def _analysis404(self):
notFoundPage = requests.get('{0}/fuck/hello.html'.format(self.scanSite), allow_redirects=False)
self.notFoundPageText = notFoundPage.text.replace('/fuck/hello.html', '')
def _writeOutput(self, result):
self.lock.acquire()
with open(self.scanOutput, 'a+') as f:
f.write(result + '\n')
self.lock.release()
def _scan(self, url):
html_result = 0
try:
html_result = requests.get(url, headers=self.headers, allow_redirects=False, timeout=10)
except requests.exceptions.ConnectionError as e:
pass
except requests.exceptions.ConnectTimeout as e:
print('timeout while connecting to the {0}'.format(url))
except requests.exceptions.Timeout as e:
print('exception of requests.exceptions.Timeout')
finally:
if html_result != 0:
if html_result.status_code == 200 and html_result.text != self.notFoundPageText:
print('=====[%i] %s=====' % (html_result.status_code, html_result.url))
self._writeOutput('[%i]%s' % (html_result.status_code, html_result.url))
elif html_result.status_code == 403:
print('====={0} {1}====='.format(html_result.status_code, html_result.url))
self._writeOutput('{0}{1}'.format(html_result.status_code, html_result.url))
def run(self):
while not self.q.empty() and self.STOP_ME == False:
url = '{0}{1}'.format(self.scanSite, self.q.get())
self._scan(url)
if __name__ == '__main__':
socket.setdefaulttimeout(10)
parser = argparse.ArgumentParser()
parser.add_argument('scanSite', help="The website to be scanned", type=str)
parser.add_argument('-d', '--dict', dest="scanDict", help="Dictionary for scanning", type=str, default="web_dict.txt")
parser.add_argument('-o', '--output', dest="scanOutput", help="Results saved files", type=str, default=0)
parser.add_argument('-t', '--thread', dest="threadNum", help="Number of threads running the program", type=int, default=10)
args = parser.parse_args()
scan = Dirscan(args.scanSite, args.scanDict, args.scanOutput, args.threadNum)
for i in range(args.threadNum):
t = threading.Thread(target=scan.run)
t.setDaemon(True)
t.start()
while True:
if threading.activeCount() <= 1 :
break
else:
try:
time.sleep(0.1)
except KeyboardInterrupt as e:
print('\n=====[WARNING] User aborted, wait all slave threads to exit, current(%i)=====' % threading.activeCount())
scan.STOP_ME = True
print('=====Scan end!!!=====')