ArcSoft4.0_Python_人脸识别跟踪+最优抓拍 一、项目说明 二、环境准备 三、接口、类型映射 四、流程设计 五、主流程代码实现 六、运行测试 项目代码

网上关于使用Python调用虹软sdk的资料还不是很多,至于调用ArcSoft4.0 SDK几乎没有,本人也是东拼西凑、以及根据自己的学习、领悟,终于将自己当前想做的给硬干出来了。

首先来说下,本次要做的到底是个啥。我这边有一个RTSP的视频流,我这边希望使用虹软能将经过摄像头下的所有人的人脸|+背景图片给抓取并存储本地。同时能够实时的在本地窗口看到人脸抓拍识别的全过程。并且能够根据FaceID进行高质量去重,只留下质量最好的一组。这样可以大量降低所抓拍到的重复人脸图片。

准备开干~!

二、环境准备

  • 开发环境:PyCharm 2018.3.5 (Professional Edition)
  • Python版本:Python 3.7.3
  • 依赖的Python三方库:cv2(python版的opencv,关于这块的安装,网上非常多,就不啰嗦了,该库主要用于视频帧处理、图片处理、显示等)
  • ArcSoft SDK:V4.0 增强版 windows x64 c++ (目前还是比较新的)

从以上列表来,准备的东西特别简单,Python还是比较方便的,只要用熟练的话。

三、接口、类型映射

本节主要将虹软sdk4.0中的类型、接口进行映射成python版本,根据我们本次的任务需要,仅仅映射以下几个接口即可满足:

  1. ASFOnlineActivation【在线激活】
  2. ASFInitEngine【引擎初始化】
  1. ASFDetectFaces【人脸检测】
  2. ASFProcess【可选】
  1. ASFGetAge【可选】
  2. ASFGetGender【可选】
  1. ASFImageQualityDetect【质量检测】

然后根据以上接口,将所需要依赖的类型、常量等进行映射转换。

3.1 类型映射

先进行类型的映射,因为接口映射时需要依赖这些类型。

文件:asf_struct.py

from ctypes import *

# 人脸框
class MRECT(Structure):
   _fields_ = [
       (u'left', c_int32),
       (u'top', c_int32),
       (u'right', c_int32),
       (u'bottom', c_int32)
   ]

# 人脸信息
class ASFFaceDataInfo(Structure):
   _fields_ = [
       ('data', c_void_p),
       ('dataSize', c_int32)
   ]

# 单人脸信息  人脸框 人脸角度 人脸信息
class ASFSingleFaceInfo(Structure):
   _fields_ = [
       ('faceRect', MRECT),
       ('faceOrient', c_int32),
       ('faceDataInfo',ASFFaceDataInfo)
   ]

# 多人脸信息
class ASFMultiFaceInfo(Structure):
   _fields_ = [
       (u'faceRect', POINTER(MRECT)),
       (u'faceOrient', POINTER(c_int32)),
       (u'faceNum', c_int32),
       (u'faceID', POINTER(c_int32)),
       (u'wearGlasses',POINTER(c_float)),
       (u'leftEyeClosed', POINTER(c_int32)),
       (u'rightEyeClosed', POINTER(c_int32)),
       (u'faceShelter', POINTER(c_int32)),
       (u'faceDataInfoList',POINTER(ASFFaceDataInfo))
    ]

# 年龄(多人脸)
class ASFAgeInfo(Structure):
   _fields_ = [
       (u'ageArray', c_void_p),
       (u'num', c_int32)
   ]

# 性别(多人)
class ASFGenderInfo(Structure):
   _fields_ = [
       (u'genderArray', c_void_p),
       (u'num', c_int32)
   ]

哈哈很简单吧。如果你后期想用其他能力的话,自己仿照着添加类型即可。

3.2 常量定义

虹软SDK中一些后期用到的常量定义。

文件:asf_common.py

from ctypes import *
from enum import Enum

face_dll = CDLL("libarcsoft_face.dll")
face_engine_dll = CDLL("libarcsoft_face_engine.dll")

#====================常量类型定义====================
ASF_DETECT_MODE_VIDEO    = 0x00000000        #   视频流检测模式
ASF_DETECT_MODE_IMAGE    = 0xFFFFFFFF        #   图片检测模式

ASF_NONE				 = 0x00000000        #   无属性
ASF_FACE_DETECT			 = 0x00000001        #   此处detect可以是tracking或者detection两个引擎之一,具体的选择由detect mode 确定
ASF_FACERECOGNITION		 = 0x00000004        #   人脸特征
ASF_AGE					 = 0x00000008        #   年龄
ASF_GENDER				 = 0x00000010        #   性别
ASF_FACE3DANGLE			 = 0x00000020        #   3D角度
ASF_FACELANDMARK		 = 0x00000040        #   额头区域检测
ASF_LIVENESS			 = 0x00000080        #   RGB活体
ASF_IMAGEQUALITY		 = 0x00000200        #   图像质量检测
ASF_IR_LIVENESS			 = 0x00000400        #   IR活体
ASF_FACESHELTER			 = 0x00000800        #   人脸遮挡
ASF_MASKDETECT			 = 0x00001000        #   口罩检测
ASF_UPDATE_FACEDATA		 = 0x00002000        #   人脸信息

ASVL_PAF_RGB24_B8G8R8    = 0x201             #   图片格式

#检测时人脸角度的优先级--枚举类型
class ArcSoftFaceOrientPriority(Enum):
    ASF_OP_0_ONLY = 0x1,                     #   常规预览下正方向
    ASF_OP_90_ONLY = 0x2,                    #   基于0°逆时针旋转90°的方向
    ASF_OP_270_ONLY = 0x3,                   #   基于0°逆时针旋转270°的方向
    ASF_OP_180_ONLY = 0x4,                   #   基于0°旋转180°的方向(逆时针、顺时针效果一样)
    ASF_OP_0_HIGHER_EXT = 0x5,               #   全角度

#==================================================

用python调用过dll的你,不用我做过多的介绍了吧。

3.3 接口映射

以上将所有后期要用到的类型、常量进行定义后,接下来将需要用到的接口进行python映射。

文件:asf_func.py

from asf_struct import *
from ctypes import *
import asf_common

#====================Api接口映射定义====================
#主要定义函数的入参与返回类型

#在线激活Api
online_activate = asf_common.face_engine_dll.ASFOnlineActivation
online_activate.restype = c_int32
online_activate.argtypes = (c_char_p, c_char_p, c_char_p)

#引擎初始化Api
init_engine = asf_common.face_engine_dll.ASFInitEngine
init_engine.restype = c_int32
init_engine.argtypes = (c_long, c_int32, c_int32, c_int32, POINTER(c_void_p))

#人脸检测Api
detect_face = asf_common.face_engine_dll.ASFDetectFaces
detect_face.restype = c_int32
detect_face.argtypes = (c_void_p, c_int32, c_int32, c_int32, POINTER(c_ubyte), POINTER(ASFMultiFaceInfo))

#人脸扩展信息识别Api(例如年龄、性别、口罩等等)
process = asf_common.face_engine_dll.ASFProcess
process.restype = c_int32
process.argtypes = (c_void_p, c_int32, c_int32, c_int32, POINTER(c_ubyte), POINTER(ASFMultiFaceInfo), c_int32)

#获取年龄Api
get_age = asf_common.face_engine_dll.ASFGetAge
get_age.restype = c_int32
get_age.argtypes = (c_void_p, POINTER(ASFAgeInfo))

#获取性别Api
get_gender = asf_common.face_engine_dll.ASFGetGender
get_gender.restype = c_int32
get_gender.argtypes = (c_void_p, POINTER(ASFGenderInfo))

