每一部由高通芯片驱动的手机中,Hexagon数字信号处理器都会负责唤醒词检测、传感器数据处理、降噪功能以及蓝牙音频流传输等功能——而与此同时,手机的主体ARM中央处理器仍在运行Android操作系统。
在DSP上协调这些功能的操作系统是QuRT(高通实时操作系统)。这款基于POSIX标准的、以优先级机制为核心的实时操作系统,就是专门为高通的Hexagon数字信号处理器设计的。
本文是一本关于高通实时操作系统的实用指南。它从基础层面全面介绍了QuRT的各项功能:架构设计、线程创建方式、同步原语的使用、内存管理机制、中断处理流程、定时器功能、通过FastRPC实现的处理器间通信技术,以及完整的传感器数据融合处理流程。每一章内容都包含了实际可运行的代码示例,并对底层工作原理进行了详细解释。

目录

为什么QuRT如此重要

想象一下,在进行电话通话时,手机会同时执行以下操作:对麦克风采集的音频数据进行降噪处理,运行神经网络算法来检测唤醒词,每秒读取400次加速度计的数据,同时还负责管理蓝牙音频流传输功能。
所有这些操作都不是由手机的主体ARM中央处理器来完成的,而是由高通的Hexagon数字信号处理器来处理的;而协调这些操作的操作系统正是QuRT

QuRT(高通实时操作系统)是一种基于POSIX标准、采用优先级机制的抢占式实时操作系统,专为高通公司的Hexagon数字信号处理器设计而开发。与旨在实现高度灵活性的通用操作系统Linux不同,QuRT更像是一种精密工具,其核心功能就是实现精确到微秒级别的调度操作。

QuRT在系统中的位置

高通SoC内部的双处理器架构

该图展示了高通SoC内部的双处理器架构。左侧的ARM CPU负责运行Android或Linux系统,处理一般的应用程序逻辑;右侧的Hexagon DSP则用于运行QuRT,处理那些对延迟要求较高的任务,例如音频处理、传感器数据融合、机器学习推理以及计算任务的卸载。

这两颗处理器通过一个名为FastRPC的框架进行通信。您可以使用Hexagon SDK为DSP端编写代码,而QuRT则会作为操作系统在Hexagon处理器上执行这些代码。

配置您的开发环境

在开始编写任何QuRT代码之前,您需要相应的工具链以及模拟器或实物硬件。

先决条件

您需要使用Hexagon SDK(版本3.5+或4.x),这是高通官方提供的开发工具包,其中包含了Hexagon Tools编译工具链。

为了运行您的代码,您可以使用高通的开发板(例如Robotics RB5或SM8250 HDK),或者使用SDK内置的模拟器。安装了Ubuntu 18.04或20.04操作系统的Linux主机最为适合用于开发工作。

安装Hexagon SDK

# 从高通的开发门户下载Hexagon SDK
# https://developer.qualcomm.com/software/hexagon-dsp-sdk

# 解压并运行安装程序
chmod +x qualcomm_hexagon_sdk_4_x_x_x.bin
./qualcomm_hexagon_sdk_4_x_x_x.bin

# 设置环境变量
export HEXAGON_SDK_ROOT=~/Qualcomm/Hexagon_SDK/4.x.x.x
export HEXAGON_TOOLS_ROOT=~/Qualcomm/Hexagon_SDK/4.x.x.x/tools
source $HEXAGON_SDK_ROOT/setup_sdk_env.source

此命令会将SDK安装到您的个人目录中,并设置构建系统和模拟器所需的环境变量。`setup_sdk_env.source`脚本会配置您的shell环境,使其能够找到编译器、模拟器和相关库文件的路径。

验证您的配置

# 检查Hexagon编译器的版本
hexagon-clang --version

# 应该会看到类似这样的输出:
# Qualcomm Hexagon Clang version 8.x.xx

# 运行QuRT模拟器以确保其可以正常使用
$HEXAGON_SDK_ROOT/tools/HEXAGON_Tools/8.x.xx/Tools/bin/hexagon-sim \
    --simulated_returnval --cosim_file \
    $HEXAGON_SDK_ROOT/libs/common/qurt/computev66/sdksim_bin/osam.cfg \
    -- $HEXAGON_SDK_ROOT/libs/common/qurt/computev66/sdksim_bin/bootimg.pbn

第一个命令用于确认Hexagon Clang编译器已经安装完毕且可以正常使用;第二个命令则会启动QuRT模拟器。这个模拟器的功能类似于Android模拟器,它允许您在没有任何实物硬件的情况下测试QuRT程序。虽然模拟器上的运行结果与真实硬件上的效果会有所不同,但在开发过程中,它对于验证程序的正确性来说是非常有价值的。

项目结构

Hexagon SDK使用SCons作为其底层构建系统。各个项目都位于SDK的目录结构中,通过.min文件进行配置。这些文件是一种描述性的构建配置文件,SDK的SCons构建系统会解析这些文件的内容。

一个最基本的项目结构如下所示:

$HEXAGON_SDK_ROOT/examples/my_qurt_project/
├── src/
│   └── main.c              # 你的QuRT应用程序代码
├── inc/
│   └── my_module.h         # 头文件
├── hexagon.min              # 用于Hexagon DSP端的SCons构建配置文件
└── android.min              # 用于ARM端的SCons构建配置文件(如果使用FastRPC的话)

hexagon.min文件负责配置DSP端的构建过程,而android.min文件则在使用FastRPC进行跨处理器通信时处理ARM端的相关构建逻辑。这两份配置文件都会被SDK顶层的SConstruct文件读取,该文件的路径为$HEXAGON_SDK_ROOT/SConstruct。对于SDK目录结构中的项目来说,不需要单独编写MakefileSConscript文件。

使用SCons进行构建配置

一个最基本的hexagon.min构建文件示例如下:

# hexagon.min – 用于DSP端的SCons构建配置文件

BUILD_LIBS = libmy_qurt_app

# 源代码文件
libmy_qurt_app_C_SRCS = src/main.c

# QuRT操作系统相关库
libmy_qurt_app_libS = atomic rpcmem

# 编译器选项
libmy_qurt_app_hexAGON_CFLAGS = -O2 -Wall

# 需要链接的库
libmy_qurt_app_DLLS = libmy_qurt_app_skel

.min文件格式是Hexagon SDK的SCons构建系统所特有的。BUILD_LIBS用于指定目标库的名称;C_SRCS用于列出源代码文件;LIBS用于指定需要链接的库文件;HEXAGON_CFLAGS用于设置编译器选项;DLLS用于定义共享库的输出文件名,其中_skel后缀是FastRPC为DSP端实现指定的 convention。

在内部机制上,SDK的SConstruct会遍历整个项目目录结构,读取每一个.min文件,并将其配置内容转换为SCons可以理解的构建指令。在构建时通过传递V参数来指定目标架构、构建类型以及工具链版本。例如,V=hexagon_Release_dynamic_toolv84_v66表示:为Hexagon平台构建,采用发布模式,进行动态链接,并使用针对v66 DSP架构设计的v84工具链。

对于那些需要比.min格式提供更多的控制选项的项目来说,你可以编写独立的SConscript文件来进行构建配置:

# SConscript – 用于QuRT项目的独立SCons构建脚本

Import('env')

env = env.clone()

# 添加包含路径
env.Append(CPPPATH = ['inc'])

# 编译器选项
env.Append(CCFLAGS = [-O2, -Wall])

# 构建共享库
sources = ['src/main.c']
libs = ['atomic', 'rpcmem']

env.sharedLibrary(
    target = 'libmy_qurt_app_skel',
    source = sources,
    LIBS = libs
)

SConscript这种方法使你能够充分利用SCons的所有功能:条件编译、自定义构建步骤、依赖项扫描以及多种构建变体。调用Import('env')会加载由SDK的顶层SConstruct配置好的构建环境,该环境已经知道了Hexagon编译器的路径、QuRT头文件以及系统库的位置。env.Clone()会创建一个副本,这样你的修改就不会影响到代码树中的其他项目。

QuRT编程模型

QuRT编程的核心思想非常简单:

QuRT是一种基于优先级的抢占式实时操作系统。这意味着所有任务都是在线程中运行的(不存在裸机的主循环)。优先级较高的线程会立即抢占优先级较低的线程,而无需进行任何协商;相同优先级的线程则会采用轮询的方式运行。

调度器不会定期唤醒,只有当某些情况发生变化时,它才会开始执行任务。例如,某个线程被阻塞了、有信号被触发,或者某个高优先级的线程准备就绪了,调度器才会启动相应的处理程序。

优先级等级(0-255,数值越小表示优先级越高)

 000  ┃ ████ 中断处理程序(请勿修改此级别)
 001  ┃ ████ 关键系统任务
 ...  ┃
 064  ┃ ████ 高优先级的音频处理任务
 ...  ┃
 128  ┃ ████ 中等优先级的传感器数据融合任务
 ...  ┃
 192  ┃ ████ 低优先级的日志记录/报告任务
 ...  ┃
 255  ┃ ████ 空闲线程(QuRT内置的后台线程)

这个优先级等级表说明了QuRT的256个优先级等级通常是如何分配的。优先级0代表最高优先级,而255代表最低优先级。这与FreeRTOS的情况相反,在FreeRTOS中,数值越大表示优先级越高。

中断处理程序占据最高的优先级等级,系统任务排在其次,而用户线程则位于中间范围。优先级为255的空闲线程只有在没有其他线程可以运行的时候才会被启动。

创建你的第一个QuRT线程

最简单的QuRT程序会创建一个线程,该线程会打印一条消息然后退出。

/* main.c - 第一个QuRT程序 */

#include 
#include 
#include 

#define STACK_SIZE 4096

/* 线程栈必须为8字节对齐 */
static char thread_stack[STACK_SIZE] __attribute__((aligned(8)));

void my_thread_func(void *arg)
{
    int thread_id = (int)(uintptr_t)arg);

    printf("来自QuRT线程%d的问候!\n", thread_id);
    printf("我的线程ID是:%lu\n", qurt_thread_get_id());

    /* 线程必须明确地结束执行 */
    qurt_thread_exit(QURT_EOK);
}

int main(void)
{
    qurt_thread_t      thread_id;
    qurt_thread_attr_t attr;

    printf("主线程在QuRT上启动了!\n");

    /* 初始化线程属性 */
    qurt_thread_attr_init(&attr);

    /* 配置线程 */
    qurt_thread_attr_set_name(&attr, "my_first_thread");
    qurt_thread_attr_set_stack_addr(&attr, thread_stack);
    qurt_thread_attr_set_stack_size(&attr, STACK_SIZE);
    qurt_thread_attr_set_priority(&attr, 128);  /* 中等优先级 */

    /* 创建并启动线程 */
    int result = qurt_thread_create(&thread_id, &attr,
                                     my_thread_func,
                                     (void *)42);

    if (result != QURT_EOK) {
        printf("创建线程失败,错误代码为:%d\n", result);
        return -1;
    }

    printf("线程创建成功!ID为:%lu\n", thread_id);

    /* 等待线程完成执行 */
    int status;
    qurt_thread_join(thread_id, &status);

    printf("线程已完成执行,状态码为:%d\n", status);
    return 0;
}

这个程序演示了在QuRT中创建线程的四个步骤。首先,qurt_thread_attr_init()用于初始化线程属性结构体。其次,程序会为该线程配置一个调试名称(该名称会在崩溃日志中显示)、堆栈地址、堆栈大小以及优先级。第三,qurt_thread_create()会创建并立即启动这个线程,同时传递一个函数指针和一个参数。第四,qurt_thread_join()会阻塞调用它的线程,直到新创建的线程执行完qurt_thread_exit()函数。

有两点需要注意:QuRT不会为你分配堆栈内存,你必须提供一个静态分配的、长度为8字节且地址对齐的缓冲区。此外,每个线程在退出之前都必须调用qurt_thread_exit()函数;如果线程函数直接返回而不执行退出操作,其行为是不确定的。

线程创建流程

     qurt_thread_attr_init()
              │
              ▼
    ┌─────────────────────┐
    │  设置名称           │
    │  设置堆栈地址      │
    │  设置堆栈大小       │
    │  设置优先级         │
    └─────────────────────┘
              │
              ▼
     qurt_thread_create()
              │
              ▼
    线程开始运行 ──► my_thread_func()
              │                      │
              ▼                      ▼
     qurt_thread_join()       qurt_thread_exit()
     (等待线程结束)         (发送“已完成”信号)

这个流程展示了单个线程的生命周期。属性结构体充当配置对象,你可以通过它设置所有线程参数,然后将其传递给qurt_thread_create()函数。一旦线程被创建出来,它就会执行其入口函数。当入口函数调用qurt_thread_exit()时,线程就会终止,而那些在qurt_thread_join()中处于等待状态的线程也会被释放,并接收到退出状态码。

线程创建的内部机制

大多数教程都会省略对qurt_thread_create()内部实现细节的描述。了解这些内部原理有助于更清楚地理解调试过程以及优先级设计的逻辑。

内核在创建线程时执行哪些操作

当你调用qurt_thread_create()时,实际上是在向QuRT内核发起一个系统调用。内核会按顺序执行以下五个步骤:

  你的代码调用了qurt_thread_create()
         │
         ▼
  ┌──────────────────────────────────────────────────────────┐
  │  1. 验证参数的有效性                                         │
  │     • 堆栈指针是否非空且地址对齐?                         │
  │     • 堆栈大小是否大于或等于最小值(通常为2KB)?           │
  │     • 优先级是否在0到255的范围内?                          │
  │     • 入口函数指针是否非空?                            │
  │     • 如果有任何检查失败,则返回QURT_E_INVALID                  │
  ├──────────────────────────────────────────────────────────┤
  │  2. 为线程分配控制块TCB                                   │
  │     • QuRT会为线程分配一个内核级数据结构                │
  │     • 这个数据结构中包含:线程ID、优先级、状态、保存的寄存器值、信号屏蔽码、互斥锁等待队列等信息         │
  ├──────────────────────────────────────────────────────────┤
  │  3. 初始化堆栈帧                                         │
  │     • 内核会在你的堆栈内存顶部创建一个虚拟堆栈帧             │
  │     • 并将初始寄存器值写入其中:                         │
  │       ┌──────────────────────────────────────┐           │
  │       │ 堆栈顶地址            │           │
  │       │         ┌──────────────────────────────────┐│           │
  │       │         │ PC  = my_thread_func (入口函数地址)     ││           │
  │       │         │ SP  = stack_addr + stack_size    ││           │
  │       │         │ R0  = arg (传递的参数)      ││           │
  │       │         │ LR  = qurt_thread_exit          ││           │
  │       │         │ SR  = 默认状态寄存器        ││           │
  │       │         │ R1-R31 = 0                       ││           │
  │       │         └──────────────────────────────────┘│            │
  │       │         ... (其余堆栈空间保持不变)……              │           │
  │       │         堆栈底地址          │           │
  │       └──────────────────────────────────────┘           │
  ├──────────────────────────────────────────────────────────┤
  │  4. 将线程添加到就绪队列中                                 │
  │     • TCB会被加入调度器的就绪队列,并根据其优先级被安排执行顺序         │
  │     • 线程的状态会被设置为“READY”状态                 │
  ├──────────────────────────────────────────────────────────┤
  │  5. 触发重新调度机制                                     │
  │     • 调度器会检查:“这个新线程的优先级是否高于当前正在运行的线程?”         │
  │     • 如果是,那么调用线程会立即被抢占,新线程开始执行             │
  │     • 如果不是,新线程就会在就绪队列中等待,直到成为优先级最高的可执行线程         │
  └──────────────────────────────────────────────────────────┘
         │
         ▼
  qurt_thread_create()函数会返回给调用者
  (不过新创建的线程可能已经开始运行了!)

这种线程调度机制中最令人惊讶的地方在于第5步:如果新创建的线程优先级高于创建它的线程,那么新线程会在qurt_thread_create()返回调用者之前就开始执行了。也就是说,创建新线程的线程会在函数调用中途被抢占。这就是“抢占式调度”在实践中的真正含义——调度器并不会等待一个合适的时机,而是会立即根据优先级来决定程序的执行顺序。

堆栈帧是如何启动你的函数的

当调度器首次切换到一个新的线程时,它的操作方式与处理任何上下文切换时都是一样的:它会从线程控制块中恢复保存好的寄存器值,然后跳转到程序计数器的对应位置。

对于新创建的线程来说,这些寄存器值是在第3步时由内核人工设置的。程序计数器被设置为my_thread_func,因此处理器会直接跳转到你的函数;R0寄存器被设置为你的arg参数值,所以你的函数会将其作为第一个参数接收(这符合Hexagon调用规范);堆栈指针被设置为你函数的堆栈顶部,这样你的函数就拥有了可用的工作堆栈;而链接寄存器则被设置为qurt_thread_exit,因此如果你的函数正常返回(不过你其实不应该依赖这种行为),程序就会继续执行qurt_thread_exit

错觉:
──────────────
对你的线程函数来说,就好像有人用你传递的参数
“正常地”调用了它一样。

现实:
──────────────
调度器只是恢复了一组人工设置的寄存器值,
使得处理器误以为它是从某个函数调用中返回到了你的入口点。

这就好比你在一个从未去过的地方醒来,
但有人把一切都安排得如此巧妙,
以至于你根本意识不到自己其实并没有通过门进入那里的。

这张图对比了程序员在心理上所认为的正常函数调用过程,以及实际上在硬件层面上发生的情况——也就是通过恢复一些寄存器值来模拟函数调用的效果。线程函数本身是无法区分这两种情况的,而这恰恰就是内核想要达到的目的。内核成功地创造了一种无缝的错觉。

上下文切换过程详解

让我们来看一个具体的例子:线程A(优先级为128)创建了线程B(优先级为64,显然优先级更高)。以下时间线展示了每个步骤中发生的情况:

时间 ──────────────────────────────────────────────►

线程A (优先级 128)          内核/调度器         线程B (优先级 64)
────────────────           ────────────────           ────────────────
调用                      
qurt_thread_create()       
   │                       
   ├─► 系统调用 ──────►  验证参数
                            分配线程控制块
                            设置堆栈帧结构
                            将线程B加入就绪队列
                            
                            “B(优先级64)是否高于A(优先级128)? 是。”
                            
                            将线程A的寄存器值保存到其线程控制块中   ──┐
                            再从线程B的线程控制块中读取线程B的寄存器值   │
                                                   │
                            跳转到程序计数器的对应位置,开始执行my_thread_func(arg)   ◄─┘
                                                   │
                                                   │ 这段代码确实能够正常运行...
                                                   │ 最后会调用qurt_thread_exit()结束执行   │
                            线程B被从就绪队列中移除                     ◄───────
                            系统调用结束
                            
                            “下一个该谁?线程A。”
                            
                            重新读取线程A的寄存器值             │                        
                           跳转到线程A的程序计数器的对应位置       │◄──────────────────────
   │
   ├─► 再次调用qurt_thread_create()         │   
   │     并返回QURT_EOK表示调用成功     │   
   ▼ 程序继续执行……             ▼

从线程A的角度来看,qurt_thread_create()只是一个需要一段时间才能返回结果的函数调用。线程A根本不知道自己被暂停了,它也不清楚在这段暂停时间内线程B是否已经完成了执行。

调度器会使得被抢占的线程无法察觉到自己的被抢占行为。这就是抢占式调度的基本特性:各个线程无需进行任何协作,甚至不需要了解其他线程的存在。

线程控制块的内容

TCB是内核用于跟踪每个线程的内部数据结构。虽然我们无法直接访问它,但了解其内部结构有助于理解QuRT的许多运行机制:

/* TCB的结构示意图(简化版本,并非QuRT的实际源代码) */
struct qurt_tcb {
    /* 身份信息 */
    qurt_thread_t   thread_id;
    char            name[16];
    
    /* 调度相关信息 */
    uint8_t         base_priority;
    uint8_t         effective_priority; /* 由于优先级继承机制,实际值可能与base_priority不同 */
    uint8_t         state;             /* READY、RUNNING、BLOCKED、SUSPENDED等状态 */
    
    /* 保存的CPU上下文信息(在上下文切换时被填充) */
    uint32_t        saved regs[32];
    uint32_t        saved_pc;
    uint32_t        saved_sp;
    uint32_t        saved_sr;
    
    /* 栈相关信息(用于调试和检测栈溢出) */
    void           *stack_base;
    size_t          stack_size;
    
    /* 阻塞相关信息 */
    void           *wait_object;  /* 线程正在等待的对象,可能是互斥锁、信号或管道 */
    uint32_t        wait_mask;    /* 线程正在等待的特定信号位 */
    
    /* 链表指针 */
    struct qurt_tcb *next_ready;
    struct qurt tcb *next_waiting;
    
    /* 用于线程连接的功能相关信息 */
    int             exit_status;  /* 调用qurt_thread_exit()时传递的值 */
    qurt_thread_t   joiner;      /* 在调用qurt_thread_join()时等待的线程 */
};

TCB存储了调度器所需的所有信息:身份信息(线程ID和调试名称)、调度状态(基础优先级、有效优先级以及当前状态)、保存的CPU上下文信息(32个通用寄存器、程序计数器、堆栈指针和状态寄存器)、阻塞相关信息、用于表示线程处于就绪队列或等待队列中的链表指针,以及用于支持线程连接的功能相关字段。

当优先级继承机制处于激活状态时,effective_priority字段的值可能会与base_priority不同,这一点会在同步相关的章节中详细说明。

线程状态机

QuRT中的线程始终处于以下四种状态之一:

                    qurt_thread_create()
                           │
                           ▼
                    ┌──────────┐
          ┌─────────│  READY   │◄──────────────────────────┐
          │         └──────────┘                           │
          │              │ ▲                               │
          │  调度器     │ │ 被优先级更高的线程抢占        │
          │  选择这个线程    │ │                         │
          │              ▼ │                               │
          │         ┌──────────┐     等待信号/互斥锁/定时器事件       │
          │         │ RUNNING  │                     │
          │         └──────────┘     解除阻塞                    │
          │              │                                 │
          │  线程调用      │                                 │
          │  导致阻塞的API    │                                 │
          │  包括:mutex_lock、signal_wait、pipe.receive等   │
          │         ┌──────────┐                           │
          │         │ BLOCKED  │───────────────────────────┘
          │         └──────────┘
          │
          │  qurt_thread_exit()
          │         │
          │         ▼
          │    ┌──────────┐
          └───►│  DEAD    │
               └──────────┘
  • READY表示该线程可以运行,目前正在等待硬件线程槽。

  • RUNNING表示该线程当前正在硬件线程上执行(每个硬件线程槽一次只能有一个线程处于这种状态)。

  • BLOCKED表示该线程正在等待某个外部事件发生:例如互斥锁被释放、信号被触发,或者计时器到期。

  • DEAD表示该线程已经调用了qurt_thread_exit()函数。如果有其他线程调用了qurt_thread_join()来等待这个线程结束,那么调用qurt_thread_join()的线程将会收到该线程的退出状态。

硬件线程槽

Hexagon DSP是一种硬件多线程处理器,每个核心都配备了多个硬件线程槽(通常为2到4个)。这意味着QuRT可以在单个核心上真正地同时运行多个线程,而不仅仅是通过时间片轮询来模拟多线程效果。

┌─────────────────────────────────────────┐
│          Hexagon DSP核心               │
│                                         │
│  ┌───────────┐  ┌───────────┐           │
│  │ 硬件线程槽0    │  │ 硬件线程槽1    │  …………     │
│  │           │  │           │           │
│  │ 线程A      │  │ 线程B      │           │
│  │ (正在运行)   │  │ (正在运行)   │           │
│  └───────────┘  └───────────┘           │
│                                         │
│ 待执行线程队列:[C, D, E, F, ………]         │
│ 调度器会将优先级最高的READY状态线程分配到硬件线程槽中     │
└─────────────────────────────────────────┘

上图展示了一个拥有两个硬件线程槽的Hexagon核心。每个线程槽都可以独立且同时地执行一个线程。调度器会将优先级最高的待执行线程分配到这些硬件线程槽中。当软件线程的数量超过了硬件线程槽数量时,调度器会通过时间片轮询来安排低优先级的线程的执行。不过,优先级最高的线程会获得专用的硬件线程槽,从而能够完全避免上下文切换,从而实现更高的执行效率。

在典型的Hexagon v66处理器上,由于拥有4个硬件线程槽,因此优先级最高的4个线程每个都会拥有自己的执行流水线。只有当某个线程被阻塞,或者有更高优先级的线程唤醒并占用了某个硬件线程槽时,才会发生上下文切换。正因为如此,QuRT才能实现如此低的调度延迟。

线程的完整生命周期

以下代码展示了线程的完整生命周期,并对QuRT在每个阶段所执行的操作进行了说明:

static char stack[8192] __attribute__((aligned(8)));

void my_func(void *arg)
{
    /* 状态:RUNNING。栈空间是空的,R0寄存器中保存着参数arg的值。 */
    int val = *(int *)arg;

    qurt_mutex_lock(&somemutex);
    /* 如果某个互斥锁被其他线程占用,那么当前线程会进入BLOCKED状态,直到该锁被释放。 */

    shared_data = val;
    qurt_mutex_unlock(&some_mutex);

    qurt_thread_exit(QURT_EOK);
    /* 状态变为DEAD。如果有其他线程正在等待这个线程结束,那么它们将会被唤醒。 */
}

int main(void)
{
    qurt_thread_t tid;
    qurt_thread_attr_t attr;
    int my_arg = 42;

    qurt_thread_attr_init(&attr);
    qurt_thread_attr_set_stack_addr(&attr, stack);
    qurt_thread_attr_set_stack_size(&attr, sizeof(stack));
    qurt_thread_attr_set_priority(&attr, 100);

    qurt_thread_create(&tid, &attr, my_func, &my_arg);
    /* 如果my_func的优先级(100)高于main函数的优先级,那么main函数会被抢占。 */

    int status;
    qurt_thread_join(tid, &status);
    /* 会一直阻塞,直到my_func执行完毕;如果my_func已经结束,那么join操作会立即完成。 */

    return 0;
}

my_func开始运行时,内核已经设置好了相应的寄存器,因此arg中会包含指向my_arg的指针。此时该线程的状态为“RUNNING”。

当它调用qurt_mutex_lock()时,会有两种情况发生:如果互斥锁是空闲的,该线程就会获取它并继续执行;如果互斥锁已被其他线程占用,那么调用该函数的线程的状态会变为“BLOCKED”,其寄存器内容会被保存到对应的TCB中,然后调度器会选择下一个优先级较高的可用线程来执行。

当持有互斥锁的线程调用qurt_mutex_unlock()时,被阻塞的线程会恢复到“READY”状态,此时调度器会重新评估所有线程的优先级。

main函数所在的线程中,qurt_thread_create()可能在my_func执行完毕之前就返回了,也可能不立即返回。如果my_func的优先级高于main函数,调度器会立即抢占main函数的执行权,因此qurt_thread_create()可能要等到my_func执行完毕才会返回。而qurt_thread_join()要么会阻塞main函数,直到my_func退出,要么会在my_func已经退出后立即返回。

关于栈大小设置的一个重要注意事项是:如果你将STACK_SIZE设置得过小(比如只有256字节),而你的线程又调用了printf函数,那么就会发生“栈溢出”现象。QuRT并不会自动检测这种错误,因此程序会无声无息地崩溃,而且也很难诊断出问题所在。因此,为你的线程分配至少8192字节的栈空间是比较安全的;只有在进行了性能分析之后,再根据实际需要优化栈大小才是明智的做法。

在模拟器上构建和运行程序

Hexagon SDK提供了一个make封装层,该层会在内部调用SCons来构建项目。以下两条命令会产生相同的结果:

# 方式1:使用make封装层(内部调用SCons)
cd $HEXAGON_SDK_ROOT
make V=hexagon_Release_dynamic_toolv84_v66 \
     tree=my_qurt_project

# 方式2:直接调用SCons
cd $HEXAGON_SDK_ROOT
python tools/build/scons/scons.py \
    V=hexagon.Release_dynamic_toolv84_v66 \
    my_qurt_project

这两条命令都会使用v84工具链,在Hexagon v66架构上构建项目,并且都是以发布模式进行构建的。make封装层提供了便利性:它会解析V=tree=这些参数,然后将其传递给SCons。直接使用SCons的话,你可以使用更多额外的选项,比如--jobs=N用于并行构建,--verbose用于查看完整的编译命令输出。

# 在模拟器上运行程序
hexagon-sim --simulated_returnval \
    --cosim_file osam.cfg \
    -- bootimg.pbn \
    -- my_qurt_app.so

hexagon-sim命令会启动QuRT模拟器,并运行你编译好的应用程序。--simulated_returnval选项用于捕获main函数返回的值,而选项则指定了QuRT操作系统的配置文件路径。

多线程编程

在实际的QuRT应用程序中,通常会有多个线程同时运行。生产者-消费者模式是DSP编程中最常见的设计模式之一:其中一个线程从硬件设备中读取数据,另一个线程则对这些数据进行处理。

#include 
#include 

#define STACK_SIZE 8192
#define BUFFER_SIZE 16
#define NUM_ITEMS 100

/* 线程栈 */
static char producer_stack[STACK_SIZE] __attribute__((aligned(8)));
static char consumer_stack[STACK_SIZE] __attribute__(aligned(8)));

/* 共享缓冲区 */
static int buffer[BUFFER_SIZE];
static int head = 0;
static int tail = 0;
static int count = 0;

/* 同步原语 */
qurt_mutex_t buffermutex;
qurt_cond_t not_full;
qurt_cond_t not_empty;

void producer_thread(void *arg)
{
    for (int i = 0; i < NUM_ITEMS; i++) {
        qurt_mutex_lock(&buffer_mutex);

        /* 等待缓冲区中有空位 */
        while (count == BUFFER_SIZE) {
            qurtCOND_wait(&not_full, &buffermutex);
        }

        /* 生产一个数据项 */
        buffer[head] = i;
        head = (head + 1) % BUFFER_SIZE;
        count++;

        printf("[生产者] 已生成数据项 %d,当前缓冲区中剩余数据项数量:%d\n", i, count);

        /* 通知消费者数据已准备好 */
        qurt_cond_signal(&not_empty);
        qurt_mutex_unlock(&buffer_mutex);
    }

    qurt_thread_exit(QURT_EOK);
}

void consumer_thread(void *arg)
{
    for (int i = 0; i < NUM_ITEMS; i++) {
        qurt_mutex_lock(&buffermutex);

        /* 等待缓冲区中有数据 */
        while (count == 0) {
            qurtCOND_wait(&not_empty, &buffer_mutex);
        }

        /* 消耗一个数据项 */
        int item = buffer[tail];
        tail = (tail + 1) % BUFFER_SIZE;
        count--;

        printf("[消费者] 已获取数据项 %d,当前缓冲区中剩余数据项数量:%d\n", item, count);

        /* 通知生产者缓冲区中有空位了 */
        qurt_cond_signal(&not_full);
        qurt_mutex_unlock(&buffermutex);
    }

    qurt_thread_exit(QURT_EOK);
}

int main(void)
{
    qurt_thread_t producer, consumer;
    qurt_thread_attr_t attr;

    /* 在创建线程之前,先初始化同步原语 */
    qurt_mutex_init(&buffer_mutex);
    qurt_cond_init(&not_full);
    qurtCOND_init(&not_empty);

    /* 创建生产者线程(优先级较高) */
    qurt_thread_attr_init(&attr);
    qurt_thread_attr_set_name(&attr, "producer");
    qurt_thread_attr_set_stack_addr(&attr, producer_stack);
    qurt_thread_attr_set_stack_size(&attr, STACK_SIZE);
    qurt_thread_attr_set_priority(&attr, 100);
    qurt_thread_create(&producer, &attr, producer_thread, NULL);

    /* 创建消费者线程(优先级较低) */
    qurt_thread_attr_init(&attr);
    qurt_thread_attr_set_name(&attr, "consumer");
    qurt_thread_attr_set_stack_addr(&attr, consumer_stack);
    qurt_thread_attr_set_stack_size(&attr, STACK_SIZE);
    qurt_thread_attr_set_priority(&attr, 110);
    qurt_thread_create(&consumer, &attr, consumer_thread, NULL);

    /* 等待两个线程都完成执行 */
    int status;
    qurt_thread_join(producer, &status);
    qurt_thread_join(consumer, &status);

    /* 清理资源 */
    qurt_mutex_destroy(&buffermutex);
    qurt_conddestroy(&not_full);
    qurt_condDestroy(&not_empty);

    printf("所有操作已完成!共生成并消耗了 %d 个数据项。\n", NUM_ITEMS);
    return 0;
}

这段代码实现了一个经典的有限缓冲区生产者-消费者模式。共享缓冲区是一个由16个整数组成的循环数组,该数组受到互斥锁的保护。生产者将数据写入缓冲区,而消费者则从缓冲区中读取数据。

当缓冲区已满时,生产者会阻塞在not_full条件变量上;而当缓冲区为空时,消费者则会阻塞在not_empty条件变量上。每当其中一方修改了缓冲区的内容后,另一方都会立即发出信号通知对方。

生产者具有比消费者更高的优先级(100对比110),这是有原因的。在实际的数字信号处理场景中,生产者通常是从硬件设备(如麦克风、传感器)中获取数据的。如果生产者错过了某个硬件的数据采样,那么这些数据就会永远丢失;而消费者则可以随时之后再处理这些数据。这就是实时操作系统设计中的一个基本原则:绝不要让那些直接与硬件交互的线程处于等待状态。

同步原语

QuRT提供了五种主要的同步机制:互斥锁、条件变量、信号量、屏障和semaphore。

┌──────────────┬────────────────────────────────────────────────────┐
│ 同步原语      │ 使用场景                                        │
├──────────────┼────────────────────────────────────────────────────┤
│ 互斥锁        | 用于保护共享数据,防止并发访问          │
│ 条件变量     | “等待直到某个条件成立”(必须与互斥锁配合使用)| 
│ 信号量       | 一个线程向另一个线程发送通知              │
│ 屏障         | “所有线程都在此处等待,直到其他线程全部到达”   │
├──────────────┼────────────────────────────────────────────────────┤
│ semaphore     | 用于控制对有限资源池的访问                │
│              | (例如:10个线程共享4个DMA通道)         │
└──────────────┴────────────────────────────────────────────────────┘

上表总结了每种同步原语及其主要使用场景。互斥锁能够确保只有单个线程能够访问共享数据;条件变量可以让一个线程等待,直到某个特定条件成立,而且它们总是与互斥锁一起使用的;信号量可以实现线程之间的轻量级一对一通信;屏障则可以使得一组线程在同一个时刻停下来,等待其他线程完成操作;semaphore则用于控制对有限数量资源的访问。

互斥锁

互斥锁能够确保任何时候都只有一个线程能够访问临界区。QuRT中的互斥锁还支持通过qurt_mutex_try_lock()函数进行非阻塞式获取。

qurt_mutex_t mymutex;

void init_example(void)
{
    /* 使用前必须先初始化 */
    qurt_mutex_init(&my_mutex);
}

void critical_section_example(void)
{
    qurt_mutex_lock(&my_mutex);

    /* 任何时候都只有一个线程可以进入这个临界区 */
    shared_counter++;
    shared_buffer[index] = new_value;

    qurtmutex_unlock(&my_mutex);
}

/* 非阻塞版本 */
void try_lock_example(void)
{
    int result = qurt_mutex_try_lock(&my_mutex);

    if (result == QURT_EOK) {
        shared_counter++;
        qurt_mutexunlock(&my_mutex);
    } else {
        printf("忙,稍后再试\n");
    }
}

