Start of `retryFnIfNotJoined`

This commit is contained in:
Eric Eastwood 2023-06-22 20:13:42 -05:00
parent 2845ac7ff0
commit 79a468b81f
5 changed files with 201 additions and 25 deletions

View File

@ -0,0 +1,94 @@
'use strict';
const assert = require('assert');
const { HTTPResponseError } = require('../fetch-endpoint');
const ensureRoomJoined = require('./ensure-room-joined');
const JOIN_STATES = {
unknown: 'unknown',
joining: 'joining',
joined: 'joined',
failed: 'failed',
};
const joinStateValues = Object.values(JOIN_STATES);
// Optimistically use the Matrix API assuming you're already joined to the room or
// accessing a `world_readable` room that doesn't require joining. If we see a 403
// Forbidden, then try joining the room and retrying the API call.
//
// Usage: Call this once to first create a helper utility that will retry a given
// function appropriately.
function createRetryFnIfNotJoined(
accessToken,
roomIdOrAlias,
{ viaServers = new Set(), abortSignal } = {}
) {
assert(accessToken);
assert(roomIdOrAlias);
// We use a `Set` to ensure that we don't have duplicate servers in the list
assert(viaServers instanceof Set);
let joinState = JOIN_STATES.unknown;
let joinPromise = null;
return async function retryFnIfNotJoined(fn) {
assert(
joinStateValues.includes(joinState),
`Unexpected internal join state when using createRetryFnIfNotJoined(...) (joinState=${joinState}). ` +
`This is a bug in the Matrix Public Archive. Please report`
);
if (joinState === JOIN_STATES.joining) {
// Wait for the join to finish before trying
await joinPromise;
} else if (joinState === JOIN_STATES.failed) {
// If we failed to join the room, then there is no way any other call is going
// to succeed so just immediately return an error. We return `joinPromise`
// which will resolve to the join error that occured
return joinPromise;
}
try {
return await Promise.resolve(fn());
} catch (errFromFn) {
const isForbiddenError =
errFromFn instanceof HTTPResponseError && errFromFn.response.status === 403;
// If we're in the middle of joining, try again
if (joinState === JOIN_STATES.joining) {
return await retryFnIfNotJoined(fn);
}
// Try joining the room if we see a 403 Forbidden error as we may just not
// be part of the room yet. We can't distinguish between a room that has
// banned us vs a room we haven't joined yet so we just try joining the
// room in any case.
else if (
isForbiddenError &&
// Only try joining if we haven't tried joining yet
joinState === JOIN_STATES.unknown
) {
joinState = JOIN_STATES.joining;
joinPromise = ensureRoomJoined(accessToken, roomIdOrAlias, {
viaServers,
abortSignal,
});
try {
await joinPromise;
joinState = JOIN_STATES.joined;
console.log('retryAfterJoin');
return await retryFnIfNotJoined(fn);
} catch (err) {
console.log('FAILED retryAfterJoin');
joinState = JOIN_STATES.failed;
throw err;
}
}
throw errFromFn;
}
};
}
module.exports = createRetryFnIfNotJoined;

View File

