Use fs streams directly

joshmgross/chunked-upload-testing
Josh Gross 5 years ago
parent 131e247bd2
commit 0816faf84c

@ -1497,7 +1497,6 @@ const Handlers_1 = __webpack_require__(941);
const HttpClient_1 = __webpack_require__(874); const HttpClient_1 = __webpack_require__(874);
const RestClient_1 = __webpack_require__(105); const RestClient_1 = __webpack_require__(105);
const utils = __importStar(__webpack_require__(443)); const utils = __importStar(__webpack_require__(443));
const stream_1 = __webpack_require__(794);
const MAX_CHUNK_SIZE = 4000000; // 4 MB Chunks const MAX_CHUNK_SIZE = 4000000; // 4 MB Chunks
function isSuccessStatusCode(statusCode) { function isSuccessStatusCode(statusCode) {
return statusCode >= 200 && statusCode < 300; return statusCode >= 200 && statusCode < 300;
@ -1584,30 +1583,29 @@ function reserveCache(key) {
}); });
} }
exports.reserveCache = reserveCache; exports.reserveCache = reserveCache;
function getContentRange(start, length) { function getContentRange(start, end) {
// Format: `bytes start-end/filesize // Format: `bytes start-end/filesize
// start and end are inclusive // start and end are inclusive
// filesize can be * // filesize can be *
// For a 200 byte chunk starting at byte 0: // For a 200 byte chunk starting at byte 0:
// Content-Range: bytes 0-199/* // Content-Range: bytes 0-199/*
return `bytes ${start}-${start + length - 1}/*`; return `bytes ${start}-${end}/*`;
} }
function bufferToStream(buffer) { // function bufferToStream(buffer: Buffer): NodeJS.ReadableStream {
const stream = new stream_1.Duplex(); // const stream = new Duplex();
stream.push(buffer); // stream.push(buffer);
stream.push(null); // stream.push(null);
return stream; // return stream;
} // }
function uploadChunk(restClient, resourceUrl, data, offset) { function uploadChunk(restClient, resourceUrl, data, start, end) {
return __awaiter(this, void 0, void 0, function* () { return __awaiter(this, void 0, void 0, function* () {
core.debug(`Uploading chunk of size ${data.byteLength} bytes at offset ${offset}`); core.debug(`Uploading chunk of size ${end - start + 1} bytes at offset ${start}`);
const requestOptions = getRequestOptions(); const requestOptions = getRequestOptions();
requestOptions.additionalHeaders = { requestOptions.additionalHeaders = {
"Content-Type": "application/octet-stream", "Content-Type": "application/octet-stream",
"Content-Range": getContentRange(offset, data.byteLength) "Content-Range": getContentRange(start, end)
}; };
const stream = bufferToStream(data); return yield restClient.uploadStream("PATCH", resourceUrl, data, requestOptions);
return yield restClient.uploadStream("PATCH", resourceUrl, stream, requestOptions);
}); });
} }
function commitCache(restClient, cacheId, filesize) { function commitCache(restClient, cacheId, filesize) {
@ -1622,26 +1620,19 @@ function saveCache(cacheId, archivePath) {
const restClient = createRestClient(); const restClient = createRestClient();
core.debug("Uploading chunks"); core.debug("Uploading chunks");
// Upload Chunks // Upload Chunks
const stream = fs.createReadStream(archivePath); const fileSize = fs.statSync(archivePath).size;
let streamIsClosed = false;
stream.on("end", () => {
core.debug("Stream is ended");
streamIsClosed = true;
});
const resourceUrl = getCacheApiUrl() + cacheId.toString(); const resourceUrl = getCacheApiUrl() + cacheId.toString();
const uploads = []; const uploads = [];
let offset = 0; let offset = 0;
while (!streamIsClosed) { while (offset < fileSize) {
const chunkSize = offset + MAX_CHUNK_SIZE > fileSize ? fileSize - offset : MAX_CHUNK_SIZE;
const end = offset + chunkSize - 1;
core.debug(`Offset: ${offset}`); core.debug(`Offset: ${offset}`);
const chunk = stream.read(MAX_CHUNK_SIZE); const chunk = fs.createReadStream(archivePath, { start: offset, end });
if (chunk == null) { uploads.push(uploadChunk(restClient, resourceUrl, chunk, offset, end));
core.debug(`Chunk is null, reading is over?`);
break;
}
uploads.push(uploadChunk(restClient, resourceUrl, chunk, offset));
offset += MAX_CHUNK_SIZE; offset += MAX_CHUNK_SIZE;
} }
core.debug("Awaiting all uplaods"); core.debug("Awaiting all uploads");
const responses = yield Promise.all(uploads); const responses = yield Promise.all(uploads);
const failedResponse = responses.find(x => !isSuccessStatusCode(x.statusCode)); const failedResponse = responses.find(x => !isSuccessStatusCode(x.statusCode));
if (failedResponse) { if (failedResponse) {
@ -3157,13 +3148,6 @@ run();
exports.default = run; exports.default = run;
/***/ }),
/***/ 794:
/***/ (function(module) {
module.exports = require("stream");
/***/ }), /***/ }),
/***/ 826: /***/ 826:

56
dist/save/index.js vendored

@ -1497,7 +1497,6 @@ const Handlers_1 = __webpack_require__(941);
const HttpClient_1 = __webpack_require__(874); const HttpClient_1 = __webpack_require__(874);
const RestClient_1 = __webpack_require__(105); const RestClient_1 = __webpack_require__(105);
const utils = __importStar(__webpack_require__(443)); const utils = __importStar(__webpack_require__(443));
const stream_1 = __webpack_require__(794);
const MAX_CHUNK_SIZE = 4000000; // 4 MB Chunks const MAX_CHUNK_SIZE = 4000000; // 4 MB Chunks
function isSuccessStatusCode(statusCode) { function isSuccessStatusCode(statusCode) {
return statusCode >= 200 && statusCode < 300; return statusCode >= 200 && statusCode < 300;
@ -1584,30 +1583,29 @@ function reserveCache(key) {
}); });
} }
exports.reserveCache = reserveCache; exports.reserveCache = reserveCache;
function getContentRange(start, length) { function getContentRange(start, end) {
// Format: `bytes start-end/filesize // Format: `bytes start-end/filesize
// start and end are inclusive // start and end are inclusive
// filesize can be * // filesize can be *
// For a 200 byte chunk starting at byte 0: // For a 200 byte chunk starting at byte 0:
// Content-Range: bytes 0-199/* // Content-Range: bytes 0-199/*
return `bytes ${start}-${start + length - 1}/*`; return `bytes ${start}-${end}/*`;
} }
function bufferToStream(buffer) { // function bufferToStream(buffer: Buffer): NodeJS.ReadableStream {
const stream = new stream_1.Duplex(); // const stream = new Duplex();
stream.push(buffer); // stream.push(buffer);
stream.push(null); // stream.push(null);
return stream; // return stream;
} // }
function uploadChunk(restClient, resourceUrl, data, offset) { function uploadChunk(restClient, resourceUrl, data, start, end) {
return __awaiter(this, void 0, void 0, function* () { return __awaiter(this, void 0, void 0, function* () {
core.debug(`Uploading chunk of size ${data.byteLength} bytes at offset ${offset}`); core.debug(`Uploading chunk of size ${end - start + 1} bytes at offset ${start}`);
const requestOptions = getRequestOptions(); const requestOptions = getRequestOptions();
requestOptions.additionalHeaders = { requestOptions.additionalHeaders = {
"Content-Type": "application/octet-stream", "Content-Type": "application/octet-stream",
"Content-Range": getContentRange(offset, data.byteLength) "Content-Range": getContentRange(start, end)
}; };
const stream = bufferToStream(data); return yield restClient.uploadStream("PATCH", resourceUrl, data, requestOptions);
return yield restClient.uploadStream("PATCH", resourceUrl, stream, requestOptions);
}); });
} }
function commitCache(restClient, cacheId, filesize) { function commitCache(restClient, cacheId, filesize) {
@ -1622,26 +1620,19 @@ function saveCache(cacheId, archivePath) {
const restClient = createRestClient(); const restClient = createRestClient();
core.debug("Uploading chunks"); core.debug("Uploading chunks");
// Upload Chunks // Upload Chunks
const stream = fs.createReadStream(archivePath); const fileSize = fs.statSync(archivePath).size;
let streamIsClosed = false;
stream.on("end", () => {
core.debug("Stream is ended");
streamIsClosed = true;
});
const resourceUrl = getCacheApiUrl() + cacheId.toString(); const resourceUrl = getCacheApiUrl() + cacheId.toString();
const uploads = []; const uploads = [];
let offset = 0; let offset = 0;
while (!streamIsClosed) { while (offset < fileSize) {
const chunkSize = offset + MAX_CHUNK_SIZE > fileSize ? fileSize - offset : MAX_CHUNK_SIZE;
const end = offset + chunkSize - 1;
core.debug(`Offset: ${offset}`); core.debug(`Offset: ${offset}`);
const chunk = stream.read(MAX_CHUNK_SIZE); const chunk = fs.createReadStream(archivePath, { start: offset, end });
if (chunk == null) { uploads.push(uploadChunk(restClient, resourceUrl, chunk, offset, end));
core.debug(`Chunk is null, reading is over?`);
break;
}
uploads.push(uploadChunk(restClient, resourceUrl, chunk, offset));
offset += MAX_CHUNK_SIZE; offset += MAX_CHUNK_SIZE;
} }
core.debug("Awaiting all uplaods"); core.debug("Awaiting all uploads");
const responses = yield Promise.all(uploads); const responses = yield Promise.all(uploads);
const failedResponse = responses.find(x => !isSuccessStatusCode(x.statusCode)); const failedResponse = responses.find(x => !isSuccessStatusCode(x.statusCode));
if (failedResponse) { if (failedResponse) {
@ -3139,13 +3130,6 @@ module.exports = require("fs");
/***/ }), /***/ }),
/***/ 794:
/***/ (function(module) {
module.exports = require("stream");
/***/ }),
/***/ 826: /***/ 826:
/***/ (function(module, __unusedexports, __webpack_require__) { /***/ (function(module, __unusedexports, __webpack_require__) {

@ -15,7 +15,6 @@ import {
ReserverCacheResponse ReserverCacheResponse
} from "./contracts"; } from "./contracts";
import * as utils from "./utils/actionUtils"; import * as utils from "./utils/actionUtils";
import { Duplex } from "stream";
const MAX_CHUNK_SIZE = 4000000; // 4 MB Chunks const MAX_CHUNK_SIZE = 4000000; // 4 MB Chunks
@ -127,38 +126,38 @@ export async function reserveCache(
return response?.result?.cacheId ?? -1; return response?.result?.cacheId ?? -1;
} }
function getContentRange(start: number, length: number): string { function getContentRange(start: number, end: number): string {
// Format: `bytes start-end/filesize // Format: `bytes start-end/filesize
// start and end are inclusive // start and end are inclusive
// filesize can be * // filesize can be *
// For a 200 byte chunk starting at byte 0: // For a 200 byte chunk starting at byte 0:
// Content-Range: bytes 0-199/* // Content-Range: bytes 0-199/*
return `bytes ${start}-${start + length - 1}/*`; return `bytes ${start}-${end}/*`;
} }
function bufferToStream(buffer: Buffer): NodeJS.ReadableStream { // function bufferToStream(buffer: Buffer): NodeJS.ReadableStream {
const stream = new Duplex(); // const stream = new Duplex();
stream.push(buffer); // stream.push(buffer);
stream.push(null); // stream.push(null);
return stream; // return stream;
} // }
async function uploadChunk( async function uploadChunk(
restClient: RestClient, restClient: RestClient,
resourceUrl: string, resourceUrl: string,
data: Buffer, data: NodeJS.ReadableStream,
offset: number start: number,
end: number
): Promise<IRestResponse<void>> { ): Promise<IRestResponse<void>> {
core.debug(`Uploading chunk of size ${data.byteLength} bytes at offset ${offset}`); core.debug(`Uploading chunk of size ${end - start + 1} bytes at offset ${start}`);
const requestOptions = getRequestOptions(); const requestOptions = getRequestOptions();
requestOptions.additionalHeaders = { requestOptions.additionalHeaders = {
"Content-Type": "application/octet-stream", "Content-Type": "application/octet-stream",
"Content-Range": getContentRange(offset, data.byteLength) "Content-Range": getContentRange(start, end)
}; };
const stream = bufferToStream(data); return await restClient.uploadStream<void>("PATCH", resourceUrl, data, requestOptions);
return await restClient.uploadStream<void>("PATCH", resourceUrl, stream, requestOptions);
} }
async function commitCache( async function commitCache(
@ -183,28 +182,20 @@ export async function saveCache(
core.debug("Uploading chunks"); core.debug("Uploading chunks");
// Upload Chunks // Upload Chunks
const stream = fs.createReadStream(archivePath); const fileSize = fs.statSync(archivePath).size;
let streamIsClosed = false;
stream.on("end", () => {
core.debug("Stream is ended");
streamIsClosed = true;
});
const resourceUrl = getCacheApiUrl() + cacheId.toString(); const resourceUrl = getCacheApiUrl() + cacheId.toString();
const uploads: Promise<IRestResponse<void>>[] = []; const uploads: Promise<IRestResponse<void>>[] = [];
let offset = 0; let offset = 0;
while (!streamIsClosed) { while (offset < fileSize) {
const chunkSize = offset + MAX_CHUNK_SIZE > fileSize ? fileSize - offset : MAX_CHUNK_SIZE;
const end = offset + chunkSize - 1;
core.debug(`Offset: ${offset}`); core.debug(`Offset: ${offset}`);
const chunk: Buffer = stream.read(MAX_CHUNK_SIZE); const chunk = fs.createReadStream(archivePath, { start: offset, end });
if (chunk == null) { uploads.push(uploadChunk(restClient, resourceUrl, chunk, offset, end));
core.debug(`Chunk is null, reading is over?`);
break;
}
uploads.push(uploadChunk(restClient, resourceUrl, chunk, offset));
offset += MAX_CHUNK_SIZE; offset += MAX_CHUNK_SIZE;
} }
core.debug("Awaiting all uplaods"); core.debug("Awaiting all uploads");
const responses = await Promise.all(uploads); const responses = await Promise.all(uploads);
const failedResponse = responses.find( const failedResponse = responses.find(

Loading…
Cancel
Save