利用RITA检测beacon通信

前言

RITA (Real Intelligence Threat Analytics) 是一个用于检测 C2 通信流量的开源项目,由 GO 语言编写

本文主要介绍其分析器的工作原理,并借助 Python 代码逐步还原该算法

为方便阅读,以下涉及到的变量均参照源码搬运而来,读者可跳转至该项目后自行作对比分析

分析器源码地址:https://github.com/activecm/rita/blob/master/pkg/beacon/analyzer.go

该算法对同源-目的IP间的通信,进行分数计算得到 score,其值分布为 0~1 之间,趋近 1 则疑似 beacon 通信行为

具体计算过程大致上可以分为两部分,根据通信间隔数据包大小的特征,分别计算出 tsScoredsScore,最后取均值得到 score

其中,tsScore 和 dsScore 的计算过程相似,下面开始针对各个部分做具体分析

tsScore的计算

tsScore 的计算又细分为三个部分:tsSkewScoretsMadmScoretsConnCountScore

tsSkewScore

该值的计算原理在源码中的注释部分也有所解释,即:理想的 beacon 通信行为应该具有通信间隔和数据大小呈现对称分布的特征

image-20220117103322321

意思也很好理解,以 CobalStrike 默认的 beacon 配置为例,其通信行为如下图所示:

image-20220117210711125

CS 默认回连的间隔时间为 60 秒,即使加上抖动(jitter),理论上仍然会呈现出高度对称的分布特征,其数据包大小亦如是

这里采用Bowley 偏度公式来度量对称性,得到偏度系数 tsSkew 和 dsSkew,后者在第二部分计算 dsScore 时会使用到

所谓的偏度系数,适用于描述曲线形状的对称性,其值在 1 到 -1 之间,分别代表着正偏斜或负偏斜,参照下图

image-20220117160802106

具体计算时需要对数据集按大小顺序作四等分,在三个分位点依次取值:tsLow,tsMid,tsHigh

Python 示例代码:

# 根据时间戳计算通信间隔
http_df['deltas'] = http_df[timestamp].apply(lambda x: pd.Series(x).diff().dt.seconds.dropna().tolist())
# 以通信间隔为数据集,计算各分位点处数值
http_df['tsLow'] = http_df['deltas'].apply(lambda x: np.percentile(np.array(x), 25))
http_df['tsMid'] = http_df['deltas'].apply(lambda x: np.percentile(np.array(x), 50))
http_df['tsHigh'] = http_df['deltas'].apply(lambda x: np.percentile(np.array(x), 75))
# 根据公式:Bowley Skewness = (Q1 + Q3 – 2Q2) / (Q3 – Q1),计算分母和分子
http_df['tsBowleyNum'] = http_df['tsLow'] + http_df['tsHigh'] - 2*http_df['tsMid']
http_df['tsBowleyDen'] = http_df['tsHigh'] - http_df['tsLow']
# 如果分母为 0,Q2 = Q1 或 Q2 = Q3,则计算结果不可靠,tsSkew 取 0
http_df['tsSkew'] = http_df[['tsLow','tsMid','tsHigh','tsBowleyNum','tsBowleyDen']].apply(
    lambda x: x['tsBowleyNum'] / x['tsBowleyDen'] if x['tsBowleyDen'] != 0 and x['tsMid'] != x['tsLow'] and x['tsMid'] != x['tsHigh'] else 0.0, axis=1
    )

最后根据 tsSkew 算出:tsSkewScore = 1 - abs(tsSkew)

tsMADMScore

这一段计算的前提为:理想的 beacon 通信行为在其通信间隔的中位数周围应该具有很低的分散度

该特征可以用 MADM(Median Absolute Deviation about the Median)来度量

image-20220117113352117

具体计算过程也比较简单,在 Python 中我们用一行代码就能搞定

http_df['tsMadm'] = http_df['deltas'].apply(lambda x: np.median(np.absolute(np.array(x) - np.median(np.array(x)))))

得到 tsMadm 和 dsMadm 后,设定 30秒32字节 作为阈值,超出则分数计为 0

image-20220117114108977

因为这里的分散度越低越好,由此可以计算出 tsMadmScore 和 dsMadmScore,以下是 Python 示例代码:

http_df['tsMadmScore'] = 1.0 - http_df['tsMadm'] / 30.0
http_df['tsMadmScore'] = http_df['tsMadmScore'].apply(lambda x: 0 if x < 0 else x)
http_df['dsMadmScore'] = 1.0 - http_df['tsMadm'] / 32.0
http_df['dsMadmScore'] = http_df['dsMadmScore'].apply(lambda x: 0 if x < 0 else x)

tsConnCountScore

跟上面的 SkewScore 和 MadmScore 不同,这一部分在 tsScore 和 dsScore 中的计算有所区别

在行为特征上,beacon 通信时往往具有较高的连接数,tsConnCountScore 正是用来度量这一点的

