Skip to content
Projects
Groups
Snippets
Help
This project
Loading...
Sign in / Register
Toggle navigation
O
Oil_Level_Recognition_System
Overview
Overview
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
Administrator
Oil_Level_Recognition_System
Commits
e6c7ade9
Commit
e6c7ade9
authored
Dec 04, 2025
by
yhb
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
1
parent
a0cefa05
Hide whitespace changes
Inline
Side-by-side
Showing
1 changed file
with
460 additions
and
7 deletions
+460
-7
detection.py
handlers/videopage/detection.py
+460
-7
No files found.
handlers/videopage/detection.py
View file @
e6c7ade9
...
@@ -168,6 +168,10 @@ class ModelDetect:
...
@@ -168,6 +168,10 @@ class ModelDetect:
self
.
frame_counters
=
[]
self
.
frame_counters
=
[]
self
.
consecutive_rejects
=
[]
self
.
consecutive_rejects
=
[]
self
.
last_observations
=
[]
self
.
last_observations
=
[]
# 🆕 强制重置条件检查相关
self
.
last_accepted_observations
=
[]
# 最后一次被接受的观测值(帧0)
self
.
rejected_observations_buffer
=
[]
# 被拒绝的观测值缓冲区(帧1-6)
def
_validate_device
(
self
,
device
):
def
_validate_device
(
self
,
device
):
"""验证并选择可用的设备"""
"""验证并选择可用的设备"""
...
@@ -453,6 +457,12 @@ class ModelDetect:
...
@@ -453,6 +457,12 @@ class ModelDetect:
kf
.
measurementNoiseCov
=
np
.
array
([[
10
]],
dtype
=
np
.
float32
)
kf
.
measurementNoiseCov
=
np
.
array
([[
10
]],
dtype
=
np
.
float32
)
kf
.
statePost
=
np
.
array
([[
5.0
],
[
0
]],
dtype
=
np
.
float32
)
# 默认初始高度5mm
kf
.
statePost
=
np
.
array
([[
5.0
],
[
0
]],
dtype
=
np
.
float32
)
# 默认初始高度5mm
self
.
kalman_filters
.
append
(
kf
)
self
.
kalman_filters
.
append
(
kf
)
# 🆕 检查并扩展强制重置相关列表
while
len
(
self
.
last_accepted_observations
)
<
num_targets
:
self
.
last_accepted_observations
.
append
(
None
)
while
len
(
self
.
rejected_observations_buffer
)
<
num_targets
:
self
.
rejected_observations_buffer
.
append
([])
def
predict
(
self
,
image
):
def
predict
(
self
,
image
):
"""
"""
...
@@ -987,6 +997,74 @@ class Logic(InitError):
...
@@ -987,6 +997,74 @@ class Logic(InitError):
self
.
full_threshold_ratio
=
0.9
# 满液阈值比例(容器高度的90%)
self
.
full_threshold_ratio
=
0.9
# 满液阈值比例(容器高度的90%)
self
.
full_count
=
[]
# 满液状态计数器
self
.
full_count
=
[]
# 满液状态计数器
self
.
full_confirm_frames
=
3
# 连续多少帧确认满液
self
.
full_confirm_frames
=
3
# 连续多少帧确认满液
# 🆕 卡尔曼滤波调试日志
self
.
_kalman_debug_enabled
=
True
self
.
_kalman_debug_path
=
"kalman_filter_debug.csv"
self
.
_kalman_frame_count
=
0
self
.
_init_kalman_debug_csv
()
def
_init_kalman_debug_csv
(
self
):
"""初始化卡尔曼滤波调试CSV日志文件"""
if
not
self
.
_kalman_debug_enabled
:
return
try
:
import
csv
with
open
(
self
.
_kalman_debug_path
,
'w'
,
newline
=
''
,
encoding
=
'utf-8'
)
as
f
:
writer
=
csv
.
writer
(
f
)
writer
.
writerow
([
'timestamp'
,
# 时间戳
'frame_count'
,
# 帧计数
'target_idx'
,
# 检测区域索引
'raw_observation'
,
# 原始观测值(mm)
'predicted_height'
,
# 卡尔曼预测值(mm)
'prediction_error_pct'
,
# 预测误差百分比
'filter_action'
,
# 滤波动作
'final_height'
,
# 滤波后高度(mm)
'smooth_height'
,
# 滑动窗口平滑后高度(mm)
'consecutive_rejects'
,
# 连续拒绝次数
'last_accepted'
,
# 最后接受的观测值(mm)
'rejected_buffer'
,
# 被拒绝的观测值缓冲区
'rejected_std'
,
# 被拒绝值的标准差
'rejected_mean'
,
# 被拒绝值的平均值
'reset_check_result'
,
# 重置检查结果
'is_full'
,
# 是否满液
'container_height_mm'
# 容器高度(mm)
])
print
(
f
"📝 [Logic] 卡尔曼滤波调试日志已初始化: {self._kalman_debug_path}"
)
except
Exception
as
e
:
print
(
f
"❌ [Logic] 初始化卡尔曼调试日志失败: {e}"
)
def
_log_kalman_debug
(
self
,
log_data
):
"""写入一条卡尔曼滤波调试日志"""
if
not
self
.
_kalman_debug_enabled
:
return
try
:
import
csv
from
datetime
import
datetime
with
open
(
self
.
_kalman_debug_path
,
'a'
,
newline
=
''
,
encoding
=
'utf-8'
)
as
f
:
writer
=
csv
.
writer
(
f
)
writer
.
writerow
([
datetime
.
now
()
.
strftime
(
'
%
Y-
%
m-
%
d
%
H:
%
M:
%
S.
%
f'
)[:
-
3
],
log_data
.
get
(
'frame_count'
,
0
),
log_data
.
get
(
'target_idx'
,
0
),
f
"{log_data.get('raw_observation', 0):.2f}"
,
f
"{log_data.get('predicted_height', 0):.2f}"
,
f
"{log_data.get('prediction_error_pct', 0):.1f}"
,
log_data
.
get
(
'filter_action'
,
''
),
f
"{log_data.get('final_height', 0):.2f}"
,
f
"{log_data.get('smooth_height', 0):.2f}"
,
log_data
.
get
(
'consecutive_rejects'
,
0
),
f
"{log_data.get('last_accepted', 0):.2f}"
if
log_data
.
get
(
'last_accepted'
)
is
not
None
else
'None'
,
log_data
.
get
(
'rejected_buffer'
,
''
),
f
"{log_data.get('rejected_std', 0):.2f}"
if
log_data
.
get
(
'rejected_std'
)
is
not
None
else
'N/A'
,
f
"{log_data.get('rejected_mean', 0):.2f}"
if
log_data
.
get
(
'rejected_mean'
)
is
not
None
else
'N/A'
,
log_data
.
get
(
'reset_check_result'
,
'N/A'
),
log_data
.
get
(
'is_full'
,
False
),
f
"{log_data.get('container_height_mm', 0):.2f}"
])
except
Exception
as
e
:
print
(
f
"❌ [Logic] 写入卡尔曼调试日志失败: {e}"
)
def
_init_kalman_filters
(
self
,
num_targets
):
def
_init_kalman_filters
(
self
,
num_targets
):
"""初始化卡尔曼滤波器列表"""
"""初始化卡尔曼滤波器列表"""
...
@@ -1150,6 +1228,9 @@ class Logic(InitError):
...
@@ -1150,6 +1228,9 @@ class Logic(InitError):
Returns:
Returns:
tuple: (平滑后的高度, 是否满液)
tuple: (平滑后的高度, 是否满液)
"""
"""
# 🆕 帧计数
self
.
_kalman_frame_count
+=
1
# 预测步骤
# 预测步骤
predicted
=
self
.
kalman_filters
[
idx
]
.
predict
()
predicted
=
self
.
kalman_filters
[
idx
]
.
predict
()
predicted_height
=
predicted
[
0
][
0
]
predicted_height
=
predicted
[
0
][
0
]
...
@@ -1162,20 +1243,45 @@ class Logic(InitError):
...
@@ -1162,20 +1243,45 @@ class Logic(InitError):
observation
==
self
.
last_observations
[
idx
])
observation
==
self
.
last_observations
[
idx
])
filter_action
=
""
filter_action
=
""
reset_check_result
=
"N/A"
rejected_std
=
None
rejected_mean
=
None
# 误差控制逻辑
# 误差控制逻辑
if
prediction_error_percent
>
self
.
error_percentage
:
if
prediction_error_percent
>
self
.
error_percentage
:
# 误差过大,增加拒绝计数
# 误差过大,增加拒绝计数
,并记录被拒绝的观测值
self
.
consecutive_rejects
[
idx
]
+=
1
self
.
consecutive_rejects
[
idx
]
+=
1
self
.
rejected_observations_buffer
[
idx
]
.
append
(
observation
)
# 检查是否连续6次拒绝
# 检查是否连续6次拒绝
if
self
.
consecutive_rejects
[
idx
]
>=
6
:
if
self
.
consecutive_rejects
[
idx
]
>=
6
:
# 连续6次误差过大,强制使用观测值更新(直接赋值,更激进)
# 🆕 强制重置前检查:判断是否为异常跳变
self
.
kalman_filters
[
idx
]
.
statePost
=
np
.
array
([[
observation
],
[
0
]],
dtype
=
np
.
float32
)
should_reset
=
self
.
_check_reset_condition
(
idx
,
container_height_mm
)
self
.
kalman_filters
[
idx
]
.
statePre
=
np
.
array
([[
observation
],
[
0
]],
dtype
=
np
.
float32
)
final_height
=
observation
# 计算被拒绝值的统计信息(用于日志)
self
.
consecutive_rejects
[
idx
]
=
0
# 重置计数器
if
len
(
self
.
rejected_observations_buffer
[
idx
])
>=
6
:
filter_action
=
f
"强制重置(连续6次拒绝,直接赋值{observation:.2f}mm)"
rejected_array
=
np
.
array
(
self
.
rejected_observations_buffer
[
idx
][
-
6
:])
rejected_std
=
np
.
std
(
rejected_array
)
rejected_mean
=
np
.
mean
(
rejected_array
)
if
should_reset
:
# 通过检查,执行强制重置
self
.
kalman_filters
[
idx
]
.
statePost
=
np
.
array
([[
observation
],
[
0
]],
dtype
=
np
.
float32
)
self
.
kalman_filters
[
idx
]
.
statePre
=
np
.
array
([[
observation
],
[
0
]],
dtype
=
np
.
float32
)
final_height
=
observation
# 更新最后接受的观测值
self
.
last_accepted_observations
[
idx
]
=
observation
filter_action
=
f
"强制重置(连续6次拒绝,检查通过,直接赋值{observation:.2f}mm)"
reset_check_result
=
"RESET_ALLOWED"
else
:
# 判定为异常跳变,不重置,继续使用预测值
final_height
=
predicted_height
filter_action
=
f
"拒绝重置(判定为异常跳变,继续使用预测值{predicted_height:.2f}mm)"
reset_check_result
=
"RESET_DENIED_JUMP"
# 重置计数器和缓冲区
self
.
consecutive_rejects
[
idx
]
=
0
self
.
rejected_observations_buffer
[
idx
]
=
[]
else
:
else
:
# 使用预测值
# 使用预测值
final_height
=
predicted_height
final_height
=
predicted_height
...
@@ -1184,7 +1290,10 @@ class Logic(InitError):
...
@@ -1184,7 +1290,10 @@ class Logic(InitError):
# 误差可接受,正常更新
# 误差可接受,正常更新
self
.
kalman_filters
[
idx
]
.
correct
(
np
.
array
([[
observation
]],
dtype
=
np
.
float32
))
self
.
kalman_filters
[
idx
]
.
correct
(
np
.
array
([[
observation
]],
dtype
=
np
.
float32
))
final_height
=
self
.
kalman_filters
[
idx
]
.
statePost
[
0
][
0
]
final_height
=
self
.
kalman_filters
[
idx
]
.
statePost
[
0
][
0
]
# 🆕 记录最后一次被接受的观测值(帧0)
self
.
last_accepted_observations
[
idx
]
=
observation
self
.
consecutive_rejects
[
idx
]
=
0
# 重置计数器
self
.
consecutive_rejects
[
idx
]
=
0
# 重置计数器
self
.
rejected_observations_buffer
[
idx
]
=
[]
# 清空拒绝缓冲区
filter_action
=
f
"正常更新(误差{prediction_error_percent:.1f}
%
<={self.error_percentage}
%
)"
filter_action
=
f
"正常更新(误差{prediction_error_percent:.1f}
%
<={self.error_percentage}
%
)"
# 更新上次观测值记录
# 更新上次观测值记录
...
@@ -1227,9 +1336,89 @@ class Logic(InitError):
...
@@ -1227,9 +1336,89 @@ class Logic(InitError):
full_status
=
"🌊满液"
if
is_full_confirmed
else
(
"接近满液"
if
is_full
else
"正常"
)
full_status
=
"🌊满液"
if
is_full_confirmed
else
(
"接近满液"
if
is_full
else
"正常"
)
print
(
f
" 💧 [目标{idx}] 满液判断: 阈值={full_threshold_mm:.2f}mm, 状态={full_status}, 计数={self.full_count[idx]}/{self.full_confirm_frames}"
)
print
(
f
" 💧 [目标{idx}] 满液判断: 阈值={full_threshold_mm:.2f}mm, 状态={full_status}, 计数={self.full_count[idx]}/{self.full_confirm_frames}"
)
# 🆕 写入卡尔曼滤波调试日志
self
.
_log_kalman_debug
({
'frame_count'
:
self
.
_kalman_frame_count
,
'target_idx'
:
idx
,
'raw_observation'
:
observation
,
'predicted_height'
:
predicted_height
,
'prediction_error_pct'
:
prediction_error_percent
,
'filter_action'
:
filter_action
,
'final_height'
:
final_height
,
'smooth_height'
:
smooth_height
,
'consecutive_rejects'
:
self
.
consecutive_rejects
[
idx
],
'last_accepted'
:
self
.
last_accepted_observations
[
idx
]
if
idx
<
len
(
self
.
last_accepted_observations
)
else
None
,
'rejected_buffer'
:
str
([
f
'{v:.2f}'
for
v
in
self
.
rejected_observations_buffer
[
idx
]])
if
idx
<
len
(
self
.
rejected_observations_buffer
)
else
'[]'
,
'rejected_std'
:
rejected_std
,
'rejected_mean'
:
rejected_mean
,
'reset_check_result'
:
reset_check_result
,
'is_full'
:
is_full_confirmed
,
'container_height_mm'
:
container_height_mm
})
# 🆕 返回滑动窗口平滑后的高度和满液状态
# 🆕 返回滑动窗口平滑后的高度和满液状态
return
smooth_height
,
is_full_confirmed
return
smooth_height
,
is_full_confirmed
def
_check_reset_condition
(
self
,
idx
,
container_height_mm
):
"""
检查是否应该执行强制重置
条件:
1. 被拒绝的6帧观测值方差小于容器高度的10
%
(说明这6帧数据稳定一致)
2. 这6帧的平均值与帧0(最后接受的观测值)误差超过30
%
如果同时满足条件1和2,说明是异常跳变,不应该重置
Args:
idx: 目标索引
container_height_mm: 容器高度(毫米)
Returns:
bool: True=应该重置, False=不应该重置(异常跳变)
"""
rejected_buffer
=
self
.
rejected_observations_buffer
[
idx
]
last_accepted
=
self
.
last_accepted_observations
[
idx
]
# 如果没有足够的数据,允许重置
if
len
(
rejected_buffer
)
<
6
or
last_accepted
is
None
:
if
self
.
debug
:
print
(
f
" 🔍 [目标{idx}] 重置检查: 数据不足,允许重置"
)
return
True
# 计算被拒绝的6帧观测值的方差和平均值
rejected_array
=
np
.
array
(
rejected_buffer
[
-
6
:])
# 取最后6个
rejected_std
=
np
.
std
(
rejected_array
)
rejected_mean
=
np
.
mean
(
rejected_array
)
# 条件1:方差小于容器高度的10%(数据稳定一致)
variance_threshold
=
container_height_mm
*
0.10
is_stable
=
rejected_std
<
variance_threshold
# 条件2:平均值与帧0的误差超过30%
error_from_last_accepted
=
abs
(
rejected_mean
-
last_accepted
)
error_threshold
=
container_height_mm
*
0.30
is_large_jump
=
error_from_last_accepted
>
error_threshold
if
self
.
debug
:
print
(
f
" 🔍 [目标{idx}] 重置检查:"
)
print
(
f
" 帧0观测值: {last_accepted:.2f}mm"
)
print
(
f
" 被拒绝6帧: {[f'{v:.2f}' for v in rejected_buffer[-6:]]}"
)
print
(
f
" 6帧标准差: {rejected_std:.2f}mm (阈值: {variance_threshold:.2f}mm)"
)
print
(
f
" 6帧平均值: {rejected_mean:.2f}mm"
)
print
(
f
" 与帧0误差: {error_from_last_accepted:.2f}mm (阈值: {error_threshold:.2f}mm)"
)
print
(
f
" 稳定一致: {is_stable}, 大幅跳变: {is_large_jump}"
)
# 如果6帧数据稳定一致 且 与帧0差距大 → 判定为异常跳变,不重置
if
is_stable
and
is_large_jump
:
if
self
.
debug
:
print
(
f
" ⚠️ [目标{idx}] 判定为异常跳变,拒绝重置"
)
return
False
# 其他情况允许重置
if
self
.
debug
:
print
(
f
" ✅ [目标{idx}] 允许重置"
)
return
True
def
get_smooth_height
(
self
,
target_idx
):
def
get_smooth_height
(
self
,
target_idx
):
"""获取平滑后的高度(中位数)"""
"""获取平滑后的高度(中位数)"""
if
not
self
.
recent_observations
[
target_idx
]:
if
not
self
.
recent_observations
[
target_idx
]:
...
@@ -1530,6 +1719,270 @@ class Logic(InitError):
...
@@ -1530,6 +1719,270 @@ class Logic(InitError):
return
None
,
False
,
'detect_zero'
return
None
,
False
,
'detect_zero'
# ==================== 空间后处理逻辑 ====================
class
SpaceLogic
:
"""
空间后处理逻辑类
功能:
根据现实物理条件制定约束规则,对多个检测区域的液位高度进行空间一致性校正
使用条件:
只有检测区域数量大于1时使用,只有一个检测区域则不使用
规则:
1. 下方检测区域分割结果有air或foam且此信息为真 → 上方检测区域液位高度一定为零
2. 上方检测区域分割结果有liquid或foam且此信息为真 → 下方检测区域液位高度一定为满(容器高度)
信息真假判断条件:
1. 分割结果的置信度(当前实现)
2. 后续会添加其他条件
返回码:
001: 下方检测区域只有air,上方液位设为0
002: 上方检测区域有liquid或foam,下方液位设为满
None: 未触发空间逻辑
"""
def
__init__
(
self
,
confidence_threshold
=
0.5
):
"""
初始化空间后处理逻辑类
Args:
confidence_threshold: 置信度阈值,高于此值认为信息为真
"""
self
.
confidence_threshold
=
confidence_threshold
self
.
debug
=
False
# 存储每个检测区域的分割结果信息
self
.
region_mask_info
=
{}
# {idx: {'classes': [], 'confidences': [], 'y_position': float}}
def
update_region_info
(
self
,
idx
,
all_masks_info
,
container_top_y
):
"""
更新检测区域的分割结果信息
Args:
idx: 检测区域索引
all_masks_info: mask信息列表 [(mask, class_name, confidence), ...]
container_top_y: 容器顶部y坐标(用于判断上下位置)
"""
classes
=
[]
confidences
=
[]
for
mask
,
class_name
,
conf
in
all_masks_info
:
classes
.
append
(
class_name
)
confidences
.
append
(
conf
)
self
.
region_mask_info
[
idx
]
=
{
'classes'
:
classes
,
'confidences'
:
confidences
,
'y_position'
:
container_top_y
,
# y坐标越小表示位置越上
'masks_info'
:
all_masks_info
}
if
self
.
debug
:
print
(
f
" 📍 [SpaceLogic] 更新区域{idx}信息: classes={classes}, confs={[f'{c:.2f}' for c in confidences]}, y={container_top_y}"
)
def
_has_class_with_confidence
(
self
,
idx
,
target_classes
):
"""
检查指定区域是否有目标类别且置信度达标
Args:
idx: 检测区域索引
target_classes: 目标类别列表 ['air', 'foam'] 或 ['liquid', 'foam']
Returns:
tuple: (是否存在, 最高置信度)
"""
if
idx
not
in
self
.
region_mask_info
:
return
False
,
0.0
info
=
self
.
region_mask_info
[
idx
]
max_conf
=
0.0
found
=
False
for
class_name
,
conf
in
zip
(
info
[
'classes'
],
info
[
'confidences'
]):
if
class_name
in
target_classes
:
if
conf
>=
self
.
confidence_threshold
:
found
=
True
max_conf
=
max
(
max_conf
,
conf
)
return
found
,
max_conf
def
_has_only_air
(
self
,
idx
):
"""
检查指定区域是否只有air(没有liquid和foam)
Args:
idx: 检测区域索引
Returns:
tuple: (是否只有air, air的置信度)
"""
if
idx
not
in
self
.
region_mask_info
:
return
False
,
0.0
info
=
self
.
region_mask_info
[
idx
]
has_air
=
False
air_conf
=
0.0
has_liquid_or_foam
=
False
for
class_name
,
conf
in
zip
(
info
[
'classes'
],
info
[
'confidences'
]):
if
class_name
==
'air'
and
conf
>=
self
.
confidence_threshold
:
has_air
=
True
air_conf
=
max
(
air_conf
,
conf
)
elif
class_name
in
[
'liquid'
,
'foam'
]
and
conf
>=
self
.
confidence_threshold
:
has_liquid_or_foam
=
True
return
has_air
and
not
has_liquid_or_foam
,
air_conf
def
apply_space_logic
(
self
,
liquid_heights
,
container_heights
,
fixed_tops
):
"""
应用空间后处理逻辑
Args:
liquid_heights: 液位高度字典 {idx: height_mm, ...}
container_heights: 容器高度字典 {idx: height_mm, ...}
fixed_tops: 容器顶部y坐标字典 {idx: y, ...}
Returns:
tuple: (修正后的液位高度字典, 触发的规则码列表)
规则码: '001'=下方air导致上方为0, '002'=上方liquid/foam导致下方为满
"""
# 检测区域数量必须大于1
num_regions
=
len
(
liquid_heights
)
if
num_regions
<=
1
:
if
self
.
debug
:
print
(
f
" ⏭️ [SpaceLogic] 检测区域数量={num_regions},跳过空间逻辑"
)
return
liquid_heights
,
[]
# 按y坐标排序,确定上下关系(y坐标越小越上)
sorted_indices
=
sorted
(
fixed_tops
.
keys
(),
key
=
lambda
x
:
fixed_tops
[
x
])
if
self
.
debug
:
print
(
f
" 🔍 [SpaceLogic] 检测区域排序(从上到下): {sorted_indices}"
)
for
idx
in
sorted_indices
:
print
(
f
" 区域{idx}: y={fixed_tops[idx]}, 液位={liquid_heights.get(idx, 'N/A')}mm"
)
corrected_heights
=
dict
(
liquid_heights
)
triggered_rules
=
[]
# 遍历相邻的检测区域对
for
i
in
range
(
len
(
sorted_indices
)
-
1
):
upper_idx
=
sorted_indices
[
i
]
# 上方区域
lower_idx
=
sorted_indices
[
i
+
1
]
# 下方区域
# ========== 规则1: 下方区域只有air → 上方液位为0 ==========
only_air
,
air_conf
=
self
.
_has_only_air
(
lower_idx
)
if
only_air
:
if
self
.
debug
:
print
(
f
" 📏 [SpaceLogic] 规则1触发: 下方区域{lower_idx}只有air(conf={air_conf:.2f}) → 上方区域{upper_idx}液位=0"
)
corrected_heights
[
upper_idx
]
=
0.0
triggered_rules
.
append
(
'001'
)
# ========== 规则2: 上方区域有liquid或foam → 下方液位为满 ==========
has_liquid_foam
,
lf_conf
=
self
.
_has_class_with_confidence
(
upper_idx
,
[
'liquid'
,
'foam'
])
if
has_liquid_foam
:
container_height
=
container_heights
.
get
(
lower_idx
,
0
)
if
container_height
>
0
:
if
self
.
debug
:
print
(
f
" 📏 [SpaceLogic] 规则2触发: 上方区域{upper_idx}有liquid/foam(conf={lf_conf:.2f}) → 下方区域{lower_idx}液位={container_height}mm(满)"
)
corrected_heights
[
lower_idx
]
=
container_height
triggered_rules
.
append
(
'002'
)
if
self
.
debug
and
not
triggered_rules
:
print
(
f
" ✅ [SpaceLogic] 未触发任何空间逻辑规则"
)
return
corrected_heights
,
triggered_rules
def
process
(
self
,
detection_results
,
container_heights
,
fixed_tops
):
"""
处理检测结果,应用空间后处理逻辑
这是主要的调用接口,整合了信息更新和逻辑应用
Args:
detection_results: 检测结果字典
{
'liquid_line_positions': {
idx: {'height_mm': float, 'y': int, ...},
...
},
'success': bool
}
container_heights: 容器高度列表或字典 [h1, h2, ...] 或 {idx: h, ...}
fixed_tops: 容器顶部y坐标列表或字典 [y1, y2, ...] 或 {idx: y, ...}
Returns:
dict: 修正后的检测结果(与输入格式相同)
"""
if
not
detection_results
.
get
(
'success'
,
False
):
return
detection_results
liquid_positions
=
detection_results
.
get
(
'liquid_line_positions'
,
{})
# 检测区域数量必须大于1
if
len
(
liquid_positions
)
<=
1
:
return
detection_results
# 转换为字典格式
if
isinstance
(
container_heights
,
list
):
container_heights
=
{
i
:
h
for
i
,
h
in
enumerate
(
container_heights
)}
if
isinstance
(
fixed_tops
,
list
):
fixed_tops
=
{
i
:
y
for
i
,
y
in
enumerate
(
fixed_tops
)}
# 提取液位高度
liquid_heights
=
{}
for
idx
,
pos_info
in
liquid_positions
.
items
():
liquid_heights
[
idx
]
=
pos_info
.
get
(
'height_mm'
,
0
)
# 应用空间逻辑
corrected_heights
,
triggered_rules
=
self
.
apply_space_logic
(
liquid_heights
,
container_heights
,
fixed_tops
)
# 更新检测结果
corrected_results
=
dict
(
detection_results
)
corrected_positions
=
dict
(
liquid_positions
)
for
idx
,
new_height
in
corrected_heights
.
items
():
if
idx
in
corrected_positions
:
old_height
=
corrected_positions
[
idx
]
.
get
(
'height_mm'
,
0
)
if
new_height
!=
old_height
:
corrected_positions
[
idx
]
=
dict
(
corrected_positions
[
idx
])
corrected_positions
[
idx
][
'height_mm'
]
=
new_height
corrected_positions
[
idx
][
'space_logic_applied'
]
=
True
# 重新计算像素高度和y坐标
container_height
=
container_heights
.
get
(
idx
,
20.0
)
if
container_height
>
0
:
# 获取容器像素高度(从原始数据推算)
original_height_mm
=
old_height
original_height_px
=
corrected_positions
[
idx
]
.
get
(
'height_px'
,
0
)
if
original_height_mm
>
0
:
pixel_per_mm
=
original_height_px
/
original_height_mm
else
:
# 使用默认比例
pixel_per_mm
=
10.0
new_height_px
=
int
(
new_height
*
pixel_per_mm
)
corrected_positions
[
idx
][
'height_px'
]
=
new_height_px
# 更新y坐标
container_bottom_y
=
corrected_positions
[
idx
]
.
get
(
'y'
,
0
)
+
original_height_px
corrected_positions
[
idx
][
'y'
]
=
container_bottom_y
-
new_height_px
corrected_results
[
'liquid_line_positions'
]
=
corrected_positions
corrected_results
[
'space_logic_rules'
]
=
triggered_rules
return
corrected_results
def
reset
(
self
):
"""重置空间逻辑状态"""
self
.
region_mask_info
=
{}
# ==================== 兼容性别名 ====================
# ==================== 兼容性别名 ====================
# 保持向后兼容,外部代码可以继续使用 LiquidDetectionEngine
# 保持向后兼容,外部代码可以继续使用 LiquidDetectionEngine
LiquidDetectionEngine
=
Logic
LiquidDetectionEngine
=
Logic
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment