Qt开发之Concurrent
QFuture
用于表示异步计算的结果,该类中所有函数均为线程安全,除const_iterator
QFutureWatcher
就是门铃,装个「门铃」,小工干完按铃通知
QFutureWatcher 能监听到啥?除了「干完了(finished)」,还能监听「有中间结果了(resultReadyAt)」「进度变了(progressValueChanged)」
QThread
一般只有需要长期独占线程/精细控制线程属性(优先级,栈大小)等情况才会使用QThread。
QThreadPool
QConcurrent
写在前面:该类的map和filter函数虽然支持几种stl兼容的容器和迭代器类型。但更适用于具有随机访问迭代器的Qt容器如QList。
更推荐使用随机存储迭代器,一是速度更快,二是可以让concurrent通过QFuture::ProgressValue和QFutureWatcher::progressVAlueChanged()提供进度信息。
非就地修改函数如maped,filtered,会在调用时复制容器,如果是stl容器,复制操作需要一些时间,建议指定容器的开始和结束迭代器。
QConcurrent基于Qt的信号槽和线程池,无需手动管理线程。轻松实现并行计算,异步任务,多线程数据处理。使用QConcurrent的程序可以自动根据可用处理器内核数量自动调整使用线程数,这意味着程序未来部署到多核系统仍可继续扩展
- 无需手动管理:底层封装了线程池,自行复用线程,控制并发数,避免线程创建销毁的开销
- 自动异常安全:底层处理线程崩溃,资源泄露等问题
- 支持取消/暂停并发任务:通过QFuture/QfutureWatcher控制并发任务的生命周期,后者需要手动释放
| 组件 / 概念 | 作用 |
|---|---|
QtConcurrent::run |
最基础接口:将单个函数 / 方法放到线程池异步执行 |
QtConcurrent::map |
并行遍历容器(如 QList/QVector),对每个元素执行相同操作即对容器中的每个项目应用一个函数,对项目就地修改 |
QtConcurrent::maped |
类似map,但返回的是一个包含修改内容的新容器。 |
QtConcurrent::mappedReduced |
类似maped,只不过被修改后的结果被缩小或折叠成一个结果。并行处理容器元素,再合并结果(reduce)(如并行求和 / 统计) |
QtConcurrent::filter |
并行过滤容器,保留符合条件的元素 |
QtConcurrent::filtered |
类似filter,但返回的是一个包含过滤结果的新容器。 |
QtConcurrent::filteredReduced |
类似filtered,但修改后的结果被折叠或缩小成一个结果 |
QtConcurrent::task |
创建QConcurrent::QTaskBuilder的实例,该对象可以用来调整参数和在单独的线程中启动任务。 |
QFuture |
表示 “未来的结果”:获取异步任务的状态(运行 / 完成 / 取消)、返回值 |
QFutureIterator |
可以遍历QFuture获得的结果 |
QFutureWatcher |
监听 QFuture 的状态变化(如任务完成、进度更新),通过信号槽通知主线程 |
QFutureSynchronizer |
自动同步多个QFutrue |
QPromise |
提供一种向QFuture报告异步计算的进度和结果的方法。当QFuture请求时,允许暂停或取消任务。 |
QThreadPool |
底层线程池:默认全局线程池(QThreadPool::globalInstance()),可自定义线程数 |
并行Map
1 | #include<QtConcurrentMap> |
QtConcurrent::map()、QtConcurrent::mapped() 和 QtConcurrent::mappedReduced() 函数在一个序列(如QList )中并行运行计算。QtConcurrent::map() 就地修改一个序列,QtConcurrent::mapped() 返回一个包含修改内容的新序列,QtConcurrent::mappedReduced() 返回一个结果。
这些函数都有阻塞变体,他们返回最终结果(本例中为QList
1 | QList<QImage> images = ...; |
并发映射
mapped接受一个输入序列和一个map函数,然后序列中每个项都会运行这个映射函数并返回一个包含映射函数返回值的新序列,结果可以用QFuture获取。映射函数形式:U function(const T &t);
在映射函数形式上,和QConcurrent::map的区别在于,一是有const,二是U这个返回值和返回值类型会被用到。
1 | QImage scaled(const QImage &image){return image.scaled(100, 100);} |
由于序列是就地修改的,QtConcurrent::map() 不会通过QFuture 返回任何结果。不过,仍然可以使用QFuture 和QFutureWatcher 来监控映射的状态。
并发映射和延续
QtConcurrent::mapped() 调用的结果是一个包含多个结果的QFuture 。当将.then() continuation 附加到这样的QFuture 时,请确保使用将QFuture 作为参数的 continuation,否则只会处理第一个结果:
1 | auto process= [](int val) {return val* 2; }; |
在本例中,badFuture 只打印一个结果 .then(告诉QFuture只取QFuture结果篮子里面的一个结果,QFuture 一看只要一个,就只把第一个结果(2)给你,剩下的都扔了
而goodFuture 则打印所有结果。.then() 里写的是 (QFuture<int> f),意思是「把装所有结果的篮子给我」用 f.results() 把里面所有结果都取出来,再用 for 循环挨个打印
重点在 .then(下一步要干的活)
- 如果下一步要干的活只要「一个数」(
(int val)),就只拿篮子里第一个; - 如果下一步要干的活要「整个篮子」(
(QFuture<int> f)),就能把所有结果都拿出来用。
并发Map-Reduce
QtConcurrent::mappedReduced() 类似于 QtConcurrent::mapped(),但不是返回一个包含新结果的序列,而是使用 reduce 函数将结果合并为一个值。
reduce 函数的形式必须是V function(T &result, const U &intermediate)T 是最终结果的类型,U 是映射函数的返回类型。注意,不使用 reduce 函数的返回值和返回类型。
- Map 阶段:让 5 个家人各自算自己的月薪(每人算自己的,同时干)多线程异步;
- Reduce 阶段:您把 5 个人的月薪挨个加起来,得到总工资(只有您一个人加,不会算乱)所以是线程安全的。不用锁,因为 Qt 保证只有一个线程在合并;
1 | Map函数,计算自己的月薪 |
既然reduce是合并,那么肯定有一个先后顺序。默认是谁快谁先拼
- UnorderedReduce(默认):哪个孙子切好苹果块,就先拼哪个 —— 比如先拼老三的,再拼老大的,顺序乱,但速度快;
- OrderedReduce:必须按原始图片的顺序拼 —— 先拼第 1 张图的缩略图,再拼第 2 张,哪怕第 2 张的缩略图后做好,也要等,顺序不乱,但速度稍慢。
序列变体,迭代器范围变体,阻塞变体
上述函数均为序列变体,他们每个都还有一个用迭代器范围的变体使用方法同序列变体
1 | QList<QImage> images = ...; |
也有一个阻塞变体,它返回最终结果而不是QFuture
1 | QList<QImage> images = ...; |
使用成员函数
QtConcurrent::map()、QtConcurrent::mapped()和 QtConcurrent::mappedReduced() 接受指向成员函数的指针。成员函数类的类型必须与存储在序列中的类型相匹配:
没什么特别的,就是上述函数的参数二允许是参数一的类型里面的成员函数指针。
1 | // 在 QStringList 中挤压所有字符串。 |
注意qOverload<指定所需参数的重载版本> 的使用。需要使用它来解决有多个重载的方法的歧义问题。因为虽然指定了使用int的qset的成员函数,但是即便如此她也是有很多版本的
insert(const int &value)→ 放一个整数进去(要的这款);insert(Iterator pos, const int &value)→ 往指定位置放一个整数;insert(const QSet<int> &other)→ 把另一个整数集合全放进来;
请注意,在使用 QtConcurrent::mappedReduced() 时,可以自由混合使用普通函数和成员函数:
1 | // 1. 普通函数(公共工具):计算一张图片的颜色分布值(返回一个整数) |
使用函数对象
QtConcurrent::map()、QtConcurrent::mapped()和QtConcurrent::mappedReduced()接受map函数的函数对象。这些函数对象可用于为函数调用添加状态
普通函数(没状态):只能数个数,记不住其他
1 | // 普通函数:只能返回苹果数,记不住是第几盘 |
函数对象(带状态):既能数个数,还能记状态
1 | // 函数对象 |
- 用普通函数时,每次调用都是「独立的」,没法记「之前干了啥」「累计了啥」;
- 用函数对象时,函数对象里的成员变量(比如上面的
total/plateIndex)就是「状态」,每次调用函数对象干活时,都能更新 / 读取这些状态; - QtConcurrent 会把这个函数对象传给每个线程,让每个线程的「干活逻辑」都能访问 / 修改状态(注意:如果多线程改同一个状态,要加锁,QtConcurrent::mappedReduced 的 Reduce 阶段不用锁,因为只有一个线程改)。
使用 Lambda 表达式
QtConcurrent::map()、QtConcurrent::mapped()和 QtConcurrent::mappedReduced() 接受 map 和 reduce 函数的 lambda 表达式
当使用 QtConcurrent::mappedReduced() 或 QtConcurrent::blockingMappedReduced() 时,您可以自由混合使用普通函数、成员函数和 lambda 表达式。
无需多言
封装包含多个参数的函数
如果要使用一个包含多个参数的 map 函数,可以使用 lambda 函数或std::bind() 将其转换为一个包含一个参数的函数。
例如,我们将使用 QImage::scaledToWidth():
1 | QImage QImage::scaledToWidth(int width, Qt::TransformationMode) const; |
scaledToWidth 需要三个参数(包括 “this “指针),并且不能直接与 QtConcurrent::mapped() 一起使用,因为 QtConcurrent::mapped() 期望使用一个参数的函数。要在 QtConcurrent::mapped() 中使用 QImage::scaledToWidth(),我们必须提供宽度值和转换模式:
1 | QList<QImage> images = ...; |
并行Filter
1 | #include <QtConcurrentFilter> |
QtConcurrent::filter()、QtConcurrent::filtered() 和 QtConcurrent::filedReduced() 函数并行过滤一个序列中的项目,如QList 。QtConcurrent::filter() 就地修改序列,QtConcurrent::filtered() 返回包含过滤内容的新序列,QtConcurrent::filteredReduced() 返回单一结果。
上述每个函数都有一个阻塞变体,它返回最终结果而不是QFuture 。使用它们的方法与异步变体相同。
并行Filter-Reduce
整体都类似map与map-reduce,不多赘述
并发运行
1 | #include <QtConcurrentRun> |
主要介绍QConcurrent::run(),该函数主要是分配一个线程去执行一个函数,返回值由QFutureApi提供
有两种运行模式,下面分开介绍。区别就是
- 普通模式(基本模式):小工干完活,只跑过来跟您说一句「活儿干完了,结果是 XXX」;
- 带承诺模式(QPromise):小工干活时,能随时跟您说「我干到 50% 了」「我算出一个中间结果 XXX」,您还能喊「先停一下」「别干了」。
基本模式
1 | extern void aFunction(); |
这将在从默认QThreadPool 获取的独立线程中运行aFunction 。您可以使用QFuture 和QFutureWatcher 类监控函数的状态。
要使用专用线程池,可以将QThreadPool 作为第一个参数:
1 | extern void aFunction(); |
向函数传递参数的方法是将参数添加到 QtConcurrent::run() 调用中,紧跟在函数名称之后。例如
1 | extern void aFunctionWithArguments(int arg1, double arg2, const QString &string); |
在调用 QtConcurrent::run() 时,会复制每个参数,这些值会在线程开始执行函数时传递给线程。调用 QtConcurrent::run() 后对参数所做的更改线程是***看不到***的。
请注意,QtConcurrent::run 不支持直接调用重载函数。例如,下面的代码将无法编译:
1 | void foo(int arg); |
最简单的解决方法
通过 lambda 调用重载函数:
QFuture<void> future = QtConcurrent::run([] { foo(42); });static_cast告诉编译器选择哪个重载函数:QFuture<void> future = QtConcurrent::run(static_cast<void(*)(int)>(foo), 42);qOverload :
QFuture<void> future = QtConcurrent::run(qOverload<int>(foo), 42);
函数的任何返回值都可以通过QFuture 获取:
1 | extern QString functionReturningAString(); |
如果不需要结果(例如,因为函数返回void ),使用QThreadPool::start() 重载获取函数对象会更有效。什么意思呢?
- QtConcurrent::run () = 雇小工干活,还要拿结果条不管小工干的活有没有返回值,
QtConcurrent::run()都会给您一个「QFuture 结果条」—— 哪怕结果条是空的(函数返回 void),它也会造一个,只是里面没东西。多了造QFuture、监控更新状态、函数返回值写入future步骤。 - QThreadPool::start () = 雇小工干活,不要结果条如果小工干的活「不用返结果」(比如只是擦桌子、倒垃圾,干完就行,不用告诉「擦完了」),用这个方式更高效 —— 因为不用造「结果条」,少了折腾,省内存、省时间。
1 | // 方式1:用QtConcurrent::run()(没必要,因为擦桌子不用返结果) |
请注意,QFuture::result() 函数会阻塞并等待结果可用。当函数执行完毕、结果可用时,请使用QFutureWatcher 获取通知。—-「result()是『站在门口等小工干完』(会堵着不动),QFutureWatcher是『装个「门铃」,小工干完按铃通知』(不耽误你干别的)」
调用方式
核心差异,为了线程安全,所以是默认拷贝传参,就有下面的区别,不是引用没用,而是被拷贝了一份。
为了在新线程执行函数,会偷偷把参数拷贝一份,即便参数是引用,修改的也是拷贝副本的引用。
必须要说明的是成员函数和普通函数传递引用的区别。一个是针对对象,一个是变量,前者除了对象还要有变量,变量部分同后者,对象部分做区分。
成员函数传引用实际上是对象的指针,是地址。普通函数传引用是变量,需要ref包装否则就拷贝了。
- 调用成员函数(类的专属工具)bat.getCount();
- 常量成员函数(不修改对象,比如 QByteArray::split):传「对象本身」→ 例子:
run(&QByteArray::split, bytearray, ' '),干完不改动原 bytearray; - 非常量成员函数(修改对象,比如 QImage::invertPixels):传「对象引用(&image)」→ 例子:
run(&QImage::invertPixels, &image, mode),干完原 image 的像素会被改。当然如果mode这个变量在函数定义中是引用的话即需要修改这个变量,同下面第三点一样,还是需要stdRef包装
- 调用 Lambda 函数
直接把要干的活写在[]() { ... }里,丢给run(),代码块会在单独线程跑:→ run([=]() { 要干的活 })
- 调用修改「引用参数」的函数(比如 addOne (int &n))
必须用std::ref(n)传参,不然改的是副本,原变量不变:→ run(&addOne, std::ref(n)),干完 n 从 42 变 43。std::ref(n)不是普通的引用,它是一个「引用包装器」—— 可以把它理解成「给 n 套了个 “引用马甲”」,让QConcurrent::run识别:「这个参数别拷贝了,直接传原变量的引用过去」。
- 调用可调用对象(像函数的类,比如 TestClass)
- 改原对象:传
std::ref(o)→run(std::ref(o), 15),o.s 变 15; - 改副本:直接传 o →
run(o, 42),原 o.s 不变; - 临时对象:传
TestClass()→ 用完就扔; - 别传
&o:编译报错(不支持对象指针)。
核心就这 4 点,记住第三点「改原对象传指针 /std::ref,不改传本身」就行
带Promise模式
在Run With Promise(带承诺运行)模式下,传递给 QtConcurrent::run() 的函数第一个参数必须是QPromise<T> & 类型的附加参数,其中T 是计算结果的类型(它应该与 QtConcurrent::run() 返回的QFutureT 相匹配,且该函数返回值必须为void例如:
1 | extern void aFunction(QPromise<int> &promise);下面数苹果的例子用了std::move |
暂停、取消方法
如果需要,QPromise API 还可以暂停和取消计算
- future.suspend();
调用此方法后,运行任务将在其迭代循环中下一次调用promise.suspendIfRequested() 后暂停。
在这种情况下,运行中的任务将阻塞对promise.suspendIfRequested() 的调用。
- future.resume();
被阻塞的调用将在调用future.resume() 后解除阻塞。
请注意,suspendIfRequested() 内部使用等待条件来解除阻塞
因此运行中的线程会进入空闲状态,而不是在阻塞时浪费资源
以便定期检查恢复请求是否来自调用者的线程。
- future.cancel();
调用导致下一次对promise.isCanceled() 的调用将返回true
而aFunction 将立即返回,不再报告任何结果。
1 | // 小工的干活逻辑(迭代循环数苹果) |
注意: 无需调用QPromise::start() 和QPromise::finish() 来指示计算的开始和结束(通常使用QPromise 时会这样做)。QtConcurrent::run() 总是会在开始和结束执行之前调用它们。
进度报告
也可以独立于结果报告来报告任务的进度,例如
1 | void aFunction(QPromise<int> &promise) { |
调用者为 QtConcurrent::run() 返回的QFuture 安装QFutureWatcher ,以便连接到progressValueChanged() 信号,并相应地更新图形用户界面等。
默认情况下,QtConcurrent::run() 在*”带承诺运行*”模式下不支持带有重载操作符()()的函数。在重载函数的情况下,用户需要明确指定结果类型作为模板参数传递给 QtConcurrent::run(),例如
1 | struct Functor { |
写一个「小工数 100 个苹果,每数 10 个报一次结果,还能听指挥停工」的例子:
1 | // 第一步:定义干活的函数(小工的活),参数里要加QPromise<int>(对讲机) |
QPromise<int> promise:您和小工的「对讲机」,<int>表示小工报的结果是整数;promise.start():小工打开对讲机,准备说话;promise.addResult(i):小工用对讲机喊「我数到 i 个了」(报多个结果);promise.setProgressValue(i):小工喊「我干到 i% 了」(报进度);promise.isCanceled():小工查对讲机,看您是不是喊「停工」;future.cancel():您用结果条喊「小工别干了」;QFutureWatcher:您的「消息接收器」,专门听小工的结果和进度,不用自己一直盯着
并发任务
1 | #include <QtConcurrentTask> |
QtConcurrent::task 提供了在独立线程中运行任务的替代接口。函数的返回值通过QFuture API 提供。
如果您只想在独立线程中运行一个函数,而不想调整任何参数,请使用QtConcurrent::run ,因为这样可以减少代码的编写。QtConcurrent::task 专为需要执行额外配置步骤的情况而设计。



