海量日志分析的预处理
2010 年 12 月 5 日
背景
前段时间,接入了公司部分日志流量,大约每分钟30万的请求日志。计划做一个恶意URL检测的系统,分2个阶段:
第一阶段:基于黑规则检测
第二阶段:基于机器学习检测
期间参考过很多文章,大部分都在讨论怎么去发现恶意请求,这无可厚非。但是实际环境中恶意请求的占比又是多少呢?观察我们的日志发现90%的请求都是正常请求,其中一些请求非常明显(一看就是没问题那种),例如:
如果不做一定的过滤,明显正常的请求进入检测引擎,即影响处理效率又浪费资源,很容易造成KAFKA数据积压。所以本文算是一个预热吧,这里重点和大家探讨滤白的问题,希望能起到抛砖引玉的作用。后面我会陆续分享其他2个阶段,目前项目正在紧张开发中。
/1/api/new_games?v=0.06540659188776337
/5tBARviOXjSbscQH9AzRiw==/109951164132869215.jpg?param=36y36
/Active_Page/js/vue.min.js
/api.php?format=json&t=1
如果不做一定的过滤,明显正常的请求进入检测引擎,即影响处理效率又浪费资源,很容易造成KAFKA数据积压。所以本文算是一个预热吧,这里重点和大家探讨滤白的问题,希望能起到抛砖引玉的作用。后面我会陆续分享其他2个阶段,目前项目正在紧张开发中。
0×01. 数据处理
过滤策略如下:
1.静态请求,URL后缀为:
2.参数值为纯数字、纯字母、或者字母数字组合
3.参数值值包含:数字、字母、-、_
4.参数值长度小于5
5.参数为空的GET POST请求
6.http返回状态码为200
[".jpeg", ".gif", ".jpg", ".png", ".js", ".css", ".bmp", ".ico", ".woff", ".woff2", ".ttf", ".eot", ".apk", ".json", ".zip", ".rar", ".html", ".ico", ".swf", ".exe", ".dat", ".txt", ".cgi", ".ts”]
2.参数值为纯数字、纯字母、或者字母数字组合
3.参数值值包含:数字、字母、-、_
4.参数值长度小于5
5.参数为空的GET POST请求
6.http返回状态码为200
策略1:静态请求处理
获取URL后缀名,判断是否在后缀白名单中,部分代码如下:
#判断是否静态连接 @staticmethod def filter_static(url): suffix = Common.get_url_ext(url) if suffix.lower() in Config.STATIC_SUFFIXES: return True return False #获取url文件后缀 @staticmethod def get_url_ext(url): try: path = urlparse.urlparse(url).path return os.path.splitext(path)[1] except: return False
策略2:参数值为纯数字、纯字母、或者字母数字组合
#str.isdigit(): True 只包含数字 if paramVal.isdigit(): return True #str.isalpha():True 只包含字母 if paramVal.isalpha(): return True #str.isalnum():True 只包含字母或者数字 if paramVal.isalnum(): return True
策略2:参数值包含数字、字母、-、_
思路1:将字符串泛化:
-
[a-zA-Z]泛化为
A
-
[0-9]泛化为
N
-
[-_]泛化为
C
-
其他字符泛化为
T
- 字符串去重,判断去重后的结果中是否包含T
例如:test123
泛化后:AAAANNN
去重后:AN
思路2:词法分析
简单来说就是,定义5种合法状态,如下所示:
TK_STRING = 1
TK_INTEGER = 2
TK_UNDER = 3 # _
TK_STRAIGHT = 4 # –
然后扫描字符串,每个字符的前后字符都是合法状态,一旦出现非合法字符则标记为可疑,否则标白。
0×02. 遇到的问题
1.数据格式化
我们的日志都是使用Packetbeat抓取的,默认的格式比较复杂,其中有很多用不着的数据字段,故格式化后的 数据格式如下图:
{ "@timestamp": "2019-06-14T15:04:40.864956011+08:00", "cap_ip": "", "cap_source": "D8:9D:67:13:EE:E2", "cap_timens": 168524000, "connectIP": "109.70.282.173", "dst": "123.126.104.7:80", "geo_city": "", "geo_country": "", "geo_sla": "0.000", "geo_slo": "0.000", "kafka_pid": 1, "raw_time": "2019-06-14T15:04:40+08:00", "realIP": "109.70.282.173", "realUrl": "/apiV2/profile/newsListAjax", "request.hc": 1, "request.host": "mp.xxxx.com", "request.method": "GET", "request.url": "/apiV2/profile/newsListAjax?xpt=cHBhZzkxNTMyNWUwZTJhMkBzb2h1LmNvbQ%3D%3D&pageNumber=2952&pageSize=25", "request.user-agent": "Mozilla/4.0 (compatible; MSIE 7.0; Windows NT 5.1; TencentTraveler 4.0)", "response.body": "{\"status\":1,\"msg\":\"error\"}", "response.code": 200, "response.content-length": "26", "response.content-type": "image/webp; charset=UTF-8", "src": "109.70.282.173:29819", "status.code": 1, "time": "2019-06-14 15:04:40", "unix_time": 1560495880 }
2.参数提取
GET请求相对简单些,废话不多说,代码如下:
def getQueryString(request): data = [] try: url = request["request.url"] result = urlparse.urlparse(url) query = result.query #urlparse.parse_qsl解析url请求切割参数时,遇到';'会截断,导致获取的参数值缺失';'后面的内容 if ";" in query: query = re.sub(r';', '@@@@', query) params = urlparse.parse_qsl(query, True) for k, v in params: if not v: continue #恢复分号 if '@@@@' in v: v = re.sub(r'@@@@', ';', v) if paramValFilter(v): continue data.append(v) except Exception, e: print "parse query error:", e, request
POST请求参数格式比较复杂,我们这里基本都是JSON格式,而且大多为深度JSON,需要递归提取:
def getDeepJsonVal(data, result=[]): if isinstance(data, dict): for key, value in data.items(): getDeepJsonVal(value, result) elif isinstance(data, list): for value in data: getDeepJsonVal(value) else: if isinstance(data, unicode) and ('{' in data or '[' in data): try: getDeepJsonVal(simplejson.loads(data)) except: if not paramValFilter(data): result.append(data) else: if not paramValFilter(data): result.append(data) return result
3.参数值中包含中文,直接跳过,不做处理
def filterChinese(check_str): # 过滤中文 for ch in check_str.decode('utf-8'): if u'\u4e00' <= ch <= u'\u9fff': return True return False
0×03.结果分析:
处理了10万条线上请求数据,标白的请求数为:44656,过滤掉了45%的请求,效果比较理想,甚至有点出乎我的意料,Very Nice!
代码传送门:https://github.com/skskevin/UrlDetect