Commit d4d8d3a0 by yhb

1

parent 6c828f54
--- ---
alwaysApply: true alwaysApply: true
--- ---
满:液位高度为容器高度,空:液位高度为0
一、空间逻辑判断,一个通道多个检测区域(大于等于俩个)才触发空间逻辑判断:
情况1.下方检测区域模型分割掩码结果只有air,上方检测区域液位高度为0,函数输出001
情况2.上方检测区域模型分割掩码结果有liquid或者foam,下方检测区域液位高度为容器高度,函数输出002
二、时间逻辑,从检测开始到结束一直使用,每个检测区域的逻辑是独立的。
1.本次液位高度为0(空),理论上下一次液位高度数据不会是容器高度,若下次液位高度数据是容器高度(满),数据为假且zero_full计数变量加1,液位高度数据依旧采用上次数据,当zero_full值为3时判断此次为真,使用本次数据为最终高度数据。
2.本次液位高度为容器高度(满),理论上下一次的数据不会是0(空),若下次液位高度数据是0(空),full_zero计数变量加一
3.前后俩次的数据变化范围不会超过8mm,前后俩次分割的结果面积不会差太多
4.data_count数据点技术
\ No newline at end of file
...@@ -18,13 +18,13 @@ channels: ...@@ -18,13 +18,13 @@ channels:
channel2: channel2:
general: general:
task_id: '1' task_id: '1'
task_name: test task_name: 海信现场测试
save_liquid_data_path: d:\restructure\liquid_level_line_detection_system\database\mission_result\1_test save_liquid_data_path: D:\restructure\liquid_level_line_detection_system\database\mission_result\1_海信现场测试
channel3: channel3:
general: general:
task_id: '1' task_id: '1'
task_name: test task_name: 海信现场测试
save_liquid_data_path: d:\restructure\liquid_level_line_detection_system\database\mission_result\1_test save_liquid_data_path: D:\restructure\liquid_level_line_detection_system\database\mission_result\1_海信现场测试
channel4: channel4:
general: general:
task_id: '1' task_id: '1'
...@@ -33,7 +33,7 @@ channel4: ...@@ -33,7 +33,7 @@ channel4:
channel1: channel1:
general: general:
task_id: '1' task_id: '1'
task_name: 历史回放功能展示 task_name: 曲线
area_count: 0 area_count: 0
safe_low: 2.0mm safe_low: 2.0mm
safe_high: 10.0mm safe_high: 10.0mm
...@@ -41,7 +41,7 @@ channel1: ...@@ -41,7 +41,7 @@ channel1:
video_format: AVI video_format: AVI
push_address: '' push_address: ''
video_path: '' video_path: ''
save_liquid_data_path: d:\restructure\liquid_level_line_detection_system\database\mission_result\1_历史回放功能展示 save_liquid_data_path: D:\restructure\liquid_level_line_detection_system\database\mission_result\1_曲线
areas: areas:
area_1: 通道1_区域1 area_1: 通道1_区域1
area_heights: area_heights:
......
...@@ -223,7 +223,10 @@ class CurvePanelHandler: ...@@ -223,7 +223,10 @@ class CurvePanelHandler:
# 业务数据管理(从Widget层移动到Handler层) # 业务数据管理(从Widget层移动到Handler层)
self.channel_data = {} # {channel_id: {'channel_name': str, 'window_name': str, 'time': [], 'value': []}} self.channel_data = {} # {channel_id: {'channel_name': str, 'window_name': str, 'time': [], 'value': []}}
self.channel_start_times = {} # 每个通道的起始时间 self.channel_start_times = {} # 每个通道的起始时间
self.max_points = 3000 # 每条曲线最大显示点数(3000点 ≈ 2分钟@25Hz) # 每条曲线历史模式下单次绘制的最大点数(根据X轴显示范围做采样)
# 注意:原始数据全部保存在 channel_data 中,仅在送往UI前做抽样
# self.max_points = 3000 # 旧字段,保留以兼容旧逻辑(实时模式用)
self.max_points_per_view = 800 # 新字段:单视图最大显示点数,用于采样
# 🔥 曲线加载模式:'realtime' 或 'history' # 🔥 曲线加载模式:'realtime' 或 'history'
# - 'realtime':实时检测模式,限制数据点为3000个(滚动窗口) # - 'realtime':实时检测模式,限制数据点为3000个(滚动窗口)
...@@ -248,6 +251,9 @@ class CurvePanelHandler: ...@@ -248,6 +251,9 @@ class CurvePanelHandler:
# 连接信号 # 连接信号
curve_panel.missionFolderChanged.connect(self._handleMissionFolderChanged) curve_panel.missionFolderChanged.connect(self._handleMissionFolderChanged)
curve_panel.refreshMissionListRequested.connect(self.loadMissionFolders) curve_panel.refreshMissionListRequested.connect(self.loadMissionFolders)
# 时间轴范围变化信号:用于缩放/拖动时根据当前范围重新采样
if hasattr(curve_panel, "timeAxisRangeChanged"):
curve_panel.timeAxisRangeChanged.connect(self._onTimeAxisRangeChanged)
# 连接安全限值变化信号 # 连接安全限值变化信号
curve_panel.spn_upper_limit.valueChanged.connect(self._handleSafetyLimitsChanged) curve_panel.spn_upper_limit.valueChanged.connect(self._handleSafetyLimitsChanged)
...@@ -345,7 +351,7 @@ class CurvePanelHandler: ...@@ -345,7 +351,7 @@ class CurvePanelHandler:
before_count = len(channel['time']) before_count = len(channel['time'])
print(f" - 更新前数据点数: {before_count}") print(f" - 更新前数据点数: {before_count}")
# 批量添加数据 # 批量添加数据(原始数据全部保留在 channel_data 中)
added_count = 0 added_count = 0
for point in data_points: for point in data_points:
timestamp = point['timestamp'] timestamp = point['timestamp']
...@@ -364,24 +370,8 @@ class CurvePanelHandler: ...@@ -364,24 +370,8 @@ class CurvePanelHandler:
print(f" - 实际添加数据点数: {added_count}") print(f" - 实际添加数据点数: {added_count}")
print(f" - 更新后数据点数: {after_add_count}") print(f" - 更新后数据点数: {after_add_count}")
# 🔥 数据点数量限制(已禁用) # ===== 根据当前X轴显示范围进行采样(显示层抽点,不影响原始数据) =====
# - 原逻辑:'realtime' 同步布局模式限制为3000个点(滚动窗口) processed_time, processed_value = self._getSampledDataForChannel(channel_id)
# - 已禁用:不再限制数据点数量,所有模式都显示完整数据
# if self.curve_load_mode == 'realtime':
# # 同步布局模式:保留最新3000个点
# if len(channel['time']) > self.max_points:
# before_limit = len(channel['time'])
# channel['time'] = channel['time'][-self.max_points:]
# channel['value'] = channel['value'][-self.max_points:]
# print(f" - 限制数据点: {before_limit} -> {len(channel['time'])}")
# 🔥 处理时间间隔断点:超过2分钟的数据点之间插入NaN断开连接
processed_time, processed_value = self._processTimeGaps(
channel['time'],
channel['value'],
max_gap_seconds=120 # 2分钟 = 120秒
)
print(f" - 处理后数据点数: {len(processed_time)}") print(f" - 处理后数据点数: {len(processed_time)}")
# 🔥 数据验证:确保有数据才更新UI # 🔥 数据验证:确保有数据才更新UI
...@@ -498,6 +488,109 @@ class CurvePanelHandler: ...@@ -498,6 +488,109 @@ class CurvePanelHandler:
return processed_time, processed_value return processed_time, processed_value
def _getSampledDataForChannel(self, channel_id, starttime=None, endtime=None):
"""
根据给定的时间范围对指定通道的数据进行抽样,并处理时间间隔断点。
Args:
channel_id (str): 通道ID
starttime (float|None): 可见范围起始时间戳;None 时自动取通道最小时间
endtime (float|None): 可见范围结束时间戳;None 时自动取通道最大时间
Returns:
tuple: (processed_time, processed_value)
"""
if channel_id not in self.channel_data:
return [], []
channel = self.channel_data[channel_id]
full_time = channel.get("time", [])
full_value = channel.get("value", [])
if not full_time or not full_value:
return [], []
# 如果外部没有指定范围,则使用当前CurvePanel中的范围
if starttime is None or endtime is None:
try:
starttime, endtime = self.getTimeAxisRange()
except Exception:
starttime, endtime = None, None
# 如果仍然没有有效范围,则使用该通道的整体时间范围
if starttime is None or endtime is None or not np.isfinite(starttime) or not np.isfinite(endtime):
starttime = min(full_time)
endtime = max(full_time)
# 选出当前显示范围内的数据点
visible_indices = [i for i, t in enumerate(full_time) if starttime <= t <= endtime]
if visible_indices:
visible_time = [full_time[i] for i in visible_indices]
visible_value = [full_value[i] for i in visible_indices]
else:
# 如果当前显示范围内没有点,则退回到全量数据
visible_time = full_time
visible_value = full_value
visible_count = len(visible_time)
max_points_per_view = getattr(self, "max_points_per_view", 5000)
if visible_count > max_points_per_view:
# 计算采样步长,保证单次绘制点数不超过 max_points_per_view
step = int(np.ceil(visible_count / max_points_per_view))
print(
f" - 采样可见数据[{channel_id}]: 原始={visible_count} 点, 目标最大={max_points_per_view} 点, 步长={step}"
)
sampled_time = visible_time[::step]
sampled_value = visible_value[::step]
else:
sampled_time = visible_time
sampled_value = visible_value
# 处理时间间隔断点:超过2分钟的数据点之间插入NaN断开连接
processed_time, processed_value = self._processTimeGaps(
sampled_time, sampled_value, max_gap_seconds=120 # 2分钟 = 120秒
)
return processed_time, processed_value
def _onTimeAxisRangeChanged(self, starttime, endtime):
"""
曲线面板时间轴范围变化回调(由CurvePanel触发)
每次缩放/拖动X轴时,按照当前显示范围对所有通道重新采样并刷新UI。
"""
if not self.curve_panel:
return
print(
f"🔁 [时间轴变化重采样] starttime={starttime}, endtime={endtime}, 通道数量={len(self.channel_data)}"
)
for channel_id, channel in self.channel_data.items():
processed_time, processed_value = self._getSampledDataForChannel(
channel_id, starttime=starttime, endtime=endtime
)
# 数据验证:与 updateCurveData 中保持一致
if not processed_time or not processed_value:
continue
if len(processed_time) != len(processed_value):
continue
has_valid_data = any(
np.isfinite(v) and np.isfinite(t)
for t, v in zip(processed_time, processed_value)
)
if not has_valid_data:
continue
self.curve_panel.updateCurveDisplay(
channel_id,
processed_time,
processed_value
)
def _exportChannelDataToCSV(self, channel_id, file_path=None): def _exportChannelDataToCSV(self, channel_id, file_path=None):
""" """
导出通道数据到CSV文件(手动导出功能) 导出通道数据到CSV文件(手动导出功能)
......
...@@ -422,16 +422,16 @@ class LiquidDetectionEngine: ...@@ -422,16 +422,16 @@ class LiquidDetectionEngine:
continue continue
# 执行检测(传入top坐标和配置用于坐标转换) # 执行检测(传入top坐标和配置用于坐标转换)
liquid_height_mm = self._detect_single_target( liquid_height_raw_mm = self._detect_single_target(
cropped, idx, top, cropped, idx, top,
fixed_bottoms[idx] if idx < len(fixed_bottoms) else None, fixed_bottoms[idx] if idx < len(fixed_bottoms) else None,
fixed_tops[idx] if idx < len(fixed_tops) else None, fixed_tops[idx] if idx < len(fixed_tops) else None,
actual_heights[idx] if idx < len(actual_heights) else 20.0 actual_heights[idx] if idx < len(actual_heights) else 20.0
) )
# 如果没有检测到液位,使用高度0 # 如果没有检测到液位,使用高度0(原始观测)
if liquid_height_mm is None: if liquid_height_raw_mm is None:
liquid_height_mm = 0.0 liquid_height_raw_mm = 0.0
# 计算液位线位置 # 计算液位线位置
# 注意:container_bottom_y 和 container_top_y 已经是原图中的绝对坐标 # 注意:container_bottom_y 和 container_top_y 已经是原图中的绝对坐标
...@@ -440,6 +440,19 @@ class LiquidDetectionEngine: ...@@ -440,6 +440,19 @@ class LiquidDetectionEngine:
container_height_mm = actual_heights[idx] if idx < len(actual_heights) else 20.0 container_height_mm = actual_heights[idx] if idx < len(actual_heights) else 20.0
container_pixel_height = container_bottom_y - container_top_y container_pixel_height = container_bottom_y - container_top_y
# ==================== 多帧稳健逻辑:卡尔曼滤波和平滑 ====================
# 使用卡尔曼滤波对单帧观测值进行平滑,得到最终的液位高度(毫米)
if idx < len(self.kalman_filters):
liquid_height_mm = self._apply_kalman_filter(
observation=liquid_height_raw_mm,
idx=idx,
container_height_mm=container_height_mm
)
else:
# 保险起见,如果卡尔曼未初始化,则直接使用原始高度
liquid_height_mm = max(0.0, min(liquid_height_raw_mm, container_height_mm))
pixel_per_mm = container_pixel_height / container_height_mm pixel_per_mm = container_pixel_height / container_height_mm
height_px = int(liquid_height_mm * pixel_per_mm) height_px = int(liquid_height_mm * pixel_per_mm)
...@@ -533,9 +546,7 @@ class LiquidDetectionEngine: ...@@ -533,9 +546,7 @@ class LiquidDetectionEngine:
# else: # else:
# print(f" - ⚠️ 未检测到任何mask!") # print(f" - ⚠️ 未检测到任何mask!")
liquid_height = None # 解析YOLO推理结果,得到当前帧的所有实例信息
# 解析YOLO推理结果
all_masks_info = parse_yolo_result( all_masks_info = parse_yolo_result(
mission_result=mission_result, mission_result=mission_result,
model_names=self.model.names, model_names=self.model.names,
...@@ -543,20 +554,32 @@ class LiquidDetectionEngine: ...@@ -543,20 +554,32 @@ class LiquidDetectionEngine:
confidence_threshold=0.5 confidence_threshold=0.5
) )
if len(all_masks_info) == 0: # ==================== 多帧稳健逻辑:更新计数器 ====================
return None # 如果检测到 liquid 实例,则认为本帧有有效观测,清零“未检测计数”和帧计数;
# 否则累加,用于后续 foam/air 备选策略和“保持上一帧高度”。
# ️ 关键修复:将原图坐标转换为裁剪图像坐标 has_liquid = any(class_name == 'liquid' for _, class_name, _ in all_masks_info)
# container_bottom_offset 是原图绝对坐标,需要转换为裁剪图像中的相对坐标
container_bottom_in_crop = container_bottom_offset - crop_top_y # 确保索引安全
container_top_in_crop = container_top_offset - crop_top_y if idx < len(self.no_liquid_count):
if has_liquid:
# 本帧检测到 liquid:重置计数器
self.no_liquid_count[idx] = 0
if idx < len(self.frame_counters):
self.frame_counters[idx] = 0
else:
# 本帧未检测到 liquid:累加计数器
self.no_liquid_count[idx] += 1
if idx < len(self.frame_counters):
self.frame_counters[idx] += 1
# print(f" [坐标转换-目标{idx}]:") # 周期性清零,避免计数无限增长(与历史逻辑类似,每3帧清一次)
# print(f" - 裁剪区域top: {crop_top_y}px (原图坐标)") if self.frame_counters[idx] > 0 and self.frame_counters[idx] % 3 == 0:
# print(f" - 原图容器底部: {container_bottom_offset}px → 裁剪图像中: {container_bottom_in_crop}px") self.no_liquid_count[idx] = 0
# print(f" - 原图容器顶部: {container_top_offset}px → 裁剪图像中: {container_top_in_crop}px")
# print(f" - 裁剪图像中容器高度: {container_bottom_in_crop - container_top_in_crop}px(应等于{container_pixel_height}px)")
# 如果完全没有任何 mask,直接走后面的“保持上一帧高度 / 返回 None”逻辑
if len(all_masks_info) == 0:
liquid_height = None
else:
# 分析mask获取液位高度(使用裁剪图像坐标) # 分析mask获取液位高度(使用裁剪图像坐标)
liquid_height = calculate_liquid_height( liquid_height = calculate_liquid_height(
all_masks_info=all_masks_info, all_masks_info=all_masks_info,
...@@ -566,6 +589,18 @@ class LiquidDetectionEngine: ...@@ -566,6 +589,18 @@ class LiquidDetectionEngine:
no_liquid_count=self.no_liquid_count[idx] if idx < len(self.no_liquid_count) else 0 no_liquid_count=self.no_liquid_count[idx] if idx < len(self.no_liquid_count) else 0
) )
# ==================== 多帧稳健逻辑:保持上一帧高度 ====================
# 如果连续未检测到 liquid 但还没达到 foam/air 备选触发阈值,
# 并且之前有有效高度,则直接保持上一帧的液位高度。
if liquid_height is None and idx < len(self.no_liquid_count):
if self.no_liquid_count[idx] < 3 and idx < len(self.last_liquid_heights):
if self.last_liquid_heights[idx] is not None:
liquid_height = self.last_liquid_heights[idx]
# 如果本帧最终得到有效高度,更新 last_liquid_heights
if liquid_height is not None and idx < len(self.last_liquid_heights):
self.last_liquid_heights[idx] = liquid_height
return liquid_height return liquid_height
except Exception as e: except Exception as e:
......
# -*- coding: utf-8 -*-
"""
时间逻辑判断模块
实现检测区域的时间逻辑判断,用于过滤异常数据并输出最终液位高度
使用示例:
from handlers.videopage.temporal_logic import TemporalLogicFilter
# 为每个检测区域创建一个过滤器实例
filter = TemporalLogicFilter(container_height_mm=100.0)
# 每次检测后调用
final_height = filter.process(current_height_mm=50.0, segmentation_area=1000.0)
"""
from typing import Optional
import numpy as np
class TemporalLogicFilter:
"""
时间逻辑过滤器
每个检测区域独立使用一个实例,维护该区域的时间逻辑状态
"""
def __init__(self, container_height_mm: float, area_change_threshold: float = 0.3):
"""
初始化时间逻辑过滤器
Args:
container_height_mm: 容器高度(毫米),用于判断"满"状态
area_change_threshold: 分割面积变化阈值(0-1),默认0.3表示面积变化不超过30%
"""
self.container_height_mm = container_height_mm
# 状态变量
self.last_height_mm: Optional[float] = None # 上次液位高度
self.last_segmentation_area: Optional[float] = None # 上次分割面积
self.zero_full_count: int = 0 # 从空到满的异常计数
self.full_zero_count: int = 0 # 从满到空的异常计数
self.data_count: int = 0 # 数据点计数
# 参数
self.max_height_change_mm = 8.0 # 最大高度变化(毫米)
self.area_change_threshold = area_change_threshold # 面积变化阈值
# 判断"空"和"满"的容差(毫米)
self.empty_tolerance_mm = 1.0 # 高度小于此值视为空
self.full_tolerance_mm = 1.0 # 高度大于(容器高度-此值)视为满
def _is_empty(self, height_mm: float) -> bool:
"""判断液位是否为空"""
return height_mm <= self.empty_tolerance_mm
def _is_full(self, height_mm: float) -> bool:
"""判断液位是否为满"""
return height_mm >= (self.container_height_mm - self.full_tolerance_mm)
def _is_height_change_valid(self, current_height: float, last_height: float) -> bool:
"""
判断高度变化是否有效(规则3:前后两次数据变化范围不会超过8mm)
Args:
current_height: 当前高度(毫米)
last_height: 上次高度(毫米)
Returns:
bool: 变化是否有效
"""
if last_height is None:
return True # 第一次检测,总是有效
height_change = abs(current_height - last_height)
return height_change <= self.max_height_change_mm
def _is_area_change_valid(self, current_area: Optional[float], last_area: Optional[float]) -> bool:
"""
判断分割面积变化是否有效(规则3:前后两次分割的结果面积不会差太多)
Args:
current_area: 当前分割面积
last_area: 上次分割面积
Returns:
bool: 面积变化是否有效
"""
if last_area is None or current_area is None:
return True # 没有面积数据,跳过检查
if last_area == 0:
return True # 上次面积为0,跳过检查
# 计算面积变化比例
area_change_ratio = abs(current_area - last_area) / last_area
return area_change_ratio <= self.area_change_threshold
def process(
self,
current_height_mm: float,
segmentation_area: Optional[float] = None
) -> float:
"""
处理当前检测到的液位高度,应用时间逻辑判断
Args:
current_height_mm: 当前检测到的液位高度(毫米)
segmentation_area: 当前分割结果面积(可选,用于规则3的面积检查)
Returns:
float: 经过时间逻辑判断后的最终液位高度(毫米)
"""
# 数据点计数加1
self.data_count += 1
# 第一次检测,直接使用当前数据
if self.last_height_mm is None:
self.last_height_mm = current_height_mm
self.last_segmentation_area = segmentation_area
return current_height_mm
# 规则1:本次液位高度为0(空),理论上下一次液位高度数据不会是容器高度
if self._is_empty(self.last_height_mm):
if self._is_full(current_height_mm):
# 从空到满的异常情况
self.zero_full_count += 1
if self.zero_full_count >= 3:
# 连续3次从空到满,判断此次为真,使用本次数据
self.last_height_mm = current_height_mm
self.last_segmentation_area = segmentation_area
self.zero_full_count = 0 # 重置计数器
return current_height_mm
else:
# 数据为假,使用上次数据
return self.last_height_mm
else:
# 正常情况,重置计数器
self.zero_full_count = 0
# 规则2:本次液位高度为容器高度(满),理论上下一次的数据不会是0(空)
if self._is_full(self.last_height_mm):
if self._is_empty(current_height_mm):
# 从满到空的异常情况
self.full_zero_count += 1
# 注意:规则2只要求计数,没有说明如何处理,这里使用上次数据
return self.last_height_mm
else:
# 正常情况,重置计数器
self.full_zero_count = 0
# 规则3:前后两次的数据变化范围不会超过8mm
if not self._is_height_change_valid(current_height_mm, self.last_height_mm):
# 高度变化过大,使用上次数据
return self.last_height_mm
# 规则3:前后两次分割的结果面积不会差太多
if not self._is_area_change_valid(segmentation_area, self.last_segmentation_area):
# 面积变化过大,使用上次数据
return self.last_height_mm
# 所有检查通过,使用当前数据
self.last_height_mm = current_height_mm
self.last_segmentation_area = segmentation_area
self.zero_full_count = 0 # 重置计数器
self.full_zero_count = 0 # 重置计数器
return current_height_mm
def reset(self):
"""重置过滤器状态"""
self.last_height_mm = None
self.last_segmentation_area = None
self.zero_full_count = 0
self.full_zero_count = 0
self.data_count = 0
def get_state(self) -> dict:
"""
获取当前状态信息(用于调试)
Returns:
dict: 状态字典
"""
return {
'last_height_mm': self.last_height_mm,
'zero_full_count': self.zero_full_count,
'full_zero_count': self.full_zero_count,
'data_count': self.data_count,
'container_height_mm': self.container_height_mm
}
def create_temporal_filter(container_height_mm: float, area_change_threshold: float = 0.3) -> TemporalLogicFilter:
"""
创建时间逻辑过滤器的便捷函数
Args:
container_height_mm: 容器高度(毫米)
area_change_threshold: 分割面积变化阈值(0-1),默认0.3
Returns:
TemporalLogicFilter: 过滤器实例
"""
return TemporalLogicFilter(container_height_mm, area_change_threshold)
"""
检测引擎模块
"""
\ No newline at end of file
"""
卡尔曼滤波引擎模块
包含卡尔曼滤波相关的功能
"""
import cv2
import numpy as np
def stable_median(data, max_std=1.0):
"""稳健地计算中位数"""
if len(data) == 0:
return 0
data = np.array(data)
q1, q3 = np.percentile(data, [25, 75])
iqr = q3 - q1
lower, upper = q1 - 1.5 * iqr, q3 + 1.5 * iqr
data = data[(data >= lower) & (data <= upper)]
if len(data) >= 2 and np.std(data) > max_std:
median_val = np.median(data)
data = data[np.abs(data - median_val) <= max_std]
return float(np.median(data)) if len(data) > 0 else 0
def init_kalman_filters_list(num_targets, init_means):
"""初始化卡尔曼滤波器列表"""
kalman_list = []
for i in range(num_targets):
kf = cv2.KalmanFilter(2, 1)
kf.measurementMatrix = np.array([[1, 0]], np.float32)
kf.transitionMatrix = np.array([[1, 0.9], [0, 0.9]], np.float32)
kf.processNoiseCov = np.diag([1e-4, 1e-3]).astype(np.float32)
kf.measurementNoiseCov = np.array([[10]], dtype=np.float32)
kf.statePost = np.array([[init_means[i]], [0]], dtype=np.float32)
kalman_list.append(kf)
return kalman_list
class KalmanFilterEngine:
"""卡尔曼滤波引擎"""
def __init__(self, num_targets=0):
"""初始化卡尔曼滤波引擎"""
self.num_targets = num_targets
self.kalman_filters = []
self.consecutive_rejects = [0] * num_targets
self.recent_observations = [[] for _ in range(num_targets)]
self.last_observations = [None] * num_targets
self.smooth_window = 5
def initialize(self, initial_heights):
"""初始化卡尔曼滤波器"""
init_means = [stable_median(heights) for heights in initial_heights]
self.kalman_filters = init_kalman_filters_list(self.num_targets, init_means)
print(f"✅ 卡尔曼滤波器初始化完成,起始高度:{init_means}")
return init_means
def update(self, target_idx, observation, container_height_cm, error_percentage=30):
"""更新指定目标的卡尔曼滤波器"""
if not self.kalman_filters or target_idx >= len(self.kalman_filters):
raise RuntimeError("卡尔曼滤波器未初始化或目标索引超出范围")
# 预测步骤
predicted = self.kalman_filters[target_idx].predict()
predicted_height = predicted[0][0]
# 更新滤波器
final_height, self.consecutive_rejects[target_idx] = self._update_kalman_filter(
self.kalman_filters[target_idx],
observation,
predicted_height,
container_height_cm,
error_percentage,
self.consecutive_rejects[target_idx],
self.last_observations[target_idx]
)
# 更新上次观测值记录
self.last_observations[target_idx] = observation
# 添加到滑动窗口
self.recent_observations[target_idx].append(final_height)
if len(self.recent_observations[target_idx]) > self.smooth_window:
self.recent_observations[target_idx].pop(0)
return final_height, predicted_height
def _update_kalman_filter(self, kalman_filter, observation, predicted_height, container_height_cm,
error_percentage=30, consecutive_rejects=0, last_observation=None):
"""更新卡尔曼滤波器"""
# 计算预测误差(相对于容器高度的百分比)
prediction_error_percent = abs(observation - predicted_height) / container_height_cm * 100
# 检测是否是重复的观测值(保持的液位数据)
is_repeated_observation = (last_observation is not None and
observation == last_observation)
# 误差控制逻辑
if prediction_error_percent > error_percentage:
# 误差过大,增加拒绝计数
consecutive_rejects += 1
# 检查是否连续6次拒绝
if consecutive_rejects >= 6:
# 连续6次误差过大,强制使用观测值更新
kalman_filter.correct(np.array([[observation]], dtype=np.float32))
final_height = kalman_filter.statePost[0][0]
consecutive_rejects = 0 # 重置计数器
print(f" 连续6次误差过大,强制使用观测值更新: {observation:.3f}cm → 滤波后: {final_height:.3f}cm")
else:
# 使用预测值
final_height = predicted_height
print(f" ❌ 误差 {prediction_error_percent:.1f}% > {error_percentage}%,使用预测值: {predicted_height:.3f}cm (连续拒绝: {consecutive_rejects}/6)")
else:
# 误差可接受,正常更新
kalman_filter.correct(np.array([[observation]], dtype=np.float32))
final_height = kalman_filter.statePost[0][0]
consecutive_rejects = 0 # 重置计数器
print(f" ✅ 误差 {prediction_error_percent:.1f}% <= {error_percentage}%,使用观测值: {observation:.3f}cm → 滤波后: {final_height:.3f}cm")
return final_height, consecutive_rejects
def get_smooth_height(self, target_idx):
"""获取平滑后的高度(中位数)"""
if not self.recent_observations[target_idx]:
return 0
return np.median(self.recent_observations[target_idx])
def reset_target(self, target_idx):
"""重置指定目标的滤波器状态"""
if target_idx < len(self.consecutive_rejects):
self.consecutive_rejects[target_idx] = 0
if target_idx < len(self.last_observations):
self.last_observations[target_idx] = None
if target_idx < len(self.recent_observations):
self.recent_observations[target_idx] = []
print(f" 重置目标{target_idx+1}的滤波器状态")
\ No newline at end of file
"""
Real Time Streaming Capture
用于处理RTSP/RTMP等实时流,避免花屏问题
"""
import threading
import cv2
class RTSCapture(cv2.VideoCapture):
"""Real Time Streaming Capture.
这个类必须使用 RTSCapture.create 方法创建,请不要直接实例化
"""
_cur_frame = None
_reading = False
schemes = ["rtsp://", "rtmp://"] # 用于识别实时流
@staticmethod
def create(url, *schemes):
"""实例化&初始化
rtscap = RTSCapture.create("rtsp://example.com/live/1")
or
rtscap = RTSCapture.create("http://example.com/live/1.m3u8", "http://")
"""
rtscap = RTSCapture(url)
rtscap.frame_receiver = threading.Thread(target=rtscap.recv_frame, daemon=True)
rtscap.schemes.extend(schemes)
if isinstance(url, str) and url.startswith(tuple(rtscap.schemes)):
rtscap._reading = True
elif isinstance(url, int):
# 这里可能是本机设备
pass
return rtscap
def isStarted(self):
"""替代 VideoCapture.isOpened() """
ok = self.isOpened()
if ok and self._reading:
ok = self.frame_receiver.is_alive()
return ok
def recv_frame(self):
"""子线程读取最新视频帧方法"""
while self._reading and self.isOpened():
ok, frame = self.read()
if not ok:
break
self._cur_frame = frame
self._reading = False
def read2(self):
"""读取最新视频帧
返回结果格式与 VideoCapture.read() 一样
"""
frame = self._cur_frame
self._cur_frame = None
return frame is not None, frame
def start_read(self):
"""启动子线程读取视频帧"""
self.frame_receiver.start()
self.read_latest_frame = self.read2 if self._reading else self.read
def stop_read(self):
"""退出子线程方法"""
self._reading = False
if self.frame_receiver.is_alive():
self.frame_receiver.join()
def release(self):
"""释放资源"""
self.stop_read()
super().release()
# 测试代码
if __name__ == '__main__':
import sys
if len(sys.argv) < 2:
print("Usage: python rtscapture.py <rtsp_url>")
sys.exit(1)
rtscap = RTSCapture.create(sys.argv[1])
rtscap.start_read() # 启动子线程并改变 read_latest_frame 的指向
while rtscap.isStarted():
ok, frame = rtscap.read_latest_frame() # read_latest_frame() 替代 read()
if not ok:
if cv2.waitKey(100) & 0xFF == ord('q'):
break
continue
# 帧处理代码写这里
cv2.imshow("cam", frame)
if cv2.waitKey(100) & 0xFF == ord('q'):
break
rtscap.stop_read()
rtscap.release()
cv2.destroyAllWindows()
\ No newline at end of file
从yolo分割结果到液位高度数据总体分两个步骤
从yolo分割结果到液位高度数据总体分两个步骤
1.通过 yolo,分割结果得到检测的液位线 2.根据卡尔曼滤波的结果是否选择更新
通过 yolo 结果,一共3层,有检测液体,没有检测到液体但是有检测到泡沫或者空气,什么都没有检测到
->如果有检测到液体->取液体的最上面作为液位高度
->没有检测到液体,用其他检测到
->有泡沫->取泡沫下方作为液位高度
->有多个泡沫,可能是液体混着泡沫的情况,取两个泡沫中间
->没泡沫有空气取空气下方作为液位高度
->如果没有检测到,仍然保留上次结果
2.得到1的液位结果,进行卡尔曼滤波判断
卡尔曼滤波会自己单独得到一个预测的液位高度值和我们通过1得到的yolo结果进行比较
->卡尔曼滤波的液位高度 和 我们实际得到的差不多,使用卡尔曼滤波的值
->卡尔曼滤波的液位高度 和 我们实际得到的差距大,这个情况可能存在跳变
->连续三次都差距大,那么就取新的ylg结果作为高度值
->不是连续三次,说明可能存在误检,仍然保留上次的结果,不进行更新。
\ No newline at end of file
时间轴变量starttime,endtime 时间轴变量starttime,endtime
...@@ -3,3 +3,17 @@ ...@@ -3,3 +3,17 @@
@curvepanel_handler.py#L371-378 修改self._video_layout_mode = 1同步布局,绘制曲线的数据范围 @curvepanel_handler.py#L371-378 修改self._video_layout_mode = 1同步布局,绘制曲线的数据范围
实验最长时间三天,strattime到endtime,缩放限制 实验最长时间三天,strattime到endtime,缩放限制
曲线面板历史模式下,, 曲线面板历史模式下,,
**简短回答**
`self.max_points_per_view = 5000` 用来限制**当前 X 轴可见范围内,单次真正传给 PyQtGraph 去画的点数上限**,防止一次画太多点导致卡顿。
### 具体含义
- **原始数据**:全部保存在 `channel['time']` / `channel['value']` 里,不受这个值影响。
- **显示数据**:在 `updateCurveData()` 里,会先根据当前时间轴范围 `(starttime, endtime)` 取出“当前可见时间段”的点数 `visible_count`
- 如果 `visible_count <= max_points_per_view`:不抽样,全部画出来。
- 如果 `visible_count > max_points_per_view`:按步长 `step = ceil(visible_count / max_points_per_view)` 抽样,只画一部分代表点。
- 所以:
- **值越大**:曲线越细致,但大时间跨度时更容易卡。
- **值越小**:性能好,但放大到很长时间范围时曲线会更“稀疏”。
如果你想“更平滑、更细腻”,可以把 5000 提高一些;如果性能吃紧,可以改小,比如 2000。
\ No newline at end of file
...@@ -91,6 +91,8 @@ class CurvePanel(QtWidgets.QWidget): ...@@ -91,6 +91,8 @@ class CurvePanel(QtWidgets.QWidget):
backClicked = QtCore.Signal() # 返回按钮点击信号 backClicked = QtCore.Signal() # 返回按钮点击信号
missionFolderChanged = QtCore.Signal(str) # 任务文件夹选择变化信号(传递完整路径) missionFolderChanged = QtCore.Signal(str) # 任务文件夹选择变化信号(传递完整路径)
refreshMissionListRequested = QtCore.Signal() # 刷新任务列表请求信号 refreshMissionListRequested = QtCore.Signal() # 刷新任务列表请求信号
# 时间轴范围变化信号:(starttime, endtime) - Unix时间戳
timeAxisRangeChanged = QtCore.Signal(float, float)
def __init__(self, parent=None): def __init__(self, parent=None):
super(CurvePanel, self).__init__(parent) super(CurvePanel, self).__init__(parent)
...@@ -564,7 +566,7 @@ class CurvePanel(QtWidgets.QWidget): ...@@ -564,7 +566,7 @@ class CurvePanel(QtWidgets.QWidget):
# 创建曲线UI对象(圆润样式) # 创建曲线UI对象(圆润样式)
pen = pg.mkPen( pen = pg.mkPen(
color=color, color=color,
width=3, width=1,
style=Qt.SolidLine, style=Qt.SolidLine,
capStyle=Qt.RoundCap, # 圆形端点 capStyle=Qt.RoundCap, # 圆形端点
joinStyle=Qt.RoundJoin # 圆角连接 joinStyle=Qt.RoundJoin # 圆角连接
...@@ -912,6 +914,13 @@ class CurvePanel(QtWidgets.QWidget): ...@@ -912,6 +914,13 @@ class CurvePanel(QtWidgets.QWidget):
print(f"🔄 [时间轴范围] starttime={x_min:.3f} ({start_str}) -> endtime={x_max:.3f} ({end_str})") print(f"🔄 [时间轴范围] starttime={x_min:.3f} ({start_str}) -> endtime={x_max:.3f} ({end_str})")
# 发送时间轴范围变化信号,通知Handler根据新的范围重新采样
try:
self.timeAxisRangeChanged.emit(float(self.starttime), float(self.endtime))
except Exception:
# 静默处理,避免因为范围无效导致崩溃
pass
# 确保Y轴范围在0-23之间 # 确保Y轴范围在0-23之间
if y_min < 0 or y_max > 23: if y_min < 0 or y_max > 23:
# 调整到合法范围 # 调整到合法范围
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment