Commit 917fbb65 by yhb

1

parent 38c68ebe
......@@ -386,19 +386,44 @@ class DataCollectionChannelHandler:
def _parseRTSP(self, rtsp_url):
"""解析RTSP URL(复用实时监测管理的方法)"""
try:
import re
# RTSP URL格式: rtsp://username:password@ip:port/path
pattern = r'rtsp://([^:]+):([^@]+)@([^:/]+)'
match = re.match(pattern, rtsp_url)
if match:
username = match.group(1)
password = match.group(2)
ip = match.group(3)
return username, password, ip
else:
# 支持密码中包含@符号的情况
if not rtsp_url.startswith('rtsp://'):
return None, None, None
url_part = rtsp_url[7:] # 去掉 "rtsp://"
# 找到最后一个 @ 符号(IP地址前的那个)
last_at_index = url_part.rfind('@')
if last_at_index == -1:
return None, None, None
credentials_part = url_part[:last_at_index]
host_part = url_part[last_at_index + 1:]
# 分离用户名和密码(第一个:分隔)
first_colon = credentials_part.find(':')
if first_colon == -1:
return None, None, None
username = credentials_part[:first_colon]
password = credentials_part[first_colon + 1:]
# URL解码密码
from urllib.parse import unquote
password = unquote(password)
# 提取IP(到:或/为止)
ip_end = len(host_part)
colon_idx = host_part.find(':')
slash_idx = host_part.find('/')
if colon_idx != -1:
ip_end = min(ip_end, colon_idx)
if slash_idx != -1:
ip_end = min(ip_end, slash_idx)
ip = host_part[:ip_end]
return username, password, ip
except Exception as e:
return None, None, None
......
......@@ -30,7 +30,7 @@ class HKcapture:
提供类似cv2.VideoCapture的接口
"""
def __init__(self, source, username=None, password=None, port=8000, channel=1, fps=25):
def __init__(self, source, username=None, password=None, port=8000, channel=1, fps=25, debug=False):
"""
初始化视频捕获对象
......@@ -41,6 +41,7 @@ class HKcapture:
port: 设备端口,默认8000 (int,仅海康威视使用)
channel: 通道号,默认1 (int,仅海康威视使用)
fps: 期望帧率,默认25 (int,用于配置通道和OpenCV)
debug: 是否输出调试信息,默认False
"""
self.source = source
self.username = username
......@@ -48,6 +49,7 @@ class HKcapture:
self.port = port
self.channel = channel
self.target_fps = fps # 保存期望帧率
self.debug = debug # 调试开关
# 检测是否为海康威视设备
self.is_hikvision = self._detect_hikvision()
......@@ -95,6 +97,9 @@ class HKcapture:
# 如果source包含admin:cei345678字段,则认为是海康威视设备
if self.source and 'admin:cei345678' in self.source:
return True
# 如果source包含admin:123456aA@字段,则认为是海康威视设备
if self.source and 'admin:123456aA@' in self.source:
return True
# 如果source是IP地址格式且提供了用户名密码,则认为是海康威视
if self.username and self.password:
......@@ -108,6 +113,9 @@ class HKcapture:
# 如果RTSP URL包含admin:cei345678,认为是海康威视
if 'admin:cei345678' in self.source:
return True
# 如果RTSP URL包含admin:123456aA@,认为是海康威视
if 'admin:123456aA@' in self.source:
return True
# 否则认为是其他RTSP通道
return False
......@@ -125,26 +133,120 @@ class HKcapture:
return self.source
# 如果source是RTSP URL,提取IP
# 支持密码中包含@符号的情况
if self.source.startswith('rtsp://'):
pattern = r'rtsp://(?:[^:]+:[^@]+@)?([^:/]+)'
match = re.match(pattern, self.source)
if match:
return match.group(1)
try:
# 去掉 rtsp:// 前缀
url_part = self.source[7:]
if self.debug:
print(f"[HKcapture调试] _extract_ip: url_part={url_part}")
# 找到最后一个 @ 符号(IP地址前的那个)
last_at_index = url_part.rfind('@')
if self.debug:
print(f"[HKcapture调试] _extract_ip: last_at_index={last_at_index}")
if last_at_index != -1:
# @ 后面是 ip:port/path
host_part = url_part[last_at_index + 1:]
else:
# 没有认证信息,直接是 ip:port/path
host_part = url_part
if self.debug:
print(f"[HKcapture调试] _extract_ip: host_part={host_part}")
# 提取IP(到:或/为止)
ip_end = len(host_part)
colon_idx = host_part.find(':')
slash_idx = host_part.find('/')
if colon_idx != -1:
ip_end = min(ip_end, colon_idx)
if slash_idx != -1:
ip_end = min(ip_end, slash_idx)
extracted_ip = host_part[:ip_end]
if self.debug:
print(f"[HKcapture调试] _extract_ip: 提取的IP={extracted_ip}")
return extracted_ip
except Exception as e:
if self.debug:
print(f"[HKcapture调试] _extract_ip: 异常={e}")
pass
return None
def _extract_credentials_from_source(self):
"""从source中提取用户名和密码"""
"""从source中提取用户名和密码
支持密码中包含特殊字符(如@)的情况
URL格式: rtsp://username:password@ip:port/path
"""
if self.debug:
print(f"[HKcapture调试] _extract_credentials_from_source 被调用")
print(f"[HKcapture调试] 原始source: {self.source}")
if not self.source or not self.source.startswith('rtsp://'):
if self.debug:
print(f"[HKcapture调试] source为空或不是rtsp://开头,返回None")
return None, None
try:
# 去掉 rtsp:// 前缀
url_part = self.source[7:] # 去掉 "rtsp://"
if self.debug:
print(f"[HKcapture调试] 去掉rtsp://后: {url_part}")
# 解析RTSP URL: rtsp://username:password@ip:port/path
pattern = r'rtsp://([^:]+):([^@]+)@'
match = re.match(pattern, self.source)
if match:
return match.group(1), match.group(2)
# 找到最后一个 @ 符号(IP地址前的那个)
# 这样可以正确处理密码中包含 @ 的情况
last_at_index = url_part.rfind('@')
if self.debug:
print(f"[HKcapture调试] 最后一个@的位置: {last_at_index}")
return None, None
if last_at_index == -1:
if self.debug:
print(f"[HKcapture调试] 未找到@符号,返回None")
return None, None
# 提取用户名:密码部分
credentials_part = url_part[:last_at_index]
host_part = url_part[last_at_index + 1:]
if self.debug:
print(f"[HKcapture调试] 凭证部分: {credentials_part}")
print(f"[HKcapture调试] 主机部分: {host_part}")
# 找到第一个 : 分隔用户名和密码
first_colon_index = credentials_part.find(':')
if self.debug:
print(f"[HKcapture调试] 第一个:的位置: {first_colon_index}")
if first_colon_index == -1:
if self.debug:
print(f"[HKcapture调试] 凭证部分未找到:,返回None")
return None, None
username = credentials_part[:first_colon_index]
password = credentials_part[first_colon_index + 1:]
if self.debug:
print(f"[HKcapture调试] 解析出用户名: {username}")
print(f"[HKcapture调试] 解析出密码(解码前): {password}")
# URL解码密码(处理 %40 等编码的特殊字符)
from urllib.parse import unquote
password = unquote(password)
if self.debug:
print(f"[HKcapture调试] 解析出密码(解码后): {password}")
return username, password
except Exception as e:
if self.debug:
print(f"[HKcapture调试] 解析异常: {e}")
# 回退到旧的正则解析方式
pattern = r'rtsp://([^:]+):([^@]+)@'
match = re.match(pattern, self.source)
if match:
return match.group(1), match.group(2)
return None, None
def _init_hikvision_sdk(self):
"""初始化海康威视SDK库(线程安全)"""
......@@ -253,27 +355,59 @@ class HKcapture:
def _login_device(self):
"""登录海康威视设备(已在_open_hikvision中持有锁)"""
if self.debug:
print(f"[HKcapture调试] _login_device 被调用")
print(f"[HKcapture调试] self.source: {self.source}")
print(f"[HKcapture调试] self.username: {self.username}")
print(f"[HKcapture调试] self.password: {self.password}")
# 从source中提取IP地址
ip_address = self._extract_ip_from_source()
if self.debug:
print(f"[HKcapture调试] 提取的IP地址: {ip_address}")
if not ip_address:
if self.debug:
print(f"[HKcapture调试] IP地址为空,登录失败")
return False
# 从source中提取用户名和密码(如果RTSP URL中包含)
username, password = self._extract_credentials_from_source()
if self.debug:
print(f"[HKcapture调试] 从URL提取的用户名: {username}")
print(f"[HKcapture调试] 从URL提取的密码: {password}")
# 使用提供的用户名密码,或者从URL中提取的
login_username = self.username or username
login_password = self.password or password
if self.debug:
print(f"[HKcapture调试] 最终登录用户名: {login_username}")
print(f"[HKcapture调试] 最终登录密码: {login_password}")
if not login_username or not login_password:
if self.debug:
print(f"[HKcapture调试] 用户名或密码为空,登录失败")
return False
if self.debug:
print(f"[HKcapture调试] 准备登录设备: IP={ip_address}, Port={self.port}, User={login_username}, Pass={login_password}")
# 编码字节数据
ip_bytes = ip_address.encode('utf-8')
user_bytes = login_username.encode('utf-8')
pass_bytes = login_password.encode('utf-8')
if self.debug:
print(f"[HKcapture调试] IP字节: {ip_bytes}, 长度={len(ip_bytes)}")
print(f"[HKcapture调试] 用户名字节: {user_bytes}, 长度={len(user_bytes)}")
print(f"[HKcapture调试] 密码字节: {pass_bytes}, 长度={len(pass_bytes)}")
print(f"[HKcapture调试] 密码字符列表: {[c for c in login_password]}")
print(f"[HKcapture调试] 密码ASCII码: {[ord(c) for c in login_password]}")
struLoginInfo = NET_DVR_USER_LOGIN_INFO()
struLoginInfo.bUseAsynLogin = 0
struLoginInfo.sDeviceAddress = ip_address.encode('utf-8')
struLoginInfo.sDeviceAddress = ip_bytes
struLoginInfo.wPort = self.port
struLoginInfo.sUserName = login_username.encode('utf-8')
struLoginInfo.sPassword = login_password.encode('utf-8')
struLoginInfo.sUserName = user_bytes
struLoginInfo.sPassword = pass_bytes
struLoginInfo.byLoginMode = 0
struDeviceInfoV40 = NET_DVR_DEVICEINFO_V40()
......@@ -284,8 +418,41 @@ class HKcapture:
)
if self.iUserID < 0:
error_code = self.hikSDK.NET_DVR_GetLastError()
# 海康威视SDK错误码解释
error_messages = {
1: "用户名或密码错误",
2: "权限不足",
3: "SDK未初始化",
4: "通道号错误",
5: "连接设备失败",
6: "向设备发送失败",
7: "从设备接收数据失败",
8: "等待超时",
9: "参数错误",
10: "设备不支持",
11: "设备资源不足",
12: "设备离线",
17: "设备不存在",
28: "密码错误",
29: "用户名不存在",
47: "用户被锁定",
153: "设备不在线",
}
error_msg = error_messages.get(error_code, f"未知错误")
if self.debug:
print(f"[HKcapture调试] 登录失败! iUserID={self.iUserID}, 错误码={error_code}, 错误信息={error_msg}")
print(f"[HKcapture调试] ===== 登录参数详情 =====")
print(f"[HKcapture调试] sDeviceAddress: {struLoginInfo.sDeviceAddress}")
print(f"[HKcapture调试] wPort: {struLoginInfo.wPort}")
print(f"[HKcapture调试] sUserName: {struLoginInfo.sUserName}")
print(f"[HKcapture调试] sPassword: {struLoginInfo.sPassword}")
print(f"[HKcapture调试] byLoginMode: {struLoginInfo.byLoginMode}")
print(f"[HKcapture调试] ========================")
return False
else:
if self.debug:
print(f"[HKcapture调试] 登录成功! iUserID={self.iUserID}")
return True
def start_capture(self):
......
......@@ -79,11 +79,14 @@ class AmplifyWindowHandler:
self.min_zoom = 1.0
self.max_zoom = 30.0 # 支持更大倍数变焦
self.zoom_step = 0.1
self.zoom_center_x = 960 # 变焦中心X坐标
self.zoom_center_y = 540 # 变焦中心Y坐标
self.zoom_center_x = 0 # 变焦中心X坐标(将根据图片尺寸自适应设置)
self.zoom_center_y = 0 # 变焦中心Y坐标(将根据图片尺寸自适应设置)
self._zoom_center_initialized = False # 标记放大中心是否已初始化
# 当前帧缓存
self._current_frame = None
self._frame_width = 0 # 帧宽度
self._frame_height = 0 # 帧高度
# 连接信号
self._connectSignals()
......@@ -270,6 +273,10 @@ class AmplifyWindowHandler:
# 缓存当前帧
self._current_frame = frame.copy()
# 获取帧尺寸并自适应设置放大中心
height, width = frame.shape[:2]
self._updateZoomCenter(width, height)
# 物理变焦模式下,变焦由硬件完成,直接返回帧
return frame
......@@ -277,6 +284,49 @@ class AmplifyWindowHandler:
print(f"[物理变焦] 帧处理失败: {e}")
return frame
def _updateZoomCenter(self, width, height):
"""
根据图片尺寸自适应更新放大中心
Args:
width: 图片宽度
height: 图片高度
"""
# 检查尺寸是否变化
if width != self._frame_width or height != self._frame_height:
self._frame_width = width
self._frame_height = height
# 设置放大中心为图片中心
self.zoom_center_x = width // 2
self.zoom_center_y = height // 2
self._zoom_center_initialized = True
def setZoomCenter(self, x, y):
"""
手动设置放大中心点
Args:
x: 中心点X坐标
y: 中心点Y坐标
"""
# 确保坐标在有效范围内
if self._frame_width > 0 and self._frame_height > 0:
self.zoom_center_x = max(0, min(x, self._frame_width))
self.zoom_center_y = max(0, min(y, self._frame_height))
else:
self.zoom_center_x = x
self.zoom_center_y = y
self._zoom_center_initialized = True
def getZoomCenter(self):
"""
获取当前放大中心点
Returns:
tuple: (x, y) 放大中心坐标
"""
return (self.zoom_center_x, self.zoom_center_y)
def onMouseClicked(self, click_x, click_y):
"""处理鼠标点击事件 - 物理变焦中心点设置"""
try:
......
......@@ -774,7 +774,8 @@ class ChannelPanelHandler:
return
# 解析RTSP URL获取连接信息
username, password, ip = self._parseRTSP(rtsp_url)
username, password, ip, port = self._parseRTSP(rtsp_url)
# 创建捕获对象
if username and password and ip:
......@@ -783,7 +784,7 @@ class ChannelPanelHandler:
source=ip,
username=username,
password=password,
port=8000,
port=port,
channel=1,
fps=fps # 传入配置的帧率
)
......@@ -861,26 +862,78 @@ class ChannelPanelHandler:
# 检查是否为纯IP
ip_pattern = r'^(\d{1,3}\.){3}\d{1,3}$'
if re.match(ip_pattern, rtsp_url):
return None, None, rtsp_url
return None, None, rtsp_url, 8000
# 检查是否为USB设备(如 "0")
if rtsp_url.isdigit():
return None, None, None
# 解析完整RTSP URL
pattern = r'rtsp://(?:([^:]+):([^@]+)@)?([^:/]+)'
match = re.match(pattern, rtsp_url)
return None, None, None, 8000
if match:
username = match.group(1)
password = match.group(2)
ip = match.group(3)
return username, password, ip
# 解析完整RTSP URL(支持密码中包含@符号)
if rtsp_url.startswith('rtsp://'):
url_part = rtsp_url[7:] # 去掉 "rtsp://"
# 找到最后一个 @ 符号(IP地址前的那个)
last_at_index = url_part.rfind('@')
if last_at_index != -1:
# 有认证信息
credentials_part = url_part[:last_at_index]
host_part = url_part[last_at_index + 1:]
# 分离用户名和密码(第一个:分隔)
first_colon = credentials_part.find(':')
if first_colon != -1:
username = credentials_part[:first_colon]
password = credentials_part[first_colon + 1:]
# URL解码密码
from urllib.parse import unquote
password = unquote(password)
else:
username = credentials_part
password = None
else:
# 没有认证信息
username = None
password = None
host_part = url_part
# 提取IP和端口
colon_idx = host_part.find(':')
slash_idx = host_part.find('/')
if colon_idx != -1:
ip = host_part[:colon_idx]
# 提取端口
if slash_idx != -1:
port_str = host_part[colon_idx + 1:slash_idx]
else:
port_str = host_part[colon_idx + 1:]
try:
port = int(port_str)
except ValueError:
port = 8000
else:
if slash_idx != -1:
ip = host_part[:slash_idx]
else:
ip = host_part
port = 8000
return username, password, ip, port
return None, None, None
return None, None, None, 8000
except Exception as e:
return None, None, None
return None, None, None, 8000
def _updateChannelConnectedUI(self, channel_id):
"""更新通道连接UI状态(在主线程中执行)"""
......
......@@ -132,17 +132,17 @@ def main():
# 相机配置(最多支持4个)
cameras = [
{
'rtsp_url': 'rtsp://admin:cei345678@192.168.2.126:8000/stream1',
'rtsp_url': 'rtsp://admin:123456aA@@192.168.8.225:8000/stream1',
'save_path': r'D:\record\通道1',
'id': 1
},
{
'rtsp_url': 'rtsp://admin:cei345678@192.168.2.27:8000/stream1',
'rtsp_url': 'rtsp://admin:123456aA@@192.168.8.215:8000/stream1',
'save_path': r'D:\record\通道2',
'id': 2
},
{
'rtsp_url': 'rtsp://admin:cei345678@192.168.2.121:8000/stream1',
'rtsp_url': 'rtsp://admin:123456aA@@192.168.8.228:8000/stream1',
'save_path': r'D:\record\通道3',
'id': 3
},
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment