C# LiteDB 時(shí)間序列數(shù)據(jù)處理完整指南
作者:iamrick
LiteDB 提供了一個(gè)輕量、靈活的解決方案,用于在 C# 中處理時(shí)間序列數(shù)據(jù)。通過(guò)合理的數(shù)據(jù)模型和查詢(xún)策略,可以高效地存儲(chǔ)和分析時(shí)間序列信息。
時(shí)間序列數(shù)據(jù)是一種特殊的數(shù)據(jù)類(lèi)型,它按照時(shí)間順序記錄和組織數(shù)據(jù)點(diǎn),每個(gè)數(shù)據(jù)點(diǎn)都與特定的時(shí)間戳相關(guān)聯(lián)。這種數(shù)據(jù)形式在現(xiàn)代數(shù)據(jù)分析中扮演著極其重要的角色。
主要特征:
- 時(shí)間維度:每個(gè)數(shù)據(jù)點(diǎn)都有明確的時(shí)間標(biāo)記
- 順序性:數(shù)據(jù)按時(shí)間先后順序排列
- 連續(xù)性:通常以固定或變化的時(shí)間間隔收集
- 趨勢(shì)性:可能展現(xiàn)出特定的模式、周期或趨勢(shì)
應(yīng)用場(chǎng)景舉例:
- 工業(yè)領(lǐng)域:設(shè)備傳感器實(shí)時(shí)監(jiān)測(cè)數(shù)據(jù),如溫度、壓力、振動(dòng)等
- 金融市場(chǎng):股票價(jià)格、匯率、交易量的每日記錄
- 環(huán)境監(jiān)測(cè):氣溫、濕度、空氣質(zhì)量等定期采樣數(shù)據(jù)
- 互聯(lián)網(wǎng)運(yùn)維:服務(wù)器性能指標(biāo)、網(wǎng)絡(luò)流量、用戶訪問(wèn)量等
- 物聯(lián)網(wǎng)設(shè)備:智能家居設(shè)備的使用數(shù)據(jù)、能源消耗記錄
Nuget 安裝LiteDB包
圖片
數(shù)據(jù)模型設(shè)計(jì)
時(shí)間序列數(shù)據(jù)模型
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using LiteDB;
namespace App07
{
/// <summary>
/// 時(shí)間序列數(shù)據(jù)點(diǎn)模型
/// </summary>
public class TimeSeriesDataPoint
{
/// <summary>
/// 唯一標(biāo)識(shí)符
/// </summary>
[BsonId]
public ObjectId Id { get; set; }
/// <summary>
/// 數(shù)據(jù)記錄時(shí)間戳
/// </summary>
public DateTime Timestamp { get; set; }
/// <summary>
/// 數(shù)據(jù)源或傳感器標(biāo)識(shí)
/// </summary>
public string SourceId { get; set; }
/// <summary>
/// 數(shù)值類(lèi)型數(shù)據(jù)
/// </summary>
public double Value { get; set; }
/// <summary>
/// 可選的額外元數(shù)據(jù)
/// </summary>
public Dictionary<string, object> Metadata { get; set; }
}
}
LiteDB 時(shí)間序列數(shù)據(jù)操作
數(shù)據(jù)插入
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using LiteDB;
namespace App07
{
public class TimeSeriesRepository
{
private readonly LiteDatabase _database;
private readonly ILiteCollection<TimeSeriesDataPoint> _collection;
public LiteDatabase Database => _database;
public TimeSeriesRepository(string databasePath)
{
_database = new LiteDatabase(databasePath);
_collection = _database.GetCollection<TimeSeriesDataPoint>("time_series_data");
// 創(chuàng)建時(shí)間戳和源ID的復(fù)合索引
_collection.EnsureIndex(x => x.Timestamp);
_collection.EnsureIndex(x => x.SourceId);
}
/// <summary>
/// 插入單個(gè)數(shù)據(jù)點(diǎn)
/// </summary>
public void InsertDataPoint(TimeSeriesDataPoint dataPoint)
{
_collection.Insert(dataPoint);
}
/// <summary>
/// 批量插入數(shù)據(jù)點(diǎn)
/// </summary>
public void InsertBatch(IEnumerable<TimeSeriesDataPoint> dataPoints)
{
_collection.InsertBulk(dataPoints);
}
}
}
數(shù)據(jù)查詢(xún)
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using LiteDB;
namespace App07
{
public class TimeSeriesQuery
{
private readonly ILiteCollection<TimeSeriesDataPoint> _collection;
public TimeSeriesQuery(LiteDatabase database)
{
_collection = database.GetCollection<TimeSeriesDataPoint>("time_series_data");
}
/// <summary>
/// 查詢(xún)特定時(shí)間范圍內(nèi)的數(shù)據(jù)
/// </summary>
public IEnumerable<TimeSeriesDataPoint> GetDataInTimeRange(
DateTime startTime,
DateTime endTime,
string sourceId = null)
{
var query = Query.And(
Query.GTE("Timestamp", startTime),
Query.LTE("Timestamp", endTime)
);
if (!string.IsNullOrEmpty(sourceId))
{
query = Query.And(query, Query.EQ("SourceId", sourceId));
}
return _collection.Find(query);
}
/// <summary>
/// 獲取最近的數(shù)據(jù)點(diǎn)
/// </summary>
public TimeSeriesDataPoint GetLatestDataPoint(string sourceId)
{
return _collection
.Find(x => x.SourceId == sourceId)
.OrderByDescending(x => x.Timestamp)
.FirstOrDefault();
}
}
}
數(shù)據(jù)聚合與分析
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using LiteDB;
namespace App07
{
public class TimeSeriesAnalytics
{
private readonly ILiteCollection<TimeSeriesDataPoint> _collection;
public TimeSeriesAnalytics(LiteDatabase database)
{
_collection = database.GetCollection<TimeSeriesDataPoint>("time_series_data");
}
/// <summary>
/// 計(jì)算時(shí)間范圍內(nèi)的統(tǒng)計(jì)數(shù)據(jù)
/// </summary>
public (double Min, double Max, double Average)
CalculateStatistics(DateTime startTime, DateTime endTime, string sourceId)
{
var data = _collection.Find(x =>
x.Timestamp >= startTime &&
x.Timestamp <= endTime &&
x.SourceId == sourceId);
return (
data.Min(x => x.Value),
data.Max(x => x.Value),
data.Average(x => x.Value)
);
}
/// <summary>
/// 按時(shí)間間隔聚合數(shù)據(jù)
/// </summary>
public IEnumerable<(DateTime Interval, double AggregatedValue)>
AggregateByInterval(
DateTime startTime,
DateTime endTime,
string sourceId,
TimeSpan interval)
{
return _collection
.Find(x => x.Timestamp >= startTime &&
x.Timestamp <= endTime &&
x.SourceId == sourceId)
.GroupBy(x => Math.Floor((x.Timestamp - startTime).TotalMinutes / interval.TotalMinutes))
.Select(g => (
Interval: startTime.AddMinutes(g.Key * interval.TotalMinutes),
AggregatedValue: g.Average(x => x.Value)
));
}
}
}
使用示例
namespace App07
{
internal class Program
{
static void Main(string[] args)
{
var repository = new TimeSeriesRepository("timeseries.db");
var query = new TimeSeriesQuery(repository.Database);
var analytics = new TimeSeriesAnalytics(repository.Database);
// 創(chuàng)建一個(gè)隨機(jī)數(shù)生成器,用于模擬傳感器數(shù)據(jù)
var random = new Random();
Console.WriteLine("開(kāi)始模擬數(shù)據(jù)寫(xiě)入,按 'Q' 鍵退出...");
// 創(chuàng)建一個(gè)取消令牌源
using var cancellationTokenSource = new CancellationTokenSource();
// 啟動(dòng)異步任務(wù)來(lái)寫(xiě)入數(shù)據(jù)
Task.Run(async () =>
{
while (!cancellationTokenSource.Token.IsCancellationRequested)
{
var dataPoint = new TimeSeriesDataPoint
{
Timestamp = DateTime.Now,
SourceId = "sensor_001",
Value = 20 + random.NextDouble() * 10, // 生成20-30之間的隨機(jī)溫度
Metadata = new Dictionary<string, object>
{
{ "Location", "Main Room" },
{ "Unit", "Celsius" }
}
};
repository.InsertDataPoint(dataPoint);
Console.WriteLine($"寫(xiě)入數(shù)據(jù)點(diǎn) - 時(shí)間: {dataPoint.Timestamp}, 值: {dataPoint.Value:F2}°C");
// 等待1秒
await Task.Delay(1000, cancellationTokenSource.Token);
}
}, cancellationTokenSource.Token);
// 等待用戶按 'Q' 鍵退出
while (Console.ReadKey(true).Key != ConsoleKey.Q)
{
// 繼續(xù)循環(huán)直到按下 'Q' 鍵
}
// 取消數(shù)據(jù)寫(xiě)入任務(wù)
cancellationTokenSource.Cancel();
Console.WriteLine("\n數(shù)據(jù)寫(xiě)入已停止");
// 顯示一些統(tǒng)計(jì)信息
try
{
var lastMinute = DateTime.Now.AddMinutes(-1);
var stats = analytics.CalculateStatistics(
lastMinute,
DateTime.Now,
"sensor_001"
);
Console.WriteLine("\n最近一分鐘的統(tǒng)計(jì)數(shù)據(jù):");
Console.WriteLine($"最小值: {stats.Min:F2}°C");
Console.WriteLine($"最大值: {stats.Max:F2}°C");
Console.WriteLine($"平均值: {stats.Average:F2}°C");
}
catch (Exception ex)
{
Console.WriteLine($"計(jì)算統(tǒng)計(jì)數(shù)據(jù)時(shí)出錯(cuò): {ex.Message}");
}
var result = query.GetLatestDataPoint("sensor_001");
Console.WriteLine($"\n最新數(shù)據(jù)點(diǎn): 時(shí)間: {result.Timestamp}, 值: {result.Value:F2}°C");
Console.WriteLine("\n按任意鍵退出...");
Console.ReadKey();
}
}
}
圖片
圖片
注意事項(xiàng)
- LiteDB 適合中小型時(shí)間序列數(shù)據(jù)
- 對(duì)于海量數(shù)據(jù),考慮使用專(zhuān)業(yè)的時(shí)序數(shù)據(jù)庫(kù)
- 定期備份數(shù)據(jù)庫(kù)文件
- 注意內(nèi)存使用和查詢(xún)性能
- 使用復(fù)合索引
- 定期歸檔和清理舊數(shù)據(jù)
- 批量插入數(shù)據(jù)
- 選擇合適的數(shù)據(jù)粒度
- 考慮數(shù)據(jù)壓縮和分區(qū)策略
結(jié)論
LiteDB 提供了一個(gè)輕量、靈活的解決方案,用于在 C# 中處理時(shí)間序列數(shù)據(jù)。通過(guò)合理的數(shù)據(jù)模型和查詢(xún)策略,可以高效地存儲(chǔ)和分析時(shí)間序列信息。
責(zé)任編輯:武曉燕
來(lái)源:
技術(shù)老小子