感受数据结构的魅力:记一次查找性能优化

前几周做过一次性能优化,如果对应用场景所碰到的问题进行抽象,实际上就是一次三维数组的查找性能优化。在反复尝试了好几种方案之后,总算找到了一种当前为止最高效的方案。

问题背景

一个函数需要对接收消息载荷当中的元素进行分组,之后将这些元素以组为单位进行发送。一个形象的比方:某快递公司为了提高货物快递效率,在全国各个城镇均开通了各中转站的直通线路。当某一车包裹到达了A地中转站,A地的中转站将立即对这批包裹进行分拣,之后可根据货物目的城镇分车进行转运。而这个问题的背景就如同“快递中转站对包裹的分拣、转运流程”,分开来看,这个流程大致包括如下三个部分:
  • 存储新到的包裹
  • 对新到的包裹进行分组
  • 发送这些分组

这是一个较为简单的处理过程,可以编写如下的伪码:

// 根据目的省份、城市、城镇创建的以group为单位的存储数据结构,这些
// group实际上对应各个直达的中转站
PacketGroupType g_PacketGroups[MAX_PROVINCE_NUM][MAX_CITY_NUM][MAX_TOWN_NUM];

// 对到来的包裹进行扫描分装到不同的group当中
void PacketsGrouping(void *msg)
{
    PacketsCarrierMsgType *pMsg = (PacketsCarrierMsgType *) GetPayloadFromMsy(msg);
    packetsNum = pMsg->number;

    for (int i = 0; i <packetsNum; i ++ )
    {
        PacketsType packet = pMsg->packets[i];

        int provinceNo = GetProvinceNo(&packet);
        int cityNo = GetCityNo(&packet);
        int townNo = GetTownNo(&packet);

        g_PacketGroups[provinceNo][cityNo][townNo].elemsNum++;
        AddPacketIntoGroup(packet, &g_PacketGroups[provinceNo][cityNo][townNo]);
    }
    ...
}

// 遍历group的存储结构,找到包裹数不为0的分组,发送
void PacketsSending(void *msg)
{
    ...

    for (int i = 0; i < MAX_PROVINCE_NUM; i ++)
    {
        for (int j = 0; j < MAX_CITY_NUM; j ++)        
        {
            for (int k = 0; k < MAX_TOWN_NUM; k ++) 
            {
                if (g_PacketGroups[i][j][k].elemsNum != 0)
                {
                    SendPacketGroup(&g_PacketGroups[i][j][k]);
                }
            }
        }
    }
    ...
}

使用标记位

上面的这种实现方案,不管当前到达的包裹数量有多少、最终被分为多少group,都是需要一个一个去遍历g_PacketGroups当中的每一个group,所以做了很多无意义的操作。可以开始第一轮简单的优化:使用标记位来提高发送时候的查找效率。详细点儿说,就是为包裹数非0的group设立标记位,并在标记位当中编码其他额外的信息,从而避免一些不必须的查找。如下:
// 设置新的标记位数组
unsigned char g_ElementInGroupFlag[MAX_PROVINCE_NUM][MAX_CITY_NUM][MAX_TOWN_NUM];

// 在第二维度的元素当中编码“剩余其他的一个维度有无需要发送的group”
// g_ElementInGroupFlag[0][0][0] 当中的高4位值为0时表示g_ElementInGroupFlag[0][0]对应的最后一维不存在元素,不需要再做遍历MAX_TOWN_NUM次;
// g_ElementInGroupFlag[0][1][0] 当中的高4位值为0时表示g_ElementInGroupFlag[0][1]对应的最后一维不存在元素,不需要再做遍历MAX_TOWN_NUM次;
...

// 在第一维度的元素当中编码“剩余其他两个维度有无需要发送的group”
// g_ElementInGroupFlag[1][0][0] 当中的高4位值为0时表示g_ElementInGroupFlag[0]对应的其他两维不存在元素,不需要再做遍历MAX_TOWN_NUM*MAX_CITY_NUM次;
// g_ElementInGroupFlag[2][0][0] 当中的高4位值为0时表示g_ElementInGroupFlag[1]对应的其他两维不存在元素,不需要再做遍历MAX_TOWN_NUM*MAX_CITY_NUM次;
// 有如此的标记位之后,在发送时进行的遍历效率会略有提高。
void PacketsGrouping(void *msg)
{
    PacketsCarrierMsgType *pMsg = (PacketsCarrierMsgType *) GetPayloadFromMsy(msg);
    packetsNum = pMsg->number;

    for (int i = 0; i <packetsNum; i ++ )
    {
        PacketsType packet = pMsg->packets[i];

        int provinceNo = GetProvinceNo(&packet);
        int cityNo = GetCityNo(&packet);
        int townNo = GetTownNo(&packet);

        g_ElementInGroupFlag[provinceNo][cityNo][townNo] = 1;
        g_ElementInGroupFlag[provinceNo][cityNo][0] |= 1 << 4;
        g_ElementInGroupFlag[provinceNo][0][0] |= 1 << 4;

        g_PacketGroups[provinceNo][cityNo][townNo].elemsNum++;
        AddPacketIntoGroup(packet, &g_PacketGroups[provinceNo][cityNo][townNo]);
    }
    ...
}

void PacketsSending(void *msg)
{
    ...

    for (int i = 0; i < MAX_PROVINCE_NUM; i ++)
    {
        if (g_ElementInGroupFlag[i][0][0] != 1)
        {
            continue;
        }

        for (int j = 0; j < MAX_CITY_NUM; j ++)        
        {        
            if (g_ElementInGroupFlag[provinceNo][j][0] != 1)
            {
                continue;
            }

            for (int k = 0; k < MAX_TOWN_NUM; k ++) 
            {
                if (g_ElementInGroupFlag[i][j][k] != 0)
                {
                    SendPacketGroup(&g_PacketGroups[provinceNo][cityNo][townNo]);
                }
            }
        }
    }
    ...
}