void cleanup_example(void)
{
    qurtmutex_destroy(&my_mutex);
}

qurt_mutex_lock()这个函数会阻塞调用它的线程,直到该互斥锁被释放并可供使用为止,然后才会获取它。而qurtmutex_try_lock()则会尝试获取该互斥锁,如果成功,它会立即返回QURT_EOK;如果互斥锁已被其他线程占用,就会返回一个错误代码。在使用完某个互斥锁后,务必调用qurt_mutex_destroy()来释放它。

QuRT中的互斥锁实现了优先级继承机制。当一个高优先级的线程正在等待一个被低优先级线程占用的互斥锁时,这个低优先级的线程会暂时提升为高优先级状态。这种设计可以有效防止优先级反转现象——这种错误曾经导致“火星探路者”号探测器在执行任务时不断重新启动。

QuRT会自动处理优先级继承机制,但你应该了解这一机制的存在,这样才能在调试过程中避免因意外的优先级变化而产生混淆。

信号

在QuRT中,信号是一种轻量级的通知机制。一个线程会等待特定的信号位被设置,而另一个线程或中断服务程序则会将这些信号位设置为“已触发”状态,从而唤醒该线程。

#include 

#define SIGNAL_DATAREADY 0x01
#define SIGNAL_STOP     0x02
#define SIGNAL_ERROR    0x04

qurt_signal_t my_signal;

void signal_init(void)
{
    qurt_signal_init(&my_signal);
}

/* 待机线程 */
void waiter_thread(void *arg)
{
    unsigned int receivedsignals;

    while (1) {
        /* 等待任意一个指定的信号 */
        receivedsignals = qurt_signal_wait(
            &my_signal,
            SIGNAL_DATAREADY | SIGNAL_STOP | SIGNAL_ERROR,
            QURTSignal_ATTR_WAIT_ANY
        );

        if (receivedsignals & SIGNAL_STOP) {
            printf("收到停止信号,正在退出。\n");
            break;
        }

        if (receivedsignals & SIGNAL>DataREADY) {
            printf("数据已准备就绪!正在处理中...\n");
            process_data();
            /* 处理完信号后,需要清除该信号位 */
            qurt_signal_clear(&mySignal, SIGNAL_DATAREADY);
        }

        if (receivedsignals & SIGNAL_ERROR) {
            printf("发生了错误!正在处理中...\n");
            handle_error();
            qurt_signal_clear(&mySignal, SIGNAL_ERROR);
        }
    }

    qurt_signal_destroy(&my_signal);
    qurt_thread_exit(QURT_EOK);
}

/* 发送信号线程(或中断服务程序) */
void sender_thread(void *arg)
{
    prepare_data();
    qurt_signal_set(&my_signal, SIGNAL_DATAREADY);

    /* 后来,发送停止信号 */
    qurt_signal_set(&mySignal, SIGNAL_STOP);

    qurt_thread_exit(QURT_EOK);
}

待机线程会使用一个包含它所关心的信号位的位掩码来调用qurt_signal_wait()函数。QURT.Signal_ATTR_WAIT_ANY表示只要有任何指定的信号位被设置,该线程就会被唤醒。发送信号线程则通过qurt_signal_set()函数来设置一个或多个信号位。在处理完某个信号后,待机线程必须调用qurt_signal_clear()来清除相应的信号位。如果你忘记清除某个信号位,下次再次调用qurtSignal_wait()时,系统会立即返回结果,导致该线程再次处理同一个事件。

在信号与条件变量之间进行选择,需要根据具体的使用场景来决定。对于无关线程之间的通知,或者从中断服务程序发出的通知来说,信号是更合适的选择,因为它们结构简单、占用系统资源较少。而当通知与特定的数据状态相关联时(比如缓冲区已满、队列为空),并且需要对数据进行检查时,条件变量则更为适用,因为使用条件变量可以确保在访问这些数据时能够正确地实现互斥保护。

屏障

屏障会阻止所有参与操作的线程继续执行,直到所有线程都到达指定的屏障点为止。当某个计算过程被分解成多个阶段,并且每个阶段的结果都依赖于前一个阶段的输出时,使用屏障就非常有用。

#define NUM_WORKERThreads  4

qurt_barrier_t syncBarrier;

void worker_thread(void *arg)
{
    int thread_num = (int)(uintptr_t)arg);

    /* 第一阶段:每个线程计算自己负责的部分 */
    printf("线程 %d:正在执行第一阶段计算...\n", thread_num);
    compute_partial_result(thread_num);

    /* 所有线程都会在这里等待,直到所有线程都完成第一阶段的计算 */
    qurt_barrier_wait(&syncBarrier);

    /* 第二阶段:所有部分的计算结果已经准备好,现在将它们合并 */
    printf("线程 %d:正在执行第二阶段计算...\n", thread_num);
    combine_results(thread_num);

    qurt_thread_exit(QURT_EOK);
}

int main(void)
{
    qurt_barrier_init(&syncBarrier, NUM_WORKERThreads);

    /* 创建工作线程 */
    for (int i = 0; i < NUM WORKER Threads; i++) {
        create_worker(i);
    }

    join_all_workers();

    qurt_barrier_destroy(&syncBarrier);
    return 0;
}

屏障在初始化时会指定参与操作的线程数量。当每个线程到达同步点时,都会调用qurt_barrier_wait()函数,这个函数会阻塞该线程,直到所有线程都到达屏障点为止。一旦最后一个线程调用了qurtBarrier_wait(),所有被阻塞的线程就会立即得到释放,然后继续执行第二阶段的操作。

信号量

信号量用于控制对N个相同资源的访问权限。与互斥锁不同(互斥锁实际上是一种特殊情况下的信号量,其N值等于1),信号量允许多达N个线程同时持有它。

#define MAX DMA_channels 4

qurt_sem_t dmaSemaphore;

void initdma_pool(void)
{
    /* 有4个DMA通道可用 */
    qurt_sem_init_val(&dmaSemaphore, MAX_DMA Channels);
}

void thread_needing_dma(void *arg)
{
    /* 获取一个DMA通道(如果所有4个通道都被占用,这个操作会阻塞) */
    qurt-sem_down(&dmaSemaphore);

    int channel = allocate dma_channel();
    performdma_transfer(channel);
    releasedma_channel(channel);

    /* 释放信号量 */
    qurt_sem_up(&dma Semaphore);

    qurt_thread_exit(QURT_EOK);
}

信号量的初始值被设置为4,这个数值与可用的DMA通道数量相等。每次调用qurt-sem_down()时,信号量的计数值会减少1;如果计数值为0,调用该函数的线程就会被阻塞。而每次调用qurt_sem_up()时,信号量的计数值会增加1,如果有等待的线程,其中的一个线程就会获得执行权限。这样的设计可以确保不会有超过4个线程同时使用DMA通道。

内存管理

DSP上的内存资源是有限的。典型的Hexagon DSP拥有256 KB到2 MB的紧密耦合内存,同时还可以访问DDR内存。QuRT提供了相应的工具,可以帮助用户有效地管理这些内存资源。

内存映射结构

┌───────────────────────────────────┐  高地址段
│         DDR内存(与ARM CPU共享)     │
│   - 大型数据缓冲区                 │
│   - 神经网络权重值              │
│   - 音频/视频帧数据            │
├───────────────────────────────────┤
│         QuRT虚拟内存               │
│   - 用户堆                    │
│   - 线程栈                    │
├───────────────────────────────────┤
│         L2缓存(紧密耦合内存模式)       │
│   - 频繁访问的数据缓冲区           │
│   - 查找表                   │
├───────────────────────────────────┤
│         QuRT内核                  │
│   - 调度器、中断处理程序        │
│   - 系统数据结构              │
└───────────────────────────────────┘  低地址段

该图展示了Hexagon DSP从低地址到高地址的内存布局。QuRT内核占据最低的地址范围,用户代码无法访问这些内存。在其上方,以紧密耦合内存模式配置的L2缓存为频繁访问的数据提供了快速的存储空间。虚拟内存区域则用于存放用户堆和线程栈。最上层的是DDR内存,它与ARM CPU共享,被用来存储大型数据缓冲区、机器学习模型的权重值以及媒体帧数据。虽然DDR的内存延迟略高,但其容量要大得多。

动态内存分配

#include 
#include 

void memory_examples(void)
{
    /* 标准的malloc/free函数可以正常使用(QuRT提供了堆内存管理功能) */
    int *data = (int *)malloc(1024 * sizeof(int));
    if (!data) {
        printf("malloc失败!堆内存不足。\n");
        return;
    }

    for (int i = 0; i < 1024; i++) {
        data[i] = i * 2;
    }

    free(data);
}

QuRT提供了标准的C语言堆内存管理机制,因此mallocfree函数可以正常使用。不过,malloc的执行时间具有不确定性,因为它可能需要搜索空闲内存列表、合并相邻的空闲内存区域等。因此,这种函数并不适合用于实时系统中那些对执行时间有严格要求的场景。建议将malloc用于初始化和释放资源操作,而不要用于每帧或每个样本的数据分配。

缓存管理

在Hexagon DSP上,当与ARM CPU共享内存时,进行显式的缓存管理是非常必要的。

#include 

void cache_management_example(void)
{
    void *buffer;
    size_t buffer_size = 4096;

    /* 分配连续的、符合缓存对齐要求的内存空间 */
    int result = qurt_mem_region_create(
        &buffer,
        buffer_size,
        qurt_mem_default_pool,
        QURT_MEM_REGION_SHARED
    );

    if (result != QURT_EOK) {
        printf("内存区域分配失败\n");
        return;
    }

    /* 在读取其他处理器(例如ARM CPU)写入的数据之前: */
    qurt_mem_cache_clean(buffer, buffer_size,
                          QURT_MEM_CACHE_INVALIDATE);

    /* 从缓冲区中读取数据... */

    /* 在向其他处理器写入数据之后: */
    fill_buffer_with_results(buffer, buffer_size);
    qurt_mem_cache_clean(buffer, buffer_size,
                          QURT_MEM_CACHE_FLUSH);
}

qurt_mem_region_create()这个函数会分配一块物理上连续的内存区域,这样的内存区域适合与其他处理器共享。标志QURT_MEM_REGION_SHARED用于标记该内存区域可供跨处理器使用。

对于共享内存来说,相关的缓存规则虽然简单,但却至关重要:

  1. 在读取数据之前,必须先使缓存失效,这样才能获取ARM CPU最新写入的数据,而不会看到过时的缓存内容。

  2. 在写入数据之后,必须立即刷新缓存,这样ARM CPU才能看到你所做的修改,而不会使用主内存中原有的数据。

如果忽略了这些操作,虽然你的代码在逻辑上是正确的,但最终可能会因为使用过时的数据而导致错误。

用于实现可预测内存分配的内存池

内存池能够以O(1)的时间复杂度完成内存分配操作,因此它们非常适合用于实时系统中那些需要快速获取内存资源的场景。

#include 

#define BLOCK_SIZE 256
#define NUMBlocks 32

/* 内存池中的内存是静态分配的,这样可以确保分配过程的确定性 */
static char pool_memory[BLOCK_SIZE * NUM Blocks] __attribute__((aligned(8)));
static qurt_mem_pool_t my_pool;

void pool_init(void)
{
    qurt_mem_pool_create(&my_pool, pool_memory,
                          BLOCK_SIZE * NUMBlocks,
                          BLOCK_SIZE);
}

void *pool_alloc(void)
{
    void *block = qurt_mem_pool_alloc(&my_pool);
    if (!block) {
        printf("内存池已用尽!\n");
    }
    return block;
}

void pool_free(void *block)
{
    qurt_mem_pool_free(&my_pool, block);
}

这段代码创建了一个由32个块组成的内存池,每个块的大小为256字节。由于内存池中的内存是静态分配的,因此运行时完全不需要依赖malloc函数。

