自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

基于GMM的一維時序數(shù)據(jù)平滑算法

開發(fā) 前端
本文將介紹我們使用高斯混合模型(GMM)算法作為一維數(shù)據(jù)的平滑和去噪算法。

本文將介紹我們使用高斯混合模型(GMM)算法作為一維數(shù)據(jù)的平滑和去噪算法。

假設我們想要在音頻記錄中檢測一個特定的人的聲音,并獲得每個聲音片段的時間邊界。例如,給定一小時的流,管道預測前10分鐘是前景(我們感興趣的人說話),然后接下來的20分鐘是背景(其他人或沒有人說話),然后接下來的20分鐘是前景段,最后10分鐘屬于背景段。

有一種方法是預測每個語音段的邊界,然后對語音段進行分類。但是如果我們錯過了一個片段,那么這個錯誤將會使整個片段產(chǎn)生錯誤。想要解決這題我們可以使用GMM smooth,音頻檢測器生成時間范圍片段和每個片段的標簽。GMM smooth的輸入數(shù)據(jù)是這些段,它可以幫助我們來降低最終預測中的噪聲。

高斯混合模型

在深入GMM之前,必須首先了解高斯分布。高斯分布是一種概率分布,由兩個參數(shù)定義:平均值(或期望)和標準差(STD)。在統(tǒng)計學中,平均值是指數(shù)據(jù)集的平均值,而標準偏差(STD)衡量數(shù)據(jù)的變化或分散程度。STD表示每個數(shù)據(jù)點與平均值之間的距離,在高斯分布中,大約68%的數(shù)據(jù)落在平均值的一個STD內(nèi)。

GMM是一種參數(shù)概率模型。它假設在給定的一組數(shù)據(jù)點中,每一個單點都是由一個高斯分布產(chǎn)生的,給定一組K個高斯分布[7]。GMM的目標是從K個分布中為每個數(shù)據(jù)點分配一個高斯分布。換句話說,GMM解決的任務是聚類任務或無監(jiān)督任務。

GMMs通常用作生物識別系統(tǒng)中連續(xù)測量或特征的概率分布的參數(shù)模型,例如說話人識別系統(tǒng)中與聲道相關(guān)的頻譜特征。使用迭代期望最大化(EM)算法或來自訓練良好的先驗模型的最大后驗(MAP)估計從訓練數(shù)據(jù)中估計GMM參數(shù)[8]。

基于 GMM 的平滑器

我們的目標是解決時間概念定位問題,比如輸出如下所示:[[StartTime1, EndTime1, Class1], [StartTime2, EndTime2, Class2], …]。 如果我們想直觀地展示一下,可以像下圖這樣:

但是因為誤差而產(chǎn)生很大的噪聲,如下所示:

我們的目標只是減少噪聲(并使用本文后面描述的方法測量噪聲)??梢钥吹奖尘邦A測更常見(橙色),也就是說我們正在尋找的說話者的“標記”音頻片段更頻繁地被預測為“其他說話者”或“沒有說話”。

可以看到噪聲預測與真實預測相比具有較小的長度,所以可以得出結(jié)論,噪聲預測是可以與真實預測分離的。我們將預測的長度建模為高斯分布的混合,使用GMM作為噪聲平滑算法來解決這個問題。

代碼和解釋

完整的代碼可以在下面的代碼塊中看到:

