201 lines
8.5 KiB
JavaScript
201 lines
8.5 KiB
JavaScript
'use strict';
|
|
/*! (c) Andrea Giammarchi - ISC */
|
|
|
|
const {FUNCTION} = require('proxy-target/types');
|
|
|
|
const {CHANNEL} = require('./channel.js');
|
|
const {GET, HAS, SET} = require('./shared/traps.js');
|
|
|
|
const {SharedArrayBuffer, isArray, notify, postPatched, wait, waitAsync} = require('./bridge.js');
|
|
|
|
// just minifier friendly for Blob Workers' cases
|
|
const {Int32Array, Map, Uint16Array} = globalThis;
|
|
|
|
// common constants / utilities for repeated operations
|
|
const {BYTES_PER_ELEMENT: I32_BYTES} = Int32Array;
|
|
const {BYTES_PER_ELEMENT: UI16_BYTES} = Uint16Array;
|
|
|
|
const waitInterrupt = (sb, delay, handler) => {
|
|
while (wait(sb, 0, 0, delay) === 'timed-out')
|
|
handler();
|
|
};
|
|
|
|
// retain buffers to transfer
|
|
const buffers = new WeakSet;
|
|
|
|
// retain either main threads or workers global context
|
|
const context = new WeakMap;
|
|
|
|
const syncResult = {value: {then: fn => fn()}};
|
|
|
|
// used to generate a unique `id` per each worker `postMessage` "transaction"
|
|
let uid = 0;
|
|
|
|
/**
|
|
* @typedef {Object} Interrupt used to sanity-check interrupts while waiting synchronously.
|
|
* @prop {function} [handler] a callback invoked every `delay` milliseconds.
|
|
* @prop {number} [delay=42] define `handler` invokes in terms of milliseconds.
|
|
*/
|
|
|
|
/**
|
|
* Create once a `Proxy` able to orchestrate synchronous `postMessage` out of the box.
|
|
* @param {globalThis | Worker} self the context in which code should run
|
|
* @param {{parse: (serialized: string) => any, stringify: (serializable: any) => string, transform?: (value:any) => any, interrupt?: () => void | Interrupt}} [JSON] an optional `JSON` like interface to `parse` or `stringify` content with extra `transform` ability.
|
|
* @returns {ProxyHandler<globalThis> | ProxyHandler<Worker>}
|
|
*/
|
|
const coincident = (self, {parse = JSON.parse, stringify = JSON.stringify, transform, interrupt} = JSON) => {
|
|
// create a Proxy once for the given context (globalThis or Worker instance)
|
|
if (!context.has(self)) {
|
|
// ensure no SAB gets a chance to pass through this call
|
|
const sendMessage = postPatched || self.postMessage;
|
|
// ensure the CHANNEL and data are posted correctly
|
|
const post = (transfer, ...args) => sendMessage.call(self, {[CHANNEL]: args}, {transfer});
|
|
|
|
const handler = typeof interrupt === FUNCTION ? interrupt : interrupt?.handler;
|
|
const delay = interrupt?.delay || 42;
|
|
const decoder = new TextDecoder('utf-16');
|
|
|
|
// automatically uses sync wait (worker -> main)
|
|
// or fallback to async wait (main -> worker)
|
|
const waitFor = (isAsync, sb) => isAsync ?
|
|
waitAsync(sb, 0) :
|
|
((handler ? waitInterrupt(sb, delay, handler) : wait(sb, 0)), syncResult);
|
|
|
|
// prevent Harakiri https://github.com/WebReflection/coincident/issues/18
|
|
let seppuku = false;
|
|
|
|
context.set(self, new Proxy(new Map, {
|
|
// there is very little point in checking prop in proxy for this very specific case
|
|
// and I don't want to orchestrate a whole roundtrip neither, as stuff would fail
|
|
// regardless if from Worker we access non existent Main callback, and vice-versa.
|
|
// This is here mostly to guarantee that if such check is performed, at least the
|
|
// get trap goes through and then it's up to developers guarantee they are accessing
|
|
// stuff that actually exists elsewhere.
|
|
[HAS]: (_, action) => typeof action === 'string' && !action.startsWith('_'),
|
|
|
|
// worker related: get any utility that should be available on the main thread
|
|
[GET]: (_, action) => action === 'then' ? null : ((...args) => {
|
|
// transaction id
|
|
const id = uid++;
|
|
|
|
// first contact: just ask for how big the buffer should be
|
|
// the value would be stored at index [1] while [0] is just control
|
|
let sb = new Int32Array(new SharedArrayBuffer(I32_BYTES * 2));
|
|
|
|
// if a transfer list has been passed, drop it from args
|
|
let transfer = [];
|
|
if (buffers.has(args.at(-1) || transfer))
|
|
buffers.delete(transfer = args.pop());
|
|
|
|
// ask for invoke with arguments and wait for it
|
|
post(transfer, id, sb, action, transform ? args.map(transform) : args);
|
|
|
|
// helps deciding how to wait for results
|
|
const isAsync = self !== globalThis;
|
|
|
|
// warn users about possible deadlock still allowing them
|
|
// to explicitly `proxy.invoke().then(...)` without blocking
|
|
let deadlock = 0;
|
|
if (seppuku && isAsync)
|
|
deadlock = setTimeout(console.warn, 1000, `💀🔒 - Possible deadlock if proxy.${action}(...args) is awaited`);
|
|
|
|
return waitFor(isAsync, sb).value.then(() => {
|
|
clearTimeout(deadlock);
|
|
|
|
// commit transaction using the returned / needed buffer length
|
|
const length = sb[1];
|
|
|
|
// filter undefined results
|
|
if (!length) return;
|
|
|
|
// calculate the needed ui16 bytes length to store the result string
|
|
const bytes = UI16_BYTES * length;
|
|
|
|
// round up to the next amount of bytes divided by 4 to allow i32 operations
|
|
sb = new Int32Array(new SharedArrayBuffer(bytes + (bytes % I32_BYTES)));
|
|
|
|
// ask for results and wait for it
|
|
post([], id, sb);
|
|
return waitFor(isAsync, sb).value.then(() => parse(
|
|
decoder.decode(new Uint16Array(sb.buffer).slice(0, length)))
|
|
);
|
|
});
|
|
}),
|
|
|
|
// main thread related: react to any utility a worker is asking for
|
|
[SET](actions, action, callback) {
|
|
const type = typeof callback;
|
|
if (type !== FUNCTION)
|
|
throw new Error(`Unable to assign ${action} as ${type}`);
|
|
// lazy event listener and logic handling, triggered once by setters actions
|
|
if (!actions.size) {
|
|
// maps results by `id` as they are asked for
|
|
const results = new Map;
|
|
// add the event listener once (first defined setter, all others work the same)
|
|
self.addEventListener('message', async (event) => {
|
|
// grub the very same library CHANNEL; ignore otherwise
|
|
const details = event.data?.[CHANNEL];
|
|
if (isArray(details)) {
|
|
// if early enough, avoid leaking data to other listeners
|
|
event.stopImmediatePropagation();
|
|
const [id, sb, ...rest] = details;
|
|
let error;
|
|
// action available: it must be defined/known on the main thread
|
|
if (rest.length) {
|
|
const [action, args] = rest;
|
|
if (actions.has(action)) {
|
|
seppuku = true;
|
|
try {
|
|
// await for result either sync or async and serialize it
|
|
const result = await actions.get(action)(...args);
|
|
if (result !== void 0) {
|
|
const serialized = stringify(transform ? transform(result) : result);
|
|
// store the result for "the very next" event listener call
|
|
results.set(id, serialized);
|
|
// communicate the required SharedArrayBuffer length out of the
|
|
// resulting serialized string
|
|
sb[1] = serialized.length;
|
|
}
|
|
}
|
|
catch (_) {
|
|
error = _;
|
|
}
|
|
finally {
|
|
seppuku = false;
|
|
}
|
|
}
|
|
// unknown action should be notified as missing on the main thread
|
|
else {
|
|
error = new Error(`Unsupported action: ${action}`);
|
|
}
|
|
// unlock the wait lock later on
|
|
sb[0] = 1;
|
|
}
|
|
// no action means: get results out of the well known `id`
|
|
// wait lock automatically unlocked here as no `0` value would
|
|
// possibly ever land at index `0`
|
|
else {
|
|
const result = results.get(id);
|
|
results.delete(id);
|
|
// populate the SharedArrayBuffer with utf-16 chars code
|
|
for (let ui16a = new Uint16Array(sb.buffer), i = 0; i < result.length; i++)
|
|
ui16a[i] = result.charCodeAt(i);
|
|
}
|
|
// release te worker waiting either the length or the result
|
|
notify(sb, 0);
|
|
if (error) throw error;
|
|
}
|
|
});
|
|
}
|
|
// store this action callback allowing the setter in the process
|
|
return !!actions.set(action, callback);
|
|
}
|
|
}));
|
|
}
|
|
return context.get(self);
|
|
};
|
|
|
|
coincident.transfer = (...args) => (buffers.add(args), args);
|
|
|
|
module.exports = coincident;
|