专用工作者线程
可以在网页中创建 专用工作者线程 用来执行在 浏览器主线程 之外的其他任务。
我们可以在 专用工作者线程 中与 父页面 交换消息、发送网络请求、执行文件输入/输出、进行密集计算、处理大量数据。
专用工作者线程的基本概念
创建专用工作者线程
创建 专用工作者线程 最常见的方式是 加载 Javascript 文件。把 文件路径 提供给 Worker 构造函数。
然后 构造函数 再在后台异步加载脚本并且实例化 工作者线程。
// 空的 JS 工作者线程文件
const worker = new Worker('./emptyWorker.js');
console.log(worker) {}
//可能还没有初始化完成
工作者线程安全限制
工作者线程的 Javascript 文件 只能从父页面相同的源加载。 从其他源加载工作者线程的脚本文件会导致错误。
const worker = new Worker(new URL("./workers/test.worker", import.meta.url));
// 尝试基于 https://www.baidu.com 创建工作者线程
const worker2 = new Worker(new URL("https://www.baidu.com/workers/test.worker", import.meta.url));
// Access to script at 'https://www.baidu.com/workers/test.worker' from origin 'https://local.innodealing.com:8000'
// has been blocked by CORS policy: No 'Access-Control-Allow-Origin' header is present on the requested resource.
使用 Worker 对象
Worker()
构造函数返回的 Worker 对象
, 是与刚创建的 专用工作者线程 通信的连接点。
它可用于 工作者线程 和 父上下文 间传输信息, 以及捕获 专用工作者线程 发出的事件。
Worker 对象
支持下列事件处理程序属性:
-
onerror: 在 工作者线程 中发生 ErrorEvent 类型的错误事件时触发该事件
- 该事件在 工作者线程 中抛出错误的时候触发。
- 该事件也可以通过 worker.addEventListener('error', handler) 的形式处理
-
onmessage: 在 工作者线程 中发生 MessageEvent 类型的消息事件时触发该事件
- 该事件在 工作者线程 中向父上下文发送消息触发。
- 该事件也可以通过 worker.addEventListener('message', handler) 的形式处理
- 该事件在 工作者线程 中向父上下文发送消息触发。
-
onmessageerror: 在 工作者线程 中发生 MessageEvent 类型的错误事件时触发该事件
- 该事件在 工作者线程 中收到 无法反序列化的消息时发生
- 该事件也可以通过 worker.addEventListener('messageerror', handler) 的形式处理。
Worker 对象
此外还支持下列方法:
postMessage()
: 用于通过异步消息事件向 工作者线程 发送消息。
terminate()
: 用于立即终止 工作者线程
DedicatedWorkerGlobalScope
在专用工作者线程内部,全局作用域是 DedicatedWorkerGlobalScope
。
因为继承自 WorkerGlobalScope
,所以包含它的所有属性和方法。工作者线程可以通过 self 关键字访问。
console.log('inside worker:', self);
const worker = new Worker('./globalScopeWorker.js');
console.log('created worker:', worker);
// created worker: Worker {}
// inside worker: DedicatedWorkerGlobalScope {}
main.js
和 工作者线程 中的 console
对象 都会写入浏览器控制台。
DedicatedWorkerGlobalScope
在 WorkerGlobalScope
基础上增加了以下方法:
-
postMessage(): 从工作者线程内部向主线程上下文发送消息。
-
close(): 用于立即终止 工作者线程。
-
importScripts(path): 用于向工作者线程中导入任意数量的脚本。
专用工作者线程的生命周期
调用 Worker() 构造函数 是一个专用工作者线程的一个入口。
调用之后 主线程中虽然可以立即使用这个 Worker 对象
,但是与之关联的 工作者线程 可能还没有创建完成。因为存在脚本请求的 网络延迟 和 初始化延迟。
一般来说,专用工作者线程 非正式区分三种状态:
- 初始化(initializing)
- 活动(active)
- 终止(terminated)
这三种状态对其他线程是不可见的, 也无法分别 Worker 对象
当前所处的一个状态。
初始化的时候 虽然 工作者线程 的脚本尚未执行 但是可以先把要发送给 工作者线程 的消息加入队列。
这些消息会等待 工作者线程 的状态变为活动, 在执行消息队列里面的代码。
self.addEventListener('message', ({data}) => console.log(data));
// foo
// bar
// baz
const worker = new Worker('./initializingWorker.js');
// Worker 可能仍处于初始化状态
// 但 postMessage()数据可以正常处理
worker.postMessage('foo');
worker.postMessage('bar');
worker.postMessage('baz');
创建之后 专用工作者线程 就会伴随页面的整个生命周期存在 除非手动调用 close 方法 。
内部终止和外部终止的区别
内部终止
self.postMessage('foo');
self.postMessage("foo");
self.close();
self.postMessage("bar1");
self.postMessage("bar2");
self.postMessage("bar3");
self.postMessage("bar4");
self.postMessage("bar5");
self.postMessage("bar6");
self.postMessage("bar7");
self.postMessage("bar8");
setTimeout(() => self.postMessage("baz"), 0);
const worker = new Worker('./closeWorker.js');
worker.onmessage = ({data}) => console.log(data);
// foo
// bar
// bar1
// bar2
// bar3
// bar4
// bar5
// bar6
// bar7
// bar8
虽然调用了 close() 但显然 工作者线程 的执行并没有立即终止。
close() 在这里会通知 工作者线程 取消事件循环中的所有任务 阻止继续添加新任务。
工作者线程 不需要执行同步停止 因此在 父上下文 的事件循环中处理的其他数据仍会打印出来。
外部终止
self.onmessage = ({data}) => console.log(data);
const worker = new Worker('./terminateWorker.js');
// 给 1000 毫秒让工作者线程初始化
setTimeout(() => {
worker.postMessage('foo');
worker.terminate();
worker.postMessage('bar');
setTimeout(() => worker.postMessage('baz'), 0);
}, 1000);
//foo
父上下文 先给 工作者线程 发送了带foo
的postMessage
这条消息可以在外部终止之前处理。
一旦调用了 terminate() 工作者线程的 消息队列 就会被清理并锁住。它会将关联的工作者线程标记为终止, 它们的执行也会立即停止。
在 JavaScript 行内创建工作者线程
工作者线程 需要基于脚本文件来创建, 但这并不意味着脚本必须是远程资源。
专用工作者线程 也可以通过 Blob 对象 URL 在行内脚本创建。这样可以更快速地初始化工作者线程。因为没有网络延迟。
const workerScript = `
self.onmessage = ({data})=>{
console.log(data)
}
`
const workerScriptBlob = new Blob([workerScript])
const workerScriptBlobUrl = URL.createObjectURL(workerScriptBlob);
const worker = new Worker(workerScriptBlobUrl);
worker.postMessage('blob worker script')
// blob worker script
在这个例子中:
- 通过脚本字符串创建了 Blob。
- 在通过Blob创建了对象 URL。
- 最后把对象 URL 传给了
Worker()
构造函数。
在工作者线程中动态执行脚本
在 工作者线程 中 , 可以使用importScripts()
方法通过 编程方式 加载 执行任意脚本。
const worker = new Worker('./worker.js')
console.log('scriptA executes')
console.log('scriptB executes')
console.log('importing scripts');
importScripts('./scriptA.js');
importScripts('./scriptB.js');
console.log('scripts imported');
// importing scripts
// scriptA executes
// scriptB executes
// scripts imported
importScripts()
方法可以接收任意数量的脚本作为参数, 浏览器下载它们的顺序没有限制。
但是执行则会 严格 按照它们在 参数列表的顺序 进行。
脚本加载受到常规CORS的限制 , 但是 工作者线程 内部请求来自任何源的脚本。
这里的脚本导入策略类似于使用生成的 <script>
。
委托任务到子工作者线程
有时候可能需要在 工作者线程 , 在有多个 CPU 核心的时候 使用多个 子工作者线程 可以实现并行计算。
使用多个 子工作者线程 前要考虑周全。确保并行计算的投入确实能得到收益。毕竟同时运行多个子线程会有很大计算成本。
const worker = new Worker('./worker.js')
console.log('worker')
const worker = new Worker('./subworker.js')
console.log('subworker')
处理工作者线程错误
try/catch
没有捕获到 工作者线程 的运行错误。
try {
const worker = new Worker('./worker.js')
} catch (error) {
console.log('caught error')
}
// no error
throw new Error("foo");
但是相应的错误事件仍然会冒泡到 工作者线程 的全局上下文。可以通过在Worker
对象上监听错误事件访问到。
const worker = new Worker('./worker.js');
worker.onerror = console.log;
// ErrorEvent { isTrusted: true, message: "Uncaught Error: foo" }
throw new Error("foo");
与专用工作者线程通信
与 工作者线程 的通信都是通过异步消息完成的, 但是这些消息可以有多种形式。
使用 postMessage
最简单也是最常用的方式是使用 postMessage
传递消息。
const worker = new Worker('./factorialWorker.js');
worker.onmessage(({data})=>{
console.log(data)
});
worker.postMessage(5);
worker.postMessage(7);
worker.postMessage(10);
// 5!=120
// 7!=5040
// 10!=3628800
function factorial(n: number) {
let result = 1;
while (n) {
result = result * n--;
}
return result;
}
self.addEventListener('message', ({ data }) => {
self.postMessage(`${data}!=${factorial(data as unknown as number)}`);
});
对于传递简单的消息 , 使用 postMessage
的 主线程 和 工作者线程 之间传递消息。与在两个窗口间传递消息非常像。
使用 MessageChannel
无论是 主线程 还是 工作者线程 , 通过postMessage
进行通信的这个过程可以被 MessageChannel
取代。
MessageChannel
有两个端口 , 分别代表两个通信端点。
要让父页面 和 工作线程 通过 MessageChannel
通信, 需要把一个端口传到 工作者线程中。
const messageChannel = new MessageChannel();
const worker = new Worker(new URL('./factorialWorker.ts', import.meta.url));
worker.postMessage(null, [messageChannel.port1]);
messageChannel.port2.postMessage(5);
messageChannel.port2.onmessage = ({ data }) => {
console.log(data);
};
// 5!=120
let messagePort: MessagePort | null = null;
function factorial(n: number) {
let result = 1;
while (n) {
result = result * n--;
}
return result;
}
self.addEventListener('message', e => {
if (!messagePort && e.ports) {
messagePort = e.ports[0] as MessagePort;
messagePort.onmessage = ({ data }) => {
messagePort.postMessage(`${data}!=${factorial(data as unknown as number)}`);
};
}
});
使用 MessageChannel
实例与父页面通信很大程度是多余的。
这是因为全局 postMessage
方法本质与 channel.postMessage()
执行的是相同的操作。
MessageChannel
真正有用的地方是让两个 工作线程 之间直接通信。 可以通过把端口传给另一个 工作者线程 实现。
let messagePort: MessagePort | null = null;
let contextIdentifier: string | null = null;
const sendMessage = (data, port) => {
data.push(contextIdentifier);
port.postMessage(data);
};
self.addEventListener('message', e => {
const { ports, data } = e;
if (!messagePort && ports.length > 0) {
messagePort = ports[0] as MessagePort;
contextIdentifier = data;
messagePort.onmessage = ({ data }) => {
sendMessage(data, self);
};
} else {
sendMessage(data, messagePort);
}
});
const workerA = new Worker(new URL('./worker/test-worker.ts', import.meta.url));
const workerB = new Worker(new URL('./worker/test-worker.ts', import.meta.url));
workerA.postMessage('workerA', [messageChannel.port1]);
workerB.postMessage('workerB', [messageChannel.port2]);
workerA.postMessage(['page']);
workerB.postMessage(['page']);
workerA.onmessage = ({ data }) => {
console.log(data);
};
workerB.onmessage = ({ data }) => {
console.log(data);
};
使用 BroadcastChannel
同源脚本可以通过 BroadcastChannel 相互之间发送消息和接受消息。
这种方式设置方式比较简单, 不需要像 MessageChannel
那样转移 port。
const broadcastChannel = new BroadcastChannel('worker-channel');
new Worker(new URL('./worker.ts', import.meta.url));
broadcastChannel.onmessage = ({ data }) => {
console.log(`header ${data} in worker`);
};
setTimeout(() => {
broadcastChannel.postMessage('foo');
}, 1000);
// header foo in worker
// header bar in worker
const broadcastChannel = new BroadcastChannel('worker-channel');
broadcastChannel.onmessage = ({ data }) => {
console.log(`header ${data} in worker`);
broadcastChannel.postMessage('bar');
};
工作者线程数据传输
使用 工作者线程 经常需要为它们提供某种形式的数据 payload , 工作者线程 是独立的上下文, 因此在上下文之间传输数据就会产生损耗。
在 JavaScript
中, 有三种在上下文间转移信息的方式: 结构化克隆算法 , 可转移对象, 共享数组缓冲区。
结构化克隆算法
结构化克隆算法 可用于两个独立上下文间 共享数据, 通过postMessage()
传递对象时, 浏览器会遍历对象,并在目标上下文中生成它的副本。
结构化克隆算法 支持的类型:
- 除
Symbol
外的所有原始类型。 - Boolean对象
- String对象
- BDate
- RegExp
- Blob
- File
- FileList
- ArrayBuffer
- ArrayBufferView
- ImageDate
- Array
- Object
- Map
- Set
可转移对象
使用 可转移对象 可以在不同执行上下文 (如主线程与 Web Worker)之间高效转移所有权(使用权和操作权)的对象,而无需进行复制操作。
以下几种对象是可转移对象:
- ArrayBuffer
- MessagePort
- ImageBitMap
- OffscreenCanvas
postMessage()
方法的第二个参数可选参数是数组, 可以指定将哪些对象转移到目标上下文。
在遍历消息payload
对象时, 浏览器根据转移对象数组检查对象引用。并对转移对象对象进行转移而不是复制它们。
这个列子演示了 工作者线程 对 ArrayBuffer 的常规结构化克隆。这里没有对象转移。
const worker = new Worker('./worker.js');
// 创建32位缓冲区
const arrayBuffer = new ArrayBuffer(32);
console.log(arrayBuffer.byteLength); //32
worker.postMessage(arrayBuffer);
console.log(arrayBuffer.byteLength); //32
self.onmessage = e => {
console.log(`worker's buffer size ${e.data.byteLength}`) //32;
};
如果把 ArrayBuffer 指定为 可转移对象 , 那么对缓冲区内内存的引用就会从父上下文抹去, 然后分配给工作者线程。
const worker = new Worker('./worker.js');
// 创建32位缓冲区
const arrayBuffer = new ArrayBuffer(32);
console.log(arrayBuffer.byteLength); //32
worker.postMessage(arrayBuffer, [arrayBuffer]);
console.log(arrayBuffer.byteLength); //0
self.onmessage = e => {
console.log(`worker's buffer size ${e.data.byteLength}`) //32;
};
可以在其他类型的对象中嵌套 可转移对象,包装对象会被复制 嵌套的 可转移对象 会被转移。
const worker = new Worker('./worker.js');
// 创建32位缓冲区
const arrayBuffer = new ArrayBuffer(32);
console.log(arrayBuffer.byteLength); //32
worker.postMessage({ foo: { bar: arrayBuffer } }, [arrayBuffer]);
console.log(arrayBuffer.byteLength); //0
self.onmessage = e => {
console.log(`worker's buffer size ${e.data.foo.bar.byteLength}`) //32;
};
线程池
因为启动线程池代价比较大, 所以某些情况下可以考虑始终保持固定数量的线程活动。需要的时就把任务分配给它们。
工作者线程执行计算的时候, 会被标记为忙碌状态, 直到它通知线程池自己空闲了 才准备好接收新任务。这些活动线程就称为线程池。
线程池中线程的数量多少合适并没有权威的答案, 不过可以参考navigator.hardware
Concurrency 属性返回的系统可用的核心数量。当作线程池大小的上限。
一种使用线程池的策略是每个线程都执行同样的任务,单具体执行什么任务由几个参数来控制。
class TaskWorker extends Worker {
/**
* Worker是否可用
*/
private available = false;
private reject: () => void | undefined;
private resolve: () => void | undefined;
constructor(
public notifyAvailable: () => void,
url: string | URL,
...workerArgs: WorkerOptions[]
) {
super(url, ...workerArgs);
this.onmessage = () => {
this.setAvailable();
};
}
setAvailable() {
this.available = true;
this.reject = null;
this.resolve = null;
this.notifyAvailable();
}
dispatch<T = any>(event: {
reject: (data: ErrorEvent) => void;
resolve: (data: MessageEvent) => void;
postMessageArgs: T;
}) {
this.onmessage = (data) => {
event.resolve(data);
this.setAvailable();
};
this.onerror = (data) => {
event.reject(data);
this.setAvailable();
};
this.postMessage(event.postMessageArgs);
}
}
class WorkerPool {
private workers: TaskWorker[] = [];
private taskQueue: any[] = [];
constructor(
public poolSize: number,
url: string | URL,
...workerArgs: WorkerOptions[]
) {
for (let index = 0; index < this.poolSize; index++) {
this.workers.push(
new TaskWorker(this.dispatchIfAvailable, url, ...workerArgs)
);
}
}
enqueue(task: any) {
return new Promise((resolve, reject) => {
this.taskQueue.push({
resolve,
reject,
task,
});
this.dispatchIfAvailable();
});
}
/**
* 把任务发送给空闲的线程
* @returns
*/
dispatchIfAvailable() {
if (this.taskQueue.length === 0) return;
for (const worker of this.workers) {
if (worker.available) {
let task = this.taskQueue.shift();
worker.dispatch(task);
break;
}
}
}
close() {
for (const worker of this.workers) {
worker.terminate();
}
}
}
export default WorkerPool;