压在透明的玻璃上c-国产精品国产一级A片精品免费-国产精品视频网-成人黄网站18秘 免费看|www.tcsft.com

如何實現(xiàn)一個基于代理的web掃描器

在WEB業(yè)務上線前,QA測試階段,可將QA的瀏覽器代理設到一個指定的代理中或測試pc撥入特定的vpn中,QA在測試功能的同時, 安全測試也會在后臺同步完成,其好處不言而喻。

該類掃描器常見的有2種:代理式、vpn + 透明代理

本文只講第1種(基于代理的web掃描器)

架構(gòu)說明

proxy

proxy模塊的實現(xiàn)
用戶請求數(shù)據(jù)抓取
proxy模塊是在開源項目 https://github.com/senko/tornado-proxy 的基礎上改的,將用戶的請求與服務器的響應數(shù)據(jù)過濾后存入了mongodb中。 筆者新加的代碼在30 – 38行之間。

class ProxyHandler(tornado.web.RequestHandler):
SUPPORTED_METHODS = [‘GET’, ‘POST’, ‘CONNECT’]

@tornado.web.asynchronous
def get(self):
url_info = dict(
method=self.request.method,
url=self.request.uri
)
self.request_info = None

def handle_response(response):
if (response.error and not
isinstance(response.error, tornado.httpclient.HTTPError)):
self.set_status(500)
self.write(‘Internal server error:\n’ + str(response.error))
else:
self.set_status(response.code)
for header in (‘Date’, ‘Cache-Control’, ‘Server’,’Content-Type’, ‘Location’):
v = response.headers.get(header)
if v:
self.set_header(header, v)
v = response.headers.get_list(‘Set-Cookie’)
if v:
for i in v:
self.add_header(‘Set-Cookie’, i)
if response.body:
self.write(response.body)

# Insert http request and response into mongodb
if self.application.scan:
url = url_info.get(‘url’)
url_filter = UrlFilter(url)
if url_filter.filter():
http_info = HttpInfo(url_info, self.request_info, response)
values = http_info.get_info()
mongodb = Mongodb(db_info)
mongodb.insert(values)

self.finish()

body = self.request.body
self.request_info = self.request
if not body:
body = None
try:
fetch_request(
self.request.uri, handle_response,
method=self.request.method, body=body,
headers=self.request.headers, follow_redirects=False,
allow_nonstandard_methods=True)

except tornado.httpclient.HTTPError as e:
if hasattr(e, ‘response’) and e.response:
handle_response(e.response)
else:
self.set_status(500)
self.write(‘Internal server error:\n’ + str(e))
self.finish()

程序使用方法

代碼比較占篇幅,這里不貼了,請參考筆者的github: https://github.com/netxfly/passive_scan 。

proxy有2個參數(shù):

port,端口不指定的話,默認為8088
scan,scan默認為true,表示會將用戶信息入庫,如果單純只想作為一個代理,傳入false即可。

任務分發(fā)模塊

任務分發(fā)模塊會定期檢查mongodb中的待掃描列表,根據(jù)status字段判斷是否有掃描任務,如果有掃描任務就分發(fā)給celery的worker執(zhí)行。
1.status = 0,表示待掃描
2.status = 1,表示正在掃描
3.status = 2,表示掃描已完成

# -*- coding: utf-8 -*-
__author__ = ‘Hartnett’

import time
from pprint import pprint
import pymongo
from bson.objectid import ObjectId

from config import db_info
from scan_tasks import scan
class Scheduler(object):
def __init__(self, interval=5):
self.interval = interval
self.db_info = db_info

# connect to database
self.client = pymongo.MongoClient(self.db_info.get(‘host’), self.db_info.get(‘port’))
self.client.security_detect.authenticate(
self.db_info.get(‘username’),
self.db_info.get(‘password’),
source=’passive_scan’
)

self.db = self.client[“passive_scan”]
self.collection = self.db[‘url_info’]

def _get_task(self):
task_id = None
task_info = None
tasks = self.collection.find({‘status’ : 0}).sort(“_id”, pymongo.ASCENDING).limit(1)
for task in tasks:

url = task.get(‘url’)
task_id = task.get(‘_id’)
domain = task.get(‘domain’)
method = task.get(‘request’).get(‘method’)
request_data = task.get(‘request’).get(‘request_data’)
user_agent = task.get(‘request’).get(‘headers’).get(‘User-Agent’)
cookies = task.get(‘request’).get(‘headers’).get(‘Cookie’)
task_info = dict(
task_id=task_id,
url=url,
domain=domain,
method=method,
request_data=request_data,
user_agent=user_agent,
cookies=cookies
)

print(“task_id : %s, \ntask_info:”) % task_id
pprint(task_info)
return task_id, task_info

# set task checking now
def _set_checking(self, task_id):
self.collection.update({‘_id': ObjectId(task_id)}, {“$set” : {‘status’ : 1}})

# set task checked
def _set_checked(self, task_id):
self.collection.update({‘_id': ObjectId(task_id)}, {“$set” : {‘status’ : 2}})

# distribution task
def distribution_task(self):
task_id, task_info = self._get_task()
print “get scan task done, sleep %s second.” % self.interval
if task_id is not None:
self._set_checking(ObjectId(task_id))
url = task_info.get(‘url’)
domain = task_info.get(‘domain’)
method=task_info.get(‘method’)
request_data=task_info.get(‘request_data’)
user_agent = task_info.get(‘user_agent’)
cookies = task_info.get(‘cookies’)
scan.apply_async((task_id,url,domain,method,request_data,user_agent,cookies,))

self._set_checked(ObjectId(task_id))

def run(self):
while True:
self.distribution_task()
time.sleep(self.interval)

if __name__ == ‘__main__':
scheduler = Scheduler()
scheduler.run()

掃描任務執(zhí)行模塊

任務掃描模塊是利用celery實現(xiàn)分布式掃描的,可以將worker部署在多臺服務器中,后端的掃描器大家根據(jù)實現(xiàn)情況加,比如wvs,arachni,wvs或自己寫的掃描器 ,這篇文章的重點在于代理掃描,我圖方便就用了 arachni 。

# -*- coding:utf8 -*-
__author__ = ‘hartnett’
from celery import Celery
from arachni import arachni_console

from config import BACKEND_URL, BROKER_URL, db_info
from helper import Reporter, PassiveReport, TaskStatus

app = Celery(‘task’, backend=BACKEND_URL, broker=BROKER_URL)

# scanning url task
# ——————————————————————–
@app.task
def scan(task_id, task_url,domain,method,request_data,user_agent,cookies):
if task_url:
print “start to scan %s, task_id: %s” % (task_url, task_id)
scanner = arachni_console.Arachni_Console(task_url, user_agent, cookies,page_limit=1)
report = scanner.get_report()
if report:
reporter = Reporter(report)
value = reporter.get_value()
if value:
# 如果存在漏洞則記錄到數(shù)據(jù)庫中
scan_report = PassiveReport(db_info, value)
scan_report.report()

task_status = TaskStatus(db_info)
# 將狀態(tài)設為已掃描
task_status.set_checked(task_id)

web管理后臺

實現(xiàn)這個demo用了半天時間,寫web后臺還要處理前端展示,比較麻煩,所以沒寫,只講下基于proxy的掃描器的實現(xiàn)思路。

 

 

上一篇:基于vpn和透明代理的web漏洞掃描器的實現(xiàn)

下一篇:工行卡被盜刷分析:銀行卡里的錢是怎么丟的