@ -21,6 +21,8 @@ async function ensureRoomJoined(
roomIdOrAlias, roomIdOrAlias,
{ viaServers = new Set(), abortSignal } = {} { viaServers = new Set(), abortSignal } = {}
) { ) {
assert(accessToken);
assert(roomIdOrAlias);
// We use a `Set` to ensure that we don't have duplicate servers in the list // We use a `Set` to ensure that we don't have duplicate servers in the list
assert(viaServers instanceof Set); assert(viaServers instanceof Set);

View File

@ -0,0 +1,33 @@
'use strict';
const assert = require('assert');
const urlJoin = require('url-join');
const { fetchEndpointAsJson } = require('../fetch-endpoint');
const { traceFunction } = require('../../tracing/trace-utilities');
const config = require('../config');
const matrixServerUrl = config.get('matrixServerUrl');
assert(matrixServerUrl);
async function resolveRoomAlias({ accessToken, roomAlias, abortSignal }) {
assert(accessToken);
assert(roomAlias);
// GET /_matrix/client/r0/directory/room/{roomAlias} -> { room_id, servers }
const resolveRoomAliasEndpoint = urlJoin(
matrixServerUrl,
`_matrix/client/r0/directory/room/${encodeURIComponent(roomAlias)}`
);
const { data: resolveRoomAliasResData } = await fetchEndpointAsJson(resolveRoomAliasEndpoint, {
accessToken,
abortSignal,
});
return {
roomId: resolveRoomAliasResData.room_id,
viaServers: new Set(resolveRoomAliasResData.servers || []),
};
}
module.exports = traceFunction(resolveRoomAlias);

View File

@ -0,0 +1,40 @@
'use strict';
const {
VALID_ENTITY_DESCRIPTOR_TO_SIGIL_MAP,
} = require('matrix-public-archive-shared/lib/reference-values');
const resolveRoomAlias = require('./resolve-room-alias');
// Given a room ID or alias, return the room ID and the set of servers we should try to
// join from. Does not attempt to join the room.
async function resolveRoomIdOrAlias({
accessToken,
roomIdOrAlias,
viaServers = new Set(),
abortSignal,
} = {}) {
const isRoomId = roomIdOrAlias.startsWith(VALID_ENTITY_DESCRIPTOR_TO_SIGIL_MAP.roomid);
const isRoomAlias = roomIdOrAlias.startsWith(VALID_ENTITY_DESCRIPTOR_TO_SIGIL_MAP.r);
if (isRoomId) {
const roomId = roomIdOrAlias;
return { roomId, viaServers };
} else if (isRoomAlias) {
const roomAlias = roomIdOrAlias;
const { roomId, viaServers: moreViaServers } = await resolveRoomAlias({
accessToken,
roomAlias,
abortSignal: abortSignal,
});
return { roomId, viaServers: new Set([...viaServers, ...moreViaServers]) };
}
throw new Error(
`resolveRoomIdOrAlias: Unknown roomIdOrAlias=${roomIdOrAlias} does not start with valid sigil (${Object.values(
VALID_ENTITY_DESCRIPTOR_TO_SIGIL_MAP
)})`
);
}
module.exports = resolveRoomIdOrAlias;

View File

@ -19,6 +19,8 @@ const {
} = require('../lib/matrix-utils/fetch-room-data'); } = require('../lib/matrix-utils/fetch-room-data');
const fetchEventsFromTimestampBackwards = require('../lib/matrix-utils/fetch-events-from-timestamp-backwards'); const fetchEventsFromTimestampBackwards = require('../lib/matrix-utils/fetch-events-from-timestamp-backwards');
const ensureRoomJoined = require('../lib/matrix-utils/ensure-room-joined'); const ensureRoomJoined = require('../lib/matrix-utils/ensure-room-joined');
const createRetryFnIfNotJoined = require('../lib/matrix-utils/create-retry-fn-if-not-joined');
const resolveRoomIdOrAlias = require('../lib/matrix-utils/resolve-room-id-or-alias');
const timestampToEvent = require('../lib/matrix-utils/timestamp-to-event'); const timestampToEvent = require('../lib/matrix-utils/timestamp-to-event');
const { removeMe_fetchRoomCreateEventId } = require('../lib/matrix-utils/fetch-room-data'); const { removeMe_fetchRoomCreateEventId } = require('../lib/matrix-utils/fetch-room-data');
const getMessagesResponseFromEventId = require('../lib/matrix-utils/get-messages-response-from-event-id'); const getMessagesResponseFromEventId = require('../lib/matrix-utils/get-messages-response-from-event-id');
@ -788,17 +790,18 @@ router.get(
); );
} }
// We have to wait for the room join to happen first before we can fetch // Resolve the room ID without joining the room (optimistically assume that we're
// any of the additional room info or messages. // already joined)
// let viaServers = parseViaServersFromUserInput(req.query.via);
// XXX: It would be better if we just tried fetching first and assume that we are let roomId;
// already joined and only join after we see a 403 Forbidden error (we should do ({ roomId, viaServers } = await resolveRoomIdOrAlias({
// this for all places we `ensureRoomJoined`). But we need the `roomId` for use with accessToken: matrixAccessToken,
// the various Matrix API's anyway and `/join/{roomIdOrAlias}` -> `{ room_id }` is a roomIdOrAlias,
// great way to get it (see viaServers,
// https://github.com/matrix-org/matrix-public-archive/issues/50). abortSignal: req.abortSignal,
const viaServers = parseViaServersFromUserInput(req.query.via); }));
const roomId = await ensureRoomJoined(matrixAccessToken, roomIdOrAlias, {
const retryFnIfNotJoined = createRetryFnIfNotJoined(matrixAccessToken, roomIdOrAlias, {
viaServers, viaServers,
abortSignal: req.abortSignal, abortSignal: req.abortSignal,
}); });
@ -806,26 +809,30 @@ router.get(
// Do these in parallel to avoid the extra time in sequential round-trips // Do these in parallel to avoid the extra time in sequential round-trips
// (we want to display the archive page faster) // (we want to display the archive page faster)
const [roomData, { events, stateEventMap }] = await Promise.all([ const [roomData, { events, stateEventMap }] = await Promise.all([
fetchRoomData(matrixAccessToken, roomId, { abortSignal: req.abortSignal }), retryFnIfNotJoined(() =>
fetchRoomData(matrixAccessToken, roomId, { abortSignal: req.abortSignal })
),
// We over-fetch messages outside of the range of the given day so that we // We over-fetch messages outside of the range of the given day so that we
// can display messages from surrounding days (currently only from days // can display messages from surrounding days (currently only from days
// before) so that the quiet rooms don't feel as desolate and broken. // before) so that the quiet rooms don't feel as desolate and broken.
// //
// When given a bare date like `2022/11/16`, we want to paginate from the end of that // When given a bare date like `2022/11/16`, we want to paginate from the end of that
// day backwards. This is why we use the `toTimestamp` here and fetch backwards. // day backwards. This is why we use the `toTimestamp` here and fetch backwards.
fetchEventsFromTimestampBackwards({ retryFnIfNotJoined(() =>
accessToken: matrixAccessToken, fetchEventsFromTimestampBackwards({
roomId, accessToken: matrixAccessToken,
ts: toTimestamp, roomId,
// We fetch one more than the `archiveMessageLimit` so that we can see if there ts: toTimestamp,
// are too many messages from the given day. If we have over the // We fetch one more than the `archiveMessageLimit` so that we can see if there
// `archiveMessageLimit` number of messages fetching from the given day, it's // are too many messages from the given day. If we have over the
// acceptable to have them be from surrounding days. But if all 500 messages // `archiveMessageLimit` number of messages fetching from the given day, it's
// (for example) are from the same day, let's redirect to a smaller hour range // acceptable to have them be from surrounding days. But if all 500 messages
// to display. // (for example) are from the same day, let's redirect to a smaller hour range
limit: archiveMessageLimit + 1, // to display.
abortSignal: req.abortSignal, limit: archiveMessageLimit + 1,
}), abortSignal: req.abortSignal,
})
),
]); ]);
// Only `world_readable` or `shared` rooms that are `public` are viewable in the archive // Only `world_readable` or `shared` rooms that are `public` are viewable in the archive