qurt_mem_pool_alloc()qurt_mem_pool_free()这两个函数都能在常数时间内完成内存的分配与释放操作。如果内存池已经用尽,这两个函数会返回NULL,而不会导致程序阻塞或去其他地方查找内存。

这种确定的性能使得内存池成为处理音频数据、处理传感器传来的数据,以及任何需要在严格时间限制内完成的任务的理想选择。

定时器与计时功能

QuRT提供了基于硬件的定时器,能够实现精确的计时。这对于数字信号处理任务来说至关重要:例如,如果你正在以48 kHz的频率处理音频数据,那么你就需要每10.67毫秒就更新一次缓冲区,这一点绝对不能出错。

一次性定时器

#include 
#include 

qurt_timer_t my_timer;
qurt_signal_t timer_signal;

#define TIMER_EXPIRED_SIGNAL 0x01

void timer_example(void)
{
    qurt_signal_init(&timer_signal);

    qurt_timer_attr_t attr;
    qurt_timer_attr_init(&attr);

    /* 设置定时器时长为10毫秒 */
    qurt_timer_attr_set_duration(&attr,
        qurt_timer_convert_time_to_ticks(10000, /* 微秒 */
                                          QURT_TIME_USEC));

    /* 设置定时器触发时发送的信号 */
    qurt_timer_attr_set_signal(&attr, &timerSignal);
    qurt_timer_attr_set_signal_mask(&attr, TIMER_EXPIRED SIGNAL);

    /* 设置定时器为一次性触发类型 */
    qurt_timer_attr_set_type(&attr, QURT_TIMER_ONESHOT);

    /* 创建并启动定时器 */
    qurt_timer_create(&my_timer, &attr);

    /* 等待定时器到期 */
    qurt_signal_wait(&timerSignal,
                      TIMER_EXPIRED SIGNAL,
                      QURT_SIGNAL_ATTR_WAIT_ANY);

    printf("定时器到期了!10毫秒已经过去。\n");
    qurt_signal_clear(&timerSignal, TIMER_EXPIRED.Signal);

    /* 清理资源 */
    qurt_timer_delete(my_timer);
    qurt_signal_destroy(&timerSignal);
}

这会创建一个一次性定时器,该定时器会在10毫秒后触发。这个定时器的配置是通过一个属性结构来完成的,该结构指定了定时器的持续时间、需要通知的信号对象、要设置的信号位掩码以及定时器的类型(QURT_TIMER_ONESHOT)。当定时器到期时,它会设置指定的信号位,从而唤醒那些在qurt_signal_wait()中处于等待状态的线程。在处理完相关事件后,该线程会清除这个信号位,并释放定时器资源。

周期性定时器

void periodic_timer_thread(void *arg)
{
    qurt_timer_t periodic_timer;
    qurt_signal_t periodic_signal;
    qurt_timer_attr_t attr;

    qurt_signal_init(&periodic_signal);
    qurt_timer_attr_init(&attr);

    /* 每1毫秒触发一次 */
    qurt_timer_attr_set_duration(&attr,
        qurt_timer_convert_time_to_ticks(1000, QURT_TIME_USEC));
    qurt_timer_attr_set_signal(&attr, &periodic_signal);
    qurt_timer_attr_set_signal_mask(&attr, 0x01);
    qurt_timer_attr_set_type(&attr, QURT_TIMER_PERIODIC);

    qurt_timer_create(&periodic_timer, &attr);

    int iteration = 0;
    while (iteration < 1000) {
        qurt_signal_wait(&periodic_signal, 0x01,
                          QURT.Signal_ATTR_WAIT_ANY);
        qurt_signal_clear(&periodic_signal, 0x01);

        /* 每1毫秒执行一次 */
        process_audio_frame(iteration);
        iteration++;
    }

    qurt_timer_delete(periodic_timer);
    qurt_signal_destroy(&periodic_signal);
    qurt_thread_exit(QURT_EOK);
}

周期性定时器使用的是QURT_TIMER_periodIC类型,而不是QURT.Timer_ONESHOT。它会按照指定的间隔反复触发。在这个示例中,程序会执行1000次循环,每次循环的间隔为1毫秒,每次循环都会处理一个音频帧。在每次循环结束后,都必须清除相应的信号位,否则下一次调用qurt_signal_wait()时会立即返回结果。

读取当前时间

void timing_example(void)
{
    unsigned long long start_ticks = qurt_sysclock_get_hwTicks();

    heavy_computation();

    unsigned long long endticks = qurt_sysclock_get_hwTicks();
    unsigned long long elapsed_ticks = end_ticks - start_ticks;

    unsigned long long elapsed_us =
        qurt_timer_convert_ticks_to_time(elapsed_ticks, QURT_TIME_USEC);

    printf("计算过程耗时 %llu 微秒\n", elapsed_us);
}

qurt_sysclock_get_hwTicks()用于读取硬件周期计数器的值,这个计数器能够提供DSP上最高精度的计时信息。qurt_timer_convert_ticks_to_time()则可以将原始的计时数值转换为人类可读的形式(在这个例子中是微秒)。利用这种方法可以分析各个函数的执行时间,从而找出性能瓶颈所在。

中断处理

在DSP上,中断就是用来表示硬件需要立即处理的信号。QuRT提供了一种基于线程的中断处理模型,这种模型比纯硬件的中断服务程序更加结构化、更易于使用。


#include
#include

#define MY_SENSOR_IRQ 42
#define IRQ SIGNAL 0x01

static qurt_signal_t irq_signal;

void sensor_isr_thread(void *arg)
{
int irq = MYSENSORIRQ;

/* 将这个线程注册为IRQ 42的处理程序 */
qurtInterrupt_register(irq, &irq_signal, IRQ_SIGNAL);

printf("传感器中断处理线程已准备就绪,正在等待中断信号……\n");

while (1) {
/* 等待硬件中断发生 */
unsigned int sigs = qurt_signal_wait(&irq_signal, IRQ SIGNAL, QURTSignal_ATTR_WAIT_ANY);

if (sigs & IRQ.Signal) {
qurt_signal_clear(&irq_signal, IRQ_SIGNAL);

/* 迅速读取传感器数据 */
int sensor_value = read_sensor_register();

/* 将数据放入队列中,等待处理线程进行处理 */
enqueue SENSOR_data(sensor_value);

/* 向处理线程发送信号,告知数据已准备好 */
qurt_signal_set(&processing_signal, DATAREADY);

/* 重新启用中断 */
qurtInterrupt_acknowledge(irq);
}
}
}

QuRT中的中断处理程序与纯硬件层面的中断处理程序有所不同。它们是在专用的线程环境中运行的,因此可以在其中使用互斥锁和信号机制。不过,这些中断处理线程应该只执行最基本的任务:读取硬件寄存器的值、将数据放入队列中、向处理线程发送信号以及确认中断已经处理完毕。所有计算量较大的操作都应该在另一个优先级较低的线程中完成。


硬件中断 │
↓ │
中断处理线程(高优先级) 处理线程(中等优先级)
┌──────────────────┐ ┌──────────────────────────┐
│ 读取硬件寄存器值 │ │ 等待“数据已准备好”信号 │
│ 将数据放入队列 │ ──────► │ 从队列中取出数据 │
│ 向处理线程发送信号 │ │ 执行FFT运算/过滤等操作 │
│ 确认中断已处理 │ │ 将处理结果写入存储设备 │
└──────────────────┘ └──────────────────────────┘

上图展示了这种中断处理任务的分配方式。左边的中断处理线程会以最低的延迟时间来处理硬件中断:它读取传感器寄存器的值,将原始数据放入队列中,向处理线程发送信号,然后确认中断已经处理完毕,以便下次能够再次接收新的中断信号。右边的处理线程则会以较低的优先级来执行计算量较大的任务,比如FFT运算、数据过滤或机器学习模型的推理。

这种设计可以确保:即使处理线程还在处理之前的数据,中断处理线程也能够随时准备接受新的硬件中断请求。

管道与消息队列

QuRT提供了内置的管道机制,用于实现安全、结构化的数据在线程之间的传递。这些管道实际上是固定大小的消息队列,支持阻塞式的发送和接收操作。


#include
#include

#define PIPE_ELEMENTS 16
#define ELEMENT_SIZE sizeof(sensor_msg_t)

typedef struct {
int sensor_id;
int value;
unsigned long long timestamp;
} sensor_msg_t;

/* 管道缓冲区需要由用户自行分配 */
static char pipe_buffer[PIPE_elements * ELEMENT_SIZE]
__attribute__((aligned(8)));

qurt_pipe_t sensor_pipe;

void pipe_init(void)
{
qurt_pipe_attr_t attr;
qurt_pipe_attr_init(&attr);
qurt_pipe_attr_set_buffer(&attr, pipe_buffer);
qurt_pipe_attr_set_buffer_partition(&attr, PIPE_elements);
qurt_pipe_attr_set_elements(&attr, PIPE_ELEMENTS);
qurt_pipe_attr_set_element_size(&attr, ELEMENT_SIZE);

qurt_pipe_create(&sensor_pipe, &attr);
}

/* 生产者:将传感器数据发送到管道中 */
void sensor_reader_thread(void *arg)
{
while (1) {
sensor_msg_t msg;
msg.sensor_id = 1;
msg.value = read_accelerometer();
msg.timestamp = qurt_sysclock_get_hw_ticks();

/* 阻塞式发送:如果管道已满,则等待 */
qurt_pipe_send(&sensor_pipe, (char *)&msg, ELEMENT_SIZE);
}
}

/* 消费者:从管道中接收传感器数据 */
void data_processor_thread(void *arg)
{
sensor_msg_t msg;

while (1) {
/* 阻塞式接收:如果管道为空,则等待 */
qurt_pipe_receive(&sensor_pipe, (char *)&msg, ELEMENT_SIZE);

printf("传感器 %d:数值=%d,时间戳=%llu\n",
msg.sensor_id, msg.value, msg.timestamp);

process_sensor_reading(&msg);
}
}

QuRT管道配置了静态分配的缓冲区、一定数量的元素以及每个元素的长度。与栈类似,这些缓冲区的管理责任在于使用者自身。qurt_pipe_send()方法会将消息复制到管道中,如果管道已满,则会阻塞;qurt_pipe.receive()方法则会从管道中读取消息,如果管道为空,也会导致阻塞。该管道会自动处理所有的内部同步机制,因此无需额外使用互斥锁。

对于这里展示的传感器数据处理场景来说,管道是一种非常合适的解决方案:读取线程会以固定的频率采集硬件数据,并将这些数据放入管道中;而处理线程则会从管道中取出这些数据并进行后续处理。管道能够自动提供缓冲功能,并在必要时抑制数据的传输速度。

QuRT与FastRPC

在实际的高通设备中,人们很少单独使用QuRT。运行在ARM CPU上的Android或Linux应用程序会通过FastRPC(快速远程过程调用)将计算量较大的任务卸载到DSP上进行处理。下图展示了整个处理流程:

┌───────────────────────────────────────────────────────────────┐
│                         ARM CPU端                          │
│                                                               │
│   your_app.c                                                  │
│   ┌───────────────────────────────────────────────────┐       │
│   │  #include "my_dsp_module.h"  // 由工具自动生成的头文件    │       │
│   │                                                   │       │
│   │  // 这看起来像是一个普通的函数调用,       │       │
│   │ 但实际上它是在DSP上执行的!          │       │
│   │  result = my_dsp_module_process_audio(            │       │
│   │      input_buffer, output_buffer, num_samples);   │       │
│   └───────────────────┬───────────────────────────────┘       │
│                       │ FastRPC                               │
└───────────────────────┼───────────────────────────────────────┘
            (跨越处理器边界进行通信)          