from copy import deepcopy
 import numpy as np
 from matplotlib import pyplot as plt
 import pandas as pd
 from sklearn.mixture import GaussianMixture
 import logging
 logger = logging.getLogger()
 logger.setLevel(logging.INFO)
 logger.addHandler(logging.StreamHandler())
 
 class GMMSmoother:
     """
    This class is the main class of the Smoother. It performs a smoothing to joint segments
    """
     def __init__(self, min_samples=10):
         # The minimum number of samples for applying GMM
         self.min_samples = min_samples
         # Logger instance
         self.logger = logger
     def smooth_segments_gmm(self, segments, gmm_segment_class='background', bg_segment_class='foreground'):
         """
        This method performs the smoothing using Gaussian Mixture Model (GMM) (for more information about GMM
        please visit: https://scikit-learn.org/stable/modules/mixture.html). It calculates two GMMs: first with one
        gaussian component and the second with two components. Then, it selects the best model using AIC, and BIC metrics.
        After we choose the best model, we perform a clustering of tew clusters: real or fake
        Please note that the GMMs don't use the first and last segments because in our case
        the stream's time limit is an hour and we don't have complete statistics on
        the lengths of the first and last segments.
        :param segments: a list of dictionaries, each dict represents a segment
        :param gmm_segment_class: the segment class of the "reals"
        :param bg_segment_class: the segment class of the "fakes"
        :return:
        segments_copy: the smoothed version of segments
        """
         self.logger.info("Begin smoothing using Gaussian Mixture Model (GMM)")
         # Some instancing
         preds_map = {0: bg_segment_class, 1: gmm_segment_class}
         gmms_results_dict = {}
         # Copy segments to a new variable
         segments_copy = deepcopy(segments)
         self.logger.info("Create input data for GMM")
         # Keep the gmm_segment_class data points and perform GMM on them.
         # For example: gmm_segment_class = 'background'
         segments_filtered = {i: s for i, s in enumerate(segments_copy) if
                           s['segment'] == gmm_segment_class and (i > 0 and i < len(segments_copy) - 1)}
         # Calcualte the length of each segment
         X = np.array([[(s['endTime'] - s['startTime']).total_seconds()] for _, s in segments_filtered.items()])
         # Check if the length of data points is less than the minimum.
         # If it is, do not apply GMM!
         if len(X) <= self.min_samples:
             self.logger.warning(f"Size of input ({len(X)} smaller than min simples ({self.min_samples}). Do not perform smoothing.)")
             return segments
         # Go over 1 and 2 components and calculate statistics
         best_fitting_score = np.Inf
         self.logger.info("Begin to fit GMMs with 1 and 2 components.")
         for i in [1, 2]:
             # For each number of component (1 or 2), fit GMM
             gmm = GaussianMixture(n_components=i, random_state=0, tol=10 ** -6).fit(X)
             # Calculate AIC and BIC and the average between them
             aic, bic = gmm.aic(X), gmm.bic(X)
             fitting_score = (aic + bic) / 2
             # If the average is less than the best score, replace them
             if fitting_score < best_fitting_score:
                 best_model = gmm
                 best_fitting_score = fitting_score
             gmms_results_dict[i] = {"model": gmm, "fitting_score": fitting_score, "aic": aic, "bic": bic}
         self.logger.info(f"GMM with {best_model.n_components} components was selected")
         # If the number of components is 1, change the label to the points that
         # have distance from the mean that is bigger than 2*STD
         if best_model.n_components == 1:
             mean = best_model.means_[0, 0]
             std = np.sqrt(best_model.covariances_[0, 0])
             model_preds = [0 if x < mean - 2 * std else 1 for x in range(len(X))]
         # If the number of components is 2, assign a label to each data point,
         # and replace the label to the points that assigned to the low mean Gaussian
         else:
             if np.linalg.norm(best_model.means_[0]) > np.linalg.norm(best_model.means_[1]):
                 preds_map = {1: bg_segment_class, 0: gmm_segment_class}
             model_preds = best_model.predict(X)
         self.logger.info("Replace previous predictions with GMM predictions")
         # Perform smoothing
         for i, (k, s) in enumerate(segments_filtered.items()):
             if s['segment'] != preds_map[model_preds[i]]:
                 s['segment'] = preds_map[model_preds[i]]
                 segments_copy[k] = s
         self.logger.info("Merge segments")
         # Join consecutive segments after the processing
         segments_copy = join_consecutive_segments(segments_copy)
         return segments_copy
     @staticmethod
     def plot_bars(res_dict_objs, color_dict={"foreground": "#DADDFC", "background": '#FC997C', "null": "#808080"}, channel="",
                   start_time="", end_time="", snrs=None, titles=['orig', 'smoothed'],
                   save=False, save_path="", show=True):
         """
        Inspired by https://stackoverflow.com/questions/70142098/stacked-horizontal-bar-showing-datetime-areas
        This function is for visualizing the smoothing results
        of multiple segments' lists
        :param res_dict_objs: a list of lists. Each list is a segments list to plot
        :param color_dict: dictionary which represents the mapping between class to color in the plot
        :param channel: channel number
        :param start_time: absolute start time
        :param end_time: absolute end time
        :param snrs: list of snrs to display in the title
        :param titles: title to each subplot
        :param save: flag to save the figure into a png file
        :param save_path: save path of the figure
        :param show: flag to show the figure
        """
         if snrs == None:
             snrs = [''] * len(res_dict_objs)
         if type(res_dict_objs) != list:
             res_dict_objs = [res_dict_objs]
         fig, ax = plt.subplots(len(res_dict_objs), 1, figsize=(20, 10))
         fig.suptitle(f"Channel {channel}, {start_time}-{end_time}\n{snrs[0]}\n{snrs[1]}")
         for dict_idx, res_dict in enumerate(res_dict_objs):
             date_from = [a['startTime'] for a in res_dict]
             date_to = [a['endTime'] for a in res_dict]
             segment = [a['segment'] for a in res_dict]
             df = pd.DataFrame({'date_from': date_from, 'date_to': date_to,
                                'segment': segment})
             for i in range(df.shape[0]):
                 ax[dict_idx].plot([df['date_from'][i], df['date_to'][i]], [1, 1],
                                   linewidth=50, c=color_dict[df['segment'][i]])
             ax[dict_idx].set_yticks([])
             ax[dict_idx].set_yticklabels([])
             ax[dict_idx].set(frame_on=False)
             ax[dict_idx].title.set_text(titles[dict_idx])
         if show:
             plt.show()
         if save:
             plt.savefig(save_path)
 def join_consecutive_segments(seg_list):
     """
    This function is merged consecutive segments if they
    have the same segment class and create one segment. It also changes the
    start and the end times with respect to the joined segments
    :param seg_list: a list of dictionaries. Each dict represents a segment
    :return: joined_segments: a list of dictionaries, where the segments are merged
    """
     joined_segments = list()
     init_seg = {
         'startTime': seg_list[0]['startTime'],
         'endTime': seg_list[0]['endTime'],
         'segment': seg_list[0]['segment']
    }
     collector = init_seg
     last_segment = init_seg
     last_segment = last_segment['segment']
     for seg in seg_list:
         segment = seg['segment']
         start_dt = seg['startTime']
         end_dt = seg['endTime']
         prefiltered_type = segment
         if prefiltered_type == last_segment:
             collector['endTime'] = end_dt
         else:
             joined_segments.append(collector)
             init_seg = {
                 'startTime': start_dt,
                 'endTime': end_dt,
                 'segment': prefiltered_type
            }
             collector = init_seg
             last_segment = init_seg
             last_segment = last_segment['segment']
     joined_segments.append(collector)
     return joined_segments
 def main(seg_list):
     # Create GMMSmoother instance
     gmm_smoother = GMMSmoother()
     # Join consecutive segments that have the same segment label
     seg_list_joined = join_consecutive_segments(seg_list)
     # Perform smoothing on background class
     smoothed_segs_tmp = gmm_smoother.smooth_segments_gmm(seg_list_joined)
     # Perform smoothing on foreground class
     smoothed_segs_final = gmm_smoother.smooth_segments_gmm(smoothed_segs_tmp, gmm_segment_class='foreground', bg_segment_class='background') if len(
         smoothed_segs_tmp) != len(seg_list_joined) else smoothed_segs_tmp
     return smoothed_segs_final
 if __name__ == "__main__":
     # The read_data_func should be implemented by the user,
     # depending on his needs.
     seg_list = read_data_func()
     res = main(seg_list)

