上篇回顾: ArkTS开发系列之Web组件的学习(2.9)
本篇内容:ArtTS语言基础类库-异步同步并发内容的学习(2.10.1)
一、知识储备
1. 异常并发
Promise和Async/await提供异步并发能力,是标准的JS异步语法
- Promise是一种用于处理异常操作的对象, 可以将异步操作转换为类似于同步操作的风格,以方便代码编写和维护。
- 分别有pending(进行中)、fulfilled(已完成)和rejected(已拒绝)。
- 下面是个例子
promiseFunction() {
const promise = new Promise((resolve, reject) => {
setTimeout(() => {//通过 setTimeout来模拟一个异步操作 此时处于进行中状态(pending)
const randomNumber = Math.random();
if (randomNumber > 0.5) {//随机数大于0.5, 则正常返回,否则返回异常信息
resolve(randomNumber)
} else {
reject(new Error('Random number is too small'))
}
}, 1000)
})
promise.then(result => {// fulfilled 成功完成回调状态
this.tips = "异步结果:" + result;
promptAction.showToast({ message: this.tips });
}).catch(err => {// rejected 已被拒绝的回调状态
this.tips = '异步结果:' + err.message
promptAction.showToast({ message: this.tips });
})
}
async/await是一种用于处理异步操作的Promise语法糖, 通过使用async关键字来声明一个函数为异步函数, 并使用await关键字等promise的成功或拒绝回调。以同步的方式来编写异步代码,使代码更为简洁、易读。
async asyncPromiseFunction() {
const result = await new Promise((resolve, reject) => {
setTimeout(() => {
resolve('success')
}, 1000)
})
console.log('结果:' + result)
this.tips = "异步结果:" + result;
promptAction.showToast({ message: this.tips });
}
2.1 多线程并发
常见的并发模型分为基于内存共享并发模型和基于消息通信并发模型两种。
Actor并发模型作为基于消息通信并发模型的典型代表,不需要开发者去面对锁带来的一系列复杂偶发的问题,同时并发度也相对较高。
由于Actor模型的内存隔离特性,所以需要进行跨线程的数据序列化传输。
- 数据传输对象分为普通对象、可转移对象、可共享对象、Native绑定对象四种
- 普通对象采用结构化克隆算法(Structured Clone)进行序列化传输,此算法可以通过递归的方式拷贝传输对象,相较所支持的对象类型更加丰富。
- 可传输对象包括除Symbol之外的基础类型、Date、String、RegExp、Array、Map、Set、ArrayBuffer、TypedArray、Object(仅限简单对象,比如通过"{}"或“new object”创建。仅支持传递属性、不支持传递其原型及方法)
- 可转移对象采用地址转移进行序列化传输,不需要内容拷贝,会将ArrayBuffer的所有权转移给接收该ArrayBuffer的线程,转移后该ArrayBuffer在发送它的线程中变为不可用, 不允许再访问。
- 定义可转移对象
let buffer = new ArrayBuffer(100);
- 可共享对象指支持在多线程之间传递SharedArrayBuffer对象。传递之后的SharedArrayBuffer对象和原始的可以指向同一块内存,进而达到内存共享的目的。
- SharedArrayBuffer对象存储的数据在同时被修改时,需要通过原子操作保证其同步性,即下个操作开始之前,务必需要等到上个操作已经结束。
- 定义可共享对象
let sharedBuffer = new SharedArrayBuffer(1024);
- Native绑定对象(Native Binding Object),是系统所提供的对象,该对象与底层系统功能进行绑定,提供直接访问底层系统功能的能力。
- 当前支持序列化传输的Native绑定对象主要包含:Context和RemoteObject.
- Context对象包含应用程序组件的上下文信息,并提供了一种访问系统服务和资源的方式,方便开发者开发的应用与系统进行交互。
- RemoteObject对象的主要作用是实现远程通信的功能,它允许在不同的进程间传递对象的引用 。使得不同进程之间可以共享对象的状态和方法,服务提供者必须继承此类。
2.2 TaskPool(任务池)和Worker
实现特点及使用场景直观对比
实现 | TaskPool | Worker | 差异点 |
---|---|---|---|
内存模型 | 线程间隔离,内存不共享 | 线程间隔离,内存不共享 | 相同 |
参数传递机制 | 采用标准的结构化克隆算法(Structured Clone)进行序列化、反序列化,完成参数传递。支持ArrayBuffer转移和SharedArrayBuffer共享。 | 采用标准的结构化克隆算法(Structured Clone)进行序列化、反序列化,完成参数传递。支持ArrayBuffer转移和SharedArrayBuffer共享。 | 相同 |
参数传递 | 直接传递,无需封闭,默认进行transfer。 | 消息对象唯一参数,需要自己封装。 | 是否需要封装不同 |
返回值 | 异步调用后默认返回 | 主动发送消息,需要在onMessage解析赋值 | 接收返回值方式不同 |
生命周期 | TaskPool自动管理生命周期,无需关心任务负载高低问题 | 开发者自动管理Worker的数量及生命周期 | 是否需要开发者管理不同 |
任务池个数上限 | 自动管理,无需配置 | 同个进程下,最多支持同时开启8个Worker线程 | 开启线程个数不同 |
任务执行时长上限 | 无限制 | 无限制 | 相同 |
设备任务的优先级 | 不支持 | 不支持 | 相同 |
执行任务的取消 | 支持取消任务队列中等待的任务 | 不支持 | Worker不支持取消任务 |
适用场景 | 需要频繁取消的任务,例如图库大图浏览场景,为提升体验,会同时缓存当前图片左右侧各2张图片,适合使用TaskPool;大师或者调度点分散的任务,例如大型应用的多个模块包含多个耗时任务,不方便使用8个Worker去做负载管理。推荐使用TaskPool | 有关联的一系列同步任务,例如每次创建、使用不同句柄,且句柄需要永久保存,保证使用该句柄操作,需要使用Worker。 |
- TaskPool 注意事项
- 实现任务的函数需要使用装饰器@Concurrent标注,且仅支持在.ets文件中使用
- 实现任务的函数入参,需满足序列化支持的类型,
- 由于线程中上下文对象是不同的,因此TaskPool工作线程中只能使用线程安全的库。不能做ui相关操作,
- 序列化传输的数据量大小限制为16MB
- Worker 注意事项
- 创建Worker时,传入的Worker.ts路径在不同版本有不同的规则
- Worker创建后需要手动管理生命周期,且同进程中最多同时运行的Worker子线程数量为8个
- Ability类型的Modul支持使用Worker,Library类型的Module不支持使用Worker
- 创建Worker不支持使用其他Module的Wroker.ts文件,即不支持跨模块调用Worker
- 由于不同线程中上下文对象是不同的,因此Worker线程只能使用线程安全的库,不能做ui相关操作。
- 序列化传输的数据量大小限制为16MB
- 文件路径注意事项
- 创建Worker时,需要传入Worker的路径,Worker文件存放位置默认路径为Worker文件所在目录与pages目录同级
//导入模块 import worker from '@ohos.worker'; //APi8及之前 const worker = new worker.Worker(scriptUrl); //APi 9及之后 const worker = new worker.ThreadWorker(scriptUrl); //写法一 const worker = new worker.ThreadWorker('entry/ets/workers/MyWorker.ts', {name: "name"}); //写法二 const worker = new worker.ThreadWorker('@bundle:com.aji.study/entry/ets/workers/worker')
- 创建Worker时,需要传入Worker的路径,Worker文件存放位置默认路径为Worker文件所在目录与pages目录同级
2.3 @Concurrent装饰器
使用TaskPool时,执行的头发函数需要使用该装饰器修饰,否则无法通过相关校验。
注意
- 仅支持在Stage模型且module的compileMode为esmodule的project中使用taskpool api。确认module的compileMode方法:查看当前module的build-profile.json5,在buildOption中补充"compileMode": “esmodule”。
@Concurrent function add(numa: number, numb: number): number { //声明一个加法函数 return numa + numb; } async function calcTotal(): Promise<void> { try { let task: taskpool.Task = new taskpool.Task(add, 1, 2) //创建一个任务 let count = await taskpool.execute(task); console.error("taskpool result is: " + count) //任务池执行任务 } catch (err) { console.error("taskpool err is " + err.message); } }
二、效果一览
三、源码剖析
import promptAction from '@ohos.promptAction';
import fs from '@ohos.file.fs';
import common from '@ohos.app.ability.common';
import taskpool from '@ohos.taskpool';
import Handle from './Handle';
import worker from '@ohos.worker';
function promiseFunction() {
const promise = new Promise((resolve, reject) => {
setTimeout(() => { //通过 setTimeout来模拟一个异步操作 此时处于进行中状态(pending)
const randomNumber = Math.random();
if (randomNumber > 0.5) { //随机数大于0.5, 则正常返回,否则返回异常信息
resolve(randomNumber)
} else {
reject(new Error('Random number is too small'))
}
}, 1000)
})
promise.then(result => { // fulfilled 成功完成回调状态
let tips = "异步结果:" + result;
promptAction.showToast({ message: tips });
}).catch(err => { // rejected 已被拒绝的回调状态
let tips = '异步结果:' + err.message
promptAction.showToast({ message: tips });
})
}
async function asyncPromiseFunction() {
const result = await new Promise((resolve, reject) => {
setTimeout(() => {
resolve('success')
}, 1000)
})
console.log('结果:' + result)
let tips = "异步结果:" + result;
promptAction.showToast({ message: tips });
}
async function write(data: string, file: fs.File): Promise<void> { //定义的I/O任务逻辑
fs.write(file.fd, data).then((writeLen: number) => {
console.info('write data length is : ' + writeLen); //写入的文本长度
}).catch(err => {
console.error(`failed code: ${err.code}, msg: ${err.message}`) //异常捕获 并输出日志
})
}
async function writeFunction(): Promise<void> {
let context = getContext() as common.UIAbilityContext;
let filePath: string = context.filesDir + "/test.txt";
let file: fs.File = await fs.open(filePath, fs.OpenMode.READ_WRITE | fs.OpenMode.CREATE)
write('我要学鸿蒙', file).then(() => {
console.info("write success")
fs.close(file);
}).catch(err => {
console.error(`write failed code: ${err.code}, msg: ${err.message}`) //异常捕获 并输出日志
fs.close(file);
})
}
@Concurrent
function add(numa: number, numb: number): number { //声明一个加法函数
return numa + numb;
}
async function calcTotal(): Promise<void> {
try {
let task: taskpool.Task = new taskpool.Task(add, 1, 2) //创建一个任务
let count = await taskpool.execute(task);
console.error("taskpool result is: " + count) //任务池执行任务
} catch (err) {
console.error("taskpool err is " + err.message);
}
}
@Concurrent //步骤1 定义并发函数,内部调用同步方法
function age(num: number): boolean {
Handle.syncSet(num);
return true;
}
async function asyncGet() { //步骤2 创建任务并执行
let task = new taskpool.Task(age, 3);
let result = await taskpool.execute(task);
console.error(`age is : ${result}`)
}
@Entry
@Component
struct Index { //异步和并发
@State tips: string = '异步结果'
build() {
Column() {
Button('Promise异步')
.onClick(() => {
promiseFunction();
})
.margin(20)
Button('async异步')
.onClick(() => {
asyncPromiseFunction();
})
.margin(20)
Button('I/O')
.onClick(() => {
writeFunction();
})
.margin(20)
Button('年龄')
.onClick(() => {
calcTotal();
asyncGet(); //执行并发操作
})
.margin(20)
Button('worker')
.onClick(() => {
// let w = new worker.ThreadWorker('@bundle:com.aji.first/entry/ets/workers/MyWorker');
let w = new worker.ThreadWorker('entry/ets/workers/MyWorker.ts');
w.onmessage = function (msg) {
console.error('msg: ' + JSON.stringify(msg))
}
w.onerror = function (err) {
console.error("msg err:" + JSON.stringify(err))
}
w.postMessage({ 'type': 0, 'age': 19 })
w.postMessage({ 'type': 1 })
setTimeout(() => {
w.terminate();
console.error('msg destroy')
}, 1000)
})
}.width('100%')
.height('100%')
}
}
import worker, { ThreadWorkerGlobalScope } from '@ohos.worker';
import Handle from '../pages/base/Handle';
var workerPort: ThreadWorkerGlobalScope = worker.workerPort;
var handler = new Handle();
workerPort.onmessage = function (msg) {
console.error('msg workerPort: ' + JSON.stringify(msg))
switch (msg.data.type) {
case 0:
Handle.syncSet(msg.data.age);
workerPort.postMessage('success set')
break;
case 1:
Handle.syncGet();
workerPort.postMessage('success get')
break;
}
}
workerPort.onerror = function onErr(err) {
}
export default class Handle {
static age: number;
static instance: Handle = new Handle();
constructor() {
}
static getInstance() {
return this.instance;
}
static syncGet(): number {
return this.age;
}
static syncSet(num: number) {
this.age = num;
return;
}
}