diff --git a/dist/restore/index.js b/dist/restore/index.js index cb0ace4..ca3896b 100644 --- a/dist/restore/index.js +++ b/dist/restore/index.js @@ -1615,53 +1615,40 @@ function commitCache(restClient, cacheId, filesize) { return yield restClient.create(`caches/${cacheId.toString()}`, commitCacheRequest, requestOptions); }); } -function parallelAwait(queue, concurrency) { - var _a; +function uploadFile(restClient, cacheId, archivePath) { return __awaiter(this, void 0, void 0, function* () { - const workQueue = queue.reverse(); - let completedWork = []; - let entries = queue.length; - while (entries > 0) { - if (entries < concurrency) { - completedWork.push(yield Promise.all(workQueue)); - } - else { - let promises = []; - let i; - for (i = 0; i < concurrency; i++) { - promises.push((_a = workQueue.pop(), (_a !== null && _a !== void 0 ? _a : Promise.resolve()))); - } - completedWork.push(yield Promise.all(promises)); - } - } - return completedWork; - }); -} -function saveCache(cacheId, archivePath) { - return __awaiter(this, void 0, void 0, function* () { - const restClient = createRestClient(); - core.debug("Uploading chunks"); // Upload Chunks const fileSize = fs.statSync(archivePath).size; const resourceUrl = getCacheApiUrl() + "caches/" + cacheId.toString(); - const uploads = []; + const responses = []; const fd = fs.openSync(archivePath, "r"); // Use the same fd for serial reads? Will this work for parallel too? - let offset = 0; - while (offset < fileSize) { - const chunkSize = offset + MAX_CHUNK_SIZE > fileSize ? fileSize - offset : MAX_CHUNK_SIZE; - const end = offset + chunkSize - 1; - const chunk = fs.createReadStream(archivePath, { fd, start: offset, end, autoClose: false }); - uploads.push(uploadChunk(restClient, resourceUrl, chunk, offset, end)); - offset += MAX_CHUNK_SIZE; - } + const concurrency = 4; // # of HTTP requests in parallel + const threads = new Array(concurrency); core.debug("Awaiting all uploads"); - const responses = yield parallelAwait(uploads, 4); + let offset = 0; + Promise.all(threads.map(() => __awaiter(this, void 0, void 0, function* () { + while (offset < fileSize) { + const chunkSize = offset + MAX_CHUNK_SIZE > fileSize ? fileSize - offset : MAX_CHUNK_SIZE; + const start = offset; + const end = offset + chunkSize - 1; + offset += MAX_CHUNK_SIZE; // Do this before losing thread during await? + const chunk = fs.createReadStream(archivePath, { fd, start, end, autoClose: false }); + responses.push(yield uploadChunk(restClient, resourceUrl, chunk, start, end)); + } + }))); fs.closeSync(fd); - //const responses = await Promise.all(uploads); const failedResponse = responses.find(x => !isSuccessStatusCode(x.statusCode)); if (failedResponse) { throw new Error(`Cache service responded with ${failedResponse.statusCode} during chunk upload.`); } + return; + }); +} +function saveCache(cacheId, archivePath) { + return __awaiter(this, void 0, void 0, function* () { + const restClient = createRestClient(); + core.debug("Upload cache"); + yield uploadFile(restClient, cacheId, archivePath); core.debug("Commiting cache"); // Commit Cache const cacheSize = utils.getArchiveFileSize(archivePath); diff --git a/dist/save/index.js b/dist/save/index.js index 0451f3f..7cd1df9 100644 --- a/dist/save/index.js +++ b/dist/save/index.js @@ -1615,53 +1615,40 @@ function commitCache(restClient, cacheId, filesize) { return yield restClient.create(`caches/${cacheId.toString()}`, commitCacheRequest, requestOptions); }); } -function parallelAwait(queue, concurrency) { - var _a; +function uploadFile(restClient, cacheId, archivePath) { return __awaiter(this, void 0, void 0, function* () { - const workQueue = queue.reverse(); - let completedWork = []; - let entries = queue.length; - while (entries > 0) { - if (entries < concurrency) { - completedWork.push(yield Promise.all(workQueue)); - } - else { - let promises = []; - let i; - for (i = 0; i < concurrency; i++) { - promises.push((_a = workQueue.pop(), (_a !== null && _a !== void 0 ? _a : Promise.resolve()))); - } - completedWork.push(yield Promise.all(promises)); - } - } - return completedWork; - }); -} -function saveCache(cacheId, archivePath) { - return __awaiter(this, void 0, void 0, function* () { - const restClient = createRestClient(); - core.debug("Uploading chunks"); // Upload Chunks const fileSize = fs.statSync(archivePath).size; const resourceUrl = getCacheApiUrl() + "caches/" + cacheId.toString(); - const uploads = []; + const responses = []; const fd = fs.openSync(archivePath, "r"); // Use the same fd for serial reads? Will this work for parallel too? - let offset = 0; - while (offset < fileSize) { - const chunkSize = offset + MAX_CHUNK_SIZE > fileSize ? fileSize - offset : MAX_CHUNK_SIZE; - const end = offset + chunkSize - 1; - const chunk = fs.createReadStream(archivePath, { fd, start: offset, end, autoClose: false }); - uploads.push(uploadChunk(restClient, resourceUrl, chunk, offset, end)); - offset += MAX_CHUNK_SIZE; - } + const concurrency = 4; // # of HTTP requests in parallel + const threads = new Array(concurrency); core.debug("Awaiting all uploads"); - const responses = yield parallelAwait(uploads, 4); + let offset = 0; + Promise.all(threads.map(() => __awaiter(this, void 0, void 0, function* () { + while (offset < fileSize) { + const chunkSize = offset + MAX_CHUNK_SIZE > fileSize ? fileSize - offset : MAX_CHUNK_SIZE; + const start = offset; + const end = offset + chunkSize - 1; + offset += MAX_CHUNK_SIZE; // Do this before losing thread during await? + const chunk = fs.createReadStream(archivePath, { fd, start, end, autoClose: false }); + responses.push(yield uploadChunk(restClient, resourceUrl, chunk, start, end)); + } + }))); fs.closeSync(fd); - //const responses = await Promise.all(uploads); const failedResponse = responses.find(x => !isSuccessStatusCode(x.statusCode)); if (failedResponse) { throw new Error(`Cache service responded with ${failedResponse.statusCode} during chunk upload.`); } + return; + }); +} +function saveCache(cacheId, archivePath) { + return __awaiter(this, void 0, void 0, function* () { + const restClient = createRestClient(); + core.debug("Upload cache"); + yield uploadFile(restClient, cacheId, archivePath); core.debug("Commiting cache"); // Commit Cache const cacheSize = utils.getArchiveFileSize(archivePath); diff --git a/src/cacheHttpClient.ts b/src/cacheHttpClient.ts index db74710..00dcc7d 100644 --- a/src/cacheHttpClient.ts +++ b/src/cacheHttpClient.ts @@ -174,54 +174,29 @@ async function commitCache( ); } -async function parallelAwait(queue: Promise[], concurrency: number): Promise { - const workQueue = queue.reverse(); - let completedWork: any[] = []; - let entries = queue.length; - while (entries > 0) { - if (entries < concurrency) { - completedWork.push(await Promise.all(workQueue)); - } else { - let promises: Promise[] = []; - let i: number; - for (i = 0; i < concurrency; i++) { - promises.push(workQueue.pop() ?? Promise.resolve()); - } - completedWork.push(await Promise.all(promises)); - } - } - - return completedWork; -} - -export async function saveCache( - cacheId: number, - archivePath: string -): Promise { - const restClient = createRestClient(); - - core.debug("Uploading chunks"); +async function uploadFile(restClient: RestClient, cacheId: number, archivePath: string): Promise { // Upload Chunks const fileSize = fs.statSync(archivePath).size; const resourceUrl = getCacheApiUrl() + "caches/" + cacheId.toString(); - const uploads: Promise>[] = []; - + const responses: IRestResponse[] = []; const fd = fs.openSync(archivePath, "r"); // Use the same fd for serial reads? Will this work for parallel too? - let offset = 0; - while (offset < fileSize) { - const chunkSize = offset + MAX_CHUNK_SIZE > fileSize ? fileSize - offset : MAX_CHUNK_SIZE; - const end = offset + chunkSize - 1; - const chunk = fs.createReadStream(archivePath, { fd, start: offset, end, autoClose: false }); - uploads.push(uploadChunk(restClient, resourceUrl, chunk, offset, end)); - offset += MAX_CHUNK_SIZE; - } + const concurrency = 4; // # of HTTP requests in parallel + const threads = new Array(concurrency); core.debug("Awaiting all uploads"); - const responses = await parallelAwait(uploads, 4); - fs.closeSync(fd); - + let offset = 0; + Promise.all(threads.map(async () => { // This might not work cause something something closures + while (offset < fileSize) { + const chunkSize = offset + MAX_CHUNK_SIZE > fileSize ? fileSize - offset : MAX_CHUNK_SIZE; + const start = offset; + const end = offset + chunkSize - 1; + offset += MAX_CHUNK_SIZE; // Do this before losing thread during await? + const chunk = fs.createReadStream(archivePath, { fd, start, end, autoClose: false }); + responses.push(await uploadChunk(restClient, resourceUrl, chunk, start, end)); + } + })); - //const responses = await Promise.all(uploads); + fs.closeSync(fd); const failedResponse = responses.find( x => !isSuccessStatusCode(x.statusCode) @@ -232,6 +207,18 @@ export async function saveCache( ); } + return; +} + +export async function saveCache( + cacheId: number, + archivePath: string +): Promise { + const restClient = createRestClient(); + + core.debug("Upload cache"); + await uploadFile(restClient, cacheId, archivePath); + core.debug("Commiting cache"); // Commit Cache const cacheSize = utils.getArchiveFileSize(archivePath);