下面我們解釋關(guān)鍵塊以及如何使用GMM來執(zhí)行平滑:

1、輸入數(shù)據(jù)

數(shù)據(jù)結(jié)構(gòu)是一個字典列表。每個字典代表一個段預測,具有以下鍵值對: “startTime”,“endTime”和“segment”。下面是一個例子:

{"startTime": ISODate("%Y-%m-%dT%H:%M:%S%z"), "endTime": ISODate("%Y-%m-%dT%H:%M:%S%z"), "segment": "background/foreground"}

“startTime”和“endTime”是段的時間邊界,“segment”是它的類型。

2、連接連續(xù)段

假設輸入數(shù)據(jù)具有相同標簽的連續(xù)預測(并非所有輸入數(shù)據(jù)都必須需要此階段)。例如:

 # Input segments list
 seg_list = [{"startTime": ISODate("2022-11-19T00:00:00Z"), "endTime": ISODate("2022-11-19T01:00:00Z"), "segment": "background"},
 {"startTime": ISODate("2022-11-19T01:00:00Z"), "endTime": ISODate("2022-11-19T02:00:00Z"), "segment": "background"}]
 # Apply join_consecutive_segments on seg_list to join consecutive segments
 seg_list_joined = join_consecutive_segments(seg_list)
 # After applying the function, the new list should look like the following:
 # seg_list_joined = [{"startTime": ISODate("2022-11-19T00:00:00Z"), "endTime": ISODate("2022-11-19T02:00:00Z"), "segment": "background"}]

