Skip to content
Projects
Groups
Snippets
Help
This project
Loading...
Sign in / Register
Toggle navigation
O
Oil_Level_Recognition_System
Overview
Overview
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
Administrator
Oil_Level_Recognition_System
Commits
e4b58283
Commit
e4b58283
authored
Nov 30, 2025
by
Yuhaibo
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
1
parent
086a9666
Hide whitespace changes
Inline
Side-by-side
Showing
2 changed files
with
836 additions
and
10 deletions
+836
-10
channel_config.yaml
database/config/channel_config.yaml
+10
-10
detection.py
handlers/videopage/detection.py
+826
-0
No files found.
database/config/channel_config.yaml
View file @
e4b58283
...
@@ -17,23 +17,23 @@ channels:
...
@@ -17,23 +17,23 @@ channels:
name
:
'
4'
name
:
'
4'
channel2
:
channel2
:
general
:
general
:
task_id
:
'
1
23
'
task_id
:
'
1'
task_name
:
'
2
1
'
task_name
:
'
2
22
'
save_liquid_data_path
:
d:\restructure\liquid_level_line_detection_system\database\mission_result\1
23_21
save_liquid_data_path
:
d:\restructure\liquid_level_line_detection_system\database\mission_result\1
_222
channel3
:
channel3
:
general
:
general
:
task_id
:
'
1
23
'
task_id
:
'
1'
task_name
:
'
2
1
'
task_name
:
'
2
22
'
save_liquid_data_path
:
d:\restructure\liquid_level_line_detection_system\database\mission_result\1
23_21
save_liquid_data_path
:
d:\restructure\liquid_level_line_detection_system\database\mission_result\1
_222
channel4
:
channel4
:
general
:
general
:
task_id
:
'
1'
task_id
:
'
1'
task_name
:
'
1
'
task_name
:
'
222
'
save_liquid_data_path
:
d:\restructure\liquid_level_line_detection_system\database\mission_result\1_
1
save_liquid_data_path
:
d:\restructure\liquid_level_line_detection_system\database\mission_result\1_
222
channel1
:
channel1
:
general
:
general
:
task_id
:
'
1'
task_id
:
'
1'
task_name
:
'
1
'
task_name
:
'
222
'
area_count
:
0
area_count
:
0
safe_low
:
2.0mm
safe_low
:
2.0mm
safe_high
:
10.0mm
safe_high
:
10.0mm
...
@@ -41,7 +41,7 @@ channel1:
...
@@ -41,7 +41,7 @@ channel1:
video_format
:
AVI
video_format
:
AVI
push_address
:
'
'
push_address
:
'
'
video_path
:
'
'
video_path
:
'
'
save_liquid_data_path
:
d:\restructure\liquid_level_line_detection_system\database\mission_result\1_
1
save_liquid_data_path
:
d:\restructure\liquid_level_line_detection_system\database\mission_result\1_
222
areas
:
areas
:
area_1
:
通道1_区域1
area_1
:
通道1_区域1
area_heights
:
area_heights
:
...
...
handlers/videopage/detection.py
0 → 100644
View file @
e4b58283
# -*- coding: utf-8 -*-
"""
液位检测引擎 - 完整版
提供简洁的检测接口:输入标注数据和帧,输出液位高度数据
"""
import
cv2
import
numpy
as
np
from
pathlib
import
Path
# 导入动态路径获取函数
from
database.config
import
get_temp_models_dir
# ==================== 辅助函数 ====================
def
get_class_color
(
class_name
):
"""为不同类别分配不同的颜色"""
color_map
=
{
'liquid'
:
(
0
,
255
,
0
),
# 绿色 - 液体
'foam'
:
(
255
,
0
,
0
),
# 蓝色 - 泡沫
'air'
:
(
0
,
0
,
255
),
# 红色 - 空气
}
return
color_map
.
get
(
class_name
,
(
128
,
128
,
128
))
# 默认灰色
def
calculate_foam_boundary_lines
(
mask
):
"""计算foam mask的顶部和底部边界线"""
if
np
.
sum
(
mask
)
==
0
:
return
None
,
None
y_coords
=
np
.
where
(
mask
)[
0
]
if
len
(
y_coords
)
==
0
:
return
None
,
None
top_y
=
np
.
min
(
y_coords
)
bottom_y
=
np
.
max
(
y_coords
)
# 计算顶部边界线的平均位置
top_region_height
=
max
(
1
,
int
((
bottom_y
-
top_y
)
*
0.1
))
top_region_mask
=
mask
[
top_y
:
top_y
+
top_region_height
,
:]
if
np
.
sum
(
top_region_mask
)
>
0
:
top_y_coords
=
np
.
where
(
top_region_mask
)[
0
]
+
top_y
top_line_y
=
np
.
mean
(
top_y_coords
)
else
:
top_line_y
=
top_y
# 计算底部边界线的平均位置
bottom_region_height
=
max
(
1
,
int
((
bottom_y
-
top_y
)
*
0.1
))
bottom_region_mask
=
mask
[
bottom_y
-
bottom_region_height
:
bottom_y
+
1
,
:]
if
np
.
sum
(
bottom_region_mask
)
>
0
:
bottom_y_coords
=
np
.
where
(
bottom_region_mask
)[
0
]
+
(
bottom_y
-
bottom_region_height
)
bottom_line_y
=
np
.
mean
(
bottom_y_coords
)
else
:
bottom_line_y
=
bottom_y
return
top_line_y
,
bottom_line_y
def
analyze_multiple_foams
(
foam_masks
,
container_pixel_height
):
"""分析多个foam,找到可能的液位边界"""
if
len
(
foam_masks
)
<
2
:
return
None
foam_boundaries
=
[]
# 计算每个foam的边界信息
for
i
,
mask
in
enumerate
(
foam_masks
):
top_y
,
bottom_y
=
calculate_foam_boundary_lines
(
mask
)
if
top_y
is
not
None
and
bottom_y
is
not
None
:
center_y
=
(
top_y
+
bottom_y
)
/
2
foam_boundaries
.
append
({
'index'
:
i
,
'top_y'
:
top_y
,
'bottom_y'
:
bottom_y
,
'center_y'
:
center_y
})
if
len
(
foam_boundaries
)
<
2
:
return
None
# 按垂直位置排序
foam_boundaries
.
sort
(
key
=
lambda
x
:
x
[
'center_y'
])
error_threshold_px
=
container_pixel_height
*
0.1
# 检查相邻foam之间的边界
for
i
in
range
(
len
(
foam_boundaries
)
-
1
):
upper_foam
=
foam_boundaries
[
i
]
lower_foam
=
foam_boundaries
[
i
+
1
]
upper_bottom
=
upper_foam
[
'bottom_y'
]
lower_top
=
lower_foam
[
'top_y'
]
boundary_distance
=
abs
(
upper_bottom
-
lower_top
)
if
boundary_distance
<=
error_threshold_px
:
liquid_level_y
=
(
upper_bottom
+
lower_top
)
/
2
return
liquid_level_y
return
None
def
stable_median
(
data
,
max_std
=
1.0
):
"""稳健地计算中位数"""
if
len
(
data
)
==
0
:
return
0
data
=
np
.
array
(
data
)
q1
,
q3
=
np
.
percentile
(
data
,
[
25
,
75
])
iqr
=
q3
-
q1
lower
,
upper
=
q1
-
1.5
*
iqr
,
q3
+
1.5
*
iqr
data
=
data
[(
data
>=
lower
)
&
(
data
<=
upper
)]
if
len
(
data
)
>=
2
and
np
.
std
(
data
)
>
max_std
:
median_val
=
np
.
median
(
data
)
data
=
data
[
np
.
abs
(
data
-
median_val
)
<=
max_std
]
return
float
(
np
.
median
(
data
))
if
len
(
data
)
>
0
else
0
# ==================== 主检测引擎 ====================
class
LiquidDetectionEngine
:
"""
液位检测引擎
输入:
1. 标注数据(boxes, fixed_bottoms, fixed_tops, actual_heights)
2. 视频帧
输出:
液位高度数据字典
"""
def
__init__
(
self
,
model_path
=
None
,
device
=
'cuda'
,
batch_size
=
4
):
"""
初始化检测引擎(支持GPU批处理加速)
Args:
model_path: YOLO模型路径(.pt 或 .dat 文件)
device: 计算设备 ('cuda', 'cpu', '0', '1' 等)
batch_size: 批处理大小(1-8,推荐4)
"""
self
.
model
=
None
self
.
model_path
=
model_path
self
.
device
=
self
.
_validate_device
(
device
)
self
.
batch_size
=
batch_size
# 标注数据
self
.
targets
=
[]
# [(cx, cy, size), ...]
self
.
fixed_container_bottoms
=
[]
# 容器底部y坐标列表
self
.
fixed_container_tops
=
[]
# 容器顶部y坐标列表
self
.
actual_heights
=
[]
# 实际容器高度列表(毫米)
# 卡尔曼滤波器
self
.
kalman_filters
=
[]
# 检测状态
self
.
recent_observations
=
[]
self
.
no_liquid_count
=
[]
self
.
last_liquid_heights
=
[]
self
.
frame_counters
=
[]
self
.
consecutive_rejects
=
[]
self
.
last_observations
=
[]
# 滤波参数
self
.
smooth_window
=
5
self
.
error_percentage
=
30
# 误差百分比阈值
# 🔥 延迟加载模型 - 不在构造函数中加载,避免程序启动时自动下载 yolo11n.pt
# 模型将在实际需要时通过显式调用 load_model() 加载
# if model_path:
# self.load_model(model_path)
def
_validate_device
(
self
,
device
):
"""验证并选择可用的设备"""
try
:
import
torch
if
device
in
[
'cuda'
,
'0'
]
or
device
.
startswith
(
'cuda:'
):
if
torch
.
cuda
.
is_available
():
return
'cuda'
if
device
in
[
'cuda'
,
'0'
]
else
device
else
:
return
'cpu'
return
device
except
Exception
:
return
'cpu'
def
load_model
(
self
,
model_path
):
"""
加载YOLO模型
Args:
model_path: 模型文件路径(支持 .pt 和 .dat 格式)
Returns:
bool: 加载是否成功
"""
try
:
import
os
# 检查模型文件是否存在
if
not
os
.
path
.
exists
(
model_path
):
print
(
f
"❌ [检测引擎] 模型文件不存在: {model_path}"
)
return
False
# 如果是 .dat 文件,先解码
if
model_path
.
endswith
(
'.dat'
):
decoded_path
=
self
.
_decode_dat_model
(
model_path
)
if
decoded_path
is
None
:
print
(
f
"❌ [检测引擎] .dat 文件解码失败: {model_path}"
)
return
False
model_path
=
decoded_path
# 延迟导入 ultralytics,避免在模块加载时触发下载
from
ultralytics
import
YOLO
# 验证模型文件完整性
if
not
self
.
_validate_model_file
(
model_path
):
print
(
f
"❌ [检测引擎] 模型文件验证失败: {model_path}"
)
return
False
# 🔥 验证模型文件存在后,设置离线模式防止自动下载其他模型
if
not
os
.
path
.
exists
(
model_path
):
print
(
f
"❌ [检测引擎] 模型文件不存在: {model_path}"
)
return
False
os
.
environ
[
'YOLO_VERBOSE'
]
=
'False'
# 禁用详细输出
os
.
environ
[
'YOLO_OFFLINE'
]
=
'1'
# 离线模式
os
.
environ
[
'ULTRALYTICS_OFFLINE'
]
=
'True'
# 离线模式
print
(
f
"🔄 [检测引擎] 正在加载模型: {model_path}"
)
# 加载模型到GPU
self
.
model
=
YOLO
(
model_path
)
self
.
model
.
to
(
self
.
device
)
# 移动模型到指定设备
self
.
model_path
=
model_path
print
(
f
"✅ [检测引擎] 模型加载成功: {os.path.basename(model_path)}"
)
return
True
except
Exception
as
e
:
print
(
f
"❌ [检测引擎] 模型加载失败: {e}"
)
return
False
def
_validate_model_file
(
self
,
model_path
):
"""
验证模型文件的完整性
Args:
model_path: 模型文件路径
Returns:
bool: 文件是否有效
"""
try
:
import
os
# 检查文件大小(模型文件不应该太小)
file_size
=
os
.
path
.
getsize
(
model_path
)
if
file_size
<
1024
:
# 小于1KB的文件可能无效
print
(
f
"⚠️ [检测引擎] 模型文件过小: {file_size} bytes"
)
return
False
# 检查文件扩展名
if
not
(
model_path
.
endswith
(
'.pt'
)
or
model_path
.
endswith
(
'.pth'
)):
print
(
f
"⚠️ [检测引擎] 不支持的模型格式: {model_path}"
)
return
False
# 尝试读取文件头部,验证是否为有效的PyTorch模型
try
:
with
open
(
model_path
,
'rb'
)
as
f
:
header
=
f
.
read
(
8
)
# PyTorch模型文件通常以特定的魔数开头
if
len
(
header
)
<
8
:
print
(
f
"⚠️ [检测引擎] 模型文件头部不完整"
)
return
False
except
Exception
as
e
:
print
(
f
"⚠️ [检测引擎] 无法读取模型文件: {e}"
)
return
False
print
(
f
"✅ [检测引擎] 模型文件验证通过: {os.path.basename(model_path)} ({file_size} bytes)"
)
return
True
except
Exception
as
e
:
print
(
f
"❌ [检测引擎] 模型文件验证异常: {e}"
)
return
False
def
_decode_dat_model
(
self
,
dat_path
):
"""
解码 .dat 格式的模型文件(独立实现,不依赖外部模块)
.dat 文件格式:
- SIGNATURE (14 bytes): b'LDS_MODEL_FILE'
- VERSION (4 bytes): uint32, 当前为 1
- FILENAME_LEN (4 bytes): uint32
- FILENAME (FILENAME_LEN bytes): utf-8 编码的原始文件名
- DATA_LEN (8 bytes): uint64
- ENCRYPTED_DATA (DATA_LEN bytes): 加密的模型数据
Args:
dat_path: .dat 文件路径
Returns:
str: 解码后的 .pt 文件路径,失败返回 None
"""
try
:
import
struct
import
hashlib
# 解密参数(与 liquid4/core/model_loader.py 保持一致)
SIGNATURE
=
b
'LDS_MODEL_FILE'
VERSION
=
1
ENCRYPTION_KEY
=
"liquid_detection_system_2024"
# 生成密钥哈希
key_hash
=
hashlib
.
sha256
(
ENCRYPTION_KEY
.
encode
(
'utf-8'
))
.
digest
()
# 读取并解析 .dat 文件
with
open
(
dat_path
,
'rb'
)
as
f
:
# 1. 读取并验证签名
signature
=
f
.
read
(
len
(
SIGNATURE
))
if
signature
!=
SIGNATURE
:
return
None
# 2. 读取并验证版本
version
=
struct
.
unpack
(
'<I'
,
f
.
read
(
4
))[
0
]
if
version
!=
VERSION
:
return
None
# 3. 读取原始文件名
filename_len
=
struct
.
unpack
(
'<I'
,
f
.
read
(
4
))[
0
]
original_filename
=
f
.
read
(
filename_len
)
.
decode
(
'utf-8'
)
# 4. 读取加密数据长度
data_len
=
struct
.
unpack
(
'<Q'
,
f
.
read
(
8
))[
0
]
# 5. 读取加密数据
encrypted_data
=
f
.
read
(
data_len
)
# XOR 解密(与 liquid4 算法一致)
decrypted_data
=
bytearray
()
key_len
=
len
(
key_hash
)
for
i
,
byte
in
enumerate
(
encrypted_data
):
decrypted_data
.
append
(
byte
^
key_hash
[
i
%
key_len
])
decrypted_data
=
bytes
(
decrypted_data
)
# 保存到临时目录(使用完整路径的hash作为文件名,避免冲突)
# 使用动态路径获取临时模型目录
temp_dir
=
Path
(
get_temp_models_dir
())
temp_dir
.
mkdir
(
exist_ok
=
True
)
# 使用模型文件的完整路径生成唯一文件名
path_hash
=
hashlib
.
md5
(
str
(
dat_path
)
.
encode
())
.
hexdigest
()[:
8
]
temp_model_path
=
temp_dir
/
f
"temp_{Path(dat_path).stem}_{path_hash}.pt"
with
open
(
temp_model_path
,
'wb'
)
as
f
:
f
.
write
(
decrypted_data
)
return
str
(
temp_model_path
)
except
Exception
as
e
:
return
None
def
_parse_targets
(
self
,
boxes
):
"""解析boxes为targets格式
Args:
boxes: 检测框列表 [[x1, y1, x2, y2], ...] 或 [[cx, cy, size], ...]
Returns:
list: targets列表 [(cx, cy, size), ...]
"""
targets
=
[]
for
box
in
boxes
:
if
len
(
box
)
==
3
:
# 已经是 (cx, cy, size) 格式
targets
.
append
(
tuple
(
box
))
elif
len
(
box
)
>=
4
:
# 是 (x1, y1, x2, y2) 格式,转换为 (cx, cy, size)
x1
,
y1
,
x2
,
y2
=
box
[:
4
]
cx
=
int
((
x1
+
x2
)
/
2
)
cy
=
int
((
y1
+
y2
)
/
2
)
size
=
max
(
abs
(
x2
-
x1
),
abs
(
y2
-
y1
))
targets
.
append
((
cx
,
cy
,
size
))
return
targets
def
configure
(
self
,
boxes
,
fixed_bottoms
,
fixed_tops
,
actual_heights
):
"""
配置标注数据
Args:
boxes: 检测框列表 [[x1, y1, x2, y2], ...] 或 [[cx, cy, size], ...]
fixed_bottoms: 容器底部点列表 [y1, y2, ...]
fixed_tops: 容器顶部点列表 [y1, y2, ...]
actual_heights: 实际容器高度列表 [h1, h2, ...] (单位:毫米)
"""
try
:
# 转换boxes为targets格式 [(cx, cy, size), ...]
self
.
targets
=
self
.
_parse_targets
(
boxes
)
self
.
fixed_container_bottoms
=
list
(
fixed_bottoms
)
self
.
fixed_container_tops
=
list
(
fixed_tops
)
self
.
actual_heights
=
[
float
(
h
)
for
h
in
actual_heights
]
# 初始化状态列表
num_targets
=
len
(
self
.
targets
)
self
.
recent_observations
=
[[]
for
_
in
range
(
num_targets
)]
self
.
no_liquid_count
=
[
0
]
*
num_targets
self
.
last_liquid_heights
=
[
None
]
*
num_targets
self
.
frame_counters
=
[
0
]
*
num_targets
self
.
consecutive_rejects
=
[
0
]
*
num_targets
self
.
last_observations
=
[
None
]
*
num_targets
# 初始化卡尔曼滤波器
self
.
_init_kalman_filters
(
num_targets
)
except
Exception
:
pass
def
_init_kalman_filters
(
self
,
num_targets
):
"""初始化卡尔曼滤波器列表"""
self
.
kalman_filters
=
[]
for
i
in
range
(
num_targets
):
kf
=
cv2
.
KalmanFilter
(
2
,
1
)
kf
.
measurementMatrix
=
np
.
array
([[
1
,
0
]],
np
.
float32
)
kf
.
transitionMatrix
=
np
.
array
([[
1
,
0.9
],
[
0
,
0.9
]],
np
.
float32
)
kf
.
processNoiseCov
=
np
.
diag
([
1e-4
,
1e-3
])
.
astype
(
np
.
float32
)
kf
.
measurementNoiseCov
=
np
.
array
([[
10
]],
dtype
=
np
.
float32
)
# 初始状态:假设容器高度的50%
init_height
=
self
.
actual_heights
[
i
]
*
0.5
if
i
<
len
(
self
.
actual_heights
)
else
5.0
kf
.
statePost
=
np
.
array
([[
init_height
],
[
0
]],
dtype
=
np
.
float32
)
self
.
kalman_filters
.
append
(
kf
)
def
detect
(
self
,
frame
,
annotation_config
=
None
):
"""
检测帧中的液位高度
Args:
frame: 输入的视频帧 (numpy.ndarray)
annotation_config: 可选的标注配置字典,包含 boxes, fixed_bottoms, fixed_tops, actual_heights
如果提供,将使用此配置而不是实例配置(用于多通道共享模型场景)
Returns:
dict: 检测结果
{
'liquid_line_positions': {
0: {'y': y坐标, 'height_mm': 高度毫米, 'height_px': 高度像素},
1: {...},
...
},
'success': bool # 检测是否成功
}
"""
if
self
.
model
is
None
:
return
{
'liquid_line_positions'
:
{},
'success'
:
False
}
# 使用动态配置或实例配置
if
annotation_config
:
targets
=
self
.
_parse_targets
(
annotation_config
.
get
(
'boxes'
,
[]))
fixed_bottoms
=
annotation_config
.
get
(
'fixed_bottoms'
,
[])
fixed_tops
=
annotation_config
.
get
(
'fixed_tops'
,
[])
actual_heights
=
annotation_config
.
get
(
'actual_heights'
,
[])
else
:
targets
=
self
.
targets
fixed_bottoms
=
self
.
fixed_container_bottoms
fixed_tops
=
self
.
fixed_container_tops
actual_heights
=
self
.
actual_heights
if
not
targets
:
return
{
'liquid_line_positions'
:
{},
'success'
:
False
}
try
:
h
,
w
=
frame
.
shape
[:
2
]
liquid_line_positions
=
{}
for
idx
,
(
center_x
,
center_y
,
crop_size
)
in
enumerate
(
targets
):
# 裁剪检测区域
half_size
=
crop_size
//
2
top
=
max
(
center_y
-
half_size
,
0
)
bottom
=
min
(
center_y
+
half_size
,
h
)
left
=
max
(
center_x
-
half_size
,
0
)
right
=
min
(
center_x
+
half_size
,
w
)
cropped
=
frame
[
top
:
bottom
,
left
:
right
]
if
cropped
.
size
==
0
:
continue
# 执行检测(传入top坐标和配置用于坐标转换)
liquid_height_mm
=
self
.
_detect_single_target
(
cropped
,
idx
,
top
,
fixed_bottoms
[
idx
]
if
idx
<
len
(
fixed_bottoms
)
else
None
,
fixed_tops
[
idx
]
if
idx
<
len
(
fixed_tops
)
else
None
,
actual_heights
[
idx
]
if
idx
<
len
(
actual_heights
)
else
20.0
)
# 如果没有检测到液位,使用高度0
if
liquid_height_mm
is
None
:
liquid_height_mm
=
0.0
# 计算液位线位置
# 注意:container_bottom_y 和 container_top_y 已经是原图中的绝对坐标
container_bottom_y
=
fixed_bottoms
[
idx
]
if
idx
<
len
(
fixed_bottoms
)
else
0
container_top_y
=
fixed_tops
[
idx
]
if
idx
<
len
(
fixed_tops
)
else
0
container_height_mm
=
actual_heights
[
idx
]
if
idx
<
len
(
actual_heights
)
else
20.0
container_pixel_height
=
container_bottom_y
-
container_top_y
pixel_per_mm
=
container_pixel_height
/
container_height_mm
height_px
=
int
(
liquid_height_mm
*
pixel_per_mm
)
# 液位线在原图中的绝对位置(container_bottom_y 已经是绝对坐标)
liquid_line_y_absolute
=
container_bottom_y
-
height_px
# 统一使用mm单位输出
liquid_line_positions
[
idx
]
=
{
'y'
:
liquid_line_y_absolute
,
'height_mm'
:
liquid_height_mm
,
# 毫米单位
'height_px'
:
height_px
,
'left'
:
left
,
'right'
:
right
}
# # 调试信息:输出数据
# print(f"\n [检测输出-目标{idx}] 液位线位置数据:")
# print(f" - y坐标: {liquid_line_y_absolute}px")
# print(f" - height_mm: {liquid_height_mm:.2f}mm ️ 注意单位")
# print(f" - height_px: {height_px}px")
# # 调试信息:最终输出
# print(f"\n [检测输出] 最终结果:")
# for idx, pos in liquid_line_positions.items():
# print(f" 目标{idx}: height_mm={pos['height_mm']:.2f}mm (键名是'height_mm')")
return
{
'liquid_line_positions'
:
liquid_line_positions
,
'success'
:
len
(
liquid_line_positions
)
>
0
}
except
Exception
:
return
{
'liquid_line_positions'
:
{},
'success'
:
False
}
def
_detect_single_target
(
self
,
cropped
,
idx
,
crop_top_y
,
container_bottom
=
None
,
container_top
=
None
,
container_height_mm
=
20.0
):
"""
检测单个目标的液位高度
Args:
cropped: 裁剪后的图像
idx: 目标索引
crop_top_y: 裁剪区域在原图中的top坐标(用于坐标转换)
container_bottom: 容器底部y坐标(可选,如果为None则使用实例配置)
container_top: 容器顶部y坐标(可选,如果为None则使用实例配置)
container_height_mm: 容器实际高度(毫米)
Returns:
float: 液位高度(毫米),失败返回 None
"""
try
:
# 获取容器信息(原图绝对坐标)
if
container_bottom
is
not
None
and
container_top
is
not
None
:
container_bottom_offset
=
container_bottom
container_top_offset
=
container_top
else
:
container_bottom_offset
=
self
.
fixed_container_bottoms
[
idx
]
container_top_offset
=
self
.
fixed_container_tops
[
idx
]
container_height_mm
=
self
.
actual_heights
[
idx
]
container_pixel_height
=
container_bottom_offset
-
container_top_offset
pixel_per_mm
=
container_pixel_height
/
container_height_mm
# # 调试信息:容器参数
# print(f"\n [检测-目标{idx}] 容器参数:")
# print(f" - 底部y: {container_bottom_offset}px")
# print(f" - 顶部y: {container_top_offset}px")
# print(f" - 容器像素高度: {container_pixel_height}px")
# print(f" - 容器实际高度: {container_height_mm}mm")
# print(f" - 像素/毫米比例: {pixel_per_mm:.3f}px/mm")
# 执行YOLO推理(使用GPU + 批处理)
mission_results
=
self
.
model
.
predict
(
source
=
cropped
,
imgsz
=
640
,
conf
=
0.5
,
iou
=
0.5
,
device
=
self
.
device
,
# 强制使用GPU
batch
=
self
.
batch_size
,
# 启用批处理
save
=
False
,
verbose
=
False
,
half
=
True
if
self
.
device
!=
'cpu'
else
False
,
# GPU使用FP16加速
stream
=
False
# 批处理模式
)
mission_result
=
mission_results
[
0
]
# # 调试信息:YOLO推理结果
# print(f"[检测-目标{idx}] YOLO推理结果:")
# print(f" - mission_result.masks: {mission_result.masks is not None}")
# if mission_result.masks is not None:
# print(f" - masks数量: {len(mission_result.masks.data)}")
# else:
# print(f" - ⚠️ 未检测到任何mask!")
liquid_height
=
None
# 处理检测结果
if
mission_result
.
masks
is
not
None
:
masks
=
mission_result
.
masks
.
data
.
cpu
()
.
numpy
()
>
0.5
classes
=
mission_result
.
boxes
.
cls
.
cpu
()
.
numpy
()
.
astype
(
int
)
confidences
=
mission_result
.
boxes
.
conf
.
cpu
()
.
numpy
()
else
:
return
None
# 收集所有mask信息
all_masks_info
=
[]
for
i
in
range
(
len
(
masks
)):
class_name
=
self
.
model
.
names
[
classes
[
i
]]
conf
=
confidences
[
i
]
if
confidences
[
i
]
>=
0.5
:
resized_mask
=
cv2
.
resize
(
masks
[
i
]
.
astype
(
np
.
uint8
),
(
cropped
.
shape
[
1
],
cropped
.
shape
[
0
])
)
>
0.5
all_masks_info
.
append
((
resized_mask
,
class_name
,
confidences
[
i
]))
if
len
(
all_masks_info
)
==
0
:
return
None
# ️ 关键修复:将原图坐标转换为裁剪图像坐标
# container_bottom_offset 是原图绝对坐标,需要转换为裁剪图像中的相对坐标
container_bottom_in_crop
=
container_bottom_offset
-
crop_top_y
container_top_in_crop
=
container_top_offset
-
crop_top_y
# print(f" [坐标转换-目标{idx}]:")
# print(f" - 裁剪区域top: {crop_top_y}px (原图坐标)")
# print(f" - 原图容器底部: {container_bottom_offset}px → 裁剪图像中: {container_bottom_in_crop}px")
# print(f" - 原图容器顶部: {container_top_offset}px → 裁剪图像中: {container_top_in_crop}px")
# print(f" - 裁剪图像中容器高度: {container_bottom_in_crop - container_top_in_crop}px(应等于{container_pixel_height}px)")
# 分析mask获取液位高度(使用裁剪图像坐标)
liquid_height
=
self
.
_enhanced_liquid_detection
(
all_masks_info
,
container_bottom_in_crop
,
# 使用裁剪图像坐标
container_pixel_height
,
container_height_mm
,
idx
)
return
liquid_height
except
Exception
as
e
:
print
(
f
"[检测-目标{idx}] ❌ 检测异常: {e}"
)
return
None
def
_enhanced_liquid_detection
(
self
,
all_masks_info
,
container_bottom
,
container_pixel_height
,
container_height_mm
,
idx
):
"""
增强的液位检测,结合连续帧逻辑和foam分析
Args:
all_masks_info: mask信息列表 [(mask, class_name, confidence), ...]
container_bottom: 容器底部y坐标
container_pixel_height: 容器像素高度
container_height_mm: 容器实际高度(毫米)
idx: 目标索引
Returns:
float: 液位高度(毫米),失败返回 None
"""
pixel_per_mm
=
container_pixel_height
/
container_height_mm
# 分离不同类别的mask
liquid_masks
=
[]
foam_masks
=
[]
air_masks
=
[]
for
mask
,
class_name
,
confidence
in
all_masks_info
:
if
class_name
==
'liquid'
:
liquid_masks
.
append
(
mask
)
elif
class_name
==
'foam'
:
foam_masks
.
append
(
mask
)
elif
class_name
==
'air'
:
air_masks
.
append
(
mask
)
# 方法1:直接liquid检测(优先)
if
liquid_masks
:
# 找到最上层的液体mask
topmost_y
=
float
(
'inf'
)
for
i
,
mask
in
enumerate
(
liquid_masks
):
y_indices
=
np
.
where
(
mask
)[
0
]
if
len
(
y_indices
)
>
0
:
mask_top_y
=
np
.
min
(
y_indices
)
# print(f" - liquid mask {i+1}: 顶部y={mask_top_y}px")
if
mask_top_y
<
topmost_y
:
topmost_y
=
mask_top_y
if
topmost_y
!=
float
(
'inf'
):
liquid_height_px
=
container_bottom
-
topmost_y
liquid_height_mm
=
liquid_height_px
/
pixel_per_mm
return
max
(
0
,
min
(
liquid_height_mm
,
container_height_mm
))
# 方法2:foam边界分析(备选)- 连续3帧未检测到liquid时启用
if
self
.
no_liquid_count
[
idx
]
>=
3
:
if
len
(
foam_masks
)
>=
2
:
# 多个foam,寻找液位边界
liquid_y
=
analyze_multiple_foams
(
foam_masks
,
container_pixel_height
)
if
liquid_y
is
not
None
:
liquid_height_px
=
container_bottom
-
liquid_y
liquid_height_mm
=
liquid_height_px
/
pixel_per_mm
return
max
(
0
,
min
(
liquid_height_mm
,
container_height_mm
))
elif
len
(
foam_masks
)
==
1
:
# 单个foam,使用下边界
foam_mask
=
foam_masks
[
0
]
top_y
,
bottom_y
=
calculate_foam_boundary_lines
(
foam_mask
)
if
bottom_y
is
not
None
:
liquid_height_px
=
container_bottom
-
bottom_y
liquid_height_mm
=
liquid_height_px
/
pixel_per_mm
return
max
(
0
,
min
(
liquid_height_mm
,
container_height_mm
))
elif
len
(
air_masks
)
==
1
:
# 单个air,使用下边界
air_mask
=
air_masks
[
0
]
y_coords
=
np
.
where
(
air_mask
)[
0
]
if
len
(
y_coords
)
>
0
:
bottom_y
=
np
.
max
(
y_coords
)
liquid_height_px
=
container_bottom
-
bottom_y
liquid_height_mm
=
liquid_height_px
/
pixel_per_mm
return
max
(
0
,
min
(
liquid_height_mm
,
container_height_mm
))
return
None
def
_apply_kalman_filter
(
self
,
observation
,
idx
,
container_height_mm
):
"""
应用卡尔曼滤波平滑液位高度
Args:
observation: 观测值(毫米)
idx: 目标索引
container_height_mm: 容器高度(毫米)
Returns:
float: 滤波后的高度(毫米)
"""
# 预测步骤
predicted
=
self
.
kalman_filters
[
idx
]
.
predict
()
predicted_height
=
predicted
[
0
][
0
]
# 计算预测误差(相对于容器高度的百分比)
prediction_error_percent
=
abs
(
observation
-
predicted_height
)
/
container_height_mm
*
100
# 检测是否是重复的观测值(保持的液位数据)
is_repeated_observation
=
(
self
.
last_observations
[
idx
]
is
not
None
and
observation
==
self
.
last_observations
[
idx
])
# 误差控制逻辑
if
prediction_error_percent
>
self
.
error_percentage
:
# 误差过大,增加拒绝计数
self
.
consecutive_rejects
[
idx
]
+=
1
# 检查是否连续6次拒绝
if
self
.
consecutive_rejects
[
idx
]
>=
6
:
# 连续6次误差过大,强制使用观测值更新
self
.
kalman_filters
[
idx
]
.
correct
(
np
.
array
([[
observation
]],
dtype
=
np
.
float32
))
final_height
=
self
.
kalman_filters
[
idx
]
.
statePost
[
0
][
0
]
self
.
consecutive_rejects
[
idx
]
=
0
# 重置计数器
else
:
# 使用预测值
final_height
=
predicted_height
else
:
# 误差可接受,正常更新
self
.
kalman_filters
[
idx
]
.
correct
(
np
.
array
([[
observation
]],
dtype
=
np
.
float32
))
final_height
=
self
.
kalman_filters
[
idx
]
.
statePost
[
0
][
0
]
self
.
consecutive_rejects
[
idx
]
=
0
# 重置计数器
# 更新上次观测值记录
self
.
last_observations
[
idx
]
=
observation
# 添加到滑动窗口
self
.
recent_observations
[
idx
]
.
append
(
final_height
)
if
len
(
self
.
recent_observations
[
idx
])
>
self
.
smooth_window
:
self
.
recent_observations
[
idx
]
.
pop
(
0
)
# 限制高度范围
final_height
=
max
(
0
,
min
(
final_height
,
container_height_mm
))
return
final_height
def
get_smooth_height
(
self
,
target_idx
):
"""获取平滑后的高度(中位数)"""
if
not
self
.
recent_observations
[
target_idx
]:
return
0
return
np
.
median
(
self
.
recent_observations
[
target_idx
])
def
reset_target
(
self
,
target_idx
):
"""重置指定目标的滤波器状态"""
if
target_idx
<
len
(
self
.
consecutive_rejects
):
self
.
consecutive_rejects
[
target_idx
]
=
0
if
target_idx
<
len
(
self
.
last_observations
):
self
.
last_observations
[
target_idx
]
=
None
if
target_idx
<
len
(
self
.
recent_observations
):
self
.
recent_observations
[
target_idx
]
=
[]
if
target_idx
<
len
(
self
.
no_liquid_count
):
self
.
no_liquid_count
[
target_idx
]
=
0
if
target_idx
<
len
(
self
.
frame_counters
):
self
.
frame_counters
[
target_idx
]
=
0
def
cleanup
(
self
):
"""清理资源"""
try
:
# 清理临时模型文件(使用动态路径)
temp_dir
=
Path
(
get_temp_models_dir
())
if
temp_dir
.
exists
():
for
temp_file
in
temp_dir
.
glob
(
"temp_*.pt"
):
try
:
temp_file
.
unlink
()
except
:
pass
except
:
pass
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment