现实世界中的时间序列数据很少是干净整洁的。传感器会出现故障,系统的时间计时会出现偏差,数据传输过程中会生成重复记录,而人工录入数据时也容易出现错误。当一个数据集最终到达你的手中时,它已经经历了收集、传输和存储等环节,而这些环节每一个都可能成为数据被破坏的原因。

与处理表格数据相比,清洗时间序列数据要困难得多,因为时间本身就构成了数据结构上的约束。你不能随意重新排列数据行,也不能用某一列的平均值来填补缺失值,否则就会将未来的数据错误地纳入到过去的观测数据中。因此,在进行任何清洗操作时,都必须严格遵守数据的時間顺序,否则就会破坏建立在这些数据基础之上的所有分析结果的准确性。

本指南将详细介绍使用Python完成整个数据清洗流程的步骤:从原始数据被收集起来开始,一直到数据集准备好用于特征工程或建模为止。我们会讲解如何检测并填补缺失值、如何识别和处理异常值、如何处理重复数据、如何调整数据的频率分布、如何平滑噪声,以及如何验证数据结构的合法性。所有这些内容都会通过具体的传感器数据示例来进行演示。

你可以通过GitHub获取这个Colab笔记本,并按照指南中的步骤进行操作。

先决条件

要跟随本指南学习,你需要具备以下条件:

  • 熟练使用Python和pandas DataFrames进行数据操作

  • 熟悉基于时间索引的数据结构

  • 对特征工程和机器学习建模的基本原理有所了解

在开始运行本指南中的任何代码之前,请先安装以下软件:`pandas`、`numpy`、`scipy`、`scikit-learn`以及`statsmodels`。具体安装命令如下:

pip install pandas numpy scipy scikit-learn statsmodels

目录结构

如何在清洗时间序列数据之前对其进行审计

数据清洗的第一条规则是:在采取任何操作之前,先仔细了解现状。在进行插补、平滑处理或删除数据之前,你必须清楚地知道问题出在哪里、具体表现在哪些地方。

一次有效的审计应该包括以下内容:

  • 时间索引:是否规律?是否存在缺失值?

  • 缺失值的分布情况:这些缺失值是随机出现的,还是呈某种规律分布的?

  • 数据范围:数据中是否存在明显的异常值或由于传感器故障导致的缺失值?

  • 重复的时间戳记录

让我们创建一个包含上述一些问题的样本数据集:

# 模拟一周的智能电网电压读数数据(每小时一次)
# 并人为添加一些问题
periods = 168
index = pd.date_range("2024-06-01", periods=periods, freq="H")

voltage = (
    230.0
    + 3.5 * np.sin(2 * np.pi * np.arange(periods) / 24)
    + np.random.normal(0, 1.2, periods)
)

# 添加问题数据
voltage[14:17] = np.nan          # 传感器故障,连续3个数据点缺失
voltage[42] = np.nan             # 孤立的缺失值
voltage[78] = 312.4              # 异常高的数值
voltage[101:104] = np.nan        # 又一次传感器故障导致的缺失
voltage[130] = 187.2             # 异常低的数值

series = pd.Series(voltage, index=index, name="voltage_v")