使用的join_consecutive_segments的代碼如下:

 def join_consecutive_segments(seg_list):
     """
    This function is merged consecutive segments if they
    have the same segment class and create one segment. It also changes the
    start and the end times with respect to the joined segments
    :param seg_list: a list of dictionaries. Each dict represents a segment
    :return: joined_segments: a list of dictionaries, where the segments are merged
    """
     joined_segments = list()
 
     init_seg = {
             'startTime': seg_list[0]['startTime'],
             'endTime': seg_list[0]['endTime'],
             'segment': seg_list[0]['segment']
        }
         collector = init_seg
         last_segment = init_seg
         last_segment = last_segment['segment']
         for seg in seg_list:
             segment = seg['segment']
             start_dt = seg['startTime']
             end_dt = seg['endTime']
             prefiltered_type = segment
             if prefiltered_type == last_segment:
                 collector['endTime'] = end_dt
             else:
                 joined_segments.append(collector)
                 init_seg = {
                     'startTime': start_dt,
                     'endTime': end_dt,
                     'segment': prefiltered_type
                }
                 collector = init_seg
                 last_segment = init_seg
                 last_segment = last_segment['segment']
         joined_segments.append(collector)
         return joined_segments

join_consecutive_segments將兩個或多個具有相同預測的連續(xù)片段連接為一個片段。

3、刪除當前迭代的不相關(guān)片段

我們的預測有更多的噪聲,所以首先需要對它們進行平滑處理。從數(shù)據(jù)中過濾掉前景部分:

 # Copy segments to a new variable
 segments_copy = deepcopy(segments)
 # Keep the gmm_segment_class data points and perform GMM on them.
 # For example: gmm_segment_class = 'background'
 segments_filtered = {i: s for i, s in enumerate(segments_copy) if s['segment'] == gmm_segment_class and (i > 0 and i < len(segments_copy) - 1)}

4、計算段的長度

以秒為單位計算所有段的長度。

 # Calcualte the length of each segment
 X = np.array([[(s['endTime'] - s['startTime']).total_seconds()] for _, s in segments_filtered.items()])

5、GMM

