Commit 0fcf27b8 by yhb

1

parent 66a7639e
# Python缓存文件
__pycache__/
# 忽略所有文件
/*
# 只上传指定的文件夹
!/handlers/
!/widgets/
!/rules/
!/hooks/
!/icons/
!/labelme/
!/utils/
# 只上传指定的文件
!/app.py
!/__main__.py
!/__init__.py
!/.gitignore
# 忽略这些文件夹中的特定内容
**/__pycache__/
*.py[cod]
*$py.class
*.so
# IDE文件
.vscode/
.idea/
*.swp
*.swo
# 日志文件
*.log
# 临时文件
*.tmp
*.temp
~*
# 系统文件
.DS_Store
Thumbs.db
desktop.ini
# Python虚拟环境
.env
.venv
env/
venv/
# 打包文件
dist/
build/
*.egg-info/
# 数据库文件(如果不需要追踪)
# *.db
# *.sqlite
# *.sqlite3
# 敏感配置文件
config/secrets.yaml
config/private.yaml
\ No newline at end of file
*.backup
*.backup_*
**/*.md
**/build/
**/SdkLog_Python/
**/*.csv
*_report.txt
......@@ -103,7 +103,7 @@ binaries = []
try:
binaries += collect_dynamic_libs('torch')
print("✓ 已收集 torch 二进制文件")
excPyInstaller 1.specept Exception as e:
except Exception as e:
print(f"⚠ 收集 torch 二进制文件失败: {e}")
pass
......
......@@ -87,7 +87,6 @@ class ModelSignalHandler:
if hasattr(self, 'modelSetPage') and hasattr(self, 'trainingPage'):
if hasattr(self.modelSetPage, 'modelListChanged') and hasattr(self.trainingPage, 'connectModelListChangeSignal'):
self.trainingPage.connectModelListChangeSignal(self.modelSetPage)
print("[模型信号处理] 已连接模型列表变化信号到训练页面")
except Exception as e:
import traceback
......
from ast import main
123131441
fsafasa
main
\ No newline at end of file
......@@ -48,17 +48,27 @@ except ImportError:
class AmplifyWindowHandler:
"""全屏放大窗口业务逻辑处理器"""
def __init__(self, amplify_window, device_config=None):
# 来源常量
SOURCE_AMPLIFY = 'amplifysource' # 点击放大显示按钮进入
SOURCE_ANNOTATION = 'annotationsource' # 点击开始标注按钮进入
def __init__(self, amplify_window, device_config=None, source=None):
"""
初始化处理器
Args:
amplify_window: AmplifyWindow实例
device_config: 设备配置字典,包含IP、端口、用户名、密码等
source: 来源标记,可选值:
- 'amplifysource': 点击放大显示按钮进入
- 'annotationsource': 点击开始标注按钮进入
"""
self.amplify_window = amplify_window
self.channel_id = amplify_window._channel_id
# 来源标记(用于区分不同入口,显示不同控件)
self.source = source or self.SOURCE_AMPLIFY
# 物理变焦控制器
self.physical_zoom_controller = None
self.physical_zoom_enabled = False
......@@ -81,7 +91,6 @@ class AmplifyWindowHandler:
# 初始化物理变焦控制器
self._initPhysicalZoomController()
print(f"[AMPLIFY_HANDLER] 处理器已创建: {self.channel_id}, 物理变焦: {'启用' if self.physical_zoom_enabled else '不可用'}")
def _loadDeviceConfigFromFile(self):
"""从配置文件的RTSP地址解析设备配置"""
......@@ -521,9 +530,8 @@ class AmplifyWindowHandler:
if self.physical_zoom_controller:
self.physical_zoom_controller.disconnect_device()
self.physical_zoom_controller = None
print(f"[AMPLIFY_HANDLER] 通道{self.channel_id}: 物理变焦控制器已清理")
except Exception as e:
print(f"[AMPLIFY_HANDLER] 清理失败: {e}")
except Exception:
pass
def get_zoom_status(self):
"""
......@@ -545,6 +553,44 @@ class AmplifyWindowHandler:
status['physical_status'] = physical_status
return status
# ========== 来源判断辅助方法 ==========
def isAmplifySource(self):
"""
判断是否从放大显示按钮进入
Returns:
bool: 是则返回True
"""
return self.source == self.SOURCE_AMPLIFY
def isAnnotationSource(self):
"""
判断是否从开始标注按钮进入
Returns:
bool: 是则返回True
"""
return self.source == self.SOURCE_ANNOTATION
def getSource(self):
"""
获取当前来源标记
Returns:
str: 来源标记 ('amplifysource' 或 'annotationsource')
"""
return self.source
def setSource(self, source):
"""
设置来源标记
Args:
source: 来源标记
"""
self.source = source
# ============================================================================
......
......@@ -1723,8 +1723,6 @@ class ChannelPanelHandler:
# 显示窗口
amplify_window.show()
print(f"[AMPLIFY] 全屏窗口已创建: {channel_id} - {channel_name}")
# 放大窗口现在通过 _updateVideoDisplayUI 同步更新,无需单独的同步线程
# if channel_id in self._channel_captures:
# self._startAmplifyFrameSync(channel_id)
......@@ -1742,8 +1740,6 @@ class ChannelPanelHandler:
channel_id: 通道ID
"""
try:
print(f"[AMPLIFY] 全屏窗口关闭: {channel_id}")
# 从映射中移除窗口和处理器
if channel_id in self._amplify_windows:
del self._amplify_windows[channel_id]
......@@ -1772,7 +1768,6 @@ class ChannelPanelHandler:
# 检查是否已经有同步线程
if hasattr(self, '_amplify_sync_flags'):
if channel_id in self._amplify_sync_flags and self._amplify_sync_flags[channel_id]:
print(f"[AMPLIFY] 帧同步已在进行中: {channel_id}")
return
# 初始化同步标志
......@@ -1789,8 +1784,6 @@ class ChannelPanelHandler:
)
sync_thread.start()
print(f"[AMPLIFY] 帧同步线程已启动: {channel_id}")
except Exception as e:
print(f" 启动全屏帧同步失败: {e}")
......@@ -1805,7 +1798,6 @@ class ChannelPanelHandler:
if hasattr(self, '_amplify_sync_flags'):
if channel_id in self._amplify_sync_flags:
self._amplify_sync_flags[channel_id] = False
print(f"[AMPLIFY] 帧同步已停止: {channel_id}")
except Exception as e:
print(f" 停止全屏帧同步失败: {e}")
......@@ -1817,8 +1809,6 @@ class ChannelPanelHandler:
Args:
channel_id: 通道ID
"""
print(f"[AMPLIFY] 帧同步循环启动: {channel_id}")
frame_count = 0
last_sync_time = time.time()
......@@ -1860,9 +1850,6 @@ class ChannelPanelHandler:
fps = 30 / (current_time - last_sync_time)
last_sync_time = current_time
# 每100帧打印一次统计
if frame_count % 100 == 0:
print(f"[AMPLIFY] {channel_id} 已同步 {frame_count} 帧")
else:
# 没有新帧,短暂等待
time.sleep(0.01)
......@@ -1870,8 +1857,6 @@ class ChannelPanelHandler:
except Exception as e:
print(f" {channel_id} 全屏帧同步错误: {e}")
time.sleep(0.1)
print(f"[AMPLIFY] 帧同步循环停止: {channel_id},总共同步 {frame_count} 帧")
def _updateAmplifyWindows(self, channel_id, frame):
"""
......@@ -1902,7 +1887,6 @@ class ChannelPanelHandler:
"""初始化配置文件监控器"""
try:
# 临时禁用配置文件监控器以解决 QWidget 创建顺序问题
print(f"[ConfigWatcher] 配置文件监控器已禁用(避免 QWidget 创建顺序问题)")
return
# 获取配置文件路径
......@@ -2031,14 +2015,12 @@ class ChannelPanelHandler:
"""
try:
if not 1 <= channel_num <= 4:
print(f"[updateMissionLabelByVar] 无效的通道编号: {channel_num},必须在1-4之间")
return
# 获取对应的任务标签变量
mission_var_name = f'channel{channel_num}mission'
if not hasattr(self, mission_var_name):
print(f"[updateMissionLabelByVar] 未找到变量: {mission_var_name}")
return
mission_label = getattr(self, mission_var_name)
......@@ -2053,9 +2035,6 @@ class ChannelPanelHandler:
if panel and hasattr(panel, '_positionTaskLabel'):
panel._positionTaskLabel()
print(f"✅ [updateMissionLabelByVar] 已更新 {mission_var_name}: {text}")
except Exception as e:
print(f"❌ [updateMissionLabelByVar] 更新任务标签失败: {e}")
import traceback
traceback.print_exc()
......@@ -841,9 +841,9 @@ class CurvePanelHandler:
start_time, end_time = result
start_str = datetime.datetime.fromtimestamp(start_time).strftime('%Y-%m-%d %H:%M:%S') if start_time else 'None'
end_str = datetime.datetime.fromtimestamp(end_time).strftime('%Y-%m-%d %H:%M:%S') if end_time else 'None'
print(f"🎯 [Handler获取时间轴范围] starttime={start_time} ({start_str}) -> endtime={end_time} ({end_str})")
return result
print(f"⚠️ [Handler获取时间轴范围] curve_panel不存在,返回(None, None)")
return (None, None)
# ========== 数据管理方法 ==========
......@@ -970,12 +970,8 @@ class CurvePanelHandler:
# 构建 mission_result 目录路径
mission_result_dir = os.path.join(data_root, 'database', 'mission_result')
print(f"🔍 [任务列表] 数据根目录: {data_root}")
print(f"🔍 [任务列表] 任务目录: {mission_result_dir}")
print(f"🔍 [任务列表] 目录是否存在: {os.path.exists(mission_result_dir)}")
if not os.path.exists(mission_result_dir):
print(f"❌ [任务列表] 目录不存在: {mission_result_dir}")
# 通知UI显示空列表
if self.curve_panel:
self.curve_panel.updateMissionFolderList([])
......
# CSV日志补丁代码
# 将以下代码整合到detect_debug.py的VideoThread类中
# ========== 在 detect_and_visualize 中添加检测信息收集 ==========
# ROI检测部分(替换第612-616行):
result = results[0]
# 记录检测结果
roi_info = {}
if result.masks is not None:
roi_info['mask_count'] = len(result.masks.data)
roi_info['classes'] = []
roi_info['confidences'] = []
for i in range(len(result.masks.data)):
class_id = int(result.boxes.cls[i].cpu().numpy())
conf = float(result.boxes.conf[i].cpu().numpy())
if conf >= 0.3:
class_name = self.engine.model.names[class_id]
roi_info['classes'].append(class_name)
roi_info['confidences'].append(conf)
self._draw_roi_masks(annotated_frame, overlay, result, x, y, w, h)
else:
roi_info['mask_count'] = 0
roi_info['classes'] = []
roi_info['confidences'] = []
detection_info[f'ROI{roi_idx+1}'] = roi_info
# 全图检测部分(替换第631-634行):
result = results[0]
# 记录检测结果
full_info = {}
if result.masks is not None:
full_info['mask_count'] = len(result.masks.data)
full_info['classes'] = []
full_info['confidences'] = []
for i in range(len(result.masks.data)):
class_id = int(result.boxes.cls[i].cpu().numpy())
conf = float(result.boxes.conf[i].cpu().numpy())
if conf >= 0.3:
class_name = self.engine.model.names[class_id]
full_info['classes'].append(class_name)
full_info['confidences'].append(conf)
self._draw_fullframe_masks(annotated_frame, overlay, result, frame.shape)
else:
full_info['mask_count'] = 0
full_info['classes'] = []
full_info['confidences'] = []
detection_info['FullFrame'] = full_info
# 返回部分(替换第640-644行):
return annotated_frame, detection_info
# ========== 在 run 方法中添加CSV日志功能 ==========
# 启动时初始化CSV(在第772行后添加):
self.running = True
self.fps_start_time = time.time()
self.frame_counter = 0
# 初始化CSV日志文件
try:
import csv
self.csv_file = open(self.csv_log_path, 'w', newline='', encoding='utf-8-sig')
self.csv_writer = csv.writer(self.csv_file)
# 写入表头
self.csv_writer.writerow(['帧数', '检测区域', 'Mask数量', '检测类别', '置信度'])
self.csv_file.flush()
print(f"✅ 已创建CSV日志文件: {self.csv_log_path}")
except Exception as e:
print(f"⚠️ 无法创建CSV日志文件: {e}")
self.csv_file = None
self.csv_writer = None
# 检测调用部分(替换第787-791行):
# 执行检测
annotated_frame, detection_info = self.detect_and_visualize(frame)
# 记录帧数
self.frame_counter += 1
# 写入CSV日志
if self.csv_writer:
try:
if detection_info:
for region_name, info in detection_info.items():
classes_str = ', '.join(info.get('classes', []))
conf_str = ', '.join([f"{c:.2f}" for c in info.get('confidences', [])])
self.csv_writer.writerow([
self.frame_counter,
region_name,
info.get('mask_count', 0),
classes_str if classes_str else '无',
conf_str if conf_str else '无'
])
else:
self.csv_writer.writerow([self.frame_counter, '全图', 0, '无', '无'])
# 每10帧刷新一次文件
if self.frame_counter % 10 == 0:
self.csv_file.flush()
except Exception as e:
print(f"⚠️ 写入CSV失败: {e}")
# 发送帧
self.frame_ready.emit(annotated_frame)
# 释放资源部分(在第807行cap.release()后添加):
# 关闭CSV文件
if self.csv_file:
try:
self.csv_file.close()
print(f"✅ 已保存CSV日志,共记录 {self.frame_counter} 帧")
except Exception as e:
print(f"⚠️ 关闭CSV文件失败: {e}")
......@@ -117,7 +117,6 @@ class GeneralSetPanelHandler:
widget.annotationEngineRequested.connect(self._handleAnnotationEngineRequest)
widget.frameLoadRequested.connect(self._handleFrameLoadRequest)
widget.annotationDataRequested.connect(self._handleAnnotationDataRequest)
widget.liveFrameRequested.connect(self._handleLiveFrameRequest)
def _handleRefreshModelList(self, model_widget=None):
"""处理刷新模型列表请求"""
......@@ -692,6 +691,9 @@ class GeneralSetPanelHandler:
# 2. 保存原始帧用于标注结果显示
self._annotation_source_frame = channel_frame.copy() if channel_frame is not None else None
# 2.5 🔥 调用自动标注检测器获取初始位置
self._applyAutoAnnotation(channel_frame)
# 3. 创建标注界面组件
annotation_widget = self.showAnnotationWidget(self.general_set_panel)
......@@ -1237,13 +1239,120 @@ class GeneralSetPanelHandler:
}
engine = SimpleAnnotationEngine()
pass
return engine
except Exception as e:
pass
return None
def _applyAutoAnnotation(self, frame):
"""调用自动标注检测器获取初始位置,设置到标注引擎中(阻塞等待模型加载)"""
import time
print(f"\n{'='*60}")
print(f"🔥 [自动标注] ===== 方法入口 =====")
try:
if frame is None or self.annotation_engine is None:
print(f"⚠️ [自动标注] 前置条件不满足! frame={frame is not None}, engine={self.annotation_engine is not None}")
return
print(f"🔥 [自动标注] 输入图像: shape={frame.shape}, dtype={frame.dtype}")
channel_id = self.general_set_panel.channel_id if self.general_set_panel else None
print(f"🔥 [自动标注] 通道ID: {channel_id}")
# 🔥 导入必要模块
from handlers.videopage.auto_dot import AutoAnnotationDetector
from handlers.videopage.thread_manager.threads.global_detection_thread import GlobalDetectionThread
# 🔥 阻塞等待模型加载完成
detection_engine = None
max_wait_time = 10 # 最大等待时间(秒)
check_interval = 0.3 # 检查间隔(秒)
waited_time = 0
print(f"🔥 [自动标注] 等待模型加载... (最大等待 {max_wait_time} 秒)")
while waited_time < max_wait_time:
# 尝试获取模型
try:
global_thread = GlobalDetectionThread.get_instance()
if global_thread and global_thread.model_pool_manager:
model_pool = global_thread.model_pool_manager
model_id = model_pool.channel_model_mapping.get(channel_id)
if model_id and model_id in model_pool.model_pool:
detection_engine = model_pool.model_pool[model_id]
if detection_engine and hasattr(detection_engine, 'model') and detection_engine.model:
print(f"✅ [自动标注] 模型已就绪! (等待了 {waited_time:.1f} 秒)")
print(f"✅ [自动标注] 复用模型: {model_id}")
break
except Exception as e:
pass # 静默忽略,继续等待
# 未就绪,等待后重试
if waited_time == 0:
print(f"⏳ [自动标注] 模型尚未加载,等待中...")
time.sleep(check_interval)
waited_time += check_interval
# 每2秒输出一次等待状态
if int(waited_time) > 0 and int(waited_time) % 2 == 0 and (waited_time - int(waited_time)) < check_interval:
print(f"⏳ [自动标注] 已等待 {waited_time:.1f} 秒...")
# 检查是否获取到模型
if detection_engine is None or not hasattr(detection_engine, 'model') or detection_engine.model is None:
print(f"❌ [自动标注] 模型加载超时! (等待了 {waited_time:.1f} 秒)")
print(f"💡 [自动标注] 提示: 请先启动检测后再进行自动标注")
print(f"{'='*60}\n")
return
# 🔥 创建自动标注检测器
print(f"🔥 [自动标注] 检测引擎类型: {type(detection_engine)}")
print(f"🔥 [自动标注] 模型类型: {type(detection_engine.model)}")
engine_device = getattr(detection_engine, 'device', 'cuda')
auto_detector = AutoAnnotationDetector(model=detection_engine.model, device=engine_device)
print(f"✅ [自动标注] 已复用全局模型,device={engine_device}")
# 执行检测
print(f"🔥 [自动标注] 开始执行检测...")
result = auto_detector.detect(frame, conf_threshold=0.5, min_area=50)
print(f"🔥 [自动标注] 检测结果: success={result.get('success')}")
print(f"🔥 [自动标注] 检测结果keys: {result.keys()}")
if 'error' in result:
print(f"🔥 [自动标注] 错误信息: {result.get('error')}")
if result.get('success'):
print(f"🔥 [自动标注] masks数量: {len(result.get('masks', []))}")
print(f"🔥 [自动标注] class_names: {result.get('class_names', [])}")
if not result.get('success'):
print(f"⚠️ [自动标注] 检测失败,使用手动标注模式")
print(f"{'='*60}\n")
return
# 获取系统格式数据
data = auto_detector.get_system_format(result, padding=10)
print(f"🔥 [自动标注] 检测到 {len(data['boxes'])} 个区域")
print(f"🔥 [自动标注] boxes: {data['boxes']}")
print(f"🔥 [自动标注] bottom_points: {data['bottom_points']}")
print(f"🔥 [自动标注] top_points: {data['top_points']}")
# 添加到标注引擎
for i, (box, bottom, top) in enumerate(zip(data['boxes'], data['bottom_points'], data['top_points'])):
self.annotation_engine.boxes.append(box)
self.annotation_engine.bottom_points.append(bottom)
self.annotation_engine.top_points.append(top)
print(f" 区域{i+1}: box{box}, top{top}, bottom{bottom}")
print(f"✅ [自动标注] 完成,已添加 {len(self.annotation_engine.boxes)} 个区域")
print(f"{'='*60}\n")
except Exception as e:
print(f"❌ [自动标注] 异常: {e}")
import traceback
traceback.print_exc()
print(f"{'='*60}\n")
def _handleAnnotationEngineRequest(self):
"""处理标注引擎请求"""
if self.annotation_engine and self.annotation_widget:
......@@ -1278,19 +1387,6 @@ class GeneralSetPanelHandler:
if self.annotation_widget:
self.annotation_widget.showAnnotationError(f"获取标注数据失败: {str(e)}")
def _handleLiveFrameRequest(self):
"""处理实时画面请求"""
try:
# 获取通道最新画面
if self.general_set_panel and self.general_set_panel.channel_id:
channel_frame = self.getLatestFrame(self.general_set_panel.channel_id)
# 更新标注界面的画面
if channel_frame is not None and self.annotation_widget:
self.annotation_widget.updateLiveFrame(channel_frame)
except Exception as e:
pass
def _initPhysicalZoomForAnnotation(self, annotation_widget):
"""为标注界面初始化物理变焦控制器"""
try:
......
......@@ -1386,7 +1386,6 @@ class MissionPanelHandler:
# 使用 updateMissionLabelByVar 方法更新标签
if hasattr(self, 'updateMissionLabelByVar'):
self.updateMissionLabelByVar(channel_num, task_folder_name)
print(f"✅ [多任务] 已更新 {channel_id} 的任务标签: {task_folder_name}")
# 删除状态更新逻辑,双击不改变任务状态
else:
......
"""
稳定器集成示例
展示如何在detect_debug.py中集成DetectionStabilizer
"""
from detection_stabilizer import DetectionStabilizer
# ===== 集成方式 =====
class VideoThreadWithStabilizer:
"""VideoThread集成稳定器的示例"""
def __init__(self):
# ... 原有初始化代码 ...
# 添加稳定器
self.stabilizer = DetectionStabilizer(
history_size=5, # 保持最近5帧
pixel_change_threshold=0.20, # 像素变化不超过20%
conf_switch_threshold=0.85, # 类别切换需要高置信度
area_tolerance=0.15 # 总面积变化容忍15%
)
def detect_and_visualize(self, frame):
"""修改后的检测方法"""
detection_info = {}
try:
# ... YOLO检测代码(保持不变)...
# ===== 新增:稳定器处理 =====
# 将检测结果转换为稳定器格式
stabilizer_input = self._convert_to_stabilizer_format(detection_info)
# 应用规则稳定
stable_result = self.stabilizer.process(stabilizer_input)
# 转换回原格式
detection_info = self._convert_from_stabilizer_format(stable_result)
# ===== 稳定器处理结束 =====
return annotated_frame, detection_info
except Exception as e:
print(f"检测异常: {e}")
return frame, detection_info
def _convert_to_stabilizer_format(self, detection_info):
"""
将detection_info转换为稳定器需要的格式
Input:
detection_info = {
'ROI1': {
'mask_count': 2,
'classes': ['liquid', 'air'],
'confidences': [0.87, 0.34],
'pixel_counts': [2486, 2490]
}
}
Output:
{
'masks': [
{'class': 'liquid', 'conf': 0.87, 'pixels': 2486, 'center_y': 50},
{'class': 'air', 'conf': 0.34, 'pixels': 2490, 'center_y': 30}
]
}
"""
masks = []
for region_name, info in detection_info.items():
classes = info.get('classes', [])
confidences = info.get('confidences', [])
pixel_counts = info.get('pixel_counts', [])
for i in range(len(classes)):
mask = {
'class': classes[i],
'conf': confidences[i],
'pixels': pixel_counts[i],
'center_y': 0, # TODO: 如果需要空间检查,需要计算质心
'region': region_name
}
masks.append(mask)
return {'masks': masks}
def _convert_from_stabilizer_format(self, stable_result):
"""将稳定器结果转换回detection_info格式"""
detection_info = {}
# 按区域分组
from collections import defaultdict
region_masks = defaultdict(list)
for mask in stable_result.get('masks', []):
region = mask.get('region', 'FullFrame')
region_masks[region].append(mask)
# 重建detection_info
for region, masks in region_masks.items():
detection_info[region] = {
'mask_count': len(masks),
'classes': [m['class'] for m in masks],
'confidences': [m['conf'] for m in masks],
'pixel_counts': [m['pixels'] for m in masks]
}
return detection_info
# ===== 完整集成代码片段 =====
def integration_patch():
"""
在detect_debug.py中的修改位置:
1. 在VideoThread.__init__中添加:
"""
init_code = '''
def __init__(self, parent=None):
super().__init__(parent)
# ... 原有代码 ...
# 添加检测结果稳定器
from handlers.videopage.detection_stabilizer import DetectionStabilizer
self.stabilizer = DetectionStabilizer(
history_size=5,
pixel_change_threshold=0.20,
conf_switch_threshold=0.85,
area_tolerance=0.15
)
self.use_stabilizer = True # 开关
'''
"""
2. 在detect_and_visualize方法返回前添加:
"""
detect_code = '''
def detect_and_visualize(self, frame):
# ... 原有检测代码 ...
# 混合掩码和原图
annotated_frame = cv2.addWeighted(annotated_frame, 1 - self.mask_alpha,
overlay, self.mask_alpha, 0)
# ===== 新增:应用稳定器 =====
if self.use_stabilizer and detection_info:
detection_info = self._apply_stabilizer(detection_info)
# ===== 稳定器结束 =====
return annotated_frame, detection_info
'''
"""
3. 添加稳定器辅助方法:
"""
helper_code = '''
def _apply_stabilizer(self, detection_info):
"""应用检测结果稳定器"""
try:
# 转换格式
masks = []
for region_name, info in detection_info.items():
for i in range(len(info.get('classes', []))):
masks.append({
'class': info['classes'][i],
'conf': info['confidences'][i],
'pixels': info['pixel_counts'][i],
'center_y': 0,
'region': region_name
})
# 稳定处理
stable = self.stabilizer.process({'masks': masks})
# 转换回原格式
from collections import defaultdict
result = defaultdict(lambda: {'mask_count': 0, 'classes': [], 'confidences': [], 'pixel_counts': []})
for mask in stable.get('masks', []):
region = mask.get('region', 'FullFrame')
result[region]['classes'].append(mask['class'])
result[region]['confidences'].append(mask['conf'])
result[region]['pixel_counts'].append(mask['pixels'])
result[region]['mask_count'] = len(result[region]['classes'])
return dict(result)
except Exception as e:
print(f"稳定器异常: {e}")
return detection_info
'''
return init_code, detect_code, helper_code
# ===== 测试CSV数据 =====
def test_with_csv_data():
"""使用实际CSV数据测试"""
stabilizer = DetectionStabilizer()
# 你的实际数据(从CSV提取)
csv_data = [
# 帧16-18的跳变案例
{'masks': [{'class': 'air', 'conf': 0.93, 'pixels': 2407, 'center_y': 30}]},
{'masks': [{'class': 'liquid', 'conf': 0.87, 'pixels': 2486, 'center_y': 50}]}, # 跳变
{'masks': [
{'class': 'liquid', 'conf': 0.87, 'pixels': 2493, 'center_y': 50},
{'class': 'air', 'conf': 0.34, 'pixels': 2490, 'center_y': 30}
]},
# 帧29-32的案例
{'masks': [{'class': 'air', 'conf': 0.84, 'pixels': 2509, 'center_y': 30}]},
{'masks': [{'class': 'air', 'conf': 0.91, 'pixels': 2501, 'center_y': 30}]},
{'masks': [
{'class': 'air', 'conf': 0.78, 'pixels': 2493, 'center_y': 30},
{'class': 'liquid', 'conf': 0.63, 'pixels': 2505, 'center_y': 50}
]},
{'masks': [{'class': 'liquid', 'conf': 0.89, 'pixels': 2505, 'center_y': 50}]}, # 跳变
]
print("\n" + "="*70)
print("真实CSV数据测试")
print("="*70)
for i, data in enumerate(csv_data, 16):
print(f"\n帧 {i}:")
orig_str = [f"{m['class']}({m['conf']:.2f})" for m in data['masks']]
print(f" 原始: {orig_str}")
stable = stabilizer.process(data)
stable_str = [f"{m['class']}({m['conf']:.2f})" for m in stable['masks']]
print(f" 稳定: {stable_str}")
print("\n" + "="*70)
stats = stabilizer.get_stats()
print(f"稳定率: {stats['stabilize_rate']}, 拒绝率: {stats['reject_rate']}")
if __name__ == "__main__":
print("=" * 70)
print("检测结果稳定器 - 集成示例")
print("=" * 70)
# 显示集成代码
init, detect, helper = integration_patch()
print("\n[1] 在 VideoThread.__init__ 中添加:")
print(init)
print("\n[2] 在 detect_and_visualize 方法中添加:")
print(detect)
print("\n[3] 添加辅助方法:")
print(helper)
# 运行测试
test_with_csv_data()
......@@ -194,32 +194,52 @@ class DisplayThread:
# 统一使用mm单位
height_mm = position_data.get('height_mm', 0)
valid = position_data.get('valid', True)
error_flag = position_data.get('error_flag', None) # 获取异常标记
# 只绘制有效的液位线
if not valid:
continue
# 绘制液位线(颜色根据数据新旧自动选择)
cv2.line(
display_frame,
(int(left), int(y_absolute)),
(int(right), int(y_absolute)),
line_color, # 新数据=红色,历史=黄色
2
)
# 根据error_flag选择绘制样式
if error_flag == 'detect_zero':
# YOLO未检测到mask:黄色虚线
draw_color = (0, 255, 255) # BGR: 黄色
line_type = cv2.LINE_AA
# 绘制虚线(多段短线模拟虚线)
dash_length = 10
gap_length = 5
x = int(left)
while x < int(right):
x_end = min(x + dash_length, int(right))
cv2.line(display_frame, (x, int(y_absolute)), (x_end, int(y_absolute)),
draw_color, 2, line_type)
x = x_end + gap_length
text_color = (0, 255, 255) # 黄色文字
elif error_flag == 'detect_low':
# 置信度低于0.3:黄色实线
draw_color = (0, 255, 255) # BGR: 黄色
cv2.line(display_frame, (int(left), int(y_absolute)), (int(right), int(y_absolute)),
draw_color, 2)
text_color = (0, 255, 255) # 黄色文字
else:
# 正常检测:使用原有颜色逻辑
draw_color = line_color
cv2.line(display_frame, (int(left), int(y_absolute)), (int(right), int(y_absolute)),
draw_color, 2)
# text_color已经在前面定义了
# 直接使用mm值并四舍五入
height_mm_int = int(np.round(height_mm, 0))
text = f"{height_mm_int}mm"
# 绘制高度文字(颜色与液位线匹配
# 绘制高度文字(颜色根据error_flag自动选择
cv2.putText(
display_frame,
text,
(int(left) + 5, int(y_absolute) - 10),
cv2.FONT_HERSHEY_SIMPLEX,
0.8,
text_color, # 新数据=绿色,历史=黄
text_color, # 已经根据error_flag设置了颜
2
)
......
# -*- coding: utf-8 -*-
"""
模型加载诊断脚本
用于测试检测线程的模型配置加载是否正常
"""
import os
import sys
import yaml
# 添加项目根目录到路径
current_dir = os.path.dirname(os.path.abspath(__file__))
project_root = os.path.dirname(os.path.dirname(os.path.dirname(os.path.dirname(current_dir))))
sys.path.insert(0, project_root)
from detection_thread import DetectionThread
def test_model_config_loading():
"""测试模型配置加载"""
print("=" * 80)
print("模型配置加载测试")
print("=" * 80)
# 测试各个通道
for channel_id in ['channel1', 'channel2', 'channel3', 'channel4']:
print(f"\n{'='*80}")
print(f"测试 {channel_id}")
print('='*80)
# 加载模型配置
model_config = DetectionThread._load_model_config(channel_id)
if model_config:
print(f"[OK] [{channel_id}] 模型配置加载成功")
print(f" 配置内容:")
for key, value in model_config.items():
if key == 'model_path':
print(f" - {key}: {value}")
# 检查文件是否存在
if os.path.exists(value):
file_size = os.path.getsize(value) / (1024 * 1024) # MB
print(f" [OK] 文件存在 ({file_size:.2f} MB)")
else:
print(f" [ERROR] 文件不存在!")
else:
print(f" - {key}: {value}")
else:
print(f"[ERROR] [{channel_id}] 模型配置加载失败")
# 加载标注配置
print(f"\n尝试加载 {channel_id} 的标注配置...")
annotation_config = DetectionThread._load_annotation_config(channel_id)
if annotation_config:
print(f"[OK] [{channel_id}] 标注配置加载成功")
boxes = annotation_config.get('boxes', [])
print(f" - 检测区域数: {len(boxes)}")
print(f" - 实际高度: {annotation_config.get('actual_heights', [])}")
else:
print(f"[ERROR] [{channel_id}] 标注配置加载失败")
def test_default_config_structure():
"""测试 default_config.yaml 的结构"""
print("\n" + "=" * 80)
print("检查 default_config.yaml 配置结构")
print("=" * 80)
config_file = os.path.join(project_root, 'database', 'config', 'default_config.yaml')
if not os.path.exists(config_file):
print(f" 配置文件不存在: {config_file}")
return
print(f" 配置文件存在: {config_file}\n")
with open(config_file, 'r', encoding='utf-8') as f:
config = yaml.safe_load(f)
# 检查各通道的模型路径配置
print("检查各通道模型路径配置:")
for i in range(1, 5):
channel_id = f'channel{i}'
model_path_key = f'{channel_id}_model_path'
if model_path_key in config:
model_path = config[model_path_key]
print(f" {model_path_key}: {model_path}")
# 转换为绝对路径
if not os.path.isabs(model_path):
model_path = model_path.replace('/', os.sep).replace('\\', os.sep)
full_path = os.path.join(project_root, model_path)
full_path = os.path.normpath(full_path)
else:
full_path = model_path
# 检查文件是否存在
if os.path.exists(full_path):
file_size = os.path.getsize(full_path) / (1024 * 1024) # MB
print(f" 文件存在: {full_path} ({file_size:.2f} MB)")
else:
print(f" 文件不存在: {full_path}")
else:
print(f" {model_path_key}: 未配置")
# 检查全局模型配置
print("\n检查全局模型配置:")
if 'model' in config:
model_config = config['model']
for key, value in model_config.items():
print(f" - {key}: {value}")
else:
print(" 未找到全局 model 配置")
# 检查GPU配置
print("\n检查GPU配置:")
print(f" - gpu_enabled: {config.get('gpu_enabled', '未配置')}")
print(f" - default_device: {config.get('default_device', '未配置')}")
print(f" - batch_processing_enabled: {config.get('batch_processing_enabled', '未配置')}")
print(f" - default_batch_size: {config.get('default_batch_size', '未配置')}")
if __name__ == '__main__':
print(f"项目根目录: {project_root}\n")
test_default_config_structure()
print("\n")
test_model_config_loading()
print("\n" + "=" * 80)
print("测试完成")
print("=" * 80)
"""
检测引擎模块
"""
\ No newline at end of file
"""
卡尔曼滤波引擎模块
包含卡尔曼滤波相关的功能
"""
import cv2
import numpy as np
def stable_median(data, max_std=1.0):
"""稳健地计算中位数"""
if len(data) == 0:
return 0
data = np.array(data)
q1, q3 = np.percentile(data, [25, 75])
iqr = q3 - q1
lower, upper = q1 - 1.5 * iqr, q3 + 1.5 * iqr
data = data[(data >= lower) & (data <= upper)]
if len(data) >= 2 and np.std(data) > max_std:
median_val = np.median(data)
data = data[np.abs(data - median_val) <= max_std]
return float(np.median(data)) if len(data) > 0 else 0
def init_kalman_filters_list(num_targets, init_means):
"""初始化卡尔曼滤波器列表"""
kalman_list = []
for i in range(num_targets):
kf = cv2.KalmanFilter(2, 1)
kf.measurementMatrix = np.array([[1, 0]], np.float32)
kf.transitionMatrix = np.array([[1, 0.9], [0, 0.9]], np.float32)
kf.processNoiseCov = np.diag([1e-4, 1e-3]).astype(np.float32)
kf.measurementNoiseCov = np.array([[10]], dtype=np.float32)
kf.statePost = np.array([[init_means[i]], [0]], dtype=np.float32)
kalman_list.append(kf)
return kalman_list
class KalmanFilterEngine:
"""卡尔曼滤波引擎"""
def __init__(self, num_targets=0):
"""初始化卡尔曼滤波引擎"""
self.num_targets = num_targets
self.kalman_filters = []
self.consecutive_rejects = [0] * num_targets
self.recent_observations = [[] for _ in range(num_targets)]
self.last_observations = [None] * num_targets
self.smooth_window = 5
def initialize(self, initial_heights):
"""初始化卡尔曼滤波器"""
init_means = [stable_median(heights) for heights in initial_heights]
self.kalman_filters = init_kalman_filters_list(self.num_targets, init_means)
print(f"✅ 卡尔曼滤波器初始化完成,起始高度:{init_means}")
return init_means
def update(self, target_idx, observation, container_height_cm, error_percentage=30):
"""更新指定目标的卡尔曼滤波器"""
if not self.kalman_filters or target_idx >= len(self.kalman_filters):
raise RuntimeError("卡尔曼滤波器未初始化或目标索引超出范围")
# 预测步骤
predicted = self.kalman_filters[target_idx].predict()
predicted_height = predicted[0][0]
# 更新滤波器
final_height, self.consecutive_rejects[target_idx] = self._update_kalman_filter(
self.kalman_filters[target_idx],
observation,
predicted_height,
container_height_cm,
error_percentage,
self.consecutive_rejects[target_idx],
self.last_observations[target_idx]
)
# 更新上次观测值记录
self.last_observations[target_idx] = observation
# 添加到滑动窗口
self.recent_observations[target_idx].append(final_height)
if len(self.recent_observations[target_idx]) > self.smooth_window:
self.recent_observations[target_idx].pop(0)
return final_height, predicted_height
def _update_kalman_filter(self, kalman_filter, observation, predicted_height, container_height_cm,
error_percentage=30, consecutive_rejects=0, last_observation=None):
"""更新卡尔曼滤波器"""
# 计算预测误差(相对于容器高度的百分比)
prediction_error_percent = abs(observation - predicted_height) / container_height_cm * 100
# 检测是否是重复的观测值(保持的液位数据)
is_repeated_observation = (last_observation is not None and
observation == last_observation)
# 误差控制逻辑
if prediction_error_percent > error_percentage:
# 误差过大,增加拒绝计数
consecutive_rejects += 1
# 检查是否连续6次拒绝
if consecutive_rejects >= 6:
# 连续6次误差过大,强制使用观测值更新
kalman_filter.correct(np.array([[observation]], dtype=np.float32))
final_height = kalman_filter.statePost[0][0]
consecutive_rejects = 0 # 重置计数器
print(f" 连续6次误差过大,强制使用观测值更新: {observation:.3f}cm → 滤波后: {final_height:.3f}cm")
else:
# 使用预测值
final_height = predicted_height
print(f" ❌ 误差 {prediction_error_percent:.1f}% > {error_percentage}%,使用预测值: {predicted_height:.3f}cm (连续拒绝: {consecutive_rejects}/6)")
else:
# 误差可接受,正常更新
kalman_filter.correct(np.array([[observation]], dtype=np.float32))
final_height = kalman_filter.statePost[0][0]
consecutive_rejects = 0 # 重置计数器
print(f" ✅ 误差 {prediction_error_percent:.1f}% <= {error_percentage}%,使用观测值: {observation:.3f}cm → 滤波后: {final_height:.3f}cm")
return final_height, consecutive_rejects
def get_smooth_height(self, target_idx):
"""获取平滑后的高度(中位数)"""
if not self.recent_observations[target_idx]:
return 0
return np.median(self.recent_observations[target_idx])
def reset_target(self, target_idx):
"""重置指定目标的滤波器状态"""
if target_idx < len(self.consecutive_rejects):
self.consecutive_rejects[target_idx] = 0
if target_idx < len(self.last_observations):
self.last_observations[target_idx] = None
if target_idx < len(self.recent_observations):
self.recent_observations[target_idx] = []
print(f" 重置目标{target_idx+1}的滤波器状态")
\ No newline at end of file
"""
Real Time Streaming Capture
用于处理RTSP/RTMP等实时流,避免花屏问题
"""
import threading
import cv2
class RTSCapture(cv2.VideoCapture):
"""Real Time Streaming Capture.
这个类必须使用 RTSCapture.create 方法创建,请不要直接实例化
"""
_cur_frame = None
_reading = False
schemes = ["rtsp://", "rtmp://"] # 用于识别实时流
@staticmethod
def create(url, *schemes):
"""实例化&初始化
rtscap = RTSCapture.create("rtsp://example.com/live/1")
or
rtscap = RTSCapture.create("http://example.com/live/1.m3u8", "http://")
"""
rtscap = RTSCapture(url)
rtscap.frame_receiver = threading.Thread(target=rtscap.recv_frame, daemon=True)
rtscap.schemes.extend(schemes)
if isinstance(url, str) and url.startswith(tuple(rtscap.schemes)):
rtscap._reading = True
elif isinstance(url, int):
# 这里可能是本机设备
pass
return rtscap
def isStarted(self):
"""替代 VideoCapture.isOpened() """
ok = self.isOpened()
if ok and self._reading:
ok = self.frame_receiver.is_alive()
return ok
def recv_frame(self):
"""子线程读取最新视频帧方法"""
while self._reading and self.isOpened():
ok, frame = self.read()
if not ok:
break
self._cur_frame = frame
self._reading = False
def read2(self):
"""读取最新视频帧
返回结果格式与 VideoCapture.read() 一样
"""
frame = self._cur_frame
self._cur_frame = None
return frame is not None, frame
def start_read(self):
"""启动子线程读取视频帧"""
self.frame_receiver.start()
self.read_latest_frame = self.read2 if self._reading else self.read
def stop_read(self):
"""退出子线程方法"""
self._reading = False
if self.frame_receiver.is_alive():
self.frame_receiver.join()
def release(self):
"""释放资源"""
self.stop_read()
super().release()
# 测试代码
if __name__ == '__main__':
import sys
if len(sys.argv) < 2:
print("Usage: python rtscapture.py <rtsp_url>")
sys.exit(1)
rtscap = RTSCapture.create(sys.argv[1])
rtscap.start_read() # 启动子线程并改变 read_latest_frame 的指向
while rtscap.isStarted():
ok, frame = rtscap.read_latest_frame() # read_latest_frame() 替代 read()
if not ok:
if cv2.waitKey(100) & 0xFF == ord('q'):
break
continue
# 帧处理代码写这里
cv2.imshow("cam", frame)
if cv2.waitKey(100) & 0xFF == ord('q'):
break
rtscap.stop_read()
rtscap.release()
cv2.destroyAllWindows()
\ No newline at end of file
模块化处理class LiquidDetectionEngine代码,新增子类modeldetect,输入为图像输出为yolo分割结果的代码归档到class model中
模块化处理class LiquidDetectionEngine代码,新增子类modeldetect,输入为图像输出为yolo分割结果的代码归档到class model中
def _validate_device
def load_model
def _validate_model_file
def _decode_dat_model
def _parse_targets
def configure
def _ensure_state_lists_size
def cleanup(self)
class AutoAnnotationDetector自动标注检测器集成到系统中。用户点击开始标注后,加载模型,然后依旧进入到全屏标注界面,不过当前画面根据自动标注
AutoAnnotationDetector 输出:
boxes: {x1, y1, x2, y2}
points: {top, bottom, top_x, bottom_x}
↓ 转换
SimpleAnnotationEngine 格式:
boxes: (cx, cy, size)
bottom_points: (x, y)
top_points: (x, y)
\ No newline at end of file
"""
相机姿态检测使用示例
展示如何在液位检测系统中集成相机姿态检测功能
"""
import cv2
import numpy as np
from utils.cameraposition import CameraPositionDetector, create_detector
def example_basic_usage():
"""基础使用示例"""
# 1. 创建检测器实例
detector = CameraPositionDetector(
translation_threshold=3.0, # 平移超过3像素判定为变化
rotation_threshold=0.5, # 旋转超过0.5度判定为变化
scale_threshold=0.02, # 尺度变化超过2%判定为变化
inlier_ratio_threshold=0.5, # 内点比例低于50%判定为变化
min_match_count=10, # 至少需要10个匹配点
voting_frames=3, # 使用3帧投票
voting_ratio=0.7 # 70%的帧判定为变化才触发
)
# 2. 设置基准帧(从视频或图像读取)
ref_frame = cv2.imread("path/to/reference_frame.jpg")
# 假设液位孔的检测框为 (x1, y1, x2, y2)
hole_bbox = (100, 150, 300, 400)
# 设置基准帧
success = detector.set_reference_frame(ref_frame, hole_bbox=hole_bbox)
if not success:
print("基准帧设置失败,请检查图像质量或特征点数量")
return
# 3. 对后续帧进行检测
cap = cv2.VideoCapture("path/to/video.mp4")
while cap.isOpened():
ret, frame = cap.read()
if not ret:
break
# 检测当前帧
result = detector.detect_position_change(frame)
# 查看检测结果
if result['voted_changed']:
print("警告:相机姿态发生变化!")
print(f" 平移: {result['translation']}")
print(f" 旋转: {result['rotation']:.2f}°")
print(f" 尺度: {result['scale']:.3f}")
print(f" 原因: {result['error']}")
# 可以在此处触发报警或暂停检测
# 建议用户重新固定相机或重新标定
else:
print("相机姿态正常")
# 显示当前帧
cv2.imshow("Frame", frame)
if cv2.waitKey(1) & 0xFF == ord('q'):
break
cap.release()
cv2.destroyAllWindows()
def example_with_yolo_detection():
"""结合YOLO检测结果的示例"""
# 创建检测器
detector = create_detector(
translation_threshold=3.0,
rotation_threshold=0.5,
scale_threshold=0.02
)
# 假设这是第一帧和YOLO检测结果
first_frame = cv2.imread("first_frame.jpg")
# YOLO检测到的液位孔框(xyxy格式)
yolo_bbox = [120, 180, 280, 420] # x1, y1, x2, y2
hole_bbox = tuple(yolo_bbox)
# 设置基准帧
detector.set_reference_frame(first_frame, hole_bbox=hole_bbox)
# 处理视频流
cap = cv2.VideoCapture(0) # 或者视频文件路径
while cap.isOpened():
ret, frame = cap.read()
if not ret:
break
# 先检测相机姿态
result = detector.detect_position_change(frame)
if result['voted_changed']:
# 相机姿态变化,暂停液位检测
cv2.putText(
frame,
"Camera Position Changed! Please recalibrate",
(10, 30),
cv2.FONT_HERSHEY_SIMPLEX,
0.7,
(0, 0, 255),
2
)
else:
# 相机姿态正常,继续液位检测
# 这里调用你的液位检测函数
# liquid_level = detect_liquid_level(frame)
pass
cv2.imshow("Video", frame)
if cv2.waitKey(1) & 0xFF == ord('q'):
break
cap.release()
cv2.destroyAllWindows()
def example_with_mask():
"""使用自定义掩膜的示例"""
detector = CameraPositionDetector()
# 读取基准帧
ref_frame = cv2.imread("reference.jpg")
h, w = ref_frame.shape[:2]
# 创建自定义掩膜(0表示液位孔区域,255表示静止区域)
hole_mask = np.zeros((h, w), dtype=np.uint8)
# 假设液位孔是一个圆形区域
center = (w // 2, h // 2)
radius = 100
cv2.circle(hole_mask, center, radius, 255, -1)
# 设置基准帧(使用掩膜)
detector.set_reference_frame(ref_frame, hole_mask=hole_mask)
# 后续检测...
def example_visualization():
"""可视化特征匹配的示例(用于调试)"""
detector = CameraPositionDetector()
# 设置基准帧
ref_frame = cv2.imread("reference.jpg")
hole_bbox = (100, 100, 300, 300)
detector.set_reference_frame(ref_frame, hole_bbox=hole_bbox)
# 读取测试帧
test_frame = cv2.imread("test_frame.jpg")
# 检测姿态变化
result = detector.detect_position_change(test_frame)
print(f"姿态变化: {result['changed']}")
print(f"平移: {result['translation']}")
print(f"旋转: {result['rotation']:.2f}°")
print(f"尺度: {result['scale']:.3f}")
print(f"内点比例: {result['inlier_ratio']:.2f}")
# 可视化匹配结果
vis_img = detector.visualize_matches(test_frame, max_matches=50)
if vis_img is not None:
cv2.imshow("Feature Matches", vis_img)
cv2.waitKey(0)
cv2.destroyAllWindows()
def example_integration_with_handler():
"""与现有handler集成的示例伪代码"""
class VideoHandler:
def __init__(self):
# 初始化相机姿态检测器
self.position_detector = CameraPositionDetector(
translation_threshold=3.0,
rotation_threshold=0.5,
voting_frames=5
)
self.position_calibrated = False
def on_calibrate_button_clicked(self, frame, hole_bbox):
"""当用户点击标定按钮时"""
success = self.position_detector.set_reference_frame(
frame,
hole_bbox=hole_bbox
)
if success:
self.position_calibrated = True
print("相机姿态标定成功")
else:
print("标定失败:特征点不足")
def process_frame(self, frame):
"""处理每一帧"""
if not self.position_calibrated:
return None, "未标定"
# 检测相机姿态
pos_result = self.position_detector.detect_position_change(frame)
if pos_result['voted_changed']:
# 相机姿态变化,返回警告
return None, f"相机姿态变化: {pos_result['error']}"
# 姿态正常,继续液位检测
# liquid_result = self.detect_liquid_level(frame)
# return liquid_result, "正常"
return None, "正常"
def on_recalibrate_request(self, frame, hole_bbox):
"""重新标定"""
success = self.position_detector.update_reference_frame(
frame,
hole_bbox=hole_bbox
)
return success
def example_parameter_tuning():
"""参数调优示例"""
# 对于高分辨率图像(如1920x1080)
high_res_detector = CameraPositionDetector(
translation_threshold=5.0, # 允许更大的平移
rotation_threshold=0.3, # 对旋转更敏感
scale_threshold=0.015, # 尺度阈值稍小
inlier_ratio_threshold=0.6, # 要求更高的内点比例
min_match_count=20, # 要求更多匹配点
voting_frames=5, # 更长的投票窗口
voting_ratio=0.8 # 更严格的投票标准
)
# 对于低分辨率图像(如640x480)
low_res_detector = CameraPositionDetector(
translation_threshold=2.0,
rotation_threshold=0.8,
scale_threshold=0.03,
inlier_ratio_threshold=0.4,
min_match_count=8,
voting_frames=3,
voting_ratio=0.6
)
# 对于振动环境(需要更宽松的阈值)
vibration_detector = CameraPositionDetector(
translation_threshold=5.0,
rotation_threshold=1.0,
scale_threshold=0.05,
voting_frames=7, # 更长的投票窗口平滑抖动
voting_ratio=0.8
)
if __name__ == "__main__":
# 运行基础示例
# example_basic_usage()
# 或运行其他示例
# example_with_yolo_detection()
# example_visualization()
print("请根据实际需求选择对应的示例函数运行")
......@@ -2290,8 +2290,6 @@ class TrainingPage(QtWidgets.QWidget):
if index >= 0:
self.test_model_combo.setCurrentIndex(index)
print(f"[模型同步] 已刷新模型列表,共 {len(models)} 个模型")
except Exception as e:
print(f"[错误] 刷新模型列表失败: {e}")
import traceback
......@@ -2307,7 +2305,6 @@ class TrainingPage(QtWidgets.QWidget):
if hasattr(modelset_page, 'modelListChanged'):
# 连接信号到刷新方法
modelset_page.modelListChanged.connect(self.refreshModelLists)
print("[模型同步] 已连接模型列表变化信号")
else:
print("[警告] ModelSetPage没有modelListChanged信号")
except Exception as e:
......
......@@ -518,6 +518,8 @@ class GeneralSetPanel(QtWidgets.QWidget):
def _onStartAnnotation(self):
"""开始标注按钮点击(发送信号给handler处理)"""
# 🔥 先启动全局检测线程(加载模型,弹出进度条)
self.detectionStartRequested.emit()
# 发送创建标注引擎请求信号给handler
self.createAnnotationEngineRequested.emit()
# 发送标注请求信号
......@@ -964,7 +966,6 @@ class AnnotationWidget(QtWidgets.QWidget):
annotationEngineRequested = QtCore.Signal() # 请求标注引擎
frameLoadRequested = QtCore.Signal() # 请求加载帧
annotationDataRequested = QtCore.Signal() # 请求标注数据
liveFrameRequested = QtCore.Signal() # 请求实时画面
def __init__(self, parent=None, annotation_engine=None):
super(AnnotationWidget, self).__init__(parent)
......@@ -993,10 +994,6 @@ class AnnotationWidget(QtWidgets.QWidget):
self.area_states = [] # 存储区域状态列表(默认、空、满)
self.channel_name = "" # 通道名称
# 实时画面预览相关
self.live_preview_enabled = False # 是否启用实时预览
self.live_timer = None # 实时画面更新定时器
# 物理变焦相关
self.physical_zoom_controller = None # 物理变焦控制器
self.physical_zoom_enabled = False # 是否启用物理变焦
......@@ -1007,6 +1004,9 @@ class AnnotationWidget(QtWidgets.QWidget):
self.zoom_center_x = 0 # 变焦中心X坐标
self.zoom_center_y = 0 # 变焦中心Y坐标
# 🔥 调试开关
self.debug = True
self._initUI()
self._connectSignals()
......@@ -1043,10 +1043,7 @@ class AnnotationWidget(QtWidgets.QWidget):
def _connectSignals(self):
"""连接信号"""
# 创建实时画面更新定时器
self.live_timer = QtCore.QTimer()
self.live_timer.timeout.connect(self._requestLiveFrame)
self.live_timer.setInterval(100) # 100ms更新一次,约10fps
pass # 保留方法结构,暂无额外信号需要连接
def _applyFullScreen(self):
"""应用全屏模式(延迟调用,确保控件已初始化)"""
......@@ -1064,25 +1061,6 @@ class AnnotationWidget(QtWidgets.QWidget):
"""设置通道名称(用于生成区域默认名称)"""
self.channel_name = channel_name
def enableLivePreview(self, enabled=True):
"""启用/禁用实时画面预览"""
self.live_preview_enabled = enabled
if enabled:
self.live_timer.start()
else:
self.live_timer.stop()
def _requestLiveFrame(self):
"""请求获取最新画面(通过信号通知handler)"""
if self.live_preview_enabled:
self.liveFrameRequested.emit()
def updateLiveFrame(self, frame):
"""更新实时画面(由handler调用)"""
if frame is not None and self.live_preview_enabled:
self.current_frame = frame.copy()
self._updateDisplay()
def setPhysicalZoomController(self, controller):
"""设置物理变焦控制器"""
self.physical_zoom_controller = controller
......@@ -1188,14 +1166,16 @@ class AnnotationWidget(QtWidgets.QWidget):
def _drawAnnotations(self, img):
"""绘制标注内容"""
print(f"[DEBUG _drawAnnotations] 方法被调用,annotation_engine={self.annotation_engine}")
if self.annotation_engine is None:
print(f"[DEBUG _drawAnnotations] annotation_engine为None,返回")
if self.debug:
print(f"[DEBUG _drawAnnotations] annotation_engine为None,返回")
return
print(f"[DEBUG _drawAnnotations] boxes数量: {len(self.annotation_engine.boxes) if hasattr(self.annotation_engine, 'boxes') else 0}")
print(f"[DEBUG _drawAnnotations] bottom_points数量: {len(self.annotation_engine.bottom_points) if hasattr(self.annotation_engine, 'bottom_points') else 0}")
print(f"[DEBUG _drawAnnotations] top_points数量: {len(self.annotation_engine.top_points) if hasattr(self.annotation_engine, 'top_points') else 0}")
if self.debug:
print(f"[DEBUG _drawAnnotations] boxes数量: {len(self.annotation_engine.boxes)}")
print(f"[DEBUG _drawAnnotations] bottom_points数量: {len(self.annotation_engine.bottom_points)}")
print(f"[DEBUG _drawAnnotations] top_points数量: {len(self.annotation_engine.top_points)}")
# 第一步:使用OpenCV绘制所有的框和点
# 绘制已完成的框
......@@ -1208,13 +1188,14 @@ class AnnotationWidget(QtWidgets.QWidget):
left = cx - half
right = cx + half
if self.debug:
print(f"[DEBUG _drawAnnotations] 绘制框{i}: cx={cx}, cy={cy}, size={size}, rect=({left},{top})->({right},{bottom})")
# 绘制检测框(黄色)
cv2.rectangle(img, (left, top), (right, bottom), (0, 255, 255), 3)
# 绘制底部点(绿色)- 绘制1px水平线条,长度与检测框宽度一致
print(f"[DEBUG _drawAnnotations] 底部点数量: {len(self.annotation_engine.bottom_points)}")
for i, pt in enumerate(self.annotation_engine.bottom_points):
print(f"[DEBUG _drawAnnotations] 绘制底部点 {i}: {pt}")
# 获取对应框的宽度作为线条长度
if i < len(self.annotation_engine.boxes):
_, _, size = self.annotation_engine.boxes[i]
......@@ -1225,13 +1206,14 @@ class AnnotationWidget(QtWidgets.QWidget):
x, y = pt
start_point = (x - half_length, y)
end_point = (x + half_length, y)
print(f"[DEBUG _drawAnnotations] 底部线条: start={start_point}, end={end_point}, color=(0,255,0), thickness=1, length={line_length}")
if self.debug:
print(f"[DEBUG _drawAnnotations] 绘制底部点{i}: pt={pt}, line=({start_point})->({end_point})")
cv2.line(img, start_point, end_point, (0, 255, 0), 1)
# 绘制顶部点(红色)- 绘制1px水平线条,长度与检测框宽度一致
print(f"[DEBUG _drawAnnotations] 顶部点数量: {len(self.annotation_engine.top_points)}")
for i, pt in enumerate(self.annotation_engine.top_points):
print(f"[DEBUG _drawAnnotations] 绘制顶部点 {i}: {pt}")
# 获取对应框的宽度作为线条长度
if i < len(self.annotation_engine.boxes):
_, _, size = self.annotation_engine.boxes[i]
......@@ -1242,7 +1224,10 @@ class AnnotationWidget(QtWidgets.QWidget):
x, y = pt
start_point = (x - half_length, y)
end_point = (x + half_length, y)
print(f"[DEBUG _drawAnnotations] 顶部线条: start={start_point}, end={end_point}, color=(0,0,255), thickness=1, length={line_length}")
if self.debug:
print(f"[DEBUG _drawAnnotations] 绘制顶部点{i}: pt={pt}, line=({start_point})->({end_point})")
cv2.line(img, start_point, end_point, (0, 0, 255), 1)
# 如果正在画框,显示临时框
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment