自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

Embedding空間中的時(shí)序異常檢測,你學(xué)會了嗎?

開發(fā) 前端
通過實(shí)驗(yàn),驗(yàn)證了該算法的有效性,但在后續(xù)的工程化應(yīng)用中,還需要結(jié)合具體的應(yīng)用場景進(jìn)行適當(dāng)?shù)恼{(diào)整。比如采樣點(diǎn)的數(shù)量、采樣點(diǎn)的選取方法、樣本Embedding方法、距離計(jì)算方法等。

1.背景

在安全、反作弊等業(yè)務(wù)場景下,對流量、用戶行為進(jìn)行異常檢測是基本的剛需。通常的做法是,在各個(gè)業(yè)務(wù)維度上,對流量、用戶行為進(jìn)行統(tǒng)計(jì)分析,提取出相應(yīng)的指標(biāo)特征,然后在時(shí)間維度上,對這些指標(biāo)特征進(jìn)行建模分析。再利用相關(guān)的算法來檢測當(dāng)前的指標(biāo)值是否背離了該指標(biāo)在歷史數(shù)據(jù)中的分布規(guī)律。

2.示例

假設(shè)某業(yè)務(wù)場景下,用戶有100個(gè)來源渠道,用戶使用產(chǎn)品時(shí),有10種不同的操作方式,對于用戶的行為,我們可以簡單的撮取出PV、UV、失敗率等指標(biāo)。那么我們可以建立這樣一個(gè)監(jiān)控:

監(jiān)控的維度:來源渠道 * 操作方式 = 100 * 10 = 1000個(gè)維度

監(jiān)控的指標(biāo):PV、UV、失敗率...

統(tǒng)計(jì)周期: 小時(shí)

然后針對每個(gè)維度、時(shí)刻、指標(biāo),收集過去30天的數(shù)據(jù)做為訓(xùn)練樣本,訓(xùn)練異常檢測模型(如EllipticEnvelope等),然后對當(dāng)前時(shí)刻的指標(biāo)值,進(jìn)行異常檢測。

上面的方法,通過合理的拆分監(jiān)控維度,一方面可以有效的提高檢測的靈敏度,避免較少的異常流量淹沒在大盤監(jiān)控在隨機(jī)波動中;另一方面,也可以對異常流量進(jìn)行快速的定位,便于及時(shí)處理。

3.問題

上面的方法也存在諸多的限制,比如:

  • 監(jiān)控維度必需是離散、可枚舉的,否則無法建立歷史數(shù)據(jù)的統(tǒng)計(jì)模型;
  • 監(jiān)控維度的粒度必須合適,否則或是靈敏度不足,或是噪聲太多,無法有效檢測異常。

顯然,不是所有的業(yè)務(wù)場景都能滿足上述的要求。即便是能滿足上述要求的業(yè)務(wù)場景中,隨著對攻擊者的對抗不斷深入,攻擊者會嘗試降低攻擊的規(guī)模,并盡量將攻擊行為分散到更多的維度中,從而躲避我們的檢測手段。

4.解決思路

那么,能否不依賴業(yè)務(wù)維度拆分,直接對指標(biāo)進(jìn)行異常檢測呢?

首先,我們需要把待檢測的每一條日志、數(shù)據(jù)當(dāng)做一個(gè)獨(dú)立的樣本。接下來,不難聯(lián)想到,這些樣本都可以映射到某個(gè)高維空間中,我們把這個(gè)空間叫做樣本空間??梢酝ㄟ^向量化、Embedding等方法,得到樣本在這個(gè)空間中的坐標(biāo)。

樣本在這個(gè)空間中的分布必然不是完全隨機(jī)的,而是會存在一定的特點(diǎn)(分布特征)。若當(dāng)前時(shí)刻樣本在這個(gè)空間中的分布特征與歷史數(shù)據(jù)中的分布特征不一致,則說明當(dāng)前樣本存在異常。而分布在差異最大的區(qū)域中的樣本,則可以認(rèn)為是異常樣本。

接下來的問題就變成了如何對這種分布特征進(jìn)行建模?

最先想到的是,我們可以通過聚類算法,來對樣本進(jìn)行劃分,再對每個(gè)Cluster,提取出統(tǒng)計(jì)特征。但在具體實(shí)現(xiàn)時(shí)還需要考慮以下問題:

  • 支持的樣本數(shù)量要足夠多;
  • 支持的Cluster數(shù)量要足夠多;
  • 每個(gè)Cluster的樣本數(shù)量要盡可能均勻;
  • Cluster的劃分要盡可能穩(wěn)定,才能在時(shí)間維度上執(zhí)行異常檢測。

再進(jìn)一步,其實(shí)我們不需要執(zhí)行完整的聚類算法,我們只需要對樣本空間設(shè)置足夠多的采樣點(diǎn)進(jìn)行采樣,計(jì)算出采樣點(diǎn)附近的樣本的統(tǒng)計(jì)特征做為采集采樣點(diǎn)的分布特征,再對采樣點(diǎn)的特征進(jìn)行時(shí)間維度的異常檢測,即可完成對整個(gè)樣本空間的異常檢測了。

圖片圖片

5.算法實(shí)驗(yàn)

5.1 數(shù)據(jù)準(zhǔn)備

取某業(yè)務(wù)場景下近30天的用戶行為日志,約160萬條,利用其中的UserAgent信息,對其進(jìn)行向量化處理。每條日志的向量長度為128維。

向量化算法:

def to_vector(ua):
    if isinstance(ua, (list, tuple)):
        return [to_vector(c) for c in ua]
    else:
        vec = np.zeros(128)
        for c in ua:
            vec[ord(c) % 128] += 1  # UserAgent中的字符絕大多數(shù)都是Ascll字符,所以取余128
        l2 = np.sqrt(np.sum(vec * vec))
        if l2 != 0:
            vec /= l2
        return vec.tolist()

將清洗好的數(shù)據(jù)保存到向量DB中備用:

for day in days:
    for hour in hours:
        event_day = day.strftime("%Y%m%d")
        event_hour = "{:02d}".format(hour)
        collection = chroma_client.get_or_create_collection(
            name="{}_{}_{}".format(name_prefix, event_day, event_hour)
        )
        sub_df = df_ua_pv[(df_ua_pv.event_day == event_day) & (df_ua_pv.event_hour == event_hour)]
        ids = [hashlib.md5(bytes(str(row), "utf-8")).hexdigest() for _, row in sub_df.iterrows()]
        docs = [row.ua for _, row in sub_df.iterrows()]
        metadatas = [{"pv": row.pv} for _, row in sub_df.iterrows()]
        embeddings = [to_vector(row.ua) for _, row in sub_df.iterrows()]
        batch_size = 10000
        for batch_id in range(0, len(docs), batch_size):
            collection.upsert(
                ids=ids[batch_id : batch_id + batch_size],
                documents=docs[batch_id : batch_id + batch_size],
                metadatas=metadatas[batch_id : batch_id + batch_size],
                embeddings=embeddings[batch_id : batch_id + batch_size],
            )
            print("{:>8d} / {}".format(batch_id + batch_size, len(docs)))
        collections[event_day + event_hour] = collection

為了更方便的驗(yàn)證算法的有效性,在數(shù)據(jù)集中,人工構(gòu)造了一些異常樣本,包括:

  • 個(gè)別隨機(jī)UA,PV增長:10%, 20%, 50%, 100%, 200%, 500%,1000%;數(shù)量:5;min_pv=100。
  • 部分相似UA,PV增長:5%,10%,20%, 50%, 100%;數(shù)量:10, 20, 50, 100;min_pv=10。
  • 生成相似UA,PV同比增長,數(shù)量:10, 20, 50, 100。
  • 生成相似UA,整體PV不增長,數(shù)量:10, 20, 50, 100;min_pv=1。

5.2 算法實(shí)現(xiàn)

隨機(jī)生成采樣點(diǎn):

query_ua_list = (
    df_ua_pv[(df_ua_pv.event_day == event_day) & (df_ua_pv.event_hour == event_hour)].sample(100)["ua"].to_list()
)

在樣本空間進(jìn)行鄰近采樣:

results = []
query_ua_vec = to_vector(query_ua_list)
for day in days:
    for hour in hours:
        res = get_collection(day, hour).query(query_embeddings=query_ua_vec, n_results=n_results)
        for i in range(len(query_ua_list)):
            for j in range(n_results):
                row = [
                    query_ua_list[i],
                    res["metadatas"][i][j]["event_day"],
                    res["metadatas"][i][j]["event_hour"],
                    res["documents"][i][j],
                    res["metadatas"][i][j]["pv"],
                    res["distances"][i][j],
                ]
                if extra_fields:
                    for field in extra_fields:
                        row.append(res["metadatas"][i][j].get(field))
                results.append(row)
cols = ["ua", "day", "hour", "doc", "pv", "dist"]
if extra_fields:
    cols += extra_fields
df_results = pd.DataFrame(results, columns=cols)

定義要檢測的字段:

AREA_EXP = [0, 2, 8]
MODEL_FIELDS = ["pv", "dist"]
MODEL_FIELDS += [f"dens_{i}" for i in AREA_EXP]
MODEL_FIELDS += ["dens_s"]
MODEL_AGGS = {}
for col in MODEL_FIELDS:
    MODEL_AGGS[f"{col}_mean"] = (col, "mean")
    MODEL_AGGS[f"{col}_std"] = (col, "std")

進(jìn)行天維度的異常檢測:

df_query_results["dens_s"] = 1 / (df_query_results["dist"] ** 0.5 + 1)
df_res_agg = df_query_results.groupby(["ua", "day"], as_index=False).agg(
    pv=("pv", "sum"),
    dist=("dist", "mean"),
    dens_s=("dens_s", "mean"),
)
for i in AREA_EXP:
    df_res_agg["area_{}".format(i)] = (df_res_agg["dist"] * 10) ** i
    df_res_agg["dens_{}".format(i)] = df_res_agg["pv"] / df_res_agg["area_{}".format(i)]
df_model = df_res_agg[df_res_agg.day <= last_event_day].groupby("ua").agg(**MODEL_AGGS)
df_check = df_res_agg.join(df_model, notallow="ua")
for col in MODEL_FIELDS:
    df_check[f"{col}_sigma"] = (df_check[col] - df_check[f"{col}_mean"]) / df_check[f"{col}_std"]
df_check["dens_avg_sigma"] = df_check[["dens_s_sigma"] + [f"dens_{i}_sigma" for i in AREA_EXP]].mean(axis=1)
df_check["dens_max_sigma"] = df_check[["dens_s_sigma"] + [f"dens_{i}_sigma" for i in AREA_EXP]].max(axis=1)
df_check["dens_min_sigma"] = df_check[["dens_s_sigma"] + [f"dens_{i}_sigma" for i in AREA_EXP]].min(axis=1)

6.實(shí)驗(yàn)效果

6.1 實(shí)驗(yàn)一

個(gè)別隨機(jī)UA,PV增長:10%, 20%, 50%, 100%, 200%, 500%,1000%;數(shù)量:5;min_pv=100。

異常樣本與原始樣本的異常置信度分布對比如下圖,由上到下分別為:

  • 天級檢測下異常樣本的置信度分布;
  • 天級檢測下正常樣本的置信度分布;
  • 小時(shí)級檢測下異常樣本的置信度分布;
  • 小時(shí)級檢測下正常樣本的置信度分布。

圖片圖片

天級檢測不同閾值下的準(zhǔn)召情況:

圖片圖片

小時(shí)級檢測不同閾值下的準(zhǔn)召情況:

圖片圖片

6.2 實(shí)驗(yàn)二

部分相似UA,PV增長:5%,10%,20%, 50%, 100%;數(shù)量:5, 10, 20;  min_pv=10。

異常樣本與原始樣本的異常置信度分布對比如下圖,由上到下分別為:

  • 天級檢測下異常樣本的置信度分布;
  • 天級檢測下正常樣本的置信度分布;
  • 小時(shí)級檢測下異常樣本的置信度分布;
  • 小時(shí)級檢測下正常樣本的置信度分布。

圖片圖片

天級檢測不同閾值下的準(zhǔn)召情況:

圖片圖片

小時(shí)級檢測不同閾值下的準(zhǔn)召情況:

圖片圖片

6.3 實(shí)驗(yàn)三

生成相似UA,PV同比增長,數(shù)量:5, 10, 20, 50, 100。

異常樣本與原始樣本的異常置信度分布對比如下圖,由上到下分別為:

  • 天級檢測下異常樣本的置信度分布;
  • 天級檢測下正常樣本的置信度分布;
  • 小時(shí)級檢測下異常樣本的置信度分布;
  • 小時(shí)級檢測下正常樣本的置信度分布。

圖片圖片

天級檢測不同閾值下的準(zhǔn)召情況:

圖片圖片

小時(shí)級檢測不同閾值下的準(zhǔn)召情況:

圖片圖片

6.4 實(shí)驗(yàn)四

生成相似UA,整體PV不增長,數(shù)量:10, 20, 50, 100;min_pv=1。

異常樣本與原始樣本的異常置信度分布對比如下圖,由上到下分別為:

  • 天級檢測下異常樣本的置信度分布;
  • 天級檢測下正常樣本的置信度分布;
  • 小時(shí)級檢測下異常樣本的置信度分布;
  • 小時(shí)級檢測下正常樣本的置信度分布。

圖片圖片

天級檢測不同閾值下的準(zhǔn)召情況:

圖片圖片

小時(shí)級檢測不同閾值下的準(zhǔn)召情況:

圖片圖片

7.總結(jié)與展望

通過實(shí)驗(yàn),驗(yàn)證了該算法的有效性,但在后續(xù)的工程化應(yīng)用中,還需要結(jié)合具體的應(yīng)用場景進(jìn)行適當(dāng)?shù)恼{(diào)整。比如采樣點(diǎn)的數(shù)量、采樣點(diǎn)的選取方法、樣本Embedding方法、距離計(jì)算方法等。

此外,在實(shí)踐中,若要發(fā)揮出異常檢測的真正價(jià)值,還需要考慮以下問題:

  • 檢測到異常后,如何快速定位到異常樣本;
  • 異常樣本定位后,如何快速度評估分析,確定異常是否需要進(jìn)一步處理;
  • 若需要進(jìn)一步處理,如何快速定位到異常樣本來源特征,制定出相應(yīng)的攻防策略等。
責(zé)任編輯:武曉燕 來源: 百度Geek說
相關(guān)推薦

2024-02-29 13:12:30

2024-09-27 19:39:27

2022-07-08 09:27:48

CSSIFC模型

2024-01-29 08:21:59

AndroidOpenCV車牌

2024-02-02 11:03:11

React數(shù)據(jù)Ref

2023-01-10 08:43:15

定義DDD架構(gòu)

2024-02-04 00:00:00

Effect數(shù)據(jù)組件

2023-07-26 13:11:21

ChatGPT平臺工具

2024-01-19 08:25:38

死鎖Java通信

2024-01-02 12:05:26

Java并發(fā)編程

2023-08-01 12:51:18

WebGPT機(jī)器學(xué)習(xí)模型

2022-12-06 08:37:43

2024-05-29 07:47:30

SpringJava@Resource

2024-05-06 00:00:00

InnoDBView隔離

2024-08-06 09:47:57

2023-01-30 09:01:54

圖表指南圖形化

2024-07-31 08:39:45

Git命令暫存區(qū)

2023-12-12 08:02:10

2023-10-10 11:04:11

Rust難點(diǎn)內(nèi)存

2022-07-13 08:16:49

RocketMQRPC日志
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號