#人脸质量检测Api
image_quality_detect = asf_common.face_engine_dll.ASFImageQualityDetect
image_quality_detect.restype = c_int32
image_quality_detect.argtypes = (c_void_p, c_int32, c_int32, c_int32, POINTER(c_ubyte), POINTER(ASFSingleFaceInfo), c_int32,POINTER(c_float), c_int32)
#=======================================================

四、流程设计

根据我们的任务要求,设计三个处理线程,如下:

ArcSoft4.0_Python_人脸识别跟踪+最优抓拍
一、项目说明
二、环境准备
三、接口、类型映射
四、流程设计
五、主流程代码实现
六、运行测试
项目代码

视频抽帧线程职责:循环从rtsp流中获取一帧一帧图片放入队列中(为了保证本地磁盘IO的性能,仅仅将帧图片数据放到内存队列中)

人脸检测线程职责:循环获取一帧图片,对图片进行人脸检测、质量检测、画框标记展示,并将高质量(>0.5)得分的图片帧相关信息写入人脸列表容器

去重排序线程职责:循环遍历人脸列表容器中信息,将超过2秒没有更新时间的FaceID对应的列表进行质量得分排序,取最高的,进行大图、小图(人脸图片)落地,小图是根据人脸框坐标信息进行扣取得来。

配合下每个线程对应的流程图来辅助大家了解下:

ArcSoft4.0_Python_人脸识别跟踪+最优抓拍
一、项目说明
二、环境准备
三、接口、类型映射
四、流程设计
五、主流程代码实现
六、运行测试
项目代码

五、主流程代码实现

根据以上的设计,基本上已经确定了我们的代码思路,直接开干。(不怕你看不懂,因为注释真的很详细)

文件:asf_main.py

from queue import Queue
import threading
import asf_func
import asf_struct
from ctypes import *
import asf_common
import cv2
import time
import uuid
import os

#线程锁,主要用来处理多线程之间同步
lock = threading.Lock()
#本地视频帧队列中size大小,表示本地最多缓冲多少数量的视频帧
frames_q_size = 50
#本地视频帧队列
frames_q = Queue(maxsize=frames_q_size)
#远程抓拍rtsp流地址
rtsp_url="rtsp://admin:123123@10.1.13.22:554/h264/ch1/sub/av_stream"
#待处理的高质量抓拍人脸帧图片,key为FaceId
zp_list = {}
#不同FaceId当前最新的抓取时间,主要用来计算是否开始获取最优抓拍用
zp_time_list = {}
#大于多少阈值的人脸算作合格的人脸
storage_face_threshold = 0.5
#app_id
app_id = b"xxxxxxxxxxxxxx"
#sdk_key
sdk_key = b"xxxxxxxxxxxxxx"
#激活码
active_key = b"xxxx-xxxx-xxxx-xxxx"

#注意,以上几个数据使用字节数据

#获取到的抓拍图片,存储的目录
if not os.path.exists('IMAGE'):
    os.makedirs('IMAGE')

#调用在线激活Api进行在线激活
ret = asf_func.online_activate(app_id, sdk_key, active_key)
if ret == 0 or ret == 90114:
    print("激活成功:",ret)
else:
    print("激活失败:", ret)

#Video模式的引擎功能
video_mask = asf_common.ASF_FACE_DETECT | asf_common.ASF_IMAGEQUALITY

#Video引擎空指针,接收初始化后生成的引擎对象
video_engine = c_void_p()

#Video引擎初始化
video_ret = asf_func.init_engine(asf_common.ASF_DETECT_MODE_VIDEO, asf_common.ArcSoftFaceOrientPriority.ASF_OP_0_ONLY.value[0], 3, video_mask, byref(video_engine))

if video_ret == 0:
    print("视频模式引擎 初始化成功")
else:
    print("视频模式引擎 初始化失败:", video_ret)
    exit()

#生成不带“-”的uuid字符串
def get_uuid():
    return str(uuid.uuid1()).replace("-","")