僅獲取背景片段的長度并將 GMM 應用于長度數(shù)據(jù)。 如果有足夠的數(shù)據(jù)點(預定義數(shù)量——超參數(shù)),我們這里使用兩個GMM:一個分量模型和兩個分量模型。 然后使用貝葉斯信息準則 (BIC) 和 Akaike 信息準則 (AIC) 之間的平均值來選擇最適合的 GMM。

 # Check if the length of data points is less than the minimum.
 # If it is, do not apply GMM!
 if len(X) <= self.min_samples:
     self.logger.warning(f"Size of input ({len(X)} smaller than min simples ({self.min_samples}). Do not perform smoothing.)")
     return segments
 # Go over 1 and 2 number of components and calculate statistics
 best_fitting_score = np.Inf
 self.logger.info("Begin to fit GMMs with 1 and 2 components.")
 for i in range(1, 3):
     # For each number of component (1 or 2), fit GMM
     gmm = GaussianMixture(n_components=i, random_state=0, tol=10 ** -6).fit(X)
     # Calculate AIC and BIC and the average between them
     aic, bic = gmm.aic(X), gmm.bic(X)
     fitting_score = (aic + bic) / 2
     # If the average is less than the best score, replace them
     if fitting_score < best_fitting_score:
         best_model = gmm
         best_fitting_score = fitting_score
     gmms_results_dict[i] = {"model": gmm, "fitting_score": fitting_score, "aic": aic, "bic": bic}

6、選擇最佳模型并進行平滑

如果選擇了一個分量:將距離均值大于 2-STD 的數(shù)據(jù)點標記為前景,其余數(shù)據(jù)點保留為背景點。

如果選擇了兩個分量:將分配給低均值高斯的點標記為前景,將高均值高斯標記為背景。

# If the number of components is 1, change the label to the points that
 # have distance from the mean that is bigger than 2*STD
 if best_model.n_components == 1:
     mean = best_model.means_[0, 0]
     std = np.sqrt(best_model.covariances_[0, 0])
     model_preds = [0 if x < mean - 2 * std else 1 for x in range(len(X))]
 # If the number of components is 2, assign a label to each data point,
 # and replace the label to the points that assigned to the low mean Gaussian
 else:
     if np.linalg.norm(best_model.means_[0]) > np.linalg.norm(best_model.means_[1]):
         preds_map = {1: bg_segment_class, 0: gmm_segment_class}
     model_preds = best_model.predict(X)
 self.logger.info("Replace previous predictions with GMM predictions")
 # Perform smoothing
 for i, (k, s) in enumerate(segments_filtered.items()):
     if s['segment'] != preds_map[model_preds[i]]:
         s['segment'] = preds_map[model_preds[i]]
         segments_copy[k] = s
 self.logger.info("Merge segments")

7、后處理

再次連接連續(xù)的片段產(chǎn)生并返回最終結(jié)果。

 # Join consecutive segments after the processing
 segments_copy = join_consecutive_segments(segments_copy)

8、重復這個過程

這是一個迭代的過程我們可以重復這個過程幾次,來找到最佳結(jié)果

9、可視化

使用下面方法可以可視化我們的中間和最終的結(jié)果,并方便調(diào)試

