Skip to main content

专用工作者线程

可以在网页中创建 专用工作者线程 用来执行在 浏览器主线程 之外的其他任务。

我们可以在 专用工作者线程 中与 父页面 交换消息、发送网络请求、执行文件输入/输出、进行密集计算、处理大量数据。

专用工作者线程的基本概念

创建专用工作者线程

创建 专用工作者线程 最常见的方式是 加载 Javascript 文件。把 文件路径 提供给 Worker 构造函数。

然后 构造函数 再在后台异步加载脚本并且实例化 工作者线程

emptyWorker.js
// 空的 JS 工作者线程文件
main.js
const worker = new Worker('./emptyWorker.js');
console.log(worker) {}
//可能还没有初始化完成

工作者线程安全限制

工作者线程的 Javascript 文件 只能从父页面相同的源加载。 从其他源加载工作者线程的脚本文件会导致错误。

const worker = new Worker(new URL("./workers/test.worker", import.meta.url));

// 尝试基于 https://www.baidu.com 创建工作者线程
const worker2 = new Worker(new URL("https://www.baidu.com/workers/test.worker", import.meta.url));

// Access to script at 'https://www.baidu.com/workers/test.worker' from origin 'https://local.innodealing.com:8000'
// has been blocked by CORS policy: No 'Access-Control-Allow-Origin' header is present on the requested resource.

使用 Worker 对象

Worker() 构造函数返回的 Worker 对象, 是与刚创建的 专用工作者线程 通信的连接点。

它可用于 工作者线程父上下文 间传输信息, 以及捕获 专用工作者线程 发出的事件。

Worker 对象 支持下列事件处理程序属性:

  • onerror: 在 工作者线程 中发生 ErrorEvent 类型的错误事件时触发该事件

    • 该事件在 工作者线程 中抛出错误的时候触发。
    • 该事件也可以通过 worker.addEventListener('error', handler) 的形式处理
  • onmessage: 在 工作者线程 中发生 MessageEvent 类型的消息事件时触发该事件

    • 该事件在 工作者线程 中向父上下文发送消息触发。
      • 该事件也可以通过 worker.addEventListener('message', handler) 的形式处理
  • onmessageerror: 在 工作者线程 中发生 MessageEvent 类型的错误事件时触发该事件

    • 该事件在 工作者线程 中收到 无法反序列化的消息时发生
    • 该事件也可以通过 worker.addEventListener('messageerror', handler) 的形式处理。

Worker 对象 此外还支持下列方法: postMessage(): 用于通过异步消息事件向 工作者线程 发送消息。 terminate(): 用于立即终止 工作者线程

DedicatedWorkerGlobalScope

在专用工作者线程内部,全局作用域是 DedicatedWorkerGlobalScope

因为继承自 WorkerGlobalScope,所以包含它的所有属性和方法。工作者线程可以通过 self 关键字访问。

globalScopeWorker.js
console.log('inside worker:', self);
main.js
const worker = new Worker('./globalScopeWorker.js');

console.log('created worker:', worker);

// created worker: Worker {}

// inside worker: DedicatedWorkerGlobalScope {}

main.js工作者线程 中的 console 对象 都会写入浏览器控制台。

DedicatedWorkerGlobalScopeWorkerGlobalScope 基础上增加了以下方法:

  • postMessage(): 从工作者线程内部向主线程上下文发送消息。

  • close(): 用于立即终止 工作者线程

  • importScripts(path): 用于向工作者线程中导入任意数量的脚本。

专用工作者线程的生命周期

调用 Worker() 构造函数 是一个专用工作者线程的一个入口。

调用之后 主线程中虽然可以立即使用这个 Worker 对象,但是与之关联的 工作者线程 可能还没有创建完成。因为存在脚本请求的 网络延迟初始化延迟。 一般来说,专用工作者线程 非正式区分三种状态:

  • 初始化(initializing)
  • 活动(active)
  • 终止(terminated)

这三种状态对其他线程是不可见的, 也无法分别 Worker 对象 当前所处的一个状态。

初始化的时候 虽然 工作者线程 的脚本尚未执行 但是可以先把要发送给 工作者线程 的消息加入队列

这些消息会等待 工作者线程 的状态变为活动, 在执行消息队列里面的代码。

initializingWorker.js
self.addEventListener('message', ({data}) => console.log(data));

// foo
// bar
// baz
main.js
const worker = new Worker('./initializingWorker.js');

// Worker 可能仍处于初始化状态
// 但 postMessage()数据可以正常处理
worker.postMessage('foo');
worker.postMessage('bar');
worker.postMessage('baz');

创建之后 专用工作者线程 就会伴随页面的整个生命周期存在 除非手动调用 close 方法