利用通信的持续时间和连接次数计算得到:tsConnCountScore = ConnectionCount / (tsConnDiv / 10.0)

image-20220117114820960

其中,ConnectionCount 代表通信次数,tsConnDiv 由通信起止时间除以固定的连接频率得来,此处该值取为 10 秒

最后得分如果大于1,则向下舍入取1,因为连接得越频繁,越有可能是 beacon 通信

PS:对于这里的 10 秒,应该还存在优化空间,有研究员指出该值太小可能会影响最终得分从而产生漏报,传送地址

综上,计算 tsScore = (tsSkewScore + tsMADMScore + tsConnCountScore) / 3.0

image-20220117143249150

dsScore的计算

数据大小的分散度(dsSkewScore)和 MADM(dsMADMScore) 的算法不变,前文已经有所解释

主要区别在于,计算 dsMADMScore 时数据大小的 MADM 以 32 字节作为阈值,数据包超出该大小则得分为 0

image-20220117143909812

另外,由于 beacon 通常都是较小的数据包,这里采用 dsSmallnessScore 作为衡量指标

image-20220117143530197

其含义为,如果数据包大小的众数(Mode)超过 65 Kb,则得分为 0

Python 示例代码如下:

# 计算数据包大小差值
http_df['size_deltas'] = http_df[data_size].apply(lambda x: pd.Series(x).diff().dropna().tolist())
# 计算 dsSkew
http_df['dsLow'] = http_df['size_deltas'].apply(lambda x: np.percentile(np.array(x), 25))
http_df['dsMid'] = http_df['size_deltas'].apply(lambda x: np.percentile(np.array(x), 50))
http_df['dsHigh'] = http_df['size_deltas'].apply(lambda x: np.percentile(np.array(x), 75))
http_df['dsBowleyNum'] = http_df['dsLow'] + http_df['dsHigh'] - 2*http_df['dsMid']
http_df['dsBowleyDen'] = http_df['dsHigh'] - http_df['dsLow']
http_df['dsSkew'] = http_df[['dsLow','dsMid','dsHigh','dsBowleyNum','dsBowleyDen']].apply(
    lambda x: x['dsBowleyNum'] / x['dsBowleyDen'] if x['dsBowleyDen'] != 0 and x['dsMid'] != x['dsLow'] and x['dsMid'] != x['dsHigh'] else 0.0, axis=1
    )
# 计算 dsSkewScore
http_df['dsSkewScore'] = 1.0 - abs(http_df['dsSkew'])
# 计算 dsMadmScore
http_df['dsMadm'] = http_df['size_deltas'].apply(lambda x: np.median(np.absolute(np.array(x) - np.median(np.array(x)))))
http_df['dsMadmScore'] = 1.0 - http_df['dsMadm'] / 32.0
http_df['dsMadmScore'] = http_df['dsMadmScore'].apply(lambda x: 0 if x < 0 else x)
# 计算 dsSmallnessScore
http_df['dsSmallnessScore'] = http_df[data_size].apply(lambda x: 1- (np.argmax(np.bincount(x)) / 65535))
http_df['dsSmallnessScore'] = http_df['dsSmallnessScore'].apply(lambda x: 0 if x < 0 else x)

最后得到,dsScore = (dsSkewScore + dsMADMScore + dsSmallnessScore) / 3.0

小结

最终得分:score = (tsScore + tsScore) / 2.0,完整的演示代码放在 这里

为了简单演示下狩猎效果,我用 Covenant 作为 C2 框架,生成 3 组数据,通信间隔在 10-60 秒不等,抖动在 10%-20% 之间

image-20220117214138280

最后得分如下:

image-20220117214016417

当然,这里只是为了验证该算法基本逻辑的正确性,数据样本太小,并不能说明其实际使用效果,切勿直接用作检测告警

它的目的是帮助我们提高威胁狩猎的效率,搭配其它工具和方法一起使用才是正确的姿势

例如可以将其输出作为我们检测规则的输入,或者结合威胁情报食用,进行二次确认

另外,不要忘了对输入数据做好预处理和清洗工作,比如聚焦特定协议判断内外网IP过滤白名单主机等等,这会大幅提高分析效率和检测效果

当然,算法本身还有很大的优化空间,尤其对计算过程中涉及到的一些常量,我也有不少的疑问

例如,Mehmet Ergene 在其文章中提到过可能会产生的漏报:

  1. 通信间隔的 MADM 大于 30 秒时,则 tsMADMScore 分数为 0

  2. 数据大小的 MADM 超过 32 字节时,则 dsMADMScore分数为 0

如果一个 beacon 的通信间隔为 10 分钟,抖动(jitter)为 20%,其 MADM 会超过 30s

这种情况下算出的 tsMadmScore 为 0,进而会降低总得分,导致漏报

最后,感谢 Mehmet Ergene 的分享,这篇文章受其启发而来,如有纰漏之处,欢迎私信交流

最后更新于