def plot_bars(res_dict_objs, color_dict={"foreground": "#DADDFC", "background": '#FC997C', "null": "#808080"}, channel="",
               start_time="", end_time="", snrs=None, titles=['orig', 'smoothed'],
               save=False, save_path="", show=True):
     """
    This function is for visualizing the smoothing results of multiple segments lists
    :param res_dict_objs: a list of lists. Each list is a segments list to plot
    :param color_dict: dictionary which represents the mapping between class to color in the plot
    :param channel: channel number
    :param start_time: absolute start time
    :param end_time: absolute end time
    :param snrs: list of snrs to display in the title
    :param titles: title to each subplot
    :param save: flag to save the figure into a png file
    :param save_path: save path of the figure
    :param show: flag to show the figure
    """
     if snrs == None:
         snrs = [''] * len(res_dict_objs)
     if type(res_dict_objs) != list:
         res_dict_objs = [res_dict_objs]
     fig, ax = plt.subplots(len(res_dict_objs), 1, figsize=(20, 10))
     fig.suptitle(f"Channel {channel}, {start_time}-{end_time}\n{snrs[0]}\n{snrs[1]}")
     for dict_idx, res_dict in enumerate(res_dict_objs):
         date_from = [a['startTime'] for a in res_dict]
         date_to = [a['endTime'] for a in res_dict]
         segment = [a['segment'] for a in res_dict]
         df = pd.DataFrame({'date_from': date_from, 'date_to': date_to,
                            'segment': segment})
         for i in range(df.shape[0]):
             ax[dict_idx].plot([df['date_from'][i], df['date_to'][i]], [1, 1],
                               linewidth=50, c=color_dict[df['segment'][i]])
         ax[dict_idx].set_yticks([])
         ax[dict_idx].set_yticklabels([])
         ax[dict_idx].set(frame_on=False)
         ax[dict_idx].title.set_text(titles[dict_idx])
     if show:
         plt.show()
     if save:
         plt.savefig(save_path)

可視化結(jié)果如下圖所示:

可以看到,在第一次迭代之后減少了背景類中的噪聲,第二次迭代之后減少了前景類中的噪聲。

結(jié)果展示

下面我們展示平滑算法的一些結(jié)果。并且還測量了信噪比(SNR)[10],得到了一些數(shù)值結(jié)果來評估算法。比較平滑前后,對前景類和背景類進行了兩次信噪比。這里的淡紫色部分代表前景部分,橙色部分代表背景部分。

總結(jié)

在本文中探討GMM作為時間數(shù)據(jù)平滑算法的使用。GMM(Gaussian Mixture Model)是一種統(tǒng)計模型,常用于數(shù)據(jù)聚類和密度估計。雖然它主要用于聚類任務,但也可以在一定程度上用作時間數(shù)據(jù)平滑算法。雖然它并不是專門為此任務設計的,但是對于這種類別相關(guān)的數(shù)據(jù)平滑,GMM在降噪和結(jié)果改善方面表現(xiàn)非常好(信噪比參數(shù))。

責任編輯:華軒 來源: DeepHub IMBA
相關(guān)推薦

2017-11-20 11:37:19

時序數(shù)據(jù)數(shù)據(jù)存儲HBase

2022-07-06 15:41:55

數(shù)據(jù)庫

2021-09-26 10:08:33

TSDB時序數(shù)據(jù)庫壓縮解壓

2022-07-11 10:45:12

數(shù)據(jù)庫分析

2022-09-23 07:44:48

時序數(shù)據(jù)庫物聯(lián)網(wǎng)

2021-03-08 10:18:55

數(shù)據(jù)庫數(shù)據(jù)Prometheus

2021-03-15 10:10:29

數(shù)據(jù)庫數(shù)據(jù)查詢

2022-07-11 11:12:32

數(shù)據(jù)分析

2023-10-09 12:43:01

Python數(shù)據(jù)信息

2020-03-11 09:50:21

時序數(shù)據(jù)庫快速檢索

2021-08-04 05:49:40

數(shù)據(jù)庫數(shù)時序數(shù)據(jù)庫技術(shù)

2022-12-18 19:38:31

時序數(shù)據(jù)庫數(shù)據(jù)庫

2019-09-17 17:21:42

數(shù)據(jù)庫ElasticSear跳槽那些事兒

2023-09-24 23:22:23

2021-02-22 10:37:47

存儲Prometheus

2021-08-31 14:01:59

時序數(shù)據(jù)庫數(shù)據(jù)庫數(shù)據(jù)

2021-03-01 10:20:52

存儲

2022-07-07 12:23:29

數(shù)據(jù)庫

2024-01-03 14:44:00

2022-07-07 12:37:27

數(shù)據(jù)
點贊
收藏

51CTO技術(shù)棧公眾號