# --- 数据审计 ---
print("=== 时间序列数据审计 ===")
print(f"时间范围:        {series.index.min()} → {series.index.max()}")
print(f>数据条目数量:  {len(series)}")
print(f>预期数据频率:{pd.infer_freq(series.index)}")
print(f"缺失值数量:{series.isna().sum()} ({series.isna().mean()*100:.1f}%)")
print(f>数据范围:    [{series.min():.2f}, {series.max():.2f}]")
print(f>平均值 ± 标准差:{series.mean():.2f} ± {series.std():.2f}")

# 查找连续缺失的数据段
missing_mask = series.isna()
missing_runs = []
run_start = None
for i, (ts, is_missing) in enumerate(missing_mask.items()):
    if is_missing and run_start is None:
        run_start = ts
    elif not is_missing and run_start is not None:
        missing_runs.append((run_start, missing_mask.index[i - 1]))
        run_start = None

print(f"连续缺失的数据段共有{len(missing_runs)}个:")
for start, end in missingRuns:
    print(f"  {start} → {end}")

审计结果:

=== 时间序列数据审计 ===
时间范围:        2024-06-01 00:00:00 → 2024-06-07 23:00:00
数据条目数量:  168
预期数据频率:h

缺失值数量:7 (4.2%)
数据范围:    [187.20, 312.40]
平均值 ± 标准差:{230.22 ± 7.81}

连续缺失的数据段共有3个:
  2024-06-01 14:00:00 → 2024-06-01 16:00:00
  2024-06-02 18:00:00 → 2024-06-02 18:00:00
  2024-06-05 05:00:00 → 2024-06-05 07:00:00

通过这次审计,你可以清楚地了解数据在清洗之前的状况。关键在于区分孤立的缺失值(这些值可以通过局部信息进行插补)和连续缺失的数据段——对于后者,可能需要采取不同的处理策略,或者为后续的处理步骤提供特别的提示。

如何将索引重新调整为规范频率

在填补缺失值之前,你需要确认你的时间索引确实是规律的。在处理时间序列数据时,一个常见的问题是:缺失的时间戳只是简单地被忽略掉了,并没有被表示为NaN值——这意味着使用.fillna()函数是无法找到这些缺失值的。

# 模拟一个包含缺失时间戳的传感器数据序列
irregular_index = index.delete([14, 15, 16, 42, 101, 102, 103])
irregular_series = series.dropna().reindex(irregular_index)

print(f"原始长度:{len(series)}")
print(f"不规则序列的长度:{len(irregular_series)}")
print(f"推断出的频率:{pd.infer_freq(irregular_series.index)}")  # 如果返回None,说明索引存在缺失值

# 将索引重新调整为规范的每小时间隔
canonical_index = pd.date_range(
    start=irregular_series.index.min(),
    end=irregular_series.index.max(),
    freq="H"
)

reindexed = irregular_series.reindex(canonical_index)

print(f"\n重新调整索引后的结果:")
print(f"长度:{len(reindexed)}")
print(f>缺失值数量:{reindexed.isna().sum()}")
print(f>推断出的频率:{pd.infer_freq(reindexed.index)}")

输出结果:

原始长度:168
不规则序列的长度:161
推断出的频率:None

重新调整索引后的结果:
长度:168
缺失值数量:7
推断出的频率:h

pd.infer_freq函数返回None时,说明索引中存在缺失值。在将索引重新调整为规范的每小时间隔后,这些缺失的时间戳会明确地被表示为NaN值,此时你的填补逻辑就可以找到这些缺失值了。

如何处理缺失值

并不是所有的缺失值都应该用同样的方法来处理。在连续且平稳的信号中,某个孤立的缺失值最好通过插值法来填补;然而,在波动较大的信号中,如果某个传感器有3个小时的数据缺失,那么标记这个缺失值可能比伪造数据更为合适。因此,处理缺失值的策略应该根据缺失值的长度以及信号本身的特性来确定。

对于阶梯函数型信号,应使用前向填充法

当一个变量的值会一直保持在其上一个已知的值,直到有某种因素导致这个值发生变化时,使用前向填充法是合适的——比如机器的状态、设定值,或者某个分类标志的变化。

# 设备的运行状态属于阶梯函数型信号
mode_data = pd.Series(
    ["running", "running", np.nan, np.nan, "idle", "idle", np.nan, "running"],
    index=pd.date_range("2024-06-01", periods=8, freq="H"),
    name="operating_mode"
)

filled_mode = mode_data.ffill()
print(pd.DataFrame({"original": mode_data, "ffill": filled_mode}))

输出结果:

                    original    ffill
2024-06-01 00:00:00  running  running
2024-06-01 01:00:00  running  running
2024-06-01 02:00:00      NaN  running
2024-06-01 03:00:00      NaN  running
2024-06-01 04:00:00     idle     idle
2024-06-01 05:00:00     idle     idle
2024-06-01 06:00:00      NaN     idle
2024-06-01 07:00:00  running  running

时间加权插值——适用于连续信号

对于连续的传感器测量数据,使用基于时间的加权线性插值方法能够正确处理那些间隔不规律的数据点,因为这种方法并不假设这些数据点之间的间距是相等的。

# 使用时间加权插值填充电压序列
voltage_clean = reindexed.interpolate(method="time")

# 比较原始数据与填补后的数据在第一个缺失区间内的差异
gap_window = voltage_clean["2024-06-01 12:00":"2024-06-01 18:00"]
original_window = reindexed["2024-06-01 12:00":"2024-06-01 18:00"]

comparison = pd.DataFrame({
    "original":     original_window,
    "interpolated": gap_window.round(3),
    "was_missing":  original_window.isna(),
})
print(comparison)

输出结果:

                       原始数据      插值后数据     是否缺失
2024-06-01 12:00:00  230.290355       230.290        否
2024-06-01 13:00:00  226.798197       226.798        否
2024-06-01 14:00:00         NaN       226.848         是
2024-06-01 15:00:00         NaN       226.897         是
2024-06-01 16:00:00         NaN       226.947         是
2024-06-01 17:00:00  226.996356       226.996        否
2024-06-01 18:00:00  225.410371       225.410        否

季节性分解插补法——用于处理较长间隔的数据

对于那些间隔时间超过几个数据点的季节性序列,如果直接进行插值处理,就会忽略其中的季节性变化规律。因此,更好的方法是先对序列进行季节性分解,分别对每个组成部分进行插补处理,然后再重新构建出完整的序列。

from statsmodels.tsa.seasonal import seasonal_decompose

# 使用较长的数据序列进行分解(需要足够多的数据周期)
long_voltage = pd.Series(
    230.0
    + 3.5 * np.sin(2 * np.pi * np.arange(336) / 24)
    + np.random.normal(0, 1.0, 336),
    index=pd.date_range("2024-06-01", periods=336, freq="H")
)

# 在序列中插入一个6小时的间隔
longVoltage.iloc[100:106] = np.nan

# 先进行插值处理,以便后续进行季节性分解
tempFilled = long Voltage.interpolate(method="time")
decomp = seasonal_decompose(tempFilled, model="additive", period=24)

# 重新构建序列:趋势成分 + 季节性成分 + 缺失位置的零值填充
reconstructed = longVoltage.copy()
missing_idx = long Voltage[long Voltage.isna()).index
reconstructed[missing_idx] = (
    decomp.trend[missing_idx].fillna(method="ffill")
    + decomp.seasonal[missing_idx]
)

print(f"插值前的缺失数据数量:{longVoltage.isna().sum()}")
print(f"插值后的缺失数据数量:{reconstructed.isna().sum()}")
print("\n填充后的数据值:")
print(reconstructed[missing_idx].round(3))

输出结果:

                       原始数据      插值后数据     是否缺失
2024-06-01 12:00:00  230.290355       230.290        否
2024-06-01 13:00:00  226.798197       226.798        否
2024-06-01 14:00:00         NaN       226.848         是
2024-06-01 15:00:00         NaN       226.897         是
2024-06-01 16:00:00         NaN       226.947         是
2024-06-01 17:00:00  226.996356       226.996        否
2024-06-01 18:00:00  225.410371       225.410        否

季节性分解插补方法会尊重一天中不同时段的数据变化规律。如您所见,填充后的数值并不会在缺失区间内呈现一条水平线,而是会遵循预期的日变化趋势。

如何检测和处理异常值

在时间序列数据中,异常值的识别要比在表格数据中更为复杂,因为上下文环境起着至关重要的作用。例如,电压值异常偏高或偏低,可能是由于传感器出现故障,也可能是电网系统中真正发生的异常事件。因此,我们需要使用能够利用时间背景信息的方法,而不仅仅是基于全局统计数据的分析方法。

带滚动窗口的Z分数检测法

对于非平稳时间序列而言,仅使用全局Z分数检测法是无法发现局部异常值的。而采用滚动窗口的Z分数检测法,则能够识别出那些相对于其所在数据区间而言异常的值。

注意:所谓非平稳时间序列,是指其统计特性(如均值、方差或趋势)会随时间发生变化,而不会保持恒定的时间序列。

window = 24  # 24小时的滚动窗口长度

roll_mean = voltage_clean.rolling(window, center=True, min_periods=1).mean()
roll_std = voltage_clean.rolling(window, center=True, min_periods=1).std()

rolling_z = (voltage_clean - roll_mean) / roll_std

threshold = 3.0
outliers_z = rolling_z[rolling_z.abs() > threshold]

print(f"检测到的滚动Z分数异常值数量:{len(outliers_z)}")
print(outliers_z.round(3))

输出结果:

检测到的滚动Z分数异常值数量:2
2024-06-04 06:00:00    4.646
2024-06-06 10:00:00   -4.484
变量名称:voltage_v,数据类型:float64

Z分数检测法在数据服从近似高斯分布的情况下效果最佳,因为这种方法假设数据是围绕均值对称分布的,其离散程度由标准差来衡量。

基于四分位距的异常值检测方法

对于非高斯分布的数据而言,使用四分位距(IQR)来进行异常值检测会更加有效。四分位距是指第三四分位数(Q3)与第一四分位数(Q1)之间的差值,它反映了数据中间50%部分的分布范围。

Q1 = voltage_clean.quantile(0.25)
Q3 = voltage(clean.quantile(0.75))
IQR = Q3 - Q1

lower_bound = Q1 - 1.5 * IQR
upper_bound = Q3 + 1.5 * IQR

outliers_iqr = voltage_clean[
    (voltage_clean < lower_bound) | (voltage_clean > upper_bound)
]

print(f"四分位距范围:[{lower_bound:.2f}, {upper_bound:.2f}]")
print(f>检测到的异常值数量:{len(outliers_iqr)}")
print(outliers_iqr.round(2))

输出结果:

四分位距范围:[220.16, 239.46]
检测到的异常值数量:2
2024-06-04 06:00:00    312.4
2024-06-06 10:00:00    187.2
变量名称:voltage_v,数据类型:float64

隔离森林算法——用于多变量异常值检测

当使用多个传感器进行检测时,某个通道上的测量数据看起来可能很正常,但将其与其他通道的测量数据结合起来分析时,就能发现其中存在的异常现象。孤立森林算法能够自然而然地处理这类问题。# 构建一个包含多种传感数据的DataFrame
np.random.seed(42)
n = 200

sensor_df = pd.DataFrame({
"voltage_v": 230 + 3 * np.sin(2 * np.pi * np.arange(n) / 24) + np.random.normal(0, 1, n),
"current_a": 15 + 0.8 * np.sin(2 * np.pi * np.arange(n) / 24) + np.random.normal(0, 0.3, n),
"frequency_hz": 50 + np.random.normal(0, 0.05, n),
}, index=pd.date_range("2024-06-01", periods=n, freq="H"))

# 同时引入多种异常值:电压下降和电流激增
sensor_df.iloc[88, 0] = 194.2 # 电压下降
sensor_df.iloc[88, 1] = 28.7 # 电流激增(与故障情况相符)

clf = IsolationForest(contamination=0.02, random_state=42)
sensor_df["anomaly_score"] = clf.fit_predict(sensor_df[["voltage_v", "current_a", "frequency_hz"]])

anomalies = sensor_df[sensor_df["anomaly_score"] == -1]
print(f"检测到的异常值数量:{len(anomalies)}")
print(anomalies[["voltage_v", "current_a", "frequency_hz"]].round(2))

输出结果:
检测到的异常值数量:4
voltage_v current_a frequency_hz
2024-06-02 07:00:00 234.75 15.84 49.90
2024-06-04 06:00:00 233.09 15.82 50.15
2024-06-04 16:00:00 194.20 28.70 50.08
2024-06-06 05:00:00 235.09 15.41 49.91

在实际应用中,可以根据具体领域制定相应的阈值规则,进一步处理这些异常值。

异常值处理

一旦识别出异常值,可以通过以下几种方式进行处理:

  • 使用温索尔化方法,将极端值限制在某个范围内。

  • 用插值或估算值替换这些异常值。

  • 将这些异常值标记出来,以便模型能够适当处理它们。

# 使用温索尔化方法处理异常值:将极端值限制在IQR范围内
voltage_winsorized = voltage_clean.clip(lower=lower_bound, upper=upper_bound)

# 用时间插值法替换异常值
voltage_outlier_fixed = voltage_clean.copy()
voltage_outlier_fixed[outliers_iqr.index] = np.nan
voltage_outlier_fixed = voltage_outlier_fixed.interpolate(method="time")

print("异常值处理结果对比:")
for ts in outliers_iqr.index:
print(f"\n {ts}")
print(f" 原始值: {voltage_clean[ts]:.2f}")
print(f" 温索尔化后的值:{voltage_winsorized[ts]:.2f}")
print(f" 插值后的值:{voltage_outlier_fixed[ts]:.2f}")

输出结果:
异常值处理结果对比:

2024-06-04 06:00:00
原始值: 312.40
温索尔化后的值:239.46
插值后的值:232.01

2024-06-06 10:00:00
原始值: 187.20
温索尔化后的值:220.16
插值后的值:231.43

温索尔化方法能够保留异常值本身,但将其限制在合理的范围内;当您希望记录下发生了异常事件这一事实时,这种方法非常有用。而插值法则将异常值视为缺失数据进行处理;当您认为某个测量结果仅仅是出现错误时,使用插值法会更为合适。

如何删除重复数据

当数据管道在失败后重新尝试处理数据时,出现重复的时间戳是很常见的现象。与表格格式中的重复数据不同,时间序列数据中的重复项并不总是完全相同的;因此,在重试操作中,同一个时间戳可能会得到略有不同的测量结果。

# 添加数值略有不同的重复时间戳(重试场景)
dup_index = index.tolist()
dup_index.insert(20, index[20])  # 完全相同的重复时间戳
dup_index.insert(55, index[55])  # 用于重试的重复时间戳

dup_values = voltage_clean.tolist()
dup_values.insert(20, voltage_clean.iloc[20])
dup_values.insert(55, voltage(clean.iloc[55] + 0.7)  # 值略有不同

dup_series = pd.Series(dup_values, index=pd.DatetimeIndex(dup_index), name="voltage_v")

print(f"包含重复数据后的序列长度:{len(dup_series)}")
print(f"重复的时间戳数量:{dup_series.index.duplicated().sum()}")

# 策略1:保留第一个测量值
dedup_first = dup_series[~dup_series.index.duplicated(keep="first")]

# 策略2:保留平均值
dedup_mean = dup_series.groupby(level=0).mean()

print(f"\n采用策略1去重后的序列长度:{len(dedup_first)}")
print(f"采用策略2去重后的序列长度:{len(dedup_mean)}")

# 显示用于重试的重复时间戳对应的数据
ts_retry = index[55]
print(f"\n在时间戳{ts Retry}处的重复数据:
  测量值:{dup_series[ts_RETRY].values.round(3)}
  采用策略1去重后的结果:{dedup_first[tsRetry]:.3f}
  采用策略2去重后的结果:{dedup_mean[ts_retry]:.3f}
"

输出结果:

包含重复数据后的序列长度:170
重复的时间戳数量:2

采用策略1去重后的序列长度:168
采用策略2去重后的序列长度:168

在时间戳2024-06-03 07:00:00处的重复数据:
  测量值:[235.198 234.498]
  采用策略1去重后的结果:235.198
  采用策略2去重后的结果:234.848

对于大多数传感器数据管道来说,采用“保留第一个测量值”的策略是合适的;因为第一次测量的数据就是原始数值。而当重试操作是由不同的传感器对同一参数进行测量时,使用平均值作为处理结果也是合理的。

频率对齐与重采样

在实际的数据处理流程中,往往需要将不同频率的数据合并在一起。例如,你可能需要将每分钟一次的电能消耗数据与每小时一次的天气数据结合起来进行分析。在将这些数据合并之前,必须先进行频率对齐操作。

# 每分钟一次的电能消耗数据
power_1min = pd.Series(
    42 + 18 * ((pd.date_range("2024-06-01", periods=1440, freq="T").hour.isin(range(8, 19)))).astype(int)
    + np.random.normal(0, 2, 1440),
    index=pd.date_range("2024-06-01", periods=1440, freq="T"),
    name="power_kw"
)

# 将数据降采样为每小时一次:对于电能消耗数据来说,使用平均值是合适的
power_hourly_mean = power_1min.resample("H").mean().round(2)

# 将数据降采样为每小时一次:使用最大值表示小时内的峰值消耗量
power_hourly_max = power_1min.resample("H").max().round(2)

# 将数据降采样为每小时一次:使用总和表示每小时的总能耗(单位:千瓦时)
energy_hourly_kwh = (power_1min.resample("H").sum() / 60).round(3)

comparison = pd.DataFrame({
    "mean_kw":    power_hourly_mean,
    "peak_kmw":    power_hourly_max,
    "energy_kwh": energy_hourly_kwh,
}).iloc[7:13]

print(comparison)

输出结果:

                     平均功率  峰值功率  能量消耗(千瓦时)
2024-06-01 07:00:00    42.13    46.28      42.133
2024-06-01 08:00:00    60.56    64.81      60.557
2024-06-01 09:00:00    59.91    64.88      59.912
2024-06-01 10:00:00    60.07    65.16      60.066
2024-06-01 11:00:00    60.08    64.99      60.083
2024-06-01 12:00:00    59.72    63.65      59.724

您选择哪种汇总方式对于后续的数据分析来说至关重要。平均功率适用于负载分析;峰值功率适合容量规划;而总能量(转换为千瓦时)则用于计费。您应该能够理解,为什么“正确的”处理方法取决于具体的应用场景,而非技术因素本身。

噪声平滑处理

原始的传感器数据通常包含高频噪声,这些噪声会掩盖其中真实的信号信息。在进行特征工程之前对数据进行平滑处理可以防止模型被噪声所影响;但过度平滑处理则会破坏数据中的真实变化趋势。

指数加权移动平均法

指数加权移动平均法能够给予最近的数据更大的权重,因此它能更快地适应数据水平的变化。对于非平稳信号来说,这种算法比简单的移动平均法更为有效。

# 含有噪声的温度传感器数据(单位:°C)
temp_noisy = pd.Series(
    3.5
    + 1.2 * np.sin(2 * np.pi * np.arange(168) / 24)
    + np.random.normal(0, 0.8, 168),  # 数据中包含大量噪声
    index=pd.date_range("2024-06-01", periods=168, freq="H"),
    name="temperature_c"
)

temp_ewma = temp_noisy.ewm(span=6, adjust=False).mean()
temp_sma  = temp_noisy.rolling(window=6, center=True).mean()

comparison = pd.DataFrame({
    "原始数据":  temp_noisy,
    "指数加权平均值": temp_ewma.round(3),
    "简单移动平均值": temp_sma.round(3),
}).iloc[22:30]

print(comparison)

输出结果:

                          原始数据     指数加权平均值    简单移动平均值
2024-06-01 22:00:00  3.212372      2.843       3.035
2024-06-01 23:00:00  3.106840      2.918       3.176
2024-06-02 00:00:00  3.712290      3.145       3.011
2024-06-02 01:00:00  3.344376      3.202       3.294
2024-06-02 02:00:00  2.148946      2.901       3.705
2024-06-02 03:00:00  4.241105      3.284       4.087
2024-06-02 04:00:00  5.677429      3.968       4.381
2024-06-02 05:00:00  5.400083      4.377       4.765

Savitzky-Golay滤波器

对于那些需要保留信号峰值形状而非仅仅对其进行平滑处理的信号来说,Savitzky-Golay滤波器通过在一个滑动窗口上应用多项式函数来处理数据,这种算法能够更好地保留真实信号中的峰值部分。

from scipy.signal import savgol_filter

temp_savgol = pd.Series(
    savgol_filter(temp_noisy.values, window_length=11, polyorder=2),
    index=temp_noisy.index,
    name="temp_savgol"
).round(3)

print(pd.DataFrame({
    "原始数据":    temp_noisy,
    "Savitzky-Golay滤波结果": temp_savgol,
}).iloc[22:30])

输出结果:

                          原始数据  Savgol方法计算的结果
2024-06-01 22:00:00  3.212372   2.960
2024-06-01 23:00:00  3.106840   2.944
2024-06-02 00:00:00  3.712290   3.114
2024-06-02 01:00:00  3.344376   3.379
2024-06-02 02:00:00  2.148946   3.809
2024-06-02 03:00:00  4.241105   4.288
2024-06-02 04:00:00  5.677429   4.749
2024-06-02 05:00:00  5.400083   5.138

模式验证与数据合理性检查

如果不进行验证就直接进行清洗操作是远远不够的。你需要设置自动化检查机制,以便在新数据到来时立即进行检查,从而在问题影响到下游模型之前将其发现并解决。

def validate_time_series(series: pd.Series, config: dict) -> dict:
    """
    对时间序列数据进行模式验证与数据合理性检查。
    返回一个报告字典,其中包含各项检查的结果(通过/未通过)。
    """
    report = {}

    # 频率检查
    inferred = pd.infer_freq(series.index)
    report["freq_regular"] = inferred == config["expected_freq"]

    # 缺失值阈值检查
    missing_rate = series.isna().mean()
    report["missing_below_threshold"] = missing_rate <= config["max_missing_rate"]
    report["missing_rate"] = round(missing_rate, 4)

    # 值域检查
    in_range = series.dropna().between(config["min_value"], config["max_value"])
    report["values_in_range"] = in_range.all()
    report["out_of_range_count"] = (~in_range).sum()

    # 重复时间戳检查
    report["no_duplicates"] = not series.index.duplicated().any()

    # 索引单调性检查
    report["index_monotonic"] = series.index.is_monotonic_increasing

    return report


config = {
    "expected_freq": "H",
    "max_missing_rate": 0.05,
    "min_value": 210.0,
    "max_value": 250.0,
}

report = validate_time_series(voltage_outlier_fixed, config)

print("=== 验证报告 ===")
for check, result in report.items():
    if check in ("missing_rate", "out_of_range_count"):
        print(f"  {check}: {result}")
    else:
        status = "✓ 通过" if result else "✗ 未通过"
        print(f"  {status}  {check}")

输出结果:

=== 验证报告 ===
  ✗ 未通过  freq_regular
  ✓ 通过  missing_below_threshold
  missing_rate: 0.0
  ✓ 通过  values_in_range
  out_of_range_count: 0
  ✓ 通过  no_duplicates
  ✓ 通过  index_monotonic

这种验证函数应该被应用在生产流程中每一个数据处理步骤之前。在清洗数据之前运行它,可以及时发现存在的问题;而在清洗数据之后再次运行它,可以确认所有操作都已完成且没有错误。

完整的清洗检查清单

以下是针对任何传入的时间序列数据集都需要执行的完整处理流程:

步骤 技术方法 适用场景
审计检查 索引验证、缺失值检测、值域检查 必须首先执行——在任何其他操作之前
重新排序索引 使用reindex方法将索引调整为标准频率 当时间戳缺失而非显示为NaN时使用此步骤
处理短暂缺失值 进行时间插值处理 适用于连续信号数据,且缺失间隔不超过3个数据点
处理阶段性缺失值 使用前向填充方法填补缺失值 适用于分类数据或设定值类型的数据
处理长期缺失值 通过季节性分解模型进行估算 适用于具有明显季节性变化的数据,且缺失间隔超过6个数据点
处理异常值 对于单变量数据,使用滚动Z分数或IQR方法 适用于单个传感器采集的数据,用于检测局部异常值
处理多变量异常值 使用孤立森林算法进行识别 适用于多个相关传感器采集的数据
异常值处理方式 根据实际情况选择Winsonize方法或插值法进行处理 处理方式需根据具体数据情况来决定
处理重复数据 保留第一个数据点,或计算各组数据的平均值 在数据处理流程中可以尝试重新尝试上传重复的数据
重采样操作 使用.resample()方法进行重采样,并确保使用正确的聚合方式 在数据合并之前,需要先进行频率对齐操作
数据平滑处理 使用EWMA或Savitzky-Golay方法对数据进行平滑处理 在特征工程之前,对于噪声较大的传感器数据尤其需要进行此步骤
验证操作 进行模式验证与数据合理性检查 必须在清洗数据之后进行,同时也要在每批新数据处理完成后进行检查

总结

步骤的顺序非常重要:在进行插补处理之前,必须先对数据重新进行索引;在进行平滑处理之前,也要先完成插补操作;只有在所有步骤都完成后,才能对模型进行验证。如果跳过某些步骤或按照错误的顺序来执行这些步骤,错误就会累积起来,而一旦开始查看模型的预测结果,就很难追踪到这些错误的具体来源。

时间序列数据的清洗工作虽然并不复杂,但使用干净的数据以及经过精心设计的特征来训练模型,所获得的模型效果几乎总是会优于那些使用未经过适当处理的数据训练出来的复杂模型。在尝试对时间序列数据应用任何算法之前,确保这个数据处理流程是正确的,才是最关键的一步。

Comments are closed.