内部终止和外部终止的区别

内部终止

closeWorker.js
self.postMessage('foo');
self.postMessage("foo");
self.close();
self.postMessage("bar1");
self.postMessage("bar2");
self.postMessage("bar3");
self.postMessage("bar4");
self.postMessage("bar5");
self.postMessage("bar6");
self.postMessage("bar7");
self.postMessage("bar8");
setTimeout(() => self.postMessage("baz"), 0);
const worker = new Worker('./closeWorker.js');

worker.onmessage = ({data}) => console.log(data);

// foo
// bar
// bar1
// bar2
// bar3
// bar4
// bar5
// bar6
// bar7
// bar8

虽然调用了 close() 但显然 工作者线程 的执行并没有立即终止。

close() 在这里会通知 工作者线程 取消事件循环中的所有任务 阻止继续添加新任务

工作者线程 不需要执行同步停止 因此在 父上下文 的事件循环中处理的其他数据仍会打印出来。

外部终止

terminateWorker.js
self.onmessage = ({data}) => console.log(data);
main.js
const worker = new Worker('./terminateWorker.js');

// 给 1000 毫秒让工作者线程初始化
setTimeout(() => {
worker.postMessage('foo');
worker.terminate();
worker.postMessage('bar');
setTimeout(() => worker.postMessage('baz'), 0);
}, 1000);

//foo

父上下文 先给 工作者线程 发送了带foopostMessage 这条消息可以在外部终止之前处理。

一旦调用了 terminate() 工作者线程的 消息队列 就会被清理并锁住。它会将关联的工作者线程标记为终止, 它们的执行也会立即停止

在 JavaScript 行内创建工作者线程

工作者线程 需要基于脚本文件来创建, 但这并不意味着脚本必须是远程资源。

专用工作者线程 也可以通过 Blob 对象 URL 在行内脚本创建。这样可以更快速地初始化工作者线程。因为没有网络延迟。

const workerScript = `
self.onmessage = ({data})=>{
console.log(data)
}
`

const workerScriptBlob = new Blob([workerScript])


const workerScriptBlobUrl = URL.createObjectURL(workerScriptBlob);

const worker = new Worker(workerScriptBlobUrl);

worker.postMessage('blob worker script')

// blob worker script

在这个例子中:

  • 通过脚本字符串创建了 Blob
  • 在通过Blob创建了对象 URL。
  • 最后把对象 URL 传给了Worker()构造函数。

在工作者线程中动态执行脚本

工作者线程 中 , 可以使用importScripts() 方法通过 编程方式 加载 执行任意脚本

main.js
const worker = new Worker('./worker.js')
scriptA.js
console.log('scriptA executes')
scriptB.js
console.log('scriptB executes')
worker.js
console.log('importing scripts');

importScripts('./scriptA.js');
importScripts('./scriptB.js');

console.log('scripts imported');

// importing scripts
// scriptA executes
// scriptB executes
// scripts imported
tip

importScripts() 方法可以接收任意数量的脚本作为参数, 浏览器下载它们的顺序没有限制。

但是执行则会 严格 按照它们在 参数列表的顺序 进行。

脚本加载受到常规CORS的限制 , 但是 工作者线程 内部请求来自任何源的脚本。

这里的脚本导入策略类似于使用生成的 <script>

委托任务到子工作者线程

有时候可能需要在 工作者线程 , 在有多个 CPU 核心的时候 使用多个 子工作者线程 可以实现并行计算。

使用多个 子工作者线程 前要考虑周全。确保并行计算的投入确实能得到收益。毕竟同时运行多个子线程会有很大计算成本。

main.js
const worker = new Worker('./worker.js')
worker.js
console.log('worker')
const worker = new Worker('./subworker.js')
subworker.js
console.log('subworker')

处理工作者线程错误

try/catch 没有捕获到 工作者线程 的运行错误。

main.js
try {
const worker = new Worker('./worker.js')
} catch (error) {
console.log('caught error')
}
// no error
worker.js
throw new Error("foo");

但是相应的错误事件仍然会冒泡到 工作者线程 的全局上下文。可以通过在Worker对象上监听错误事件访问到。

main.js
  const worker = new Worker('./worker.js');
worker.onerror = console.log;
// ErrorEvent { isTrusted: true, message: "Uncaught Error: foo" }
worker.js
throw new Error("foo");

与专用工作者线程通信

工作者线程 的通信都是通过异步消息完成的, 但是这些消息可以有多种形式。

使用 postMessage

最简单也是最常用的方式是使用 postMessage 传递消息。

main.js
const worker = new Worker('./factorialWorker.js');

worker.onmessage(({data})=>{
console.log(data)
});

worker.postMessage(5);
worker.postMessage(7);
worker.postMessage(10);

// 5!=120
// 7!=5040
// 10!=3628800
factorialWorker.js
function factorial(n: number) {
let result = 1;
while (n) {
result = result * n--;
}
return result;
}

self.addEventListener('message', ({ data }) => {
self.postMessage(`${data}!=${factorial(data as unknown as number)}`);
});

对于传递简单的消息 , 使用 postMessage主线程工作者线程 之间传递消息。与在两个窗口间传递消息非常像。

使用 MessageChannel

无论是 主线程 还是 工作者线程 , 通过postMessage进行通信的这个过程可以被 MessageChannel 取代。

MessageChannel 有两个端口 , 分别代表两个通信端点。

要让父页面工作线程 通过 MessageChannel 通信, 需要把一个端口传到 工作者线程中。

main.ts
  const messageChannel = new MessageChannel();

const worker = new Worker(new URL('./factorialWorker.ts', import.meta.url));

worker.postMessage(null, [messageChannel.port1]);

messageChannel.port2.postMessage(5);

messageChannel.port2.onmessage = ({ data }) => {
console.log(data);
};

// 5!=120
factorialWorker.ts
let messagePort: MessagePort | null = null;

function factorial(n: number) {
let result = 1;
while (n) {
result = result * n--;
}
return result;
}

self.addEventListener('message', e => {
if (!messagePort && e.ports) {
messagePort = e.ports[0] as MessagePort;

messagePort.onmessage = ({ data }) => {
messagePort.postMessage(`${data}!=${factorial(data as unknown as number)}`);
};
}
});

使用 MessageChannel 实例与父页面通信很大程度是多余的。

这是因为全局 postMessage 方法本质与 channel.postMessage() 执行的是相同的操作。

MessageChannel 真正有用的地方是让两个 工作线程 之间直接通信。 可以通过把端口传给另一个 工作者线程 实现。

worker.ts
let messagePort: MessagePort | null = null;
let contextIdentifier: string | null = null;

const sendMessage = (data, port) => {
data.push(contextIdentifier);
port.postMessage(data);
};

self.addEventListener('message', e => {
const { ports, data } = e;

if (!messagePort && ports.length > 0) {
messagePort = ports[0] as MessagePort;
contextIdentifier = data;
messagePort.onmessage = ({ data }) => {
sendMessage(data, self);
};
} else {
sendMessage(data, messagePort);
}
});
main.ts

const workerA = new Worker(new URL('./worker/test-worker.ts', import.meta.url));

const workerB = new Worker(new URL('./worker/test-worker.ts', import.meta.url));

workerA.postMessage('workerA', [messageChannel.port1]);
workerB.postMessage('workerB', [messageChannel.port2]);

workerA.postMessage(['page']);
workerB.postMessage(['page']);

workerA.onmessage = ({ data }) => {
console.log(data);
};

workerB.onmessage = ({ data }) => {
console.log(data);
};

使用 BroadcastChannel

同源脚本可以通过 BroadcastChannel 相互之间发送消息和接受消息。

这种方式设置方式比较简单, 不需要像 MessageChannel 那样转移 port

main.ts
 const broadcastChannel = new BroadcastChannel('worker-channel');

new Worker(new URL('./worker.ts', import.meta.url));

broadcastChannel.onmessage = ({ data }) => {
console.log(`header ${data} in worker`);
};

setTimeout(() => {
broadcastChannel.postMessage('foo');
}, 1000);

// header foo in worker
// header bar in worker
worker.ts
const broadcastChannel = new BroadcastChannel('worker-channel');

broadcastChannel.onmessage = ({ data }) => {
console.log(`header ${data} in worker`);
broadcastChannel.postMessage('bar');
};

工作者线程数据传输

使用 工作者线程 经常需要为它们提供某种形式的数据 payload , 工作者线程 是独立的上下文, 因此在上下文之间传输数据就会产生损耗。

JavaScript 中, 有三种在上下文间转移信息的方式: 结构化克隆算法 , 可转移对象, 共享数组缓冲区

结构化克隆算法

结构化克隆算法 可用于两个独立上下文间 共享数据, 通过postMessage() 传递对象时, 浏览器会遍历对象,并在目标上下文中生成它的副本。

结构化克隆算法 支持的类型:

  • Symbol外的所有原始类型。
  • Boolean对象
  • String对象
  • BDate
  • RegExp
  • Blob
  • File
  • FileList
  • ArrayBuffer
  • ArrayBufferView
  • ImageDate
  • Array
  • Object
  • Map
  • Set

可转移对象