这样做效率是高了些,但是在是很难看的。特别是这里的if ... continue ... ,可以使用宏优化一下:

#define CONTINU_IF(EXPR)  
    if (EXPR) 
    {
        continue; 
    }

容器的方法

标记位的优化方法对于整体效率的确有所改善,但还是远远不够的。更进一步,可以考虑使用容器,将group按照作为容器的元素添加到容器当中,在发送包裹的时候直接遍历容器即可:
  • 在存储新到的包裹时,对于一个包裹需要做两个工作:首先将包裹分到group当中,其次将group存储到容器当中
  • 继续操作其他的包裹:将包裹分到group当中。如果group已经存在于容器当中,那么直接将包裹存储到group中;如果group在容器当中不存在,那么需要额外将该group也存储到容器。也就是“避免重复存储相同group到容器中”
  • 循环如上两步处理完所有包裹,之后直接遍历容器,将各个group发送出去
// 根据目的省份、城市、城镇创建的以group为单位的存储数据结构
PacketGroupType g_PacketGroups[MAX_PROVINCE_NUM][MAX_CITY_NUM][MAX_TOWN_NUM];
PacketGroupType g_PacketGroupContainer[MAX_PROVINCE_NUM * MAX_CITY_NUM * MAX_TOWN_NUM];

// 对到来的包裹进行扫描分装到不同的group当中
void PacketsGrouping(void *msg)
{
    PacketsCarrierMsgType *pMsg = (PacketsCarrierMsgType *) GetPayloadFromMsy(msg);
    packetsNum = pMsg->number;

    for (int i = 0; i <packetsNum; i ++ )
    {
        PacketsType packet = pMsg->packets[i];

        int provinceNo = GetProvinceNo(&packet);
        int cityNo = GetCityNo(&packet);
        int townNo = GetTownNo(&packet);

        g_PacketGroups[provinceNo][cityNo][townNo].elemsNum++;
        AddPacketIntoGroup(packet, g_PacketGroups[provinceNo][cityNo][townNo]);

        if (g_PacketGroups[provinceNo][cityNo][townNo] is not in g_PacketGroupContainer)
        {
             AddGroupPacketIntoContainer(g_PacketGroups[provinceNo][cityNo][townNo], g_PacketGroupContainer);
        }
    }
    ...
}

// 遍历group 容器,发送其中的group
void PacketsSending(void *msg)
{
    ...
           
    for (group in g_PacketGroupContainer)
    {
       SendPacketGroup(group);
    }
    ...
}

当前的代码执行在DSP的各个core上,对于内存的管理比较严格,无法直接使用STL容器默认的内存分配器,所以暂时就没有使用库当中的容器(考虑后续写一个符合当前情景的分配器)。这一个方案已经是比较理想的一种方案了,但是讨厌的“去重”操作实在是一个大的瑕疵,不好处理。

巧妙的链表技法

容器的方案已经相当OK,但还有一种更高效的做法!相对于上面容器的做法,这种做法有两个优势:
  • 其一,使用更少的内存资源
  • 其二,能够方便的搞定“去重”的功能
// 根据目的省份、城市、城镇创建的以group为单位的存储数据结构
PacketGroupType g_PacketGroups[MAX_PROVINCE_NUM][MAX_CITY_NUM][MAX_TOWN_NUM];
PacketGroupManager g_PacketGroupManager;

typedef strunct PacketGroupInfo{   
    int provinceNo;
    int cityNo;
    int townNo;

    PacketGroupInfo *next;
    boolean isInGroupManager;
}

typedef strunct{

    PacketGroupInfo *head;
    PacketGroupInfo groupsInfo[MAX_PROVINCE_NUM][MAX_CITY_NUM][MAX_TOWN_NUM];
}PacketGroupManager;

// 对到来的包裹进行扫描分装到不同的group当中
void PacketsGrouping(void *msg)
{
    PacketsCarrierMsgType *pMsg = (PacketsCarrierMsgType *) GetPayloadFromMsy(msg);
    packetsNum = pMsg->number;

    for (int i = 0; i <packetsNum; i ++ )
    {
        PacketsType packet = pMsg->packets[i];

        int provinceNo = GetProvinceNo(&packet);
        int cityNo = GetCityNo(&packet);
        int townNo = GetTownNo(&packet);

        g_PacketGroups[provinceNo][cityNo][townNo].elemsNum++;
        AddPacketIntoGroup(packet, g_PacketGroups[provinceNo][cityNo][townNo]);

        if (g_PacketGroupManager.groupsInfo[provinceNo][cityNo][townNo].isInGroupManager == 0)
        {
             AddGroupIntoManager(g_PacketGroups[provinceNo][cityNo][townNo], g_PacketGroupManager);
        }
    }
    ...
}

// 遍历group Manager当中的链表,发送其中的group
void PacketsSending(void *msg)
{
    ...
           
    for (PacketGroupInfo *pItem = g_PacketGroupManager.head; pItem != NULL; pItem = pItem->next)
    {
       SendPacketGroup(pItem );
    }
    ...
}

这种方案特殊性在于,它使用一个与原有group存储结构相对应的但是仅仅包含必须信息的轻量级管理结构,与此同时,使用静态链表的方式来维护满足条件的group信息,并没有额外执行节点的分配与释放操作。