┌───────────────────────┼───────────────────────────────────────┐
│                       ▼                                       │
│                  DSP端(使用QuRT)                              │
│   my_dsp_module_skel.c  // 由工具自动生成的框架代码            │
│   ┌───────────────────────────────────────────────────┐       │
│   │  int my_dsp_module_process_audio(                 │       │
│   │      const int16_t *input,                        │       │
│   │      int16_t *output,                             │       │
│   │      int num_samples)                             │       │
│   │  {                                                │       │
│   │      // 这段代码是在QuRT环境下、在Hexagon DSP上运行的   │       │
│   │      apply_noise_reduction(input, output,         │       │
│   │                             num_samples);         │       │
│   │      return 0;                                    │       │
│   │  }                                                │       │
│   └───────────────────────────────────────────────────┘       │
└───────────────────────────────────────────────────────────────┘

该图展示了FastRPC的架构。在ARM CPU端,应用程序会调用一个看起来像普通的C函数。实际上,FastRPC会先将参数序列化,然后将其传输到Hexagon DSP上,在QuRT环境下执行该函数,并最终返回结果。对于程序员来说,这种远程过程调用的体验是完全透明的。

步骤1:定义接口(IDL文件)

创建一个`.idl`文件,用来描述ARM可以调用DSP上的哪些函数:

/* my_dsp_module.idl */
#include "remote.idl"
#include "AEEStdDef.idl"

interface my_dsp_module {

    /* 简单计算操作 */
    long process_audio(
        in sequence input,
        out sequence output,
        in long num_samples
    );

    /* 矩阵乘法运算 */
    long matrix_multiply(
        in sequence mat_a,
        in sequence mat_b,
        out sequence result,
        in long rows_a,
        in long cols_a,
        in long cols_b
    );
};

IDL(接口定义语言)文件用于定义跨处理器的API。每个函数都会通过方向限定符来指定其参数类型:`in`表示数据从ARM流向DSP,`out`表示数据从DSP回流到ARM;`sequence`语法则用于表示可变长度数组。Hexagon SDK的IDL编译器会根据这些定义为ARM端生成存根代码,为DSP端生成骨架代码。

步骤2:实现DSP端代码

/* my_dsp_module_imp.c - DSP实现部分 */

#include "my_dsp_module.h"
#include 
#include 

int my_dsp_module_process_audio(
    const int16_t *input, int input_len,
    int16_t *output, int output_len,
    int num_samples
)
{
    if (!input || !output || num_samples <= 0) {
        return -1;
    }

    /* 清除缓存:ARM已经将数据写入缓存 */
    qurt_mem_cache_clean((void *)input,
                          num_samples * sizeof(int16_t),
                          QURT_MEM_CACHE_INVALIDATE);

    /* 在DSP上进行处理 */
    for (int i = 0; i < num_samples; i++) {
        /* 简单的噪声门处理 */
        if (abs(input[i]) < 100) {
            output[i] = 0;
        } else {
            output[i] = input[i];
        }
    }

    /* 刷新缓存:ARM会读取DSP处理后的结果 */
    qurt_mem_cache_clean(output,
                          num_samples * sizeof(int16_t),
                          QURT_MEM_CACHE.Flush);

    return 0;
}

DSP实现部分接收由ARM CPU写入的输入数据。在读取数据之前,代码会清除缓存,以确保DSP获取的是主内存中的最新数据而非过时的缓存内容;在输出数据后,代码会刷新缓存,以便ARM CPU能够看到DSP处理后的结果。实际的计算操作(在这个例子中是简单的噪声门处理)发生在这些缓存操作之间。

步骤3:实现ARM端代码

/* main_arm.c - ARM/Android应用程序 */

#include 
#include 
#include 
#include "my_dsp_module.h"

int main(void)
{
    int num_samples = 1024;

    /* 使用ION内存实现与DSP的无复制数据共享 */
    rpcmem_init();

    int16_t *input = (int16_t *)rpcmem_alloc(
        RPCMEM_heap_ID_SYSTEM,
        RPCMEM_DEFAULT_FLAGS,
        num_samples * sizeof(int16_t));

    int16_t *output = (int16_t *)rpcmem_alloc(
        RPCMEMHeap_ID SYSTEM,
        RPCMEM_DEFAULT_FLAGS,
        num_samples * sizeof(int16_t));

    if (!input || !output) {
        printf("rpcmem_alloc失败!\n");
        return -1;
    }

    /* 用音频数据填充输入缓冲区 */
    for (int i = 0; i < num_samples; i++) {
        input[i] = (int16_t)(i % 256);
    }

    /* 通过FastRPC将数据发送给DSP进行处理 */
    int result = my_dsp_module_process_audio(
        input, num_samples,
        output, num_samples,
        num_samples);

    if (result != 0) {
        printf("DSP处理失败:%d\n", result);
    } else {
        printf("DSP处理成功!\n");
        printf("前10个输出样本为:");
        for (int i = 0; i < 10; i++) {
            printf("%d ", output[i]);
        }
        printf("\n");
    }

    rpcmem_free(input);
    rpcmem_free(output);
    rpcmem_deinit();

    return 0;
}

ARM端代码使用rpcmem_alloc()来分配ION内存,这种共享内存区域可以被ARM CPU和Hexagon DSP直接访问,无需进行任何数据复制。调用my_dsp_module_process_audio()时,该函数看起来像是一个普通的函数调用,但实际上FastRPC会将其透明地转发给DSP进行处理。当函数执行完成后,输出缓冲区中就会包含DSP处理后的结果。

构建完整项目

一个FastRPC项目需要进行两次SCons构建:一次针对ARM CPU端,另一次针对Hexagon DSP端。这两部分代码分别对应不同的.min文件(即android.minhexagon.min),它们都会由SDK的SConstruct工具进行处理。

cd $HEXAGON_SDK_ROOT

# 为ARM目标平台(Android)构建代码
make V=android_Release tree=my_dsp_module

# 为Hexagon DSP构建代码
make V=hexagon_Release_dynamic_toolv84_v66 tree=my_dsp_module

# 或者直接使用SCons进行构建
python tools/build/scons/scons.py \
    V=android_Release \
    V=hexagon_Release_dynamic_toolv84_v66 \
    my_dsp_module

# 将构建好的文件推送到设备上
adb push android_Release/ship/my_dsp_module /data/local/tmp/
adb push hexagon_release_dynamic_toolv84_v66/ship/libmy_dsp_module_skel.so \
    /data/local/tmp/

# 运行程序
adb shell "cd /data/local/tmp & && ./my_dsp_module"

构建过程会生成两个结果:一个是ARM可执行文件(由你的main_arm.c代码以及相关 Stub文件编译而成),另一个是Hexagon共享库文件(即_skel.so,它是由你的DSP实现代码编译生成的)。SCons会自动处理IDL文件的编译工作:它会检测到.idl文件,自动生成相应的Stub和骨架C源代码文件,并将它们纳入对应的构建过程中。最终,这两个结果都会被推送到设备上。

当ARM可执行文件运行并调用FastRPC函数时,系统会将相应的骨架库加载到DSP上,并通过该库来路由此次调用。

构建传感器融合管道

本节将线程、同步机制、定时器以及信号整合到一个完整的、功能真实的QuRT应用程序中。该管道会从三个模拟传感器(加速度计、陀螺仪、磁力计)读取数据,利用互补滤波算法对这些数据进行融合处理,然后以100赫兹的频率输出方向信息。

/*
 * sensor_fusion.c - 在QuRT平台上实现的多传感器融合管道
 *
 * 架构如下:
 *   [加速度计中断服务程序] ──► [融合线程] ──► [报告线程]
 *   [陀螺仪中断服务程序]  ──►       ▲
 *   [磁力计中断服务程序]   ──►       │
 *                    [定时器线程]
 *                    (每10毫秒触发一次融合计算)
 */

#include 
#include 
#include 
#include 
#include 

/* 配置参数 */
#define STACK_SIZE          8192
#define FUSION_PERIOD_US    10000   /* 10毫秒相当于100赫兹的融合频率 */
#define QUEUE_DEPTH         32

/* 数据类型定义 */
typedef struct {
    float x, y, z;
    unsigned long long timestamp;
} vec3_sample_t;

typedef struct {
    vec3_sample_t accel;
    vec3_sample_t gyro;
    vec3_sample_t mag;
    float roll, pitch, yaw;
} fused_state_t;

/* 线程栈定义 */
static char accel_stack[STACK_SIZE]  __attribute__((aligned(8)));
static char gyro_stack[STACK_SIZE]   __attribute__(aligned(8)));
static char mag_stack[STACK_SIZE]    __attribute__(aligned(8)));
static char fusion_stack[STACK_SIZE] __attribute__(aligned(8));
static char report_stack[STACK_SIZE] __attribute__(aligned(8));

/* 共享状态变量 */
static vec3_sample_t latest_accel;
static vec3_sample_t latest_gyro;
static vec3_sample_t latest_mag;
static fused_state_t latest_fused;

static qurt_mutex_t sensor_mutex;
static qurt_mutex_t fused_mutex;
static qurt_signal_t fusion_signal;
static qurt_signal_t report_signal;

#define SIG_FUSION_TICK    0x01
#define SIG_NEW_FUSED_DATA 0x01
#define SIG_SHUTDOWN       0x80

static volatile int running = 1;

/* 模拟传感器数据读取函数 */
static void read_accelerometer(vec3_sample_t *sample)
{
    sample->x = 0.01f;
    sample->y = 0.02f;
    sample->z = 9.81f;
    sample->timestamp = qurt_sysclock_get_hw_ticks();
}

static void read_gyroscope(vec3_sample_t *sample)
{
    sample->x = 0.001f;
    sample->y = -0.002f;
    sample->z = 0.0005f;
    sample->timestamp = qurt_sysclock_get_hw_ticks();
}

static void read_magnetometer(vec3_sample_t *sample)
{
    sample->x = 25.0f;
    sample->y = -5.0f;
    sample->z = 40.0f;
    sample->timestamp = qurt_sysclock_get_hwticks();
}

/* 加速度计线程函数 */
void accel_thread(void *arg)
{
    printf("[加速度计] 线程启动\n");

    while (running) {
        vec3_sample_t sample;
        read_accelerometer(&sample);

        qurt_mutex_lock(&sensormutex);
        latest_accel = sample;
        qurt_mutex_unlock(&sensor_mutex);

        /* 数据采样频率约为400赫兹 */
        qurt_timer_sleep(2500);
    }

    printf("[加速度计] 线程结束\n");
    qurt_thread_exit(QURT_EOK);
}

/* 陀螺仪线程函数 */
void gyro_thread(void *arg)
{
    printf("[陀螺仪] 线程启动\n");

    while (running) {
        vec3_sample_t sample;
        read_gyroscope(&sample);

        qurt_mutex_lock(&sensormutex);
        latest_gyro = sample;
        qurt_mutex_unlock(&sensor_mutex);

        /* 数据采样频率为1000赫兹 */
        qurt_timer_sleep(1000);
    }

    printf("[陀螺仪] 线程结束\n");
    qurt_thread_exit(QURT_EOK);
}

/* 磁力计线程函数 */
void mag_thread(void *arg)
{
    printf("[磁力计] 线程启动\n");

    while (running) {
        vec3_sample_t sample;
        read_magnetometer(&sample);

        qurt_mutex_lock(&sensormutex);
        latest_mag = sample;
        qurt_mutex_unlock(&sensor_mutex);

        /* 数据采样频率为100赫兹 */
        qurt_timer_sleep(10000);
    }

    printf("[磁力计] 线程结束\n");
    qurt_thread_exit(QURT_EOK);
}

/* 简化的互补滤波算法 */
static void compute_orientation(
    const vec3_sample_t *accel,
    const vec3_sample_t *gyro,
    const vec3_sample_t *mag,
    fused_state_t *state)
{
    float dt = 0.01f;

    float accel_roll = atan2f(accel->y, accel->z) * 57.2958f;
    float accel_pitch = atan2f(-accel->x,
        sqrtf(accel->y * accel->y + accel->z * accel->z)) * 57.2958f;

    /* 短期内依赖陀螺仪数据,长期使用加速度计数据 */
    state->roll = 0.98f * (state->roll + gyro->x * dt * 57.2958f)
                + 0.02f * accel_roll;
    state->pitch = 0.98f * (state->pitch + gyro->y * dt * 57.2958f)
                 + 0.02f * accel_pitch;

    state->yaw = atan2f(mag->y, mag->x) * 57.2958f;

    state->accel = *accel;
    state->gyro = *gyro;
    state->mag = *mag;
}