#获取指定FaceId最优中最优的抓拍结果,并存储至本地
def storage_best_zq(face_id):
    global zp_list
    list = zp_list[face_id]
    if len(list) == 0:
        #虽然有faceid,但是不一定都会写到该列表里,为了保证内存消耗最小,只会将达到质量的图片数据写入到列表中,这里进行空列表判断
        return
    #根据质量得分进行排序(升序),0得分最小,-1得分最大
    new_list = sorted(list, key=lambda e: e[0])

    #file_uid = get_uuid()
    file_uid = str(int(time.time()))

    #生成大小图的文件名,格式:faceid编码--------uuid---------big/small.jpg  ,很容易理解
    big_file_path = "IMAGE/%s_"%(new_list[-1][6])+str(file_uid)+"_big.jpg"
    small_file_path = "IMAGE/%s_"%(new_list[-1][6]) + str(file_uid) + "_small.jpg"

    #将列表中最后一个元素中的第二个元素(图片数据)通过cv2写入到本地文件中
    cv2.imwrite(big_file_path, new_list[-1][1])

    #当前大图中,其中人脸位置信息
    left = new_list[-1][2]
    top = new_list[-1][3]
    right = new_list[-1][4]
    bottom = new_list[-1][5]

    #根据位置信息,在大图数据里面进行切割
    small_img = new_list[-1][1][top:bottom,left:right]

    #将切割后的人脸图片数据写入到本地小图文件中
    cv2.imwrite(small_file_path, small_img)
    print("存储本地抓拍图片---OK")

#实时分析并获取同一FaceId最优抓拍,并存储本地
def gen_zq():
    global zp_list
    global zp_time_list
    while(True):
        ct = time.time()
        face_id_list = list(zp_time_list.keys())
        for k in face_id_list:
            #判断字典里指定faceid的最新更新时间距离当前时间是否超过2秒钟(大家根据需要自行设置),如果超过2秒钟,认为可以进行收割了,嘿嘿~
            if ct - zp_time_list[k]>2:
                storage_best_zq(k)
                del zp_list[k]
                del zp_time_list[k]
                #处理完最优抓拍落地之后,将字段里对应faceid的数据清理掉
        time.sleep(2)
        #等待2秒钟继续分析并收割,避免频繁分析导致CPU过高,大家根据需要自行设置

#获取视频帧处理线程
#q为将抓取的视频帧数据塞入到指定的队列中
def grab_frame(rtsp_url, q):
    #使用cv2的VideoCapture打开rtsp视频流
    cap = cv2.VideoCapture(rtsp_url)
    num = 0

    while (True):
        #print(cap.isOpened())
        cap_res = cap.read()
        num += 1
        #todo num不断累加需要考虑溢出的异常
        #每隔一帧存储一次
        if num % 2 == 0:
            #为了防止视频帧队列满了,丢弃最老的一帧数据,插入新帧,这样始终保持视频帧队列中保持连续的最新的视频流数据
            #也为了防止引擎处理过慢导致队列积压所做的优化处理
            if q.full():
                print("frames queue is full, will remove one.")
                q.get()
            #以上清理完一帧再存入,这样才能保证有限的队列不存在插入不进去导致OpenCV本地队列慢导致的 各种异常,以及延迟
            q.put(cap_res)
            print(cap.isOpened())

