Timeout requests and stop processing further (#204)

Fix https://github.com/matrix-org/matrix-public-archive/issues/148
Fix https://github.com/matrix-org/matrix-public-archive/issues/40

 - Apply timeout middleware to all room directory and room routes
 - Stop messing with the response after we timeout. Fix https://github.com/matrix-org/matrix-public-archive/issues/148
    - This also involves cancelling any `async/await` things like requests in the routes so we throw an abort error instead of continuing on. Fix https://github.com/matrix-org/matrix-public-archive/issues/40
 - Also abort the route if we see that the user closed the request before we could respond to them
 - Bumps minimum supported Node.js version to v18 because we're now using the built-in native `fetch` in Node.js vs `node-fetch`. This gives us the custom `signal.reason` that we aborted with instead of a generic `AbortError`.
    - This also means we had to add some instrumentation for `fetch` which uses `undici` under the hood. Settled on some unofficial instrumentation: [`opentelemetry-instrumentation-fetch-node`](https://www.npmjs.com/package/opentelemetry-instrumentation-fetch-node)
pull/207/head
Eric Eastwood 10 months ago committed by GitHub
parent 8bea5e0355
commit 9078abf4f1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -81,7 +81,7 @@ jobs:
strategy:
matrix:
node-version: [16.x]
node-version: [18.x]
services:
# We need two homeservers that federate with each other to test with

@ -2,7 +2,7 @@
# with error 243 issues are solved:
# - https://github.com/npm/cli/issues/4996
# - https://github.com/npm/cli/issues/4769
FROM node:16.14.2-buster-slim
FROM node:18.16.0-buster-slim
# Pass through some GitHub CI variables which we use in the build (for version
# files/tags)

@ -35,11 +35,16 @@ See the [FAQ page](docs/faq.md).
### Prerequisites
- [Node.js](https://nodejs.org/) v16
- We only need v16 because it includes
- [Node.js](https://nodejs.org/) v18
- We need v18 because it includes `fetch` by default. And [`node-fetch` doesn't
support `abortSignal.reason`](https://github.com/node-fetch/node-fetch/issues/1462)
yet.
- We need v16 because it includes
[`require('crypto').webcrypto.subtle`](https://nodejs.org/docs/latest-v16.x/api/webcrypto.html#cryptosubtle)
for [Matrix encryption (olm) which can't be disabled in
Hydrogen](https://github.com/vector-im/hydrogen-web/issues/579) yet.
Hydrogen](https://github.com/vector-im/hydrogen-web/issues/579) yet. And
[`abortSignal.reason` was introduced in
v16.14.0](https://nodejs.org/dist/latest-v18.x/docs/api/globals.html#abortsignalreason) (although we use `node-fetch` for now).
- A Matrix homeserver that supports [MSC3030's](https://github.com/matrix-org/matrix-spec-proposals/pull/3030) `/timestamp_to_event` endpoint
- [Synapse](https://matrix.org/docs/projects/server/synapse) 1.73.0+

4155
package-lock.json generated

File diff suppressed because it is too large Load Diff

@ -36,16 +36,16 @@
"vite": "^4.3.1"
},
"dependencies": {
"@opentelemetry/api": "^1.1.0",
"@opentelemetry/auto-instrumentations-node": "^0.31.0",
"@opentelemetry/context-async-hooks": "^1.4.0",
"@opentelemetry/core": "^1.4.0",
"@opentelemetry/exporter-jaeger": "^1.3.1",
"@opentelemetry/instrumentation": "^0.30.0",
"@opentelemetry/propagator-ot-trace": "^0.26.0",
"@opentelemetry/resources": "^1.3.1",
"@opentelemetry/sdk-trace-base": "^1.3.1",
"@opentelemetry/semantic-conventions": "^1.3.1",
"@opentelemetry/api": "^1.4.1",
"@opentelemetry/auto-instrumentations-node": "^0.36.6",
"@opentelemetry/context-async-hooks": "^1.12.0",
"@opentelemetry/core": "^1.12.0",
"@opentelemetry/exporter-jaeger": "^1.12.0",
"@opentelemetry/instrumentation": "^0.38.0",
"@opentelemetry/propagator-ot-trace": "^0.26.2",
"@opentelemetry/resources": "^1.12.0",
"@opentelemetry/sdk-trace-base": "^1.12.0",
"@opentelemetry/semantic-conventions": "^1.12.0",
"cors": "^2.8.5",
"dompurify": "^2.3.9",
"escape-string-regexp": "^4.0.0",
@ -55,7 +55,7 @@
"linkedom": "^0.14.17",
"matrix-public-archive-shared": "file:./shared/",
"nconf": "^0.11.3",
"node-fetch": "^2.6.7",
"opentelemetry-instrumentation-fetch-node": "^1.0.0",
"url-join": "^4.0.1"
}
}

@ -5,7 +5,7 @@
const assert = require('assert');
const RethrownError = require('../lib/rethrown-error');
const RethrownError = require('../lib/errors/rethrown-error');
// Serialize the error and send it back up to the parent process so we can
// interact with it and know what happened when the process exits.

@ -12,7 +12,7 @@
const assert = require('assert');
const { fork } = require('child_process');
const RethrownError = require('../lib/rethrown-error');
const RethrownError = require('../lib/errors/rethrown-error');
const { traceFunction } = require('../tracing/trace-utilities');
const config = require('../lib/config');
@ -26,6 +26,10 @@ if (!logOutputFromChildProcesses) {
const resolvedChildForkScriptPath = require.resolve('./child-fork-script');
class RunInChildProcessTimeoutAbortError extends RethrownError {
// ...
}
function assembleErrorAfterChildExitsWithErrors(exitCode, childErrors, childStdErr) {
assert(childErrors);
@ -65,20 +69,56 @@ function assembleErrorAfterChildExitsWithErrors(exitCode, childErrors, childStdE
return childErrorSummary;
}
async function runInChildProcess(modulePath, runArguments, { timeout }) {
async function runInChildProcess(
modulePath,
runArguments,
{ timeout, abortSignal: externalAbortSignal }
) {
let abortTimeoutId;
try {
let childErrors = [];
let childExitCode = '(not set yet)';
let childStdErr = '';
const controller = new AbortController();
const { signal } = controller;
const abortController = new AbortController();
// Stops the child process if it takes too long
if (timeout) {
abortTimeoutId = setTimeout(() => {
const childErrorSummary = assembleErrorAfterChildExitsWithErrors(
childExitCode,
childErrors,
childStdErr
);
abortController.abort(
new RunInChildProcessTimeoutAbortError(
`Timed out while running ${modulePath} so we aborted the child process after ${timeout}ms. Any child errors? (${childErrors.length})`,
childErrorSummary
)
);
}, timeout);
}
// Stop the child process if we get an external signal to stop (like if the whole
// express route that caused this call times out)
if (externalAbortSignal) {
if (externalAbortSignal.aborted) {
// Abort for good measure in case we sneak past this somehow
abortController.abort(externalAbortSignal.reason);
// Throw an error and exit early if we already aborted before we even started
throw externalAbortSignal.reason;
}
externalAbortSignal.addEventListener('abort', () => {
abortController.abort(externalAbortSignal.reason);
});
}
// We use a child_process because we want to be able to exit the process
// after we receive the results. We use `fork` instead of `exec`/`spawn` so
// that we can pass a module instead of running a command.
const child = fork(resolvedChildForkScriptPath, [modulePath], {
signal,
signal: abortController.signal,
// Default to silencing logs from the child process. We already have
// proper instrumentation of any errors that might occur.
//
@ -106,13 +146,6 @@ async function runInChildProcess(modulePath, runArguments, { timeout }) {
// with argv.
child.send(runArguments);
// Stops the child process if it takes too long
if (timeout) {
abortTimeoutId = setTimeout(() => {
controller.abort();
}, timeout);
}
const returnedData = await new Promise((resolve, reject) => {
let data = '';
// Collect the data passed back by the child
@ -151,18 +184,12 @@ async function runInChildProcess(modulePath, runArguments, { timeout }) {
// When a problem occurs when spawning the process or gets aborted
child.on('error', (err) => {
// We should be able to just `reject(err)` without any special-case handling
// here since ideally, we expect the error to be whatever `signal.reason` we
// aborted with but `child_process.fork(...)` doesn't seem play nicely, see
// https://github.com/nodejs/node/issues/47814
if (err.name === 'AbortError') {
const childErrorSummary = assembleErrorAfterChildExitsWithErrors(
childExitCode,
childErrors,
childStdErr
);
reject(
new RethrownError(
`Timed out while running ${modulePath} so we aborted the child process after ${timeout}ms. Any child errors? (${childErrors.length})`,
childErrorSummary
)
);
reject(abortController.signal.reason || err);
} else {
reject(err);
}

@ -7,7 +7,9 @@
// get our initial rendered HTML.
const assert = require('assert');
const RethrownError = require('../lib/rethrown-error');
const RethrownError = require('../lib/errors/rethrown-error');
const RouteTimeoutAbortError = require('../lib/errors/route-timeout-abort-error');
const UserClosedConnectionAbortError = require('../lib/errors/user-closed-connection-abort-error');
const runInChildProcess = require('../child-process-runner/run-in-child-process');
const resolvedRenderHydrogenToStringUnsafeScriptPath = require.resolve(
@ -18,7 +20,7 @@ const resolvedRenderHydrogenToStringUnsafeScriptPath = require.resolve(
// gone really wrong.
const RENDER_TIMEOUT = 5000;
async function renderHydrogenToString(renderOptions) {
async function renderHydrogenToString({ renderOptions, abortSignal }) {
assert(renderOptions);
// We expect `config` but we should sanity check that we aren't leaking the access token
@ -48,17 +50,25 @@ async function renderHydrogenToString(renderOptions) {
renderOptions,
{
timeout: RENDER_TIMEOUT,
abortSignal,
}
);
return hydrogenHtmlOutput;
} catch (err) {
throw new RethrownError(
`Failed to render Hydrogen to string. In order to reproduce, feed in these arguments into \`renderHydrogenToString(...)\`:\n renderHydrogenToString arguments: ${JSON.stringify(
renderOptions
)}`,
err
);
// No need to wrap these errors since the abort originates from outside of the
// render process. And makes it easier to detect without having to look for
// underlying causes.
if (err instanceof RouteTimeoutAbortError || err instanceof UserClosedConnectionAbortError) {
throw err;
} else {
throw new RethrownError(
`Failed to render Hydrogen to string. In order to reproduce, feed in these arguments into \`renderHydrogenToString(...)\`:\n renderHydrogenToString arguments: ${JSON.stringify(
renderOptions
)}`,
err
);
}
}
}

@ -9,15 +9,19 @@ async function renderHydrogenVmRenderScriptToPageHtml({
pageOptions,
vmRenderScriptFilePath,
vmRenderContext,
abortSignal,
}) {
assert(vmRenderScriptFilePath);
assert(vmRenderContext);
assert(pageOptions);
const hydrogenHtmlOutput = await renderHydrogenToString({
vmRenderScriptFilePath,
vmRenderContext,
pageOptions,
renderOptions: {
vmRenderScriptFilePath,
vmRenderContext,
pageOptions,
},
abortSignal,
});
const pageHtml = renderPageHtml({

@ -0,0 +1,18 @@
'use strict';
// Standard error extender from @deployable/errors
// (https://github.com/deployable/deployable-errors)
class ExtendedError extends Error {
constructor(message) {
super(message);
this.name = this.constructor.name;
this.message = message;
if (typeof Error.captureStackTrace === 'function') {
Error.captureStackTrace(this, this.constructor);
} else {
this.stack = new Error(message).stack;
}
}
}
module.exports = ExtendedError;

@ -1,20 +1,6 @@
'use strict';
// via https://stackoverflow.com/a/42755876/796832
// Standard error extender from @deployable/errors
class ExtendedError extends Error {
constructor(message) {
super(message);
this.name = this.constructor.name;
this.message = message;
if (typeof Error.captureStackTrace === 'function') {
Error.captureStackTrace(this, this.constructor);
} else {
this.stack = new Error(message).stack;
}
}
}
const ExtendedError = require('./extended-error');
// A way to create a new error with a custom message but keep the stack trace of
// the original error. Useful to give more context and why the action was tried
@ -27,6 +13,8 @@ class ExtendedError extends Error {
// from the config when this error occured.
//
// `new RethrownError('Failed to get the ratelimit key from the config', originalError)` (failed to read the disk)
//
// via https://stackoverflow.com/a/42755876/796832
class RethrownError extends ExtendedError {
constructor(message, error) {
super(message);

@ -0,0 +1,9 @@
'use strict';
const ExtendedError = require('./extended-error');
class RouteTimeoutAbortError extends ExtendedError {
// ...
}
module.exports = RouteTimeoutAbortError;

@ -0,0 +1,9 @@
'use strict';
const ExtendedError = require('./extended-error');
class UserClosedConnectionAbortError extends ExtendedError {
// ...
}
module.exports = UserClosedConnectionAbortError;

@ -1,6 +1,6 @@
'use strict';
const fetch = require('node-fetch');
const assert = require('assert');
class HTTPResponseError extends Error {
constructor(response, responseText, ...args) {
@ -23,6 +23,10 @@ const checkResponseStatus = async (response) => {
};
async function fetchEndpoint(endpoint, options = {}) {
// We chose `abortSignal` just because it's a less ambiguous name and obvious what
// it's used for.
assert(!options.signal, 'Use `options.abortSignal` instead of `options.signal`');
const { method, accessToken } = options;
const headers = options.headers || {};
@ -34,6 +38,8 @@ async function fetchEndpoint(endpoint, options = {}) {
method,
headers,
body: options.body,
// Abort signal to cancel the request
signal: options.abortSignal,
});
await checkResponseStatus(res);

@ -7,11 +7,15 @@ const { fetchEndpointAsJson } = require('../fetch-endpoint');
const getServerNameFromMatrixRoomIdOrAlias = require('./get-server-name-from-matrix-room-id-or-alias');
const config = require('../config');
const StatusError = require('../status-error');
const StatusError = require('../errors/status-error');
const matrixServerUrl = config.get('matrixServerUrl');
assert(matrixServerUrl);
async function ensureRoomJoined(accessToken, roomIdOrAlias, viaServers = new Set()) {
async function ensureRoomJoined(
accessToken,
roomIdOrAlias,
{ viaServers = new Set(), abortSignal } = {}
) {
// We use a `Set` to ensure that we don't have duplicate servers in the list
assert(viaServers instanceof Set);
@ -38,6 +42,7 @@ async function ensureRoomJoined(accessToken, roomIdOrAlias, viaServers = new Set
const { data: joinData } = await fetchEndpointAsJson(joinEndpoint, {
method: 'POST',
accessToken,
abortSignal,
});
assert(
joinData.room_id,

@ -25,7 +25,7 @@ assert(matrixServerUrl);
// - ❌ If we start from dayEnd and look forwards, we will find nothing
//
// Returns events in reverse-chronological order.
async function fetchEventsFromTimestampBackwards({ accessToken, roomId, ts, limit }) {
async function fetchEventsFromTimestampBackwards({ accessToken, roomId, ts, limit, abortSignal }) {
assert(accessToken);
assert(roomId);
assert(ts);
@ -42,6 +42,7 @@ async function fetchEventsFromTimestampBackwards({ accessToken, roomId, ts, limi
roomId,
ts,
direction: DIRECTION.backward,
abortSignal,
});
eventIdForTimestamp = eventId;
} catch (err) {
@ -70,6 +71,7 @@ async function fetchEventsFromTimestampBackwards({ accessToken, roomId, ts, limi
// doesn't backfill in the forward direction)
dir: DIRECTION.backward,
limit,
abortSignal,
});
const stateEventMap = {};

@ -10,7 +10,10 @@ const config = require('../config');
const matrixServerUrl = config.get('matrixServerUrl');
assert(matrixServerUrl);
async function fetchPublicRooms(accessToken, { server, searchTerm, paginationToken, limit } = {}) {
async function fetchPublicRooms(
accessToken,
{ server, searchTerm, paginationToken, limit, abortSignal } = {}
) {
assert(accessToken);
let qs = new URLSearchParams();
@ -34,6 +37,7 @@ async function fetchPublicRooms(accessToken, { server, searchTerm, paginationTok
limit,
},
accessToken,
abortSignal,
});
// We only want to see public rooms in the archive

@ -25,7 +25,11 @@ function getStateEndpointForRoomIdAndEventType(roomId, eventType) {
// https://github.com/matrix-org/synapse/issues/15454
//
// TODO: Remove this when we have MSC3999 (because it's the only usage)
const removeMe_fetchRoomCreateEventId = traceFunction(async function (matrixAccessToken, roomId) {
const removeMe_fetchRoomCreateEventId = traceFunction(async function (
matrixAccessToken,
roomId,
{ abortSignal } = {}
) {
const { data } = await fetchEndpointAsJson(
urlJoin(
matrixServerUrl,
@ -33,6 +37,7 @@ const removeMe_fetchRoomCreateEventId = traceFunction(async function (matrixAcce
),
{
accessToken: matrixAccessToken,
abortSignal,
}
);
@ -41,10 +46,15 @@ const removeMe_fetchRoomCreateEventId = traceFunction(async function (matrixAcce
return roomCreateEventId;
});
const fetchRoomCreationInfo = traceFunction(async function (matrixAccessToken, roomId) {
const fetchRoomCreationInfo = traceFunction(async function (
matrixAccessToken,
roomId,
{ abortSignal } = {}
) {
const [stateCreateResDataOutcome] = await Promise.allSettled([
fetchEndpointAsJson(getStateEndpointForRoomIdAndEventType(roomId, 'm.room.create'), {
accessToken: matrixAccessToken,
abortSignal,
}),
]);
@ -61,13 +71,18 @@ const fetchRoomCreationInfo = traceFunction(async function (matrixAccessToken, r
return { roomCreationTs, predecessorRoomId, predecessorLastKnownEventId };
});
const fetchPredecessorInfo = traceFunction(async function (matrixAccessToken, roomId) {
const fetchPredecessorInfo = traceFunction(async function (
matrixAccessToken,
roomId,
{ abortSignal } = {}
) {
const [roomCreationInfoOutcome, statePredecessorResDataOutcome] = await Promise.allSettled([
fetchRoomCreationInfo(matrixAccessToken, roomId),
fetchRoomCreationInfo(matrixAccessToken, roomId, { abortSignal }),
fetchEndpointAsJson(
getStateEndpointForRoomIdAndEventType(roomId, 'org.matrix.msc3946.room_predecessor'),
{
accessToken: matrixAccessToken,
abortSignal,
}
),
]);
@ -99,10 +114,15 @@ const fetchPredecessorInfo = traceFunction(async function (matrixAccessToken, ro
};
});
const fetchSuccessorInfo = traceFunction(async function (matrixAccessToken, roomId) {
const fetchSuccessorInfo = traceFunction(async function (
matrixAccessToken,
roomId,
{ abortSignal } = {}
) {
const [stateTombstoneResDataOutcome] = await Promise.allSettled([
fetchEndpointAsJson(getStateEndpointForRoomIdAndEventType(roomId, 'm.room.tombstone'), {
accessToken: matrixAccessToken,
abortSignal,
}),
]);
@ -121,7 +141,11 @@ const fetchSuccessorInfo = traceFunction(async function (matrixAccessToken, room
});
// eslint-disable-next-line max-statements
const fetchRoomData = traceFunction(async function (matrixAccessToken, roomId) {
const fetchRoomData = traceFunction(async function (
matrixAccessToken,
roomId,
{ abortSignal } = {}
) {
assert(matrixAccessToken);
assert(roomId);
@ -136,24 +160,29 @@ const fetchRoomData = traceFunction(async function (matrixAccessToken, roomId) {
] = await Promise.allSettled([
fetchEndpointAsJson(getStateEndpointForRoomIdAndEventType(roomId, 'm.room.name'), {
accessToken: matrixAccessToken,
abortSignal,
}),
fetchEndpointAsJson(getStateEndpointForRoomIdAndEventType(roomId, 'm.room.canonical_alias'), {
accessToken: matrixAccessToken,
abortSignal,
}),
fetchEndpointAsJson(getStateEndpointForRoomIdAndEventType(roomId, 'm.room.avatar'), {
accessToken: matrixAccessToken,
abortSignal,
}),
fetchEndpointAsJson(
getStateEndpointForRoomIdAndEventType(roomId, 'm.room.history_visibility'),
{
accessToken: matrixAccessToken,
abortSignal,
}
),
fetchEndpointAsJson(getStateEndpointForRoomIdAndEventType(roomId, 'm.room.join_rules'), {
accessToken: matrixAccessToken,
abortSignal,
}),
fetchPredecessorInfo(matrixAccessToken, roomId),
fetchSuccessorInfo(matrixAccessToken, roomId),
fetchPredecessorInfo(matrixAccessToken, roomId, { abortSignal }),
fetchSuccessorInfo(matrixAccessToken, roomId, { abortSignal }),
]);
let name;

@ -10,7 +10,14 @@ const config = require('../config');
const matrixServerUrl = config.get('matrixServerUrl');
assert(matrixServerUrl);
async function getMessagesResponseFromEventId({ accessToken, roomId, eventId, dir, limit }) {
async function getMessagesResponseFromEventId({
accessToken,
roomId,
eventId,
dir,
limit,
abortSignal,
}) {
// We only use this endpoint to get a pagination token we can use with
// `/messages`.
//
@ -31,6 +38,7 @@ async function getMessagesResponseFromEventId({ accessToken, roomId, eventId, di
);
const { data: contextResData } = await fetchEndpointAsJson(contextEndpoint, {
accessToken,
abortSignal,
});
// We want to re-paginte over the same event so it's included in the response.
@ -56,6 +64,7 @@ async function getMessagesResponseFromEventId({ accessToken, roomId, eventId, di
);
const { data: messageResData } = await fetchEndpointAsJson(messagesEndpoint, {
accessToken,
abortSignal,
});
return messageResData;

@ -10,11 +10,13 @@ const config = require('../config');
const matrixServerUrl = config.get('matrixServerUrl');
assert(matrixServerUrl);
async function timestampToEvent({ accessToken, roomId, ts, direction }) {
async function timestampToEvent({ accessToken, roomId, ts, direction, abortSignal }) {
assert(accessToken);
assert(roomId);
assert(ts);
assert(direction);
// TODO: Handle `fromCausalEventId` -> `org.matrix.msc3999.event_id`: See MSC3999
// (https://github.com/matrix-org/matrix-spec-proposals/pull/3999)
const timestampToEventEndpoint = urlJoin(
matrixServerUrl,
@ -24,6 +26,7 @@ async function timestampToEvent({ accessToken, roomId, ts, direction }) {
);
const { data: timestampToEventResData } = await fetchEndpointAsJson(timestampToEventEndpoint, {
accessToken,
abortSignal,
});
return {

@ -1,6 +1,6 @@
'use strict';
const StatusError = require('../lib/status-error');
const StatusError = require('./errors/status-error');
function parseViaServersFromUserInput(rawViaServers) {
// `rawViaServers` could be an array, a single string, or undefined. Turn it into an

@ -3,7 +3,10 @@
const assert = require('assert');
const urlJoin = require('url-join');
const asyncHandler = require('../lib/express-async-handler');
const RouteTimeoutAbortError = require('../lib/errors/route-timeout-abort-error');
const UserClosedConnectionAbortError = require('../lib/errors/user-closed-connection-abort-error');
const { getSerializableSpans, getActiveTraceId } = require('../tracing/tracing-middleware');
const { SemanticAttributes } = require('@opentelemetry/semantic-conventions');
const sanitizeHtml = require('../lib/sanitize-html');
const renderPageHtml = require('../hydrogen-render/render-page-html');
@ -16,16 +19,29 @@ assert(requestTimeoutMs);
// Based off of the `connect-timeout` middleware,
// https://github.com/expressjs/timeout/blob/f2f520f335f2f2ae255d4778e908e8d38e3a4e68/index.js
async function timeoutMiddleware(req, res, next) {
req.abortController = new AbortController();
req.abortSignal = req.abortController.signal;
const timeoutId = setTimeout(() => {
// Signal to downstream middlewares/routes that they should stop processing/fetching
// things since we timed out (downstream consumers need to respect `req.abortSignal`)
req.abortController.abort(
new RouteTimeoutAbortError(
`Timed out after ${requestTimeoutMs}ms while trying to respond to route ${req.originalUrl}`
)
);
const traceId = getActiveTraceId();
const serializableSpans = getSerializableSpans();
let humanReadableSpans;
if (serializableSpans.length > 0) {
humanReadableSpans = serializableSpans.map((serializableSpan) => {
const method = serializableSpan.attributes['http.method'];
const url = serializableSpan.attributes['http.url'];
const statusCode = serializableSpan.attributes['http.status_code'];
const method = serializableSpan.attributes[SemanticAttributes.HTTP_METHOD];
const url =
serializableSpan.attributes[SemanticAttributes.HTTP_TARGET] ||
serializableSpan.attributes[SemanticAttributes.HTTP_URL];
const statusCode = serializableSpan.attributes[SemanticAttributes.HTTP_STATUS_CODE];
let durationString = `request is still running (${
Date.now() - serializableSpan.startTimeInMs
@ -93,9 +109,22 @@ async function timeoutMiddleware(req, res, next) {
}, requestTimeoutMs);
res.on('finish', function () {
// Clear the timeout if the response finishes naturally
clearTimeout(timeoutId);
});
req.on('close', function () {
// Signal to downstream middlewares/routes that they should stop processing/fetching
// things since the user closed the connection before we sent a response (downstream
// consumers need to respect `req.abortSignal`)
//
// This is a bit adjacent to "timeouts" but fits easily enough here (this could be a
// separate middleware).
req.abortController.abort(
new UserClosedConnectionAbortError(`User closed connection before we could respond`)
);
});
next();
}

@ -5,6 +5,7 @@ const express = require('express');
const cors = require('cors');
const asyncHandler = require('../lib/express-async-handler');
const timeoutMiddleware = require('../middleware/timeout-middleware');
const { handleTracingMiddleware } = require('../tracing/tracing-middleware');
const getVersionTags = require('../lib/get-version-tags');
const preventClickjackingMiddleware = require('../middleware/prevent-clickjacking-middleware');
@ -41,10 +42,14 @@ function installRoutes(app) {
// Our own archive app styles and scripts
app.use('/assets', express.static(path.join(__dirname, '../../dist/assets')));
app.use('/', require('./room-directory-routes'));
app.use('/', timeoutMiddleware, require('./room-directory-routes'));
// For room aliases (/r) or room ID's (/roomid)
app.use('/:entityDescriptor(r|roomid)/:roomIdOrAliasDirty', require('./room-routes'));
app.use(
'/:entityDescriptor(r|roomid)/:roomIdOrAliasDirty',
timeoutMiddleware,
require('./room-routes')
);
// Since everything after the hash (`#`) won't make it to the server, let's serve a 404
// page that will potentially redirect them to the correct place if they tried

@ -6,6 +6,8 @@ const urlJoin = require('url-join');
const express = require('express');
const asyncHandler = require('../lib/express-async-handler');
const RouteTimeoutAbortError = require('../lib/errors/route-timeout-abort-error');
const UserClosedConnectionAbortError = require('../lib/errors/user-closed-connection-abort-error');
const identifyRoute = require('../middleware/identify-route-middleware');
const fetchPublicRooms = require('../lib/matrix-utils/fetch-public-rooms');
const renderHydrogenVmRenderScriptToPageHtml = require('../hydrogen-render/render-hydrogen-vm-render-script-to-page-html');
@ -54,10 +56,18 @@ router.get(
searchTerm,
paginationToken,
limit,
abortSignal: req.abortSignal,
}
));
} catch (err) {
roomFetchError = err;
if (err instanceof RouteTimeoutAbortError || err instanceof UserClosedConnectionAbortError) {
// Throw an error so we stop processing and assembling the page after we abort
throw err;
} else {
// Otherwise, this will be the error we will display on the page for the user to
// explain why we failed to fetch the rooms they wanted.
roomFetchError = err;
}
}
// We index the room directory unless the config says we shouldn't index anything
@ -98,6 +108,7 @@ router.get(
matrixServerName,
},
},
abortSignal: req.abortSignal,
});
setHeadersToPreloadAssets(res, pageOptions);

@ -5,9 +5,8 @@ const path = require('path');
const urlJoin = require('url-join');
const express = require('express');
const asyncHandler = require('../lib/express-async-handler');
const StatusError = require('../lib/status-error');
const StatusError = require('../lib/errors/status-error');
const timeoutMiddleware = require('../middleware/timeout-middleware');
const redirectToCorrectArchiveUrlIfBadSigil = require('../middleware/redirect-to-correct-archive-url-if-bad-sigil-middleware');
const identifyRoute = require('../middleware/identify-route-middleware');
@ -179,11 +178,10 @@ router.get(
// We have to wait for the room join to happen first before we can fetch
// any of the additional room info or messages.
const roomId = await ensureRoomJoined(
matrixAccessToken,
roomIdOrAlias,
parseViaServersFromUserInput(req.query.via)
);
const roomId = await ensureRoomJoined(matrixAccessToken, roomIdOrAlias, {
viaServers: parseViaServersFromUserInput(req.query.via),
abortSignal: req.abortSignal,
});
// Find the closest day to the current time with messages
const { originServerTs } = await timestampToEvent({
@ -191,6 +189,7 @@ router.get(
roomId,
ts: dateBeforeJoin,
direction: DIRECTION.backward,
abortSignal: req.abortSignal,
});
if (!originServerTs) {
throw new StatusError(404, 'Unable to find day with history');
@ -252,7 +251,10 @@ router.get(
// We have to wait for the room join to happen first before we can use the jump to
// date endpoint (or any other Matrix endpoint)
const viaServers = parseViaServersFromUserInput(req.query.via);
const roomId = await ensureRoomJoined(matrixAccessToken, roomIdOrAlias, viaServers);
const roomId = await ensureRoomJoined(matrixAccessToken, roomIdOrAlias, {
viaServers,
abortSignal: req.abortSignal,
});
let ts;
let fromCausalEventId;
@ -305,9 +307,14 @@ router.get(
// currently just have this set in case some server has this implemented in
// the future but there currently is no implementation (as of 2023-04-17) and
// we can't have passing tests without a server implementation first.
'org.matrix.msc3999.event_id': fromCausalEventId,
//
// TODO: This isn't implemented yet
fromCausalEventId,
abortSignal: req.abortSignal,
}),
removeMe_fetchRoomCreateEventId(matrixAccessToken, roomId, {
abortSignal: req.abortSignal,
}),
removeMe_fetchRoomCreateEventId(matrixAccessToken, roomId),
]);
// Without MSC3999, we currently only detect one kind of loop where the
@ -438,6 +445,7 @@ router.get(
eventId: eventIdForClosestEvent,
dir: DIRECTION.forward,
limit: archiveMessageLimit,
abortSignal: req.abortSignal,
});
if (!messageResData.chunk?.length) {
@ -571,7 +579,9 @@ router.get(
predecessorRoomId,
predecessorLastKnownEventId,
predecessorViaServers,
} = await fetchPredecessorInfo(matrixAccessToken, roomId);
} = await fetchPredecessorInfo(matrixAccessToken, roomId, {
abortSignal: req.abortSignal,
});
if (!predecessorRoomId) {
throw new StatusError(
@ -582,11 +592,16 @@ router.get(
// We have to join the predecessor room before we can fetch the successor info
// (this could be our first time seeing the room)
await ensureRoomJoined(matrixAccessToken, predecessorRoomId, viaServers);
await ensureRoomJoined(matrixAccessToken, predecessorRoomId, {
viaServers,
abortSignal: req.abortSignal,
});
const {
successorRoomId: successorRoomIdForPredecessor,
successorSetTs: successorSetTsForPredecessor,
} = await fetchSuccessorInfo(matrixAccessToken, predecessorRoomId);
} = await fetchSuccessorInfo(matrixAccessToken, predecessorRoomId, {
abortSignal: req.abortSignal,
});
let tombstoneEventId;
if (!predecessorLastKnownEventId) {
@ -602,6 +617,7 @@ router.get(
roomId: predecessorRoomId,
ts: successorSetTsForPredecessor,
direction: DIRECTION.backward,
abortSignal: req.abortSignal,
}));
}
@ -664,7 +680,9 @@ router.get(
);
return;
} else if (dir === DIRECTION.forward) {
const { successorRoomId } = await fetchSuccessorInfo(matrixAccessToken, roomId);
const { successorRoomId } = await fetchSuccessorInfo(matrixAccessToken, roomId, {
abortSignal: req.abortSignal,
});
if (successorRoomId) {
// Jump to the successor room and continue at the first event of the room
res.redirect(
@ -731,7 +749,6 @@ router.get(
// https://github.com/pillarjs/path-to-regexp/issues/287
'/date/:yyyy(\\d{4})/:mm(\\d{2})/:dd(\\d{2}):time(T\\d\\d?:\\d\\d?((:\\d\\d?)?))?',
identifyRoute('app-archive-room-date'),
timeoutMiddleware,
// eslint-disable-next-line max-statements, complexity
asyncHandler(async function (req, res) {
const nowTs = Date.now();
@ -777,12 +794,15 @@ router.get(
// great way to get it (see
// https://github.com/matrix-org/matrix-public-archive/issues/50).
const viaServers = parseViaServersFromUserInput(req.query.via);
const roomId = await ensureRoomJoined(matrixAccessToken, roomIdOrAlias, viaServers);
const roomId = await ensureRoomJoined(matrixAccessToken, roomIdOrAlias, {
viaServers,
abortSignal: req.abortSignal,
});
// Do these in parallel to avoid the extra time in sequential round-trips
// (we want to display the archive page faster)
const [roomData, { events, stateEventMap }] = await Promise.all([
fetchRoomData(matrixAccessToken, roomId),
fetchRoomData(matrixAccessToken, roomId, { abortSignal: req.abortSignal }),
// We over-fetch messages outside of the range of the given day so that we
// can display messages from surrounding days (currently only from days
// before) so that the quiet rooms don't feel as desolate and broken.
@ -800,6 +820,7 @@ router.get(
// (for example) are from the same day, let's redirect to a smaller hour range
// to display.
limit: archiveMessageLimit + 1,
abortSignal: req.abortSignal,
}),
]);
@ -903,6 +924,7 @@ router.get(
matrixServerUrl: matrixServerUrl,
},
},
abortSignal: req.abortSignal,
});
setHeadersToPreloadAssets(res, pageOptions);

@ -23,20 +23,20 @@ buildClient({
});
const nodeArgs = [];
if (process.argv.inspectNode) {
if (process.argv.includes('--inspectNode')) {
nodeArgs.push('--inspect');
}
if (process.argv.traceWarningsNode) {
if (process.argv.includes('--traceWarningsNode')) {
nodeArgs.push('--trace-warnings');
}
// Pass through some args
const args = [];
if (process.argv.tracing) {
if (process.argv.includes('--tracing')) {
args.push('--tracing');
}
if (process.argv.logOutputFromChildProcesses) {
if (process.argv.includes('--logOutputFromChildProcesses')) {
args.push('--logOutputFromChildProcesses');
}

@ -4,8 +4,14 @@ const {
hrTimeToMilliseconds,
//hrTimeToMicroseconds
} = require('@opentelemetry/core');
const SAFE_ATTRIBUTES = ['http.method', 'http.url', 'http.status_code', 'http.target'];
const { SemanticAttributes } = require('@opentelemetry/semantic-conventions');
const SAFE_ATTRIBUTES = [
SemanticAttributes.HTTP_METHOD,
SemanticAttributes.HTTP_URL,
SemanticAttributes.HTTP_TARGET,
SemanticAttributes.HTTP_STATUS_CODE,
];
// Convert a `Span` object to a plain old JavaScript object with only the info
// we care about and that is safe to share. We want something we can JSON

@ -54,7 +54,14 @@ function getSerializableSpans() {
// We only care about showing the external API HTTP requests to the user
const filteredSpans = spans.filter((span) => {
return span.instrumentationLibrary.name === '@opentelemetry/instrumentation-http';
return [
// `http`/`https` requests
'@opentelemetry/instrumentation-http',
// Native `fetch`
'opentelemetry-instrumentation-node-18-fetch',
// This will get `tcp.connect` calls which `fetch` does but not the full request lifecycle
//'@opentelemetry/instrumentation-net',
].includes(span.instrumentationLibrary.name);
});
const serializableSpans = filteredSpans.map((span) => serializeSpan(span));

@ -3,6 +3,7 @@
const assert = require('assert');
const { registerInstrumentations } = require('@opentelemetry/instrumentation');
const { getNodeAutoInstrumentations } = require('@opentelemetry/auto-instrumentations-node');
const { FetchInstrumentation } = require('opentelemetry-instrumentation-fetch-node');
const { diag, DiagConsoleLogger, DiagLogLevel } = require('@opentelemetry/api');
const { JaegerExporter } = require('@opentelemetry/exporter-jaeger');
const {
@ -92,6 +93,11 @@ function startTracing() {
},
},
}),
// We have to instrument `undici` to cover the native `fetch` API built-in to
// Node.js. We're using `opentelemetry-instrumentation-fetch-node` because there
// is no official instrumentation and `opentelemetry-instrumentation-undici`
// doesn't seem to work.
new FetchInstrumentation({}),
],
});

@ -11,7 +11,7 @@ const { parseHTML } = require('linkedom');
const { readFile } = require('fs').promises;
const chalk = require('chalk');
const RethrownError = require('../server/lib/rethrown-error');
const RethrownError = require('../server/lib/errors/rethrown-error');
const MatrixPublicArchiveURLCreator = require('matrix-public-archive-shared/lib/url-creator');
const { fetchEndpointAsText, fetchEndpointAsJson } = require('../server/lib/fetch-endpoint');
const config = require('../server/lib/config');

Loading…
Cancel
Save