/* 融合线程函数(每10毫秒执行一次) */
void fusion_thread(void *arg)
{
    qurt_timer_t fusion_timer;
    qurt_timer_attr_t timer_attr;

    printf("[融合线程] 线程启动\n");

    qurt_timer_attr_init(&timer_attr);
    qurt_timer_attr_set_duration(&timer_attr,
        qurt_timer_convert_time_to_ticks(FUSION_period_US,
                                          QURT_TIME_USEC));
    qurt_timer_attr_set_signal(&timer_attr, &fusion_signal);
    qurt_timer_attr_set_signal_mask(&timer_attr, SIG_FUSION_TICK);
    qurt_timer_attr_set_type(&timer_attr, QURT_TIMER_PERIODIC);

    qurt_timer_create(&fusion_timer, &timer_attr);

    while (running) {
        unsigned int sigs = qurt_signal_wait(
            &fusion_signal,
            SIG_FUSION TICK | SIG_SHUTDOWN,
            QURT SIGNAL_ATTR_WAIT_ANY);

        if (sigs & SIG_SHUTDOWN) break;

        qurt_signal_clear(&fusion_signal, SIG_FUSION_TICK);

        /* 在锁保护下获取传感器数据 */
        vec3_sample_t a, g, m;
        qurt_mutex_lock(&sensor_mutex);
        a = latest_accel;
        g = latest_gyro;
        m = latest_mag;
        qurt_mutex_unlock(&sensor_mutex);

        /* 执行融合算法(无需锁定,使用本地数据) */
        fused_state_t state;
        qurtmutex_lock(&fused_mutex);
        state = latest_fused;
        qurt_mutexunlock(&fused_mutex);

        compute_orientation(&a, &g, &m, &state);

        /* 发布融合结果 */
        qurt_mutex_lock(&fused_mutex);
        latest_fused = state;
        qurtmutex_unlock(&fused_mutex);

        /* 通知报告线程 */
        qurt_signal_set(&report_signal, SIG_NEW_FUSED_DATA);
    }

    qurt_timer_delete(fusion_timer);
    printf("[融合线程] 线程结束\n");
    qurt_thread_exit(QURT_EOK);
}

/* 报告线程函数 */
void report_thread(void *arg)
{
    int report_count = 0;

    printf("[报告线程] 线程启动\n");

    while (running) {
        unsigned int sigs = qurt_signal_wait(
            &report_signal,
            SIG_NEW_FUSED_DATA | SIG_SHUTDOWN,
            QURT SIGNAL_ATTR_WAIT_ANY);

        if (sigs & SIG_SHUTDOWN) break;

        qurt_signal_clear(&report_signal, SIG_NEW_FUSED DATA);

        fused_state_t state;
        qurt_mutex_lock(&fused_mutex);
        state = latest_fused;
        qurtmutex_unlock(&fused_mutex);

        /* 每100次更新时报告一次结果(即每秒报告一次) */
        if (++report_count % 100 == 0) {
            printf("[报告] 方向信息 - 旋转角度:%.2f 俯仰角:%.2f  "
                   "偏航角:%.2f  (更新次数:%d)\n",
                   state.roll, state.pitch, state.yaw, report_count);
        }
    }

    printf("[报告线程] 线程结束\n");
    qurt_thread_exit(QURT_EOK);
}

/* 主函数 */
int main(void)
{
    qurt_thread_t threads[5];
    qurt_thread_attr_t attr;
    int status;

    printf("=== 开始运行传感器融合管道 ===\n");

    /* 初始化同步相关资源 */
    qurt_mutex_init(&sensor_mutex);
    qurtmutex_init(&fused_mutex);
    qurt_signal_init(&fusion_signal);
    qurt_signal_init(&report_signal);
    memset(&latest_fused, 0, sizeof(latest_fused));

    struct {
        const char *name;
        char *stack;
        int priority;
        void (*func)(void *);
    } thread_configs[] = {
        {"accel_reader", accel_stack,  60, accel_thread},
        {"gyro-reader",  gyro_stack,   60, gyro_thread},
        {"magreader",   mag_stack,    70, mag_thread},
        {"fusion",       fusion_stack, 80, fusion_thread),
        {"reporter",     report_stack, 120, report_thread},
    };

    /* 创建所有线程 */
    for (int i = 0; i < 5; i++) {
        qurt_thread_attr_init(&attr);
        qurt_thread_attr_set_name(&attr, thread_configs[i].name);
        qurt_thread_attr_set_stack_addr(&attr, threadConfigs[i].stack);
        qurt_thread_attr_set_stack_size(&attr, STACK_SIZE);
        qurt_thread_attr_set_priority(&attr, threadconfigs[i].priority);

        int result = qurt_thread_create(&threads[i], &attr,
                                         thread_configs[i].func, NULL);
        if (result != QURT_EOK) {
            printf("创建线程 '%s' 失败:%d\n",
                   threadConfigs[i].name, result);
            return -1;
        }
        printf("成功创建了线程 '%s',优先级为 %d\n",
               thread_configs[i].name, threadconfigs[i].priority);
    }

    /* 让这些线程运行10秒钟 */
    printf("管道正在运行中……10秒钟后结束。\n");
    qurt_timer_sleep(10000000);

    /* 关闭程序 */
    printf("程序即将关闭……\n");
    running = 0;
    qurt_signal_set(&fusion_signal, SIG_SHUTDOWN);
    qurt_signal_set(&report_signal, SIG_SHUTDOWN);

    /* 等待所有线程结束 */
    for (int i = 0; i < 5; i++) {
        qurt_thread_join(threads[i], &status);
    }

    /* 清理资源 */
    qurt_mutex_destroy(&sensor_mutex);
    qurtmutexdestroy(&fused_mutex);
    qurt_signal_destroy(&fusion_signal);
    qurt_signal_destroy(&report_signal);

    printf("=== 传感器融合管道运行完成 ===\n");
    return 0;
}

这个示例展示了多个QuRT组件如何协同工作。

三个传感器读取线程以最高优先级运行(加速度计和陀螺仪的优先级为60,磁力计的优先级为70),它们会不断将最新的测量数据写入受互斥锁保护的共享内存中。

一个融合处理线程每隔10毫秒被触发一次,它会同时读取这三个传感器的数据,然后通过相应的算法计算出飞行器的滚转角、俯仰角和偏航角,并将最终的结果输出出来。

另一个优先级最低的线程(优先级为120)会在新的融合数据可用时接收信号,并每秒记录一次飞行器的当前方向信息。

优先级分配

优先级60:传感器读取线程(最高优先级,必须确保及时获取硬件数据)
优先级80:融合处理线程(每10毫秒运行一次,需要快速完成计算)
优先级120:日志记录线程(最低优先级,仅负责数据记录)

优先级的分配遵循一个明确的规则:与硬件交互越频繁的线程,其优先级就越高。如果融合处理线程花费的时间过长,日志记录线程会等待;这种情况是可以接受的,因为延迟记录日志并不会对系统造成实时影响。但如果传感器的数据读取出现延迟,融合算法就会使用过时的数据来进行计算。

在控制无人机或机器人的实际应用中,如果惯性测量单元提供的数据是过时的,那么计算出的方向信息就会不准确,这可能会导致设备出现故障。

调试QuRT应用程序

与Linux环境相比,QuRT的调试功能较为有限。它没有带图形用户界面的gdb工具,而且程序崩溃时产生的错误信息往往难以提供有用的线索。以下这些技巧可以组成一套实用的调试工具包。

使用printf进行调试

#include 

void debug_example(void)
{
    printf("[%s:%d] value = %d\n", __func__, __LINE__, some_var);
}

QuRT通过一种半托管机制支持printf函数。在模拟器环境中,输出结果会被发送到标准输出流;而在硬件设备上,输出数据会被保存到诊断缓冲区中(类似于Android系统中的logcat功能)。这是QuRT开发过程中最常用的调试方法。

QuRT错误代码

switch (result) {
    case QURT_EOK:
        break;
    case QURT_EINVALID:
        printf("参数无效\n");
        break;
    case QURT_EFAILED:
        printf("发生一般性故障\n");
        break;
    case QURT_EMEM:
        printf("内存不足\n");
        break;
    case QURT_ENOTALLOWED:
        printf("操作被禁止(请检查权限)\n");
        break;
    case QURT_ETIMEOUT:
        printf("操作超时\n");
        break;
    default:
        printf("未知错误:%d\n", result);
}

在调用QuRT的API函数后,一定要检查返回值。这些错误代码是你在开发过程中最常遇到的。

QURT_EINVALID通常表示传入的参数有误(例如堆栈未对齐、指针为空或优先级超出范围)。QURT_EMEM意味着内核用于存储内部数据的内存已经用完。QURT_ENOTALLOWED则往往表明硬件设备的权限设置存在问题。

线程状态检测

void dump_thread_info(void)
{
    qurt_thread_t tid = qurt_thread_get_id();
    char name[QURT_THREAD_ATTR_NAME_MAXLEN];

    qurt_thread_get_name(name, sizeof(name));

    printf("线程:%s(ID:%lu)\n", name, tid);
}

这个函数可以输出当前线程的名称和ID,当多个线程同时向同一个日志文件写入数据时,这个功能非常有用,因为它可以帮助你区分哪些消息是由哪个线程产生的。

栈溢出检测

#define STACK_CANARY 0xDEADBEEF

static char my_stack[STACK_SIZE] __attribute__((aligned(8)));

void init_stack_canary(void)
{
    /* 在栈的底部写入一个特定的值作为“哨兵” */
    ((unsigned int *)my_stack)[0] = STACK_CANARY;
    ((unsigned int *)my_stack)[1] = STACK_CANARY;
}

void check_stack_canary(void)
{
    if (((unsigned int *)my_stack)[0] != STACK_CANARY ||
        ((unsigned int *)my_stack)[1] != STACK_CANARY) {
        printf("检测到栈溢出!\n");
    }
}

QuRT本身并不具备检测栈溢出的功能。因此,我们需要在线程开始执行之前,在栈的底部写入一个特定的值作为“哨兵”。如果栈的大小超过了其允许的范围,这个“哨兵”值就会被覆盖。通过定期检查这个“哨兵”值(或者在线程退出时进行检查),就可以及时发现栈溢出现象;否则,这些溢出问题可能会表现为一些莫名其妙、看似无关的程序崩溃。

使用Hexagon模拟器

# 以指令追踪模式运行模拟器
hexagon-sim --timing --pmu_statsfile stats.txt \
    --cosim_file osam.cfg \
    -- bootimg.pbn -- my_app.so

# 这个统计文件会提供以下信息:
# - 总执行周期数
# - 缓存命中率/未命中率
# - 由于等待资源而导致的停滞周期数
# - 每个周期执行的指令数量

使用--timing选项可以进行精确到每个周期的模拟;而选项则会将性能计数器的数据写入文件中。统计文件会报告总执行周期数、缓存命中率和未命中率、停滞周期数,以及每个周期执行的指令数量。这些数据对于判断程序的性能瓶颈是出在计算资源上、内存资源上,还是其他方面,具有非常重要的参考价值。

常见误区

误区1:忘记退出线程

/* 错误做法:线程函数执行完毕后不调用退出函数 */
void bad_thread(void *arg) {
    do_work();
    return;  /* 这会导致程序崩溃或出现未定义行为 */
}

/* 正确做法 */
void good_thread(void *arg) {
    do_work();
    qurt_thread_exit(QURT_EOK);
}

如果QuRT线程在执行完毕之后不调用qurt_thread_exit()函数来退出,就会导致程序出现未定义行为。虽然操作系统在创建线程时会将链接寄存器的值设置为qurt_thread_exit函数的位置,作为一种安全机制,但你也绝对不能依赖这种机制——必须始终明确地调用qurt_thread_exit()函数来结束线程的执行。

误区2:在错误的作用域内分配栈内存

/* 错误做法:栈内存位于调用线程的栈空间中 */
void create_thread_bad(void) {
    char stack[4096];
    qurt_thread_attr_set_stack_addr(&attr, stack);
    qurt_thread_create(&tid, &attr, func, NULL);
}   /* 这里栈内存会被释放,新创建的线程会崩溃 */

/* 正确做法:使用静态分配或堆分配 */
static char stack[4096] __attribute__((aligned(8)));
void create_thread_good(void) {
    qurt_thread_attr_set_stack_addr(&attr, stack);
    qurt_thread_create(&tid, &attr, func, NULL);
}

栈内存的使用寿命必须长于使用它的线程的生命周期。如果将栈内存作为函数中的局部变量来分配,那么当该函数执行完毕时栈内存就会被释放,但此时线程可能仍在运行中。因此应使用静态分配(如示例所示),或者通过仔细管理内存生命周期来进行堆分配。

误区3:在不知情的情况下发生优先级反转

/* 错误做法:使用手动 spinlock,且不支持优先级继承 */
volatile int lock = 0;
while (__sync_lock_test_and_set(&lock, 1)) { /* 进行无限循环等待 */ }

/* 正确做法:使用支持优先级继承的 QuRT mutex */
qurt_mutex_lock(&mymutex);

如果一个高优先级的线程在等待一个由低优先级线程持有的手动 spinlock,而此时有一个中等优先级的线程抢先获取了该锁,那么高优先级的线程实际上就会被中等优先级的线程阻塞。

QuRT mutex 通过支持优先级继承来解决这个问题:当某个线程持有锁时,它的优先级会暂时提升为最高优先级的线程的优先级。而手动 spinlock 并不具备这种功能。

误区4:未对齐的内存

/* 错误做法 */
char stack[4096];

/* 正确做法 */
char stack[4096] __attribute__((aligned(8)));

/* 对于 DMA 缓冲区,通常需要 256 字节的对齐 */
char dma_buffer[1024] __attribute__((aligned(256)));

线程栈内存必须满足 8 字节的对齐要求;而 DMA 缓冲区通常需要 256 字节的对齐。未对齐的内存会在 Hexagon 架构上导致硬件故障,而且这类故障产生的诊断信息非常少。

误区5:在中断服务程序中发生阻塞

/* 错误做法:使用 mutex_lock 可能会导致无限期的阻塞 */
void isr_handler(void *arg) {
    qurt_mutex_lock(&somemutex);
    qurt_mutex_unlock(&some_mutex);
}

/* 正确做法:使用非阻塞的 try_lock 方法,并在无法获取锁时将任务交给低优先级的线程处理 */
void isr_handler(void *arg) {
    if (qurt_mutex_try_lock(&some_mutex) == QURT_EOK) {
        /* 快速完成操作后释放锁 */
        qurtmutex_unlock(&some_mutex);
    } else {
        /* 将任务交给低优先级的线程处理 */
        qurt_signal_set(&deferred_signal, DEFERRED_WORK);
    }
}

虽然从技术上讲,QuRT 的中断服务程序线程可以调用阻塞型 API,但在高优先级的中断处理程序中使用这些 API会导致中断处理被无限期地阻塞,直到阻塞条件得到解决。因此应使用 qurt_mutex_try_lock() 来尝试获取锁,如果无法获取锁,则应通过信号将任务交给低优先级的线程来处理。

性能优化

使用HVX(Hexagon向量扩展技术)

#include 
#include 

/* 使用HVX一次处理128字节的数据 */
void vectorized_gain(int16_t *audio, int num_samples, int16_t gain)
{
    HVX_vector *vptr = (HVX_Vector *)audio;
    HVX_vector vgain = Q6_Vh_vsplat_R(gain);
    int num_vectors = num_samples * sizeof(int16_t) / sizeof(HVX_vector);

    for (int i = 0; i < num_vectors; i++) {
        vptr[i] = Q6_Vh_vmpy_VhVh_sat(vptr[i], vgain);
    }
}

HVX为Hexagon DSP提供了针对128字节数据的SIMD运算功能。Q6_Vh_vsplat_R这种内置函数可以将一个标量值广播到向量寄存器的所有通道中;Q6_Vh_vmpy_VhVh_sat则用于执行两个半字向量的饱和乘法运算。一条HVX指令可以处理64个16位样本,因此在音频和信号处理任务中,使用HVX相比使用标量代码能够显著提升运行速度。

为热门数据锁定L2缓存

void lock_cache_example(void)
{
    extern float fft_twiddle_factors[];
    size_t twiddle_size = 1024 * sizeof(float);

    /* 将相关数据固定在L2缓存中,防止其被其他数据替换 */
    qurt_mem_l2cache_lock((unsigned int)fft_twiddle_factors,
                           twiddle_size);

    /* 使用完毕后: */
    qurt_mem_l2cache_unlock((unsigned int)fft_twiddle_factors,
                             twiddle_size);
}

qurt_mem_l2cache_lock()这种函数可以将某个内存区域固定在L2缓存中,从而防止它被其他数据替换。这对于那些在循环中会被频繁访问的数据(比如FFT运算中使用的旋转因子数组)来说非常有用。

不过,如果将过多的数据锁定在L2缓存中,就会减少其他线程可用的缓存空间,因此请谨慎使用这种技术。

避免在热门处理路径中使用动态分配内存

/* 不好的做法:在音频处理循环中使用malloc */
void process_audio_bad(void) {
    while (1) {
        float *temp = malloc(1024 * sizeof(float));
        process(temp);
        free(temp);
    }
}

/* 好的做法:提前分配所有所需内存 */
static float temp_buffer[1024];
void process_audio_good(void) {
    while (1) {
        process(temp_buffer);
    }
}

mallocfree这些函数的执行时间具有不确定性,因为它们可能需要遍历空闲内存列表、合并或拆分内存块,而在最坏的情况下,甚至需要向操作系统申请更多内存。

在以48 kHz频率运行的实时音频处理循环中,一次缓慢的内存分配操作就可能导致音频出现异常。因此,建议在程序初始化时提前分配所有所需缓冲区,并重复使用这些缓冲区。

API快速参考

┌─────────────────────────────────────────────────────────────────┐
│                    QuRT API快速参考                     │
├─────────────────┬───────────────────────────────────────────────┤
│ 线程             │                                               │
│  创建            │ qurt_thread_create(&id, &attr, func, arg)     │
│  结束            │ qurt_thread_exit(status)                      │
│ 进入同步线程    │ qurt_thread_join(id, &status)                 │
│ 获取线程ID       │ qurt_thread_get_id()                          │
│ 等待指定时间      │ qurt_timer_sleep(usec)                        │
├─────────────────┼───────────────────────────────────────────────┤
│ 互斥锁           │                                               │
│  初始化          │ qurt_mutex_init(&mutex)                       │
● 锁定            │ qurt_mutex_lock(&mutex)                       │
● 尝试锁定        │ qurt_mutex_try_lock(&mutex)                   │
● 解锁            │ qurt_mutex_unlock(&mutex)                     │
● 删除互斥锁       │ qurt_mutex_destroy(&mutex)                    │
├─────────────────┼───────────────────────────────────────────────┤
│ 信号             │                                               │
│  初始化          │ qurt_signal_init(&signal)                     │
● 等待信号        │ qurt_signal_wait(&sig, mask, attr)            │
● 设置信号状态      │ qurt_signal_set(&signal, mask)                │
● 清除信号状态     │ qurt_signal_clear(&signal, mask)              │
● 删除信号         │ qurt_signal_destroy(&signal)                  │
├─────────────────┼───────────────────────────────────────────────┤
│ 定时器           │                                               │
│  创建            │ qurt_timer_create(&timer, &attr)              │
│ 删除            │ qurt_timer_delete(timer)                      │
● 等待指定时间      │ qurt_timer_sleep(usec)                        │
● 获取系统计时器计数 | qurt_sysclock_get_hw_ticks()                  │
├─────────────────┼───────────────────────────────────────────────┤
│ 内存操作          │                                               │
● 清空缓存         │ qurt_mem_cache_clean(addr, sz, FLUSH)         │
● 使缓存失效       │ qurt_mem_cache-clean addr, sz, INVALIDATE)    │
● 锁定L2缓存        │ qurt_mem_l2cache_lock(addr, size)             │
● 解锁L2缓存      │ qurt_mem_l2cache_unlock(addr, size)           │
├─────────────────┼───────────────────────────────────────────────┤
│ 信号量           │                                               │
│  初始化          │ qurt_sem_init_val(&sem, count)                │
● 减少信号量值     │ qurt-sem_down(&sem)                           │
● 增加信号量值     │ qurt_sem_up(&sem)                             │
● 删除信号量        │ qurt_sem_destroy(&sem)                        │
├─────────────────┼───────────────────────────────────────────────┤
│ 障碍物           │                                               │
│  初始化          │ qurt_barrier_init(&barrier, count)            │
● 等待障碍物       │ qurtBarrier_wait(&barrier)                   │
● 删除障碍物        │ qurt_barrier_destroy(&barrier)                │
└─────────────────┴───────────────────────────────────────────────┘

此表格按类别列出了最常用的QuRT API函数。左列标明了相应的操作类型,右列则显示了这些函数的函数签名。

  • 线程相关操作包括创建、终止、加入线程以及让线程进入睡眠状态。

  • 互斥锁相关操作提供了锁定、尝试锁定以及解锁功能。

  • 信号相关操作支持基于位掩码的通知机制,用于实现等待、设置和清除信号等功能。定时器相关操作则包括创建、删除定时器、让定时器进入睡眠状态,以及读取硬件计时器的数值。

  • 内存相关操作涉及缓存清空与失效处理(这对于跨处理器之间的数据交换尤为重要),同时也包括对那些对性能要求极高的数据的L2缓存进行锁定操作。

  • 信号量与屏障机制进一步完善了同步原语的功能。

后续步骤

本手册介绍了QuRT编程的基础知识,包括线程管理、同步机制、内存操作、定时器使用、中断处理、管道通信、FastRPC技术以及多传感器数据融合流程。接下来,您可以通过一系列循序渐进的学习步骤来深入掌握这些内容。

首先,请下载Hexagon SDK,并在模拟器上运行其中包含的示例项目。位于$HEXAGON_SDK_ROOT/examples/目录下的示例代码通过FastRPC展示了真实的ARM-DSP通信流程,这些示例是了解完整、可运行的项目功能的最佳途径。

请阅读位于$HEXAGON_SDK_ROOT/docs/目录中的QuRT用户指南。该指南详细介绍了本文中提到的所有API函数,同时还涵盖了许多其他未在文中提及的功能(例如QuRT的TLB管理机制和电源管理接口)。

尝试使用Hexagon提供的向量扩展技术HVX进行编程练习。HVX正是Hexagon DSP发挥其高性能的关键所在,学习如何编写向量化DSP代码将是提升系统性能的重要手段。

最后,请准备一块开发板(例如Qualcomm RB5),并在真实的硬件平台上运行您的代码。虽然模拟器可以验证代码的正确性,但只有在实际硬件环境下才能真正了解代码的执行时间、缓存效果,以及它与其他在DSP上运行的软件之间的交互情况。

Hexagon SDK的相关文档位于\(HEXAGON_SDK_ROOT/docs/目录下。QuRT API的参考资料则位于\)HEXAGON_SDK_ROOT/docs/qurt/目录中。Qualcomm开发者网站developer.qualcomm.com提供了更多的资源、论坛和应用说明。而Hexagon DSP架构参考手册则是了解该硬件平台的权威指南。

QuRT是一套功能强大的工具。它不会手把手地指导您使用,但它能让您以微秒级精度控制世界上最先进的DSP架构之一的实时处理能力。虽然学习曲线较为陡峭,但一旦掌握了这些知识,你就会明白为什么会有数十亿设备愿意将它们最关键的任务交给这个小巧的操作系统来处理。

Comments are closed.