#视频帧队列处理线程(从队列中获取每一帧,转换成图片,人脸检测+质量得分计算,筛选出高质量得分图片放入到zp_list队列中)
def detect_face(q):
    global zp_list,zp_time_list,storage_face_threshold
    while(True):
        q_value = q.get()
        #从视频帧队列中获取一帧数据
        old_img = q_value[1]
        #视频帧队列中每个元素为一个元祖,第二个元素为帧图片数据

        #sp = old_img.shape
        #img = cv2.resize(old_img, (sp[1] // 4 * 4, sp[0]))  # 四字节对齐
        #以上注释部分是为了在某些处理模式下需要对视频帧图片进行字节对齐操作,我这边的图片数据已经对齐过了,因此这里不执行也行

        img = old_img
        image_bytes = bytes(img)
        image_ubytes = cast(image_bytes, POINTER(c_ubyte))
        #以上将图片数据转换为虹软引擎能接受的类型,也就是字节数组指针(不懂的自己去补充下ctypes的相关知识)

        detect_faces = asf_struct.ASFMultiFaceInfo()
        #根据接口要求,人脸检测接口需要传入一个接受结果的结构体对象,因此这里实例化一个C结构体的对象

        ret = asf_func.detect_face(
            video_engine,
            img.shape[1],
            img.shape[0],
            asf_common.ASVL_PAF_RGB24_B8G8R8,
            image_ubytes,
            byref(detect_faces)
        )
        #按照人脸检测接口要求的格式类型,传入数据,detect_faces中会存储执行之后人脸识别结果信息

        t3 = time.time()
        if ret !=0:
            print("检测人脸失败:%s" % (ret))
            continue
            #这里返回不等于0不一定就说明有问题,不用管,通常来看基本上均为81927,属于正常现象
        if detect_faces.faceNum > 0:

            #如果检测到有多个人脸,循环对每一个人脸进行质量检测处理
            for i in range(detect_faces.faceNum):

                #质量检测接口需要传入一个单人脸信息的结构体数据,因此这里实例化一个
                single_face_info = asf_struct.ASFSingleFaceInfo()

                #将detect_faces多人脸结果中按照索引,将里面单个人脸新,拿出来,赋值给single_face_info单人脸结构体数据
                single_face_info.faceRect.left = detect_faces.faceRect[i].left
                single_face_info.faceRect.top = detect_faces.faceRect[i].top
                single_face_info.faceRect.right = detect_faces.faceRect[i].right
                single_face_info.faceRect.bottom = detect_faces.faceRect[i].bottom
                single_face_info.faceOrient = detect_faces.faceOrient[i]
                single_face_info.faceDataInfo = detect_faces.faceDataInfoList[i]

                #用来接收质量得分的变量,以下会使用引用传递的方式传给接口。
                confidenceLevel = c_float()

                #调用质量检测接口,获取单个人脸图片的质量得分
                ret = asf_func.image_quality_detect(
                    video_engine,
                    img.shape[1],
                    img.shape[0],
                    asf_common.ASVL_PAF_RGB24_B8G8R8,
                    image_ubytes,
                    byref(single_face_info),
                    0,
                    byref(confidenceLevel),
                    1
                )

                #准备指定FaceId的列表,如果不存在就初始化
                if detect_faces.faceID[i] not in zp_list:
                    zp_list[detect_faces.faceID[i]] = []

                #如果检测出来的得分大于设置的合格分数,就将其存储到指定FaceId的列表中,其余的全部过滤掉,因为低质量的分数,放进来也没用
                if confidenceLevel.value > storage_face_threshold:
                    print("发现高质量得分人脸:%s" % (confidenceLevel.value))
                    zp_list[detect_faces.faceID[i]].append((confidenceLevel.value, old_img,
                                                            detect_faces.faceRect[i].left,
                                                            detect_faces.faceRect[i].top,
                                                            detect_faces.faceRect[i].right,
                                                            detect_faces.faceRect[i].bottom,
                                                            detect_faces.faceID[i]))
                zp_time_list[detect_faces.faceID[i]] = time.time()
                #更新下指定faceId对应的最新时间戳,主要用来给gen_zq处理线程判断哪个faceid已经没有连续记录了,是否可以收割了。

                cv2.rectangle(img, (detect_faces.faceRect[i].left, detect_faces.faceRect[i].top), (detect_faces.faceRect[i].right, detect_faces.faceRect[i].bottom), (0, 0, 255), 1)
                cv2.putText(img, str("qa:%.2f"%(confidenceLevel.value)), (detect_faces.faceRect[i].left, detect_faces.faceRect[i].top-5), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 255), 1)
                cv2.putText(img, "id:"+str(detect_faces.faceID[i]),(detect_faces.faceRect[i].left, detect_faces.faceRect[i].top - 20), cv2.FONT_HERSHEY_SIMPLEX, 0.5,(0, 0, 255), 1)
                #以上是根据人脸检测的结果以及置信度得分,进行画框,以及文字标注,qa为质量得分   id为faceid
        cv2.imshow("RealTimeDisplay", img)
        cv2.waitKey(1)

