Commit a0cefa05 by yhb

1

parent 0fcf27b8
...@@ -500,9 +500,453 @@ class ModelDetect: ...@@ -500,9 +500,453 @@ class ModelDetect:
pass pass
# ==================== 模型推理异常后处理逻辑 ====================
class InitError(ModelDetect):
"""
模型推理异常后处理逻辑类
功能:
1. 设置初始液位状态变量 detect_initstatus
2. 检测模型推理结果与初始状态是否一致
3. 处理模型推理异常情况(标签转换)
4. 判断模型推理是否恢复正常
detect_initstatus 取值:
0: 不进入模型推理异常后处理逻辑
1: 初始状态检测区域分割掩码结果只有liquid
2: 初始状态检测区域分割掩码结果只有air
位置:ModelDetect 之后,Logic 之前
"""
def __init__(self, model_path=None, device='cuda', batch_size=4):
"""
初始化模型推理异常后处理类
Args:
model_path: YOLO模型路径
device: 计算设备
batch_size: 批处理大小
"""
super().__init__(model_path, device, batch_size)
# 初始液位状态变量(每个检测区域独立)
# 0: 不进入异常处理逻辑
# 1: 初始状态只有liquid
# 2: 初始状态只有air
self.detect_initstatus = []
# 初始掩码像素数量(用于判断恢复正常的条件)
self.init_mask_pixel_counts = []
# 模型推理异常状态标记(每个检测区域独立)
self.is_inference_error = []
# 调试模式
self.debug = False
# 🆕 CSV日志记录
self._debug_log_enabled = True # 启用调试日志
self._debug_log_path = "initerrordebug.csv"
self._debug_frame_count = 0
self._init_debug_csv()
def _init_debug_csv(self):
"""初始化调试CSV日志文件"""
if not self._debug_log_enabled:
return
try:
import csv
from datetime import datetime
with open(self._debug_log_path, 'w', newline='', encoding='utf-8') as f:
writer = csv.writer(f)
writer.writerow([
'timestamp', # 时间戳
'frame_count', # 帧计数
'target_idx', # 检测区域索引
'init_status', # 初始状态设置值
'has_liquid', # 是否检测到liquid
'has_air', # 是否检测到air
'liquid_pixel_count', # liquid像素数量
'air_pixel_count', # air像素数量
'init_pixel_count', # 初始像素数量
'is_error_before', # 处理前是否异常状态
'is_error_after', # 处理后是否异常状态
'action', # 执行的动作
'recovery_check', # 恢复检查结果
'output_labels' # 输出的标签列表
])
print(f"📝 [InitError] 调试日志已初始化: {self._debug_log_path}")
except Exception as e:
print(f"❌ [InitError] 初始化调试日志失败: {e}")
def _log_to_csv(self, log_data):
"""写入一条调试日志"""
if not self._debug_log_enabled:
return
try:
import csv
from datetime import datetime
with open(self._debug_log_path, 'a', newline='', encoding='utf-8') as f:
writer = csv.writer(f)
writer.writerow([
datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')[:-3],
log_data.get('frame_count', 0),
log_data.get('target_idx', 0),
log_data.get('init_status', 0),
log_data.get('has_liquid', False),
log_data.get('has_air', False),
log_data.get('liquid_pixel_count', 0),
log_data.get('air_pixel_count', 0),
log_data.get('init_pixel_count', 0),
log_data.get('is_error_before', False),
log_data.get('is_error_after', False),
log_data.get('action', 'none'),
log_data.get('recovery_check', 'N/A'),
log_data.get('output_labels', '')
])
except Exception as e:
print(f"❌ [InitError] 写入调试日志失败: {e}")
def configure_initstatus(self, initstatus_list):
"""
配置初始液位状态
Args:
initstatus_list: 初始状态列表 [status1, status2, ...]
0=不处理, 1=初始只有liquid, 2=初始只有air
"""
self.detect_initstatus = list(initstatus_list)
num_targets = len(initstatus_list)
self.init_mask_pixel_counts = [0] * num_targets
self.is_inference_error = [False] * num_targets
if self.debug:
print(f"🔧 [InitError] 配置初始状态: {self.detect_initstatus}")
def _ensure_initstatus_size(self, num_targets):
"""确保初始状态列表长度与目标数量匹配"""
while len(self.detect_initstatus) < num_targets:
self.detect_initstatus.append(0)
while len(self.init_mask_pixel_counts) < num_targets:
self.init_mask_pixel_counts.append(0)
while len(self.is_inference_error) < num_targets:
self.is_inference_error.append(False)
def process_masks(self, all_masks_info, idx):
"""
处理模型推理结果,检测并修正异常
规则:
1. detect_initstatus=0: 不进入异常处理逻辑
2. detect_initstatus=1: 初始状态只有liquid
3. detect_initstatus=2: 初始状态只有air
异常判定(基于像素占比):
- init_status=1 且 air占比≥90%: 异常,将air标签转为liquid
- init_status=2 且 liquid占比≥90%: 异常,过滤liquid只保留air
异常状态持续处理:
- 处于异常状态时,持续执行矫正直到恢复正常
恢复正常条件:
1. init_status=1 且 liquid占比≥90%
2. init_status=2 且 air占比≥90%
3. init_status=1 且 air位置高于liquid 且 air像素≤初始liquid的20%
4. init_status=2 且 liquid位置低于air 且 liquid像素≤初始air的20%
Args:
all_masks_info: mask信息列表 [(mask, class_name, confidence), ...]
idx: 检测区域索引
Returns:
list: 处理后的mask信息列表(可能已修正标签)
"""
self._ensure_initstatus_size(idx + 1)
self._debug_frame_count += 1
# 🔧 调试模式:手动设置 init_status = 1(初始只有liquid)
# TODO: 调试完成后删除此行
self.detect_initstatus[idx] = 1
# 如果 detect_initstatus 为 0,不进入异常处理逻辑
if idx >= len(self.detect_initstatus) or self.detect_initstatus[idx] == 0:
return all_masks_info
init_status = self.detect_initstatus[idx]
# 分析当前mask结果
liquid_masks = []
air_masks = []
other_masks = []
for mask, class_name, conf in all_masks_info:
if class_name == 'liquid':
liquid_masks.append((mask, class_name, conf))
elif class_name == 'air':
air_masks.append((mask, class_name, conf))
else:
other_masks.append((mask, class_name, conf))
# 计算像素数量
liquid_pixel_count = sum(np.sum(m[0]) for m in liquid_masks) if liquid_masks else 0
air_pixel_count = sum(np.sum(m[0]) for m in air_masks) if air_masks else 0
total_pixel_count = liquid_pixel_count + air_pixel_count
# 计算占比
liquid_ratio = liquid_pixel_count / total_pixel_count if total_pixel_count > 0 else 0
air_ratio = air_pixel_count / total_pixel_count if total_pixel_count > 0 else 0
# 记录初始掩码像素数量(首次检测时)
if self.init_mask_pixel_counts[idx] == 0:
if init_status == 1 and liquid_pixel_count > 0:
self.init_mask_pixel_counts[idx] = liquid_pixel_count
elif init_status == 2 and air_pixel_count > 0:
self.init_mask_pixel_counts[idx] = air_pixel_count
# 记录处理前的异常状态
is_error_before = self.is_inference_error[idx]
action = 'none'
recovery_check = 'N/A'
output_labels = []
# ========== 恢复正常检查 ==========
if self.is_inference_error[idx]:
recovery_result = self._check_recovery(liquid_masks, air_masks, idx, init_status,
liquid_ratio, air_ratio)
recovery_check = 'recovered' if recovery_result else 'still_error'
if recovery_result:
# 恢复正常
self.is_inference_error[idx] = False
action = 'recovery'
output_labels = [m[1] for m in all_masks_info]
if self.debug:
print(f" ✅ [InitError-目标{idx}] 模型推理恢复正常")
self._log_to_csv({
'frame_count': self._debug_frame_count,
'target_idx': idx,
'init_status': init_status,
'has_liquid': liquid_pixel_count > 0,
'has_air': air_pixel_count > 0,
'liquid_pixel_count': liquid_pixel_count,
'air_pixel_count': air_pixel_count,
'init_pixel_count': self.init_mask_pixel_counts[idx],
'is_error_before': is_error_before,
'is_error_after': self.is_inference_error[idx],
'action': action,
'recovery_check': recovery_check,
'output_labels': ','.join(output_labels)
})
return all_masks_info
else:
# 仍处于异常状态,执行持续矫正
return self._apply_error_correction(
liquid_masks, air_masks, other_masks, idx, init_status,
liquid_pixel_count, air_pixel_count, is_error_before,
recovery_check, 'error_persist'
)
# ========== 异常判定(基于像素占比≥90%)==========
THRESHOLD = 0.90
# 情况1: init_status=1(初始只有liquid),但air占比≥90%
if init_status == 1 and air_ratio >= THRESHOLD:
self.is_inference_error[idx] = True
return self._apply_error_correction(
liquid_masks, air_masks, other_masks, idx, init_status,
liquid_pixel_count, air_pixel_count, is_error_before,
recovery_check, 'air_to_liquid'
)
# 情况2: init_status=2(初始只有air),但liquid占比≥90%
if init_status == 2 and liquid_ratio >= THRESHOLD:
self.is_inference_error[idx] = True
return self._apply_error_correction(
liquid_masks, air_masks, other_masks, idx, init_status,
liquid_pixel_count, air_pixel_count, is_error_before,
recovery_check, 'liquid_to_air'
)
# 无异常,返回原始结果
action = 'pass_through'
output_labels = [m[1] for m in all_masks_info]
self._log_to_csv({
'frame_count': self._debug_frame_count,
'target_idx': idx,
'init_status': init_status,
'has_liquid': liquid_pixel_count > 0,
'has_air': air_pixel_count > 0,
'liquid_pixel_count': liquid_pixel_count,
'air_pixel_count': air_pixel_count,
'init_pixel_count': self.init_mask_pixel_counts[idx],
'is_error_before': is_error_before,
'is_error_after': self.is_inference_error[idx],
'action': action,
'recovery_check': recovery_check,
'output_labels': ','.join(output_labels)
})
return all_masks_info
def _apply_error_correction(self, liquid_masks, air_masks, other_masks, idx, init_status,
liquid_pixel_count, air_pixel_count, is_error_before,
recovery_check, action):
"""
应用异常矫正逻辑
Args:
liquid_masks: liquid掩码列表
air_masks: air掩码列表
other_masks: 其他掩码列表
idx: 检测区域索引
init_status: 初始状态
liquid_pixel_count: liquid像素数量
air_pixel_count: air像素数量
is_error_before: 处理前是否异常
recovery_check: 恢复检查结果
action: 动作类型
Returns:
list: 矫正后的mask列表
"""
output_labels = []
corrected_masks = []
if init_status == 1:
# 初始只有liquid,将air标签转为liquid
for mask, class_name, conf in air_masks:
corrected_masks.append((mask, 'liquid', conf))
output_labels = ['liquid'] * len(air_masks) + [m[1] for m in other_masks]
if self.debug:
print(f" ⚠️ [InitError-目标{idx}] 矫正: air→liquid (保持满液状态)")
result = corrected_masks + other_masks
elif init_status == 2:
# 初始只有air,过滤liquid只保留air
output_labels = ['air'] * len(air_masks) + [m[1] for m in other_masks]
if self.debug:
print(f" ⚠️ [InitError-目标{idx}] 矫正: 过滤liquid,只保留air (保持空瓶状态)")
result = air_masks + other_masks
else:
result = liquid_masks + air_masks + other_masks
output_labels = [m[1] for m in result]
self._log_to_csv({
'frame_count': self._debug_frame_count,
'target_idx': idx,
'init_status': init_status,
'has_liquid': liquid_pixel_count > 0,
'has_air': air_pixel_count > 0,
'liquid_pixel_count': liquid_pixel_count,
'air_pixel_count': air_pixel_count,
'init_pixel_count': self.init_mask_pixel_counts[idx],
'is_error_before': is_error_before,
'is_error_after': self.is_inference_error[idx],
'action': action,
'recovery_check': recovery_check,
'output_labels': ','.join(output_labels)
})
return result
def _check_recovery(self, liquid_masks, air_masks, idx, init_status, liquid_ratio, air_ratio):
"""
检查模型推理是否恢复正常
恢复条件(满足任一即可):
1. init_status=1 且 liquid占比≥90%
2. init_status=2 且 air占比≥90%
3. init_status=1 且 air位置高于liquid 且 air像素≤初始liquid的20%
4. init_status=2 且 liquid位置低于air 且 liquid像素≤初始air的20%
Args:
liquid_masks: liquid掩码列表
air_masks: air掩码列表
idx: 检测区域索引
init_status: 初始状态
liquid_ratio: liquid像素占比
air_ratio: air像素占比
Returns:
bool: 是否恢复正常
"""
RECOVERY_THRESHOLD = 0.90
init_pixel_count = self.init_mask_pixel_counts[idx]
# ========== 恢复条件1: init_status=1 且 liquid占比≥90% ==========
if init_status == 1 and liquid_ratio >= RECOVERY_THRESHOLD:
if self.debug:
print(f" 🔍 [InitError-目标{idx}] 恢复条件1满足: liquid占比={liquid_ratio:.1%}≥90%")
return True
# ========== 恢复条件2: init_status=2 且 air占比≥90% ==========
if init_status == 2 and air_ratio >= RECOVERY_THRESHOLD:
if self.debug:
print(f" 🔍 [InitError-目标{idx}] 恢复条件2满足: air占比={air_ratio:.1%}≥90%")
return True
# ========== 恢复条件3: init_status=1 且 位置+像素数量判断 ==========
if init_status == 1 and liquid_masks and air_masks and init_pixel_count > 0:
try:
# 条件: air位置高于liquid(y坐标值更小)
air_min_y = min(np.min(np.where(m[0])[0]) for m in air_masks if np.sum(m[0]) > 0)
liquid_min_y = min(np.min(np.where(m[0])[0]) for m in liquid_masks if np.sum(m[0]) > 0)
if air_min_y < liquid_min_y:
# 条件: air像素数量不超过初始liquid的20%
air_pixel_count = sum(np.sum(m[0]) for m in air_masks)
if air_pixel_count <= init_pixel_count * 0.2:
if self.debug:
print(f" 🔍 [InitError-目标{idx}] 恢复条件3满足: air_y={air_min_y}<liquid_y={liquid_min_y}, "
f"air_pixels={air_pixel_count}≤{init_pixel_count * 0.2:.0f}")
return True
except (ValueError, StopIteration):
pass
# ========== 恢复条件4: init_status=2 且 位置+像素数量判断 ==========
if init_status == 2 and liquid_masks and air_masks and init_pixel_count > 0:
try:
# 条件: liquid位置低于air(y坐标值更大)
liquid_max_y = max(np.max(np.where(m[0])[0]) for m in liquid_masks if np.sum(m[0]) > 0)
air_max_y = max(np.max(np.where(m[0])[0]) for m in air_masks if np.sum(m[0]) > 0)
if liquid_max_y > air_max_y:
# 条件: liquid像素数量不超过初始air的20%
liquid_pixel_count = sum(np.sum(m[0]) for m in liquid_masks)
if liquid_pixel_count <= init_pixel_count * 0.2:
if self.debug:
print(f" 🔍 [InitError-目标{idx}] 恢复条件4满足: liquid_y={liquid_max_y}>air_y={air_max_y}, "
f"liquid_pixels={liquid_pixel_count}≤{init_pixel_count * 0.2:.0f}")
return True
except (ValueError, StopIteration):
pass
return False
def reset_initstatus(self, idx=None):
"""
重置初始状态
Args:
idx: 检测区域索引,None表示重置所有
"""
if idx is None:
self.detect_initstatus = [0] * len(self.detect_initstatus)
self.init_mask_pixel_counts = [0] * len(self.init_mask_pixel_counts)
self.is_inference_error = [False] * len(self.is_inference_error)
else:
if idx < len(self.detect_initstatus):
self.detect_initstatus[idx] = 0
self.init_mask_pixel_counts[idx] = 0
self.is_inference_error[idx] = False
if self.debug:
print(f"🔄 [InitError] 重置初始状态: idx={idx}")
# ==================== 液位检测引擎 ==================== # ==================== 液位检测引擎 ====================
class Logic(ModelDetect): class Logic(InitError):
""" """
液位检测引擎(继承自 ModelDetect) 液位检测引擎(继承自 ModelDetect)
...@@ -533,7 +977,7 @@ class Logic(ModelDetect): ...@@ -533,7 +977,7 @@ class Logic(ModelDetect):
super().__init__(model_path, device, batch_size) super().__init__(model_path, device, batch_size)
# 调试模式开关 # 调试模式开关
self.debug = True self.debug = False
# 滤波参数 # 滤波参数
self.smooth_window = 5 self.smooth_window = 5
...@@ -1033,6 +1477,16 @@ class Logic(ModelDetect): ...@@ -1033,6 +1477,16 @@ class Logic(ModelDetect):
# 完全没有检测到mask # 完全没有检测到mask
return None, False, 'detect_zero' return None, False, 'detect_zero'
# 🆕 模型推理异常后处理(InitError逻辑)
# 检测并修正 detect_initstatus 与模型分割结果不一致的情况
all_masks_info = self.process_masks(all_masks_info, idx)
# 🆕 如果InitError返回空列表(init_status=2且只检测到liquid),直接返回液位0
if len(all_masks_info) == 0:
if self.debug:
print(f" 🔧 [目标{idx}] InitError矫正: 返回空列表,液位=0mm")
return 0.0, False, 'init_error_corrected'
# ️ 关键修复:将原图坐标转换为裁剪图像坐标 # ️ 关键修复:将原图坐标转换为裁剪图像坐标
# container_bottom_offset 是原图绝对坐标,需要转换为裁剪图像中的相对坐标 # container_bottom_offset 是原图绝对坐标,需要转换为裁剪图像中的相对坐标
container_bottom_in_crop = container_bottom_offset - crop_top_y container_bottom_in_crop = container_bottom_offset - crop_top_y
......
...@@ -1005,7 +1005,7 @@ class AnnotationWidget(QtWidgets.QWidget): ...@@ -1005,7 +1005,7 @@ class AnnotationWidget(QtWidgets.QWidget):
self.zoom_center_y = 0 # 变焦中心Y坐标 self.zoom_center_y = 0 # 变焦中心Y坐标
# 🔥 调试开关 # 🔥 调试开关
self.debug = True self.debug = False
self._initUI() self._initUI()
self._connectSignals() self._connectSignals()
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment