Skip to main content

Websocket 实战

服务端代码

const WebSocket = require("ws");
const server = new WebSocket.Server({ port: 3000 });

// 当有客户端连接时触发
server.on("connection", (socket) => {
console.log("Client connected");

// 处理收到的消息
socket.on("message", (data) => {
console.log(`Received: ${data}`);
// 在此处添加处理消息的逻辑
});

// 处理连接关闭
socket.on("close", () => {
console.log("Client disconnected");
});
});

WebSocket

events

./ReconnectingWebSocket/events.ts
export class CloseEvent extends Event {
public code: number;

public reason: string;

public wasClean = true;

constructor(code = 1000, reason = "", target?: any) {
super("close", target);
this.code = code;
this.reason = reason;
}
}

export class ErrorEvent extends Event {
public message: string;

public error: Error;

constructor(error: Error, target: any) {
super("error", target);
this.message = error.message;
this.error = error;
}
}

export type WebSocketEventValue = {
close: CloseEvent;
error: ErrorEvent;
message: MessageEvent;
open: Event;
};

ReconnectingWebSocket

参考开源库 reconnecting-websocket

./ReconnectingWebSocket/index.ts
import type { WebSocketEventValue } from "./events";
import { CloseEvent, ErrorEvent } from "./events";

export type Message = string | ArrayBuffer | Blob | ArrayBufferView;

export type Options = {
debug?: boolean;
connectionTimeout?: number;
maxRetries?: number;
heartbeat?: {
message?: Message | (() => Message);
delay?: number;
timeout?: number;
};
};

const defaultOptions = {
maxReconnectionDelay: 10000,
minReconnectionDelay: 1000 + Math.random() * 4000,
minUptime: 5000,
reconnectionDelayGrowFactor: 1.3,
connectionTimeout: 4000,
maxRetries: Infinity,
maxEnqueuedMessages: Infinity,
debug: false,
};

export type Listener<T extends keyof WebSocketEventValue> = (
value: WebSocketEventValue[T]
) => void;

export type ListenersMap = {
[key in keyof WebSocketEventValue]: Set<Listener<key>>;
};

