I have a worker_thread, let's say a "Thread manager" in node 18 app that is spawning, let's call them "sub-workers". They do what they do and when finished execution, they send a message to "Thread manager" and the manager calls worker.terminate() I even recently add worker.unref(). After a sleep interval "Thread manager" spawns another "sub-worker" and the process keeps going until the app is stopped.
My here is my question... Is it normal when i create new "sub-worker" to get threadId n+1. For example: 2,3,4 even if 2 is finished already. I asked chat gpt and I got more confused because he says that is not normal to keep growing...
Here is the code of the "Manager"
import {
parentPort,
workerData,
isMainThread,
Worker,
threadId,
WorkerOptions,
} from "worker_threads";
import { v4 as uuidv4 } from "uuid";
import type { TThreadCommand } from "../WorkerDispatcher";
import SyncerWorker from "../SyncerWorker";
import sleep from "../../../utils/sleep";
import * as jsonConfig from "./workers-config.json";
type TWorkerStatusResponse = {
threadId: number;
id: string;
status: "success" | "error" | "info" | "warning" | "critical" | "stopped" | "started";
};
type TLoadWorker = {
interval: number;
name: string;
class: string;
path: string;
"max-close-time": number;
disabled: boolean;
id: string;
};
type TWorkerTrackinginfo = {
id: string;
threadId: number;
lastActive: number;
interval: number;
workerInstance: SyncerWorker | null;
startAt: number;
endAt: number;
path: string;
"max-close-time": number;
disabled: boolean;
};
class ThreadManager<T> {
public workers: SyncerWorker[] = [];
public workerInfo: TWorkerTrackinginfo[] = [];
public terminated: boolean = false;
public isRunning: boolean = false;
constructor(start: boolean = false, public sleepInterval = 1000) {
if (isMainThread) {
console.log("This is main thread");
} else {
console.log("This is worker thread");
}
if (workerData.configPath) {
this.loadWorkers(workerData.configPath);
} else {
this.loadWorkers();
}
if (start) {
// load and start workers via execute
this.isRunning = true;
this.execute();
}
}
private addWorker(worker: TLoadWorker, workerId: string = "") {
const newWorkerId = workerId || uuidv4();
// test for ^. in path
// worker.path.replace(/\.\//g, ""); in the begining only
if (worker.disabled) {
return;
}
const workerPath = !workerId ? __dirname + worker.path.replace(/^\.+/, "") : worker.path;
const workerOptions: WorkerOptions = {
workerData: {
interval: worker.interval,
name: worker.name,
class: worker.class,
path: workerPath,
"max-close-time": worker["max-close-time"],
id: newWorkerId,
},
};
const syncerWorker = new SyncerWorker(
__dirname + "/wrap.js",
workerOptions,
false,
worker.interval
);
syncerWorker.onDone = (data: TWorkerStatusResponse) => {
// console.log("Worker done in worker-dispatcher-body.ts via Event");
// console.log("Data", data);
// update worker and clear instance of it
this.updateWorker(data.id);
};
syncerWorker.onStop = (data: TWorkerStatusResponse) => {
console.log("Worker stopped in worker-dispatcher-body.ts via Event");
// console.log("Data", data);
const worker = this.workers.find((worker) => worker.id === data.id);
this.terminateWorker(worker);
worker?.unref();
};
this.workers.push(syncerWorker);
// push only if it is new if we have workerId it means caller is relaunching worker
if (!workerId) {
this.workerInfo.push({
startAt: Date.now(),
endAt: 0,
id: newWorkerId,
interval: worker.interval,
lastActive: Date.now(),
threadId: syncerWorker.threadId,
workerInstance: syncerWorker,
path: workerPath,
"max-close-time": worker["max-close-time"],
disabled: worker.disabled,
});
} else {
const workerIndex = this.workerInfo.findIndex((w) => w.id === workerId);
if (workerIndex !== -1) {
// this.workerInfo[workerIndex].startAt = Date.now();
this.workerInfo[workerIndex].endAt = 0;
this.workerInfo[workerIndex].lastActive = Date.now();
this.workerInfo[workerIndex].threadId = syncerWorker.threadId;
this.workerInfo[workerIndex].workerInstance = syncerWorker;
}
}
// console.log(this.workerInfo);
}
private updateWorker(workerId: string) {
// find worker and update it
const w = this.workers.find((worker) => {
return worker.id === workerId;
});
if (w) {
// remove from workers and find and update internal in tracking info
this.removeWorker(w);
// update worker
const workerIndex = this.workerInfo.findIndex((w) => w.id === workerId);
if (workerIndex !== -1) {
this.workerInfo[workerIndex].endAt = Date.now();
this.workerInfo[workerIndex].workerInstance = null;
}
}
}
relaunchWorker(id: string) {
const worker = this.workerInfo.find((w) => w.id === id);
if (worker) {
const workerInfo: TLoadWorker = {
interval: worker.interval,
name: "Relaunched " + worker.id,
class: "Relaunched " + worker.id,
path: worker.path,
"max-close-time": worker["max-close-time"],
id: worker.id,
disabled: worker.disabled,
};
this.addWorker(workerInfo, worker.id);
}
}
private removeWorker(worker: SyncerWorker) {
worker.terminate();
worker.unref();
this.workers = this.workers.filter((w) => w.id !== worker.id);
}
public terminateAll(immediate: boolean = false) {
this.workers.forEach((worker) => {
worker.sendMessage({
message: "terminate",
command: immediate ? "immediate-stop" : "stop",
data: null,
});
});
}
public terminateWorker(worker: SyncerWorker | undefined) {
if (!worker) return;
worker.terminate();
this.workerInfo = this.workerInfo.filter((w) => w.id !== worker.id);
worker.unref();
if (this.workerInfo.length === 0) {
this.terminated = true;
parentPort?.postMessage({
command: "done",
status: "stopped",
} as TThreadCommand);
}
}
public terminateWorkerById(id: string) {
const worker = this.workers.find((w) => w.id === id);
this.terminateWorker(worker);
}
public terminateWorkerByThreadId(threadId: number) {
const worker = this.workers.find((w) => w.threadId === threadId);
this.terminateWorker(worker);
}
async loadWorkers(configPath: string = "./workers-config.json") {
// load workers from config
// import json dynamic here
const config = await import(configPath);
const workers: TLoadWorker[] = config.default.workers;
for (const worker of workers) {
this.addWorker(worker, worker.id);
}
}
report() {
console.log("Ping");
parentPort?.postMessage({
command: "ping",
} as TThreadCommand);
}
ping(id?: string) {
if (id) {
const worker = this.workers.find((w) => w.id === id);
if (worker) {
worker.sendMessage({ command: "ping" });
}
} else {
this.workers.forEach((worker) => {
worker.sendMessage({ command: "ping" });
});
}
}
public async execute() {
while (!this.terminated) {
await sleep(this.sleepInterval);
// check wich worker is ready to be started again based on stop time and interval of the worker from workerInfo
const now = Date.now();
// console.log(this.workerInfo[0]);
const workersToStart = this.workerInfo.filter((w) => {
// console.log(
// w.endAt,
// w.interval,
// w.endAt - w.interval * 1000,
// now - w.endAt,
// w.interval * 1000
// );
if (w.workerInstance && w.workerInstance.isRunning) return false;
if (w.endAt === 0) return false;
return now - w.endAt > w.interval * 1000;
});
// add workers to start
workersToStart.forEach((worker) => {
this.relaunchWorker(worker.id);
});
}
}
}
const threadManager = new ThreadManager<TThreadCommand>(
workerData?.start ?? true,
workerData?.interval ?? 1000
);
if (parentPort) {
parentPort.on("message", (p: TThreadCommand<TWorkerStatusResponse>) => {
// console.log(`Command received in dispatcher body`, p);
if (p.command === "start") {
console.log("Start command received in worker dispatcher body");
!threadManager.isRunning && threadManager.execute();
}
if (p.command === "stop") {
threadManager.terminated = true;
threadManager.isRunning = false;
// send command to all workers to stop with wait for completion
// threadManager.workers.forEach((worker) => {
// worker.sendMessage({ message: "terminate", command: "stop", data: null });
// });
threadManager.terminateAll();
}
if (p.command === "immediate-stop") {
threadManager.terminated = true;
threadManager.isRunning = false;
threadManager.terminateAll(true);
}
// // child report done with status stopped it became event
// if (p.command === "done" && p.status === "stopped") {
// console.log("Child worker done with status stopped");
// const worker = threadManager.workers.find((worker) => worker.id === p.data.id);
// threadManager.terminateWorker(worker);
// }
});
}
Here is my terminate() function from the extended class
async terminate(): Promise<number> {
// console.log("Dispatcher is terminating...");
this._terminated = true;
this._isRunning = false;
// send message to worker to stop (all threads)
this._sendMessage({ message: "terminate", command: "stop", data: null });
// wait for all threads to stop or maxCloseTime in seconds
const startClosing = Date.now();
while (!this.canClose && Date.now() - startClosing < this.maxCloseTime * 1000) {
// console.log("Dispatcher trying to close");
await sleep(1000);
}
console.log("Dispatcher is terminated");
return super.terminate();
}
I think I maybe have memmory leak. The reason I am thinking that is because it runs for several hours in docker container for example it reaches threadId = 250 or somethimes threadId = 140
I tried to call worker.unerf() after terminate because i read that if it is already terminated it will not have any effect, but still threadId number is growing
Thanks, any help appreciated