设备故障预测 —— 逻辑回归 + 批量梯度下降

1. 题目描述

某数据中心需要对设备进行故障预测。已知训练集包含若干设备运行日志,每条日志记录了设备的 5 个特征:

  1. 写入次数(writes)
  2. 读取次数(reads)
  3. 平均写延迟(avg_write_ms)
  4. 平均读延迟(avg_read_ms)
  5. 使用年限(years)

以及一个标记字段:设备是否发生故障(status,0=正常,1=故障)。

同时,还给出若干需要预测的测试样本(只有前 5 个特征,没有标签)。

数据清洗规则

  • 缺失值处理:如果字段为 NaN,则用训练集对应字段的均值填充
  • 异常值处理
    • 写入次数、读取次数 < 0
    • 平均写/读延迟 < 0 或 > 1000
    • 使用年限 < 0 或 > 20 → 替换为训练集该字段的中位数
  • 如果某字段在训练集没有有效值,则均值/中位数都设为 0

模型训练规则

  • 模型:使用 逻辑回归,带偏置项w0
  • 训练:采用 批量梯度下降(Batch Gradient Descent)优化参数 ,每次迭代用全部训练样本
  • 学习率 = 0.01,迭代次数 = 100,初始权重全为 0
  • 概率: 图片1

模型预测规则

  • 输入测试样本,计算预测概率
  • 如果 (p ),输出 1(预测故障)
  • 如果 (p < 0.5),输出 0(预测正常)

2. 算法代码实现(Python,ACM格式)

2.1 总结流程(从输入到预测)

  1. 准备数据(特征和标签)
  2. 初始化参数((w=0, b=0))
  3. 循环迭代
    • 前向计算:预测概率 (p=(w·x+b))
    • 计算误差:(p-y)
    • 累积梯度:对所有样本求和
    • 平均梯度,更新参数
  4. 重复迭代直到收敛
  5. 预测新样本:用学习到的 (w,b) 计算概率,并根据阈值分类

2.2 代码实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
import math
from statistics import mean, median

def parse_num(s):
if s is None:
return None
s = s.strip()
if not s or s.lower() == "nan":
return None
try:
return float(s)
except:
return None

def parse_label(s):
try:
v = int(s)
if v in (0, 1):
return v
return None
except:
return None

def is_valid_value(idx, v):
if v is None:
return False
if idx in (0, 1): # writes / reads
return v >= 0
if idx in (2, 3): # avg_write_ms / avg_read_ms
return 0 <= v <= 1000
if idx == 4: # years
return 0 <= v <= 20
return False

def clean_row(feats, means, medians):
x = []
for i, v in enumerate(feats):
if v is None:
x.append(means[i])
else:
if is_valid_value(i, v):
x.append(v)
else:
x.append(medians[i])
return x

def sigmoid(z):
if z >= 0:
ez = math.exp(-z)
return 1.0 / (1.0 + ez)
else:
ez = math.exp(z)
return ez / (1.0 + ez)

# ---------------- 主程序 ----------------
N = int(input().strip())
train_rows = [input().strip().split(",") for _ in range(N)]

M = int(input().strip())
test_rows = [input().strip().split(",") for _ in range(M)]

# 统计训练集有效值
valid_values = [[] for _ in range(5)]
train_data = []
for row in train_rows:
if len(row) < 7:
continue
feats = [parse_num(row[1]), parse_num(row[2]), parse_num(row[3]),
parse_num(row[4]), parse_num(row[5])]
y = parse_label(row[6])
if y is None:
continue
for i, v in enumerate(feats):
if is_valid_value(i, v):
valid_values[i].append(v)
train_data.append((feats, y))

means, medians = [], []
for i in range(5):
if valid_values[i]:
means.append(mean(valid_values[i]))
medians.append(median(valid_values[i]))
else:
means.append(0.0)
medians.append(0.0)

# 清洗训练数据
X, Y = [], []
for feats, y in train_data:
X.append(clean_row(feats, means, medians))
Y.append(float(y))

# 初始化参数
n_train = len(X)
d = 5
w = [0.0] * d
b = 0.0
lr = 0.01
epochs = 100

# 批量梯度下降
if n_train > 0:
for _ in range(epochs):
grad_w = [0.0] * d
grad_b = 0.0
for xi, yi in zip(X, Y):
z = b + sum(w[j] * xi[j] for j in range(d))
pi = sigmoid(z)
diff = (pi - yi)
for j in range(d):
grad_w[j] += diff * xi[j]
grad_b += diff
invN = 1.0 / n_train
for j in range(d):
w[j] -= lr * (grad_w[j] * invN)
b -= lr * (grad_b * invN)

# 预测
for row in test_rows:
feats = [None] * 5
if len(row) >= 6:
feats = [parse_num(row[1]), parse_num(row[2]), parse_num(row[3]),
parse_num(row[4]), parse_num(row[5])]
x = clean_row(feats, means, medians)
z = b + sum(w[j] * x[j] for j in range(d))
p = sigmoid(z)
print(1 if p >= 0.5 else 0)

3. 主要逻辑思路

3.1 问题背景

我们要解决的是一个二分类问题:
- 输入:一个样本的多个特征(比如硬盘的使用年限、读写次数、延迟…)
- 输出:预测它属于类别 1(故障)还是类别 0(正常)

逻辑回归模型假设:

$$ \begin{aligned} & P(y=1 \mid x)=\sigma(w \cdot x+b), \quad \sigma(z)= \frac{1}{1+e^{-z}} \end{aligned} $$

3.2 模型目标

我们要找到一组参数 (w, b),使得:
- 预测概率 (p) 尽量接近真实标签 (y)
- 一般使用交叉熵损失(log loss)来衡量预测与真实的差距,题目中没有直接给出这个词,但后面3.4部分分析的参数更新其实就是交叉熵损失的梯度推导。

$$ L(w, b)=-\frac{1}{n} \sum_{i=1}^n\left[y_i \ln \left(p_i\right)+\left(1-y_i\right) \ln \left(1-p_i\right)\right] $$

3.3 梯度下降思想

因为这个损失函数没有简单解析解 → 我们用迭代优化
- 计算损失函数对参数的导数(梯度)
- 沿着梯度的反方向更新参数
- 反复迭代,直到损失收敛

这是「梯度下降」的核心。

3.4 批量梯度下降(Batch Gradient Descent, BGD)

在每一轮迭代(epoch):

  1. 遍历所有样本
  2. 对每个样本:计算预测值 pi,误差 = (piyi)
  3. 累积所有样本的梯度
  4. 平均化梯度(除以样本数 n
  5. 更新参数:

$$ w \leftarrow w - \eta \cdot \frac{1}{n}\sum_i (p_i - y_i)x_i $$

$$ b \leftarrow b - \eta \cdot \frac{1}{n}\sum_i (p_i - y_i) $$

其中:
- eta = 学习率,控制步长大小
- 这样更新方向更稳定,不会被单个样本的噪声影响太大

3.5 预测阶段

训练完模型后:
- 对新样本 x,计算概率
p = σ(w·x+b)
- 设定阈值(通常是 0.5):
- 如果 p ≥ 0.5 → 预测类别 1
- 如果 p < 0.5 → 预测类别 0