class ReconnectingWebSocket {
private listeners: ListenersMap = {
error: new Set(),
message: new Set(),
open: new Set(),
close: new Set(),
};

private ws?: WebSocket;

private _binaryType: BinaryType = "blob";

private readonly _url: WebSocket["url"];

private readonly _protocol?: WebSocket["protocol"];

private messageQueue: Message[] = [];

private connectTimeout: ReturnType<Window["setTimeout"]> | undefined;

private uptimeTimeout: ReturnType<Window["setTimeout"]> | undefined;

private heartbeatTimeout: ReturnType<Window["setTimeout"]> | undefined;

private heartbeatInterval: ReturnType<Window["setInterval"]> | undefined;

private _retryCount = -1;

private shouldReconnect = true;

private connectLock = false;

private closeCalled = false;

private readonly options: Options;

constructor(
url: WebSocket["url"],
protocol?: WebSocket["protocol"],
options: Options = {}
) {
this._url = url;
this._protocol = protocol;
this.options = options;
this.connect();
}

/**
* 正在链接中
*/
static get CONNECTING() {
return 0;
}

/**
* 已经链接并且可以通讯
*/
static get OPEN() {
return 1;
}

/**
* 连接正在关闭
*/
static get CLOSING() {
return 2;
}

/**
* 连接已关闭或者没有链接成功
*/
static get CLOSED() {
return 3;
}

/**
* 正在链接中
*/
get CONNECTING() {
return ReconnectingWebSocket.CONNECTING;
}

/**
* 已经链接并且可以通讯
*/
get OPEN() {
return ReconnectingWebSocket.OPEN;
}

/**
* 连接正在关闭
*/
get CLOSING() {
return ReconnectingWebSocket.CLOSING;
}

/**
* 连接已关闭或者没有链接成功
*/
get CLOSED() {
return ReconnectingWebSocket.CLOSED;
}

get binaryType() {
return this.ws ? this.ws.binaryType : this._binaryType;
}

set binaryType(value: BinaryType) {
this._binaryType = value;
if (this.ws) {
this.ws.binaryType = value;
}
}

/**
* Returns the number or connection retries
*/
get retryCount(): number {
return Math.max(this._retryCount, 0);
}

/**
* The number of bytes of data that have been queued using calls to send() but not yet
* transmitted to the network. This value resets to zero once all queued data has been sent.
* This value does not reset to zero when the connection is closed; if you keep calling send(),
* this will continue to climb. Read only
*/
get bufferedAmount(): number {
const bytes = this.messageQueue.reduce((acc, message) => {
if (typeof message === "string") {
acc += message.length; // not byte size
} else if (message instanceof Blob) {
acc += message.size;
} else {
acc += message.byteLength;
}
return acc;
}, 0);
return bytes + (this.ws ? this.ws.bufferedAmount : 0);
}

/**
* The extensions selected by the server. This is currently only the empty string or a list of
* extensions as negotiated by the connection
*/
get extensions(): string {
return this.ws ? this.ws.extensions : "";
}

/**
* A string indicating the name of the sub-protocol the server selected;
* this will be one of the strings specified in the protocols parameter when creating the
* WebSocket object
*/
get protocol(): string {
return this.ws ? this.ws.protocol : "";
}

/**
* The current state of the connection; this is one of the Ready state constants
*/
get readyState(): number {
if (this.ws) {
return this.ws.readyState;
}
return ReconnectingWebSocket.CONNECTING;
}

/**
* The URL as resolved by the constructor
*/
get url(): string {
return this.ws ? this.ws.url : "";
}

/**
* An event listener to be called when the WebSocket connection's readyState changes to CLOSED
*/
public onclose: ((event: CloseEvent) => void) | null = null;

/**
* An event listener to be called when an error occurs
*/
public onerror: ((event: ErrorEvent) => void) | null = null;

/**
* An event listener to be called when a message is received from the server
*/
public onmessage: ((event: MessageEvent) => void) | null = null;

/**
* An event listener to be called when the WebSocket connection's readyState changes to OPEN;
* this indicates that the connection is ready to send and receive data
*/
public onopen: ((event: Event) => void) | null = null;

/**
* 关闭WebSocket连接 如果WebSocket已经是关闭状态 什么都不处理
*/
public close(code = 1000, reason?: string) {
this.shouldReconnect = false;
this.closeCalled = true;
this.clearTimeouts();

if (!this.ws) {
this.debug("close enqueued: no ws instance");
return;
}

if (this.ws.readyState === this.CLOSED) {
this.debug("close: already closed");
return;
}

this.ws.close(code, reason);
}

/**
* Closes the WebSocket connection or connection attempt and connects again.
* Resets retry counter;
*/
public reconnect(code?: number, reason?: string) {
this.shouldReconnect = true;
this.closeCalled = false;
this._retryCount = -1;
if (!this.ws || this.ws.readyState === this.CLOSED) {
this.connect();
} else {
this.disconnect(code, reason);
this.connect();
}
}

/**
* 将指定数据加入队列,连接WebSocket成功之后传输到服务器
*/
public send(data: Message) {
if (this.ws && this.ws.readyState === this.OPEN) {
this.debug("send", data);
this.ws.send(data);
} else {
this.debug("enqueue", data);
this.messageQueue.push(data);
}
}

/**
* Register an event handler of a specific event type
*/
public addEventListener<T extends keyof WebSocketEventValue>(
type: T,
listener: Listener<T>
): void {
if (this.listeners[type]) {
this.listeners[type].add(listener);
}
}

public dispatchEvent<T extends keyof WebSocketEventValue>(
type: T,
value: WebSocketEventValue[T]
) {
const listeners = this.listeners[type];
if (listeners) {
listeners.forEach((listener) => {
listener(value);
});
}
}

/**
* Removes an event listener
*/
public removeEventListener<T extends keyof WebSocketEventValue>(
type: T,
listener: Listener<T>
): void {
if (this.listeners[type] && this.listeners[type].has(listener)) {
this.listeners[type].delete(listener);
}
}

private resetHeartbeat = () => {
const { heartbeat = {} } = this.options;
if (heartbeat.timeout) {
clearTimeout(this.heartbeatTimeout);
this.heartbeatTimeout = window.setTimeout(() => {
this.debug("heartbeat timeout");
this.reconnect();
}, heartbeat.timeout);
}
};

private heartbeat() {
const { heartbeat = {} } = this.options;
if (heartbeat.delay && heartbeat.message) {
this.heartbeatInterval = window.setInterval(() => {
const message =
typeof heartbeat.message === "function"
? heartbeat.message()
: heartbeat.message;
this.debug("start heartbeat");
this.send(message as Message);
}, heartbeat.delay);
}
}

private debug(...args: any[]) {
if (this.options.debug) {
console.log.apply(console, ["RWS>", ...args]);
}
}

private getNextDelay() {
const {
reconnectionDelayGrowFactor,
minReconnectionDelay,
maxReconnectionDelay,
} = defaultOptions;
let delay = 0;
if (this._retryCount > 0) {
delay =
minReconnectionDelay *
Math.pow(reconnectionDelayGrowFactor, this._retryCount - 1);
if (delay > maxReconnectionDelay) {
delay = maxReconnectionDelay;
}
}
this.debug("next delay", delay);
return delay;
}

private wait(time: number): Promise<void> {
return new Promise((resolve) => {
setTimeout(resolve, time);
});
}

private handleMessage = (event: MessageEvent) => {
this.debug("message event", event);

this.resetHeartbeat();

if (this.onmessage) {
this.onmessage(event);
}

this.dispatchEvent("message", event);
};

private connect = () => {
if (this.connectLock || !this.shouldReconnect) {
return;
}
this.connectLock = true;

const {
connectionTimeout = defaultOptions.connectionTimeout,
maxRetries = defaultOptions.maxRetries,
} = this.options;

if (this._retryCount >= maxRetries) {
this.debug("max retries reached", this._retryCount, ">=", maxRetries);
return;
}

this._retryCount = this._retryCount + 1;

this.debug("connect", this._retryCount);

this.removeListener();

this.wait(this.getNextDelay()).then(() => {
if (this.closeCalled) {
return;
}

this.debug("connect", { url: this._url, protocol: this._protocol });

this.ws = new WebSocket(this._url, this._protocol);
this.ws!.binaryType = this._binaryType;
this.connectLock = false;

this.addListeners();

// 连接超时之后自动重连
this.connectTimeout = window.setTimeout(() => {
this.handleTimeout();
}, connectionTimeout);
});
};

private handleTimeout = () => {
this.debug("timeout event");
this.handleError(new ErrorEvent(new Error("TIMEOUT"), this));
};

private disconnect(code = 1000, reason?: string) {
this.clearTimeouts();

if (!this.ws) {
return;
}

this.removeListener();

try {
this.ws.close();
this.handleClose(new CloseEvent(code, reason, this));
} catch {
// do nothing
}
}

private acceptOpen() {
this.debug("accept open");
this._retryCount = 0;
}

private handleOpen = (event: Event) => {
if (!this.ws) {
return;
}

this.debug("open event");

this.uptimeTimeout = window.setTimeout(
() => this.acceptOpen(),
defaultOptions.minUptime
);

this.heartbeat();

this.resetHeartbeat();

// 连接成功之后取消定时器
clearTimeout(this.connectTimeout);

this.ws.binaryType = this._binaryType;

if (this.messageQueue.length > 0) {
// send enqueued messages (messages sent before websocket open event)
this.messageQueue.forEach((message) => this.ws?.send(message));
this.messageQueue = [];
}

if (this.onopen) {
this.onopen(event);
}

this.dispatchEvent("open", event);
};

private handleError = (event: ErrorEvent) => {
this.debug("error event", event.message);
this.disconnect(
undefined,
event.message === "TIMEOUT" ? "timeout" : undefined
);

if (this.onerror) {
this.onerror(event);
}

this.debug("exec error listeners");
this.dispatchEvent("error", event);

this.connect();
};

private handleClose = (event: CloseEvent) => {
this.debug("close event");
this.clearTimeouts();

if (this.shouldReconnect) {
this.connect();
}

if (this.onclose) {
this.onclose(event);
}

this.dispatchEvent("close", event);
};

private addListeners = () => {
if (!this.ws) {
return;
}
this.debug("addEventListener");
this.ws.addEventListener("open", this.handleOpen);
this.ws.addEventListener("message", this.handleMessage);
this.ws.addEventListener("close", this.handleClose);
// @ts-ignore
this.ws.addEventListener("error", this.handleError);
};

private removeListener = () => {
if (!this.ws) {
return;
}
this.debug("removeEventListener");
this.ws.removeEventListener("open", this.handleOpen);
this.ws.removeEventListener("message", this.handleMessage);
this.ws.removeEventListener("close", this.handleClose);
// @ts-ignore
this.ws.removeEventListener("error", this.handleError);
};

private clearTimeouts() {
clearTimeout(this.connectTimeout);
clearTimeout(this.uptimeTimeout);
clearInterval(this.heartbeatInterval);
clearTimeout(this.heartbeatTimeout);
}
}

export default ReconnectingWebSocket;

HTML 代码

const App: FC = () => {
const websocket = useMemo(() => {
return new ReconnectingWebSocket("ws://localhost:3000", undefined, {
debug: true,
});
}, []);

return (
<div>
<button>发送消息</button>
<button
onClick={() => {
websocket.close();
}}
>
关闭WebSocket
</button>
</div>
);
};