226 lines
8.4 KiB
Python
226 lines
8.4 KiB
Python
#!/usr/bin/env python
|
||
# -*- coding: utf-8 -*-
|
||
|
||
# 本地保存禁用名单
|
||
|
||
import rospy
|
||
import rospkg
|
||
import cv2
|
||
import numpy as np
|
||
import os
|
||
import time
|
||
import threading
|
||
from cv_bridge import CvBridge, CvBridgeError
|
||
from sensor_msgs.msg import Image
|
||
from harmony_box.srv import FaceRecognition, FaceRecognitionResponse
|
||
|
||
class FaceRecognitionNode:
|
||
def __init__(self):
|
||
# 初始化ROS节点
|
||
rospy.init_node('face_recognition_node')
|
||
|
||
# 获取ROS包路径
|
||
self.pkg_path = rospkg.RosPack().get_path('harmony_box')
|
||
print("ROS包路径:", self.pkg_path)
|
||
|
||
# 加载预训练的人脸检测模型
|
||
self.face_cascade = cv2.CascadeClassifier(self.pkg_path + '/config/haarcascade_frontalface_default.xml')
|
||
|
||
# 加载预训练的人脸识别模型(OpenFace)
|
||
self.face_recognizer = cv2.dnn.readNetFromTorch(self.pkg_path + '/config/openface.nn4.small2.v1.t7')
|
||
|
||
# 启用CUDA加速(如果可用)
|
||
# self.face_recognizer.setPreferableBackend(cv2.dnn.DNN_BACKEND_CUDA)
|
||
# self.face_recognizer.setPreferableTarget(cv2.dnn.DNN_TARGET_CUDA)
|
||
|
||
# 加载已知特征和名字
|
||
self.known_features, self.known_names = self.load_features()
|
||
|
||
# 创建CvBridge对象
|
||
self.bridge = CvBridge()
|
||
|
||
# 订阅/usb_cam/image_raw话题
|
||
self.image_sub = rospy.Subscriber('/usb_cam/image_raw', Image, self.image_callback)
|
||
self.image_pub = rospy.Publisher('/face_recognition_image', Image, queue_size=10)
|
||
|
||
# 创建服务
|
||
self.save_service = rospy.Service('/save_face_feature', FaceRecognition, self.handle_save_face_feature)
|
||
self.toggle_person_service = rospy.Service('/face_management', FaceRecognition, self.handle_toggle_person_recognition)
|
||
|
||
# 初始化帧率计算
|
||
self.frame_count = 0
|
||
self.start_time = time.time()
|
||
|
||
# 用于保存当前帧中的人脸特征
|
||
self.current_face_features = None
|
||
|
||
# 禁用的人脸名单(通过名字标识)
|
||
self.disabled_persons = set()
|
||
|
||
# 加载本地保存的禁用名单
|
||
self.load_disabled_persons()
|
||
|
||
# 每2帧处理一次
|
||
self.process_every_n_frames = 2
|
||
self.frame_counter = 0
|
||
|
||
def detect_and_extract_features(self, frame):
|
||
"""检测人脸并提取特征"""
|
||
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
|
||
|
||
# 检测人脸
|
||
faces = self.face_cascade.detectMultiScale(gray, scaleFactor=1.2, minNeighbors=5, minSize=(50, 50))
|
||
|
||
# 提取人脸特征
|
||
face_features = []
|
||
for (x, y, w, h) in faces:
|
||
face = frame[y:y+h, x:x+w]
|
||
face_blob = cv2.dnn.blobFromImage(face, 1.0 / 255, (96, 96), (0, 0, 0), swapRB=True, crop=False)
|
||
self.face_recognizer.setInput(face_blob)
|
||
features = self.face_recognizer.forward()
|
||
face_features.append(features)
|
||
|
||
return face_features, faces
|
||
|
||
def compare_features(self, feature1, feature2, threshold=0.6):
|
||
"""比较两个特征向量"""
|
||
distance = np.linalg.norm(feature1 - feature2)
|
||
return distance < threshold
|
||
|
||
def recognize_face(self, frame):
|
||
"""识别人脸"""
|
||
face_features, faces = self.detect_and_extract_features(frame)
|
||
recognized_names = []
|
||
|
||
for feature in face_features:
|
||
matched = False
|
||
for known_feature, name in zip(self.known_features, self.known_names):
|
||
if name in self.disabled_persons:
|
||
continue # 跳过被禁用的人
|
||
if self.compare_features(feature, known_feature):
|
||
recognized_names.append(name)
|
||
matched = True
|
||
break
|
||
if not matched:
|
||
recognized_names.append("Unknown")
|
||
|
||
return recognized_names, faces
|
||
|
||
def save_features(self, features, names, file_path='known_features.npy'):
|
||
"""保存特征到文件"""
|
||
data = {'features': features, 'names': names}
|
||
np.save(file_path, data, allow_pickle=True)
|
||
rospy.loginfo(f"特征已保存到 {file_path}")
|
||
|
||
def load_features(self, file_path='known_features.npy'):
|
||
"""从文件加载特征"""
|
||
if os.path.exists(file_path):
|
||
data = np.load(file_path, allow_pickle=True).item()
|
||
return data['features'], data['names']
|
||
else:
|
||
return [], []
|
||
|
||
def save_disabled_persons(self, file_path='disabled_persons.txt'):
|
||
"""保存禁用的人脸名单到文件"""
|
||
with open(file_path, 'w') as f:
|
||
for name in self.disabled_persons:
|
||
f.write(name + '\n')
|
||
rospy.loginfo(f"禁用名单已保存到 {file_path}")
|
||
|
||
def load_disabled_persons(self, file_path='disabled_persons.txt'):
|
||
"""从文件加载禁用的人脸名单"""
|
||
if os.path.exists(file_path):
|
||
with open(file_path, 'r') as f:
|
||
self.disabled_persons = set(line.strip() for line in f.readlines())
|
||
rospy.loginfo(f"从 {file_path} 加载禁用名单: {self.disabled_persons}")
|
||
else:
|
||
rospy.loginfo(f"未找到禁用名单文件 {file_path},使用空名单")
|
||
|
||
def handle_save_face_feature(self, req):
|
||
"""服务回调函数:保存人脸特征"""
|
||
if self.current_face_features is not None:
|
||
name = req.name
|
||
self.known_features.append(self.current_face_features)
|
||
self.known_names.append(name)
|
||
self.save_features(self.known_features, self.known_names)
|
||
rospy.loginfo(f"已保存 {name} 的特征")
|
||
return FaceRecognitionResponse(True)
|
||
else:
|
||
rospy.logwarn("未检测到人脸特征,无法保存")
|
||
return FaceRecognitionResponse(False)
|
||
|
||
def handle_toggle_person_recognition(self, req):
|
||
"""服务回调函数:启用或禁用某个人的识别"""
|
||
name = req.name
|
||
enable = req.enable
|
||
|
||
if enable:
|
||
if name in self.disabled_persons:
|
||
self.disabled_persons.remove(name)
|
||
rospy.loginfo(f"{name} 的识别已启用")
|
||
else:
|
||
rospy.loginfo(f"{name} 的识别已经是启用状态")
|
||
else:
|
||
if name not in self.disabled_persons:
|
||
self.disabled_persons.add(name)
|
||
rospy.loginfo(f"{name} 的识别已禁用")
|
||
else:
|
||
rospy.loginfo(f"{name} 的识别已经是禁用状态")
|
||
|
||
# 保存禁用名单到本地文件
|
||
self.save_disabled_persons()
|
||
|
||
return FaceRecognitionResponse(True)
|
||
|
||
def image_callback(self, msg):
|
||
"""图像回调函数"""
|
||
self.frame_counter += 1
|
||
if self.frame_counter % self.process_every_n_frames != 0:
|
||
return
|
||
|
||
try:
|
||
frame = self.bridge.imgmsg_to_cv2(msg, "bgr8")
|
||
except Exception as e:
|
||
rospy.logerr(f"转换图像失败: {e}")
|
||
return
|
||
|
||
# 使用多线程处理人脸检测和识别
|
||
threading.Thread(target=self.process_frame, args=(frame,)).start()
|
||
|
||
def process_frame(self, frame):
|
||
"""处理帧图像"""
|
||
recognized_names, faces = self.recognize_face(frame)
|
||
|
||
# 保存当前帧中的人脸特征(仅保存第一张人脸)
|
||
if len(faces) > 0:
|
||
self.current_face_features, _ = self.detect_and_extract_features(frame)
|
||
if self.current_face_features:
|
||
self.current_face_features = self.current_face_features[0] # 只保存第一张人脸的特征
|
||
else:
|
||
self.current_face_features = None
|
||
|
||
# 在图像上标注结果
|
||
for (x, y, w, h), name in zip(faces, recognized_names):
|
||
color = (255, 0, 0) if name != "Unknown" else (0, 0, 255) # 已知人脸蓝色,未知人脸红色
|
||
cv2.rectangle(frame, (x, y), (x+w, y+h), color, 2)
|
||
cv2.putText(frame, name, (x, y-10), cv2.FONT_HERSHEY_SIMPLEX, 0.9, color, 2)
|
||
|
||
# 计算帧率
|
||
self.frame_count += 1
|
||
elapsed_time = time.time() - self.start_time
|
||
fps = self.frame_count / elapsed_time
|
||
|
||
# 在图像上显示帧率
|
||
cv2.putText(frame, f"FPS: {fps:.2f}", (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2)
|
||
|
||
# 发布处理后的图像
|
||
image_msg = self.bridge.cv2_to_imgmsg(frame, encoding='bgr8')
|
||
self.image_pub.publish(image_msg)
|
||
|
||
|
||
if __name__ == "__main__":
|
||
try:
|
||
node = FaceRecognitionNode()
|
||
rospy.spin()
|
||
except rospy.ROSInterruptException:
|
||
pass |