sample_code/face-recognition-with-opencv/renlian_ros_优化后 copy.py
2025-03-14 12:04:48 +08:00

226 lines
8.4 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# 本地保存禁用名单
import rospy
import rospkg
import cv2
import numpy as np
import os
import time
import threading
from cv_bridge import CvBridge, CvBridgeError
from sensor_msgs.msg import Image
from harmony_box.srv import FaceRecognition, FaceRecognitionResponse
class FaceRecognitionNode:
def __init__(self):
# 初始化ROS节点
rospy.init_node('face_recognition_node')
# 获取ROS包路径
self.pkg_path = rospkg.RosPack().get_path('harmony_box')
print("ROS包路径:", self.pkg_path)
# 加载预训练的人脸检测模型
self.face_cascade = cv2.CascadeClassifier(self.pkg_path + '/config/haarcascade_frontalface_default.xml')
# 加载预训练的人脸识别模型OpenFace
self.face_recognizer = cv2.dnn.readNetFromTorch(self.pkg_path + '/config/openface.nn4.small2.v1.t7')
# 启用CUDA加速如果可用
# self.face_recognizer.setPreferableBackend(cv2.dnn.DNN_BACKEND_CUDA)
# self.face_recognizer.setPreferableTarget(cv2.dnn.DNN_TARGET_CUDA)
# 加载已知特征和名字
self.known_features, self.known_names = self.load_features()
# 创建CvBridge对象
self.bridge = CvBridge()
# 订阅/usb_cam/image_raw话题
self.image_sub = rospy.Subscriber('/usb_cam/image_raw', Image, self.image_callback)
self.image_pub = rospy.Publisher('/face_recognition_image', Image, queue_size=10)
# 创建服务
self.save_service = rospy.Service('/save_face_feature', FaceRecognition, self.handle_save_face_feature)
self.toggle_person_service = rospy.Service('/face_management', FaceRecognition, self.handle_toggle_person_recognition)
# 初始化帧率计算
self.frame_count = 0
self.start_time = time.time()
# 用于保存当前帧中的人脸特征
self.current_face_features = None
# 禁用的人脸名单(通过名字标识)
self.disabled_persons = set()
# 加载本地保存的禁用名单
self.load_disabled_persons()
# 每2帧处理一次
self.process_every_n_frames = 2
self.frame_counter = 0
def detect_and_extract_features(self, frame):
"""检测人脸并提取特征"""
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
# 检测人脸
faces = self.face_cascade.detectMultiScale(gray, scaleFactor=1.2, minNeighbors=5, minSize=(50, 50))
# 提取人脸特征
face_features = []
for (x, y, w, h) in faces:
face = frame[y:y+h, x:x+w]
face_blob = cv2.dnn.blobFromImage(face, 1.0 / 255, (96, 96), (0, 0, 0), swapRB=True, crop=False)
self.face_recognizer.setInput(face_blob)
features = self.face_recognizer.forward()
face_features.append(features)
return face_features, faces
def compare_features(self, feature1, feature2, threshold=0.6):
"""比较两个特征向量"""
distance = np.linalg.norm(feature1 - feature2)
return distance < threshold
def recognize_face(self, frame):
"""识别人脸"""
face_features, faces = self.detect_and_extract_features(frame)
recognized_names = []
for feature in face_features:
matched = False
for known_feature, name in zip(self.known_features, self.known_names):
if name in self.disabled_persons:
continue # 跳过被禁用的人
if self.compare_features(feature, known_feature):
recognized_names.append(name)
matched = True
break
if not matched:
recognized_names.append("Unknown")
return recognized_names, faces
def save_features(self, features, names, file_path='known_features.npy'):
"""保存特征到文件"""
data = {'features': features, 'names': names}
np.save(file_path, data, allow_pickle=True)
rospy.loginfo(f"特征已保存到 {file_path}")
def load_features(self, file_path='known_features.npy'):
"""从文件加载特征"""
if os.path.exists(file_path):
data = np.load(file_path, allow_pickle=True).item()
return data['features'], data['names']
else:
return [], []
def save_disabled_persons(self, file_path='disabled_persons.txt'):
"""保存禁用的人脸名单到文件"""
with open(file_path, 'w') as f:
for name in self.disabled_persons:
f.write(name + '\n')
rospy.loginfo(f"禁用名单已保存到 {file_path}")
def load_disabled_persons(self, file_path='disabled_persons.txt'):
"""从文件加载禁用的人脸名单"""
if os.path.exists(file_path):
with open(file_path, 'r') as f:
self.disabled_persons = set(line.strip() for line in f.readlines())
rospy.loginfo(f"{file_path} 加载禁用名单: {self.disabled_persons}")
else:
rospy.loginfo(f"未找到禁用名单文件 {file_path},使用空名单")
def handle_save_face_feature(self, req):
"""服务回调函数:保存人脸特征"""
if self.current_face_features is not None:
name = req.name
self.known_features.append(self.current_face_features)
self.known_names.append(name)
self.save_features(self.known_features, self.known_names)
rospy.loginfo(f"已保存 {name} 的特征")
return FaceRecognitionResponse(True)
else:
rospy.logwarn("未检测到人脸特征,无法保存")
return FaceRecognitionResponse(False)
def handle_toggle_person_recognition(self, req):
"""服务回调函数:启用或禁用某个人的识别"""
name = req.name
enable = req.enable
if enable:
if name in self.disabled_persons:
self.disabled_persons.remove(name)
rospy.loginfo(f"{name} 的识别已启用")
else:
rospy.loginfo(f"{name} 的识别已经是启用状态")
else:
if name not in self.disabled_persons:
self.disabled_persons.add(name)
rospy.loginfo(f"{name} 的识别已禁用")
else:
rospy.loginfo(f"{name} 的识别已经是禁用状态")
# 保存禁用名单到本地文件
self.save_disabled_persons()
return FaceRecognitionResponse(True)
def image_callback(self, msg):
"""图像回调函数"""
self.frame_counter += 1
if self.frame_counter % self.process_every_n_frames != 0:
return
try:
frame = self.bridge.imgmsg_to_cv2(msg, "bgr8")
except Exception as e:
rospy.logerr(f"转换图像失败: {e}")
return
# 使用多线程处理人脸检测和识别
threading.Thread(target=self.process_frame, args=(frame,)).start()
def process_frame(self, frame):
"""处理帧图像"""
recognized_names, faces = self.recognize_face(frame)
# 保存当前帧中的人脸特征(仅保存第一张人脸)
if len(faces) > 0:
self.current_face_features, _ = self.detect_and_extract_features(frame)
if self.current_face_features:
self.current_face_features = self.current_face_features[0] # 只保存第一张人脸的特征
else:
self.current_face_features = None
# 在图像上标注结果
for (x, y, w, h), name in zip(faces, recognized_names):
color = (255, 0, 0) if name != "Unknown" else (0, 0, 255) # 已知人脸蓝色,未知人脸红色
cv2.rectangle(frame, (x, y), (x+w, y+h), color, 2)
cv2.putText(frame, name, (x, y-10), cv2.FONT_HERSHEY_SIMPLEX, 0.9, color, 2)
# 计算帧率
self.frame_count += 1
elapsed_time = time.time() - self.start_time
fps = self.frame_count / elapsed_time
# 在图像上显示帧率
cv2.putText(frame, f"FPS: {fps:.2f}", (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2)
# 发布处理后的图像
image_msg = self.bridge.cv2_to_imgmsg(frame, encoding='bgr8')
self.image_pub.publish(image_msg)
if __name__ == "__main__":
try:
node = FaceRecognitionNode()
rospy.spin()
except rospy.ROSInterruptException:
pass