thread_grab_frame = threading.Thread(target=grab_frame, args=(rtsp_url, frames_q))
thread_grab_frame.start()
thread_detect_face = threading.Thread(target=detect_face, args=(frames_q,))
thread_detect_face.start()
thread_gen_zq = threading.Thread(target=gen_zq, args=())
thread_gen_zq.start()
thread_detect_face.join()

循环体内的代码稍微有点长,时间关系,没有单独进行封装,逻辑不是很复杂。

稍微有些瑕疵,等有空慢慢调整下。有个小小的缺陷。

thread_grab_frame = threading.Thread(target=grab_frame, args=(rtsp_url, frames_q))

thread_detect_face = threading.Thread(target=detect_face, args=(frames_q,))

thread_gen_zq = threading.Thread(target=gen_zq, args=())

大家可以根据以上三个线程入口函数,分别进行了解。

为了保证视频帧图片队列不会无限增加下去,这里设计了有限容量的队列,并在将图片帧插入队列时,若队列已满,将最旧的一帧抛弃,插入最新帧。

附上代码文件的整体结构:

ArcSoft4.0_Python_人脸识别跟踪+最优抓拍
一、项目说明
二、环境准备
三、接口、类型映射
四、流程设计
五、主流程代码实现
六、运行测试
项目代码

六、运行测试

中间遇到的一些小问题、小坑均已经解决和抚平。

实时展示效果:

ArcSoft4.0_Python_人脸识别跟踪+最优抓拍
一、项目说明
二、环境准备
三、接口、类型映射
四、流程设计
五、主流程代码实现
六、运行测试
项目代码ArcSoft4.0_Python_人脸识别跟踪+最优抓拍
一、项目说明
二、环境准备
三、接口、类型映射
四、流程设计
五、主流程代码实现
六、运行测试
项目代码

运行日志截图:

ArcSoft4.0_Python_人脸识别跟踪+最优抓拍
一、项目说明
二、环境准备
三、接口、类型映射
四、流程设计
五、主流程代码实现
六、运行测试
项目代码

以上基本上都是同一个FaceId不同帧图片的质量得分,如果不去重的话,落地后会有大量的重复。

运行后的存储效果:

ArcSoft4.0_Python_人脸识别跟踪+最优抓拍
一、项目说明
二、环境准备
三、接口、类型映射
四、流程设计
五、主流程代码实现
六、运行测试
项目代码

抓到了大量的人脸信息,为了保密,只能马赛克了。

运行性能:

本次设计基本上均在内存中处理,只有最后落地,用到了磁盘,看下运行时的CPU、内存占用情况。

ArcSoft4.0_Python_人脸识别跟踪+最优抓拍
一、项目说明
二、环境准备
三、接口、类型映射
四、流程设计
五、主流程代码实现
六、运行测试
项目代码

性能占用还是比较满意的。

我本机配置为i5 八代,视频流使用定码率16384Kbps,25帧每秒,1920*1080分辨率,H.264视频编码,如果降低清晰度分辨率的话,占用CPU会更低。

最后想说一句虹软牛X~!

项目代码

参考:https://gitee.com/codetracer/acrsoft4.0_-python.git

其中dll没有上传,请将dll的sdk放到项目根目录下即可。

了解更多人脸识别产品相关内容请到虹软视觉开放平台