使用 可转移对象 可以在不同执行上下文 (如主线程与 Web Worker)之间高效转移所有权(使用权和操作权)的对象,而无需进行复制操作。

以下几种对象是可转移对象:

  • ArrayBuffer
  • MessagePort
  • ImageBitMap
  • OffscreenCanvas

postMessage() 方法的第二个参数可选参数是数组, 可以指定将哪些对象转移到目标上下文

在遍历消息payload对象时, 浏览器根据转移对象数组检查对象引用。并对转移对象对象进行转移而不是复制它们。

这个列子演示了 工作者线程ArrayBuffer 的常规结构化克隆。这里没有对象转移。

main.js
const worker = new Worker('./worker.js');

// 创建32位缓冲区
const arrayBuffer = new ArrayBuffer(32);

console.log(arrayBuffer.byteLength); //32

worker.postMessage(arrayBuffer);

console.log(arrayBuffer.byteLength); //32
worker.js
self.onmessage = e => {
console.log(`worker's buffer size ${e.data.byteLength}`) //32;
};

如果把 ArrayBuffer 指定为 可转移对象 , 那么对缓冲区内内存的引用就会从父上下文抹去, 然后分配给工作者线程

main.js
const worker = new Worker('./worker.js');

// 创建32位缓冲区
const arrayBuffer = new ArrayBuffer(32);

console.log(arrayBuffer.byteLength); //32

worker.postMessage(arrayBuffer, [arrayBuffer]);

console.log(arrayBuffer.byteLength); //0
worker.js
self.onmessage = e => {
console.log(`worker's buffer size ${e.data.byteLength}`) //32;
};

可以在其他类型的对象中嵌套 可转移对象,包装对象会被复制 嵌套的 可转移对象 会被转移。

main.js
const worker = new Worker('./worker.js');

// 创建32位缓冲区
const arrayBuffer = new ArrayBuffer(32);

console.log(arrayBuffer.byteLength); //32

worker.postMessage({ foo: { bar: arrayBuffer } }, [arrayBuffer]);

console.log(arrayBuffer.byteLength); //0
worker.js
self.onmessage = e => {
console.log(`worker's buffer size ${e.data.foo.bar.byteLength}`) //32;
};

线程池

因为启动线程池代价比较大, 所以某些情况下可以考虑始终保持固定数量的线程活动。需要的时就把任务分配给它们。

工作者线程执行计算的时候, 会被标记为忙碌状态, 直到它通知线程池自己空闲了 才准备好接收新任务。这些活动线程就称为线程池

线程池中线程的数量多少合适并没有权威的答案, 不过可以参考navigator.hardware Concurrency 属性返回的系统可用的核心数量。当作线程池大小的上限。

一种使用线程池的策略是每个线程都执行同样的任务,单具体执行什么任务由几个参数来控制。

TaskWorker.ts
class TaskWorker extends Worker {
/**
* Worker是否可用
*/
private available = false;

private reject: () => void | undefined;

private resolve: () => void | undefined;

constructor(
public notifyAvailable: () => void,
url: string | URL,
...workerArgs: WorkerOptions[]
) {
super(url, ...workerArgs);

this.onmessage = () => {
this.setAvailable();
};
}

setAvailable() {
this.available = true;
this.reject = null;
this.resolve = null;
this.notifyAvailable();
}

dispatch<T = any>(event: {
reject: (data: ErrorEvent) => void;
resolve: (data: MessageEvent) => void;
postMessageArgs: T;
}) {
this.onmessage = (data) => {
event.resolve(data);
this.setAvailable();
};

this.onerror = (data) => {
event.reject(data);
this.setAvailable();
};

this.postMessage(event.postMessageArgs);
}
}
WorkerPool.ts
class WorkerPool {
private workers: TaskWorker[] = [];

private taskQueue: any[] = [];

constructor(
public poolSize: number,
url: string | URL,
...workerArgs: WorkerOptions[]
) {
for (let index = 0; index < this.poolSize; index++) {
this.workers.push(
new TaskWorker(this.dispatchIfAvailable, url, ...workerArgs)
);
}
}

enqueue(task: any) {
return new Promise((resolve, reject) => {
this.taskQueue.push({
resolve,
reject,
task,
});
this.dispatchIfAvailable();
});
}

/**
* 把任务发送给空闲的线程
* @returns
*/
dispatchIfAvailable() {
if (this.taskQueue.length === 0) return;

for (const worker of this.workers) {
if (worker.available) {
let task = this.taskQueue.shift();
worker.dispatch(task);
break;
}
}
}

close() {
for (const worker of this.workers) {
worker.terminate();
}
}
}

export default WorkerPool;