tickle/modules/utils/createTrackedResponse.mjs

94 lines
2.3 KiB
JavaScript

//@ts-check
import { deferredPromise } from "./deferredPromise.mjs";
/**
* Creates a response object that can dispatch progress.
* @param {(received: number) => void} onProgress
* @param {AbortSignal} [signal]
* @param {number} [maxPackets] a maximum amount of packets to receive
*/
export const createTrackedResponse = (
onProgress,
signal,
maxPackets = 99999
) => {
/** @type {DeferredPromise<ReadableStreamDefaultReader<Uint8Array>>} */
let readerPromise = deferredPromise();
/** @type {DeferredPromise<Response>} */
const successPromise = deferredPromise();
let received = 0;
let started = false;
let failed = false;
let success = false;
const response = new Response(
new ReadableStream({
async start(controller) {
const onError = (/** @type {Error} */ error) => {
failed = true;
success = false;
controller.close();
controller.error(error);
successPromise.reject(error);
};
const onSuccess = () => {
failed = false;
success = true;
successPromise.resolve(response);
};
signal &&
signal.addEventListener("abort", () =>
onError(new Error(`Stream aborted`))
);
try {
const reader = await readerPromise;
started = true;
try {
while (true && maxPackets-- > 0) {
const { done, value } = await reader.read();
if (done) {
controller.close();
onProgress(received);
break;
}
received += value.byteLength;
controller.enqueue(value);
onProgress(received);
}
onSuccess();
} catch (error) {
onError(error);
}
} catch (readerError) {
onError(readerError);
}
},
})
);
const start = readerPromise.resolve;
return Object.assign(successPromise, {
start,
get response() {
return response;
},
get received() {
return received;
},
get isStarted() {
return started;
},
get isFailed() {
return failed;
},
get isSuccess() {
return success;
},
get isUnknown() {
return success === false && failed === false;
},
});
};