Add chunk ID to pagination token

This commit is contained in:
Erik Johnston 2018-05-21 17:39:06 +01:00
parent 13dbcafb9b
commit b65cc7defa
2 changed files with 84 additions and 43 deletions

View File

@ -62,24 +62,25 @@ _TOPOLOGICAL_TOKEN = "topological"
# Used as return values for pagination APIs # Used as return values for pagination APIs
_EventDictReturn = namedtuple("_EventDictReturn", ( _EventDictReturn = namedtuple("_EventDictReturn", (
"event_id", "topological_ordering", "stream_ordering", "event_id", "chunk_id", "topological_ordering", "stream_ordering",
)) ))
def lower_bound(token, engine, inclusive=False): def lower_bound(token, engine, inclusive=False):
inclusive = "=" if inclusive else "" inclusive = "=" if inclusive else ""
if token.topological is None: if token.chunk is None:
return "(%d <%s %s)" % (token.stream, inclusive, "stream_ordering") return "(%d <%s %s)" % (token.stream, inclusive, "stream_ordering")
else: else:
if isinstance(engine, PostgresEngine): if isinstance(engine, PostgresEngine):
# Postgres doesn't optimise ``(x < a) OR (x=a AND y<b)`` as well # Postgres doesn't optimise ``(x < a) OR (x=a AND y<b)`` as well
# as it optimises ``(x,y) < (a,b)`` on multicolumn indexes. So we # as it optimises ``(x,y) < (a,b)`` on multicolumn indexes. So we
# use the later form when running against postgres. # use the later form when running against postgres.
return "((%d,%d) <%s (%s,%s))" % ( return "(chunk_id = %d AND (%d,%d) <%s (%s,%s))" % (
token.topological, token.stream, inclusive, token.chunk, token.topological, token.stream, inclusive,
"topological_ordering", "stream_ordering", "topological_ordering", "stream_ordering",
) )
return "(%d < %s OR (%d = %s AND %d <%s %s))" % ( return "(chunk_id = %d AND (%d < %s OR (%d = %s AND %d <%s %s)))" % (
token.chunk,
token.topological, "topological_ordering", token.topological, "topological_ordering",
token.topological, "topological_ordering", token.topological, "topological_ordering",
token.stream, inclusive, "stream_ordering", token.stream, inclusive, "stream_ordering",
@ -88,18 +89,19 @@ def lower_bound(token, engine, inclusive=False):
def upper_bound(token, engine, inclusive=True): def upper_bound(token, engine, inclusive=True):
inclusive = "=" if inclusive else "" inclusive = "=" if inclusive else ""
if token.topological is None: if token.chunk is None:
return "(%d >%s %s)" % (token.stream, inclusive, "stream_ordering") return "(%d >%s %s)" % (token.stream, inclusive, "stream_ordering")
else: else:
if isinstance(engine, PostgresEngine): if isinstance(engine, PostgresEngine):
# Postgres doesn't optimise ``(x > a) OR (x=a AND y>b)`` as well # Postgres doesn't optimise ``(x > a) OR (x=a AND y>b)`` as well
# as it optimises ``(x,y) > (a,b)`` on multicolumn indexes. So we # as it optimises ``(x,y) > (a,b)`` on multicolumn indexes. So we
# use the later form when running against postgres. # use the later form when running against postgres.
return "((%d,%d) >%s (%s,%s))" % ( return "(chunk_id = %d AND (%d,%d) >%s (%s,%s))" % (
token.topological, token.stream, inclusive, token.chunk, token.topological, token.stream, inclusive,
"topological_ordering", "stream_ordering", "topological_ordering", "stream_ordering",
) )
return "(%d > %s OR (%d = %s AND %d >%s %s))" % ( return "(chunk_id = %d AND (%d > %s OR (%d = %s AND %d >%s %s)))" % (
token.chunk,
token.topological, "topological_ordering", token.topological, "topological_ordering",
token.topological, "topological_ordering", token.topological, "topological_ordering",
token.stream, inclusive, "stream_ordering", token.stream, inclusive, "stream_ordering",
@ -275,7 +277,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
) % (order,) ) % (order,)
txn.execute(sql, (room_id, from_id, to_id, limit)) txn.execute(sql, (room_id, from_id, to_id, limit))
rows = [_EventDictReturn(row[0], None, row[1]) for row in txn] rows = [_EventDictReturn(row[0], None, None, row[1]) for row in txn]
return rows return rows
rows = yield self.runInteraction("get_room_events_stream_for_room", f) rows = yield self.runInteraction("get_room_events_stream_for_room", f)
@ -325,7 +327,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
) )
txn.execute(sql, (user_id, from_id, to_id,)) txn.execute(sql, (user_id, from_id, to_id,))
rows = [_EventDictReturn(row[0], None, row[1]) for row in txn] rows = [_EventDictReturn(row[0], None, None, row[1]) for row in txn]
return rows return rows
@ -437,15 +439,17 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
`room_id` causes it to return the current room specific topological `room_id` causes it to return the current room specific topological
token. token.
""" """
token = yield self.get_room_max_stream_ordering()
if room_id is None: if room_id is None:
defer.returnValue("s%d" % (token,)) token = yield self.get_room_max_stream_ordering()
defer.returnValue(str(RoomStreamToken(None, None, token)))
else: else:
topo = yield self.runInteraction( token = yield self.runInteraction(
"_get_max_topological_txn", self._get_max_topological_txn, "get_room_events_max_id", self._get_topological_token_for_room_txn,
room_id, room_id,
) )
defer.returnValue("t%d-%d" % (topo, token)) if not token:
raise Exception("Server not in room")
defer.returnValue(str(token))
def get_stream_token_for_event(self, event_id): def get_stream_token_for_event(self, event_id):
"""The stream token for an event """The stream token for an event
@ -460,7 +464,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
table="events", table="events",
keyvalues={"event_id": event_id}, keyvalues={"event_id": event_id},
retcol="stream_ordering", retcol="stream_ordering",
).addCallback(lambda row: "s%d" % (row,)) ).addCallback(lambda row: str(RoomStreamToken(None, None, row)))
def get_topological_token_for_event(self, event_id): def get_topological_token_for_event(self, event_id):
"""The stream token for an event """The stream token for an event
@ -469,16 +473,34 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
Raises: Raises:
StoreError if the event wasn't in the database. StoreError if the event wasn't in the database.
Returns: Returns:
A deferred "t%d-%d" topological token. A deferred topological token.
""" """
return self._simple_select_one( return self._simple_select_one(
table="events", table="events",
keyvalues={"event_id": event_id}, keyvalues={"event_id": event_id},
retcols=("stream_ordering", "topological_ordering"), retcols=("stream_ordering", "topological_ordering", "chunk_id"),
desc="get_topological_token_for_event", desc="get_topological_token_for_event",
).addCallback(lambda row: "t%d-%d" % ( ).addCallback(lambda row: str(RoomStreamToken(
row["topological_ordering"], row["stream_ordering"],) row["chunk_id"],
) row["topological_ordering"],
row["stream_ordering"],
)))
def _get_topological_token_for_room_txn(self, txn, room_id):
sql = """
SELECT chunk_id, topological_ordering, stream_ordering
FROM events
NATURAL JOIN event_forward_extremities
WHERE room_id = ?
ORDER BY stream_ordering DESC
LIMIT 1
"""
txn.execute(sql, (room_id,))
row = txn.fetchone()
if row:
c, t, s = row
return RoomStreamToken(c, t, s)
return None
def get_max_topological_token(self, room_id, stream_key): def get_max_topological_token(self, room_id, stream_key):
sql = ( sql = (
@ -515,18 +537,25 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
null topological_ordering. null topological_ordering.
""" """
for event, row in zip(events, rows): for event, row in zip(events, rows):
chunk = row.chunk_id
topo = row.topological_ordering
stream = row.stream_ordering stream = row.stream_ordering
if topo_order and row.topological_ordering:
topo = row.topological_ordering
else:
topo = None
internal = event.internal_metadata internal = event.internal_metadata
internal.before = str(RoomStreamToken(topo, stream - 1)) if topo_order and chunk:
internal.after = str(RoomStreamToken(topo, stream)) internal.before = str(RoomStreamToken(chunk, topo, stream - 1))
internal.order = ( internal.after = str(RoomStreamToken(chunk, topo, stream))
int(topo) if topo else 0, internal.order = (
int(stream), int(chunk) if chunk else 0,
) int(topo) if topo else 0,
int(stream),
)
else:
internal.before = str(RoomStreamToken(None, None, stream - 1))
internal.after = str(RoomStreamToken(None, None, stream))
internal.order = (
0, 0, int(stream),
)
@defer.inlineCallbacks @defer.inlineCallbacks
def get_events_around(self, room_id, event_id, before_limit, after_limit): def get_events_around(self, room_id, event_id, before_limit, after_limit):
@ -586,17 +615,19 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
"event_id": event_id, "event_id": event_id,
"room_id": room_id, "room_id": room_id,
}, },
retcols=["stream_ordering", "topological_ordering"], retcols=["stream_ordering", "topological_ordering", "chunk_id"],
) )
# Paginating backwards includes the event at the token, but paginating # Paginating backwards includes the event at the token, but paginating
# forward doesn't. # forward doesn't.
before_token = RoomStreamToken( before_token = RoomStreamToken(
results["topological_ordering"] - 1, results["chunk_id"],
results["stream_ordering"], results["topological_ordering"],
results["stream_ordering"] - 1,
) )
after_token = RoomStreamToken( after_token = RoomStreamToken(
results["chunk_id"],
results["topological_ordering"], results["topological_ordering"],
results["stream_ordering"], results["stream_ordering"],
) )
@ -728,7 +759,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
args.append(int(limit)) args.append(int(limit))
sql = ( sql = (
"SELECT event_id, topological_ordering, stream_ordering" "SELECT event_id, chunk_id, topological_ordering, stream_ordering"
" FROM events" " FROM events"
" WHERE outlier = ? AND room_id = ? AND %(bounds)s" " WHERE outlier = ? AND room_id = ? AND %(bounds)s"
" ORDER BY topological_ordering %(order)s," " ORDER BY topological_ordering %(order)s,"
@ -743,6 +774,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
rows = [_EventDictReturn(row[0], row[1], row[2]) for row in txn] rows = [_EventDictReturn(row[0], row[1], row[2]) for row in txn]
if rows: if rows:
chunk = rows[-1].chunk_id
topo = rows[-1].topological_ordering topo = rows[-1].topological_ordering
toke = rows[-1].stream_ordering toke = rows[-1].stream_ordering
if direction == 'b': if direction == 'b':
@ -752,12 +784,12 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
# when we are going backwards so we subtract one from the # when we are going backwards so we subtract one from the
# stream part. # stream part.
toke -= 1 toke -= 1
next_token = RoomStreamToken(topo, toke) next_token = RoomStreamToken(chunk, topo, toke)
else: else:
# TODO (erikj): We should work out what to do here instead. # TODO (erikj): We should work out what to do here instead.
next_token = to_token if to_token else from_token next_token = to_token if to_token else from_token
return rows, str(next_token), return rows, str(next_token), iterated_chunks,
@defer.inlineCallbacks @defer.inlineCallbacks
def paginate_room_events(self, room_id, from_key, to_key=None, def paginate_room_events(self, room_id, from_key, to_key=None,

View File

@ -306,7 +306,7 @@ StreamToken.START = StreamToken(
) )
class RoomStreamToken(namedtuple("_StreamToken", "topological stream")): class RoomStreamToken(namedtuple("_StreamToken", "chunk topological stream")):
"""Tokens are positions between events. The token "s1" comes after event 1. """Tokens are positions between events. The token "s1" comes after event 1.
s0 s1 s0 s1
@ -334,10 +334,17 @@ class RoomStreamToken(namedtuple("_StreamToken", "topological stream")):
def parse(cls, string): def parse(cls, string):
try: try:
if string[0] == 's': if string[0] == 's':
return cls(topological=None, stream=int(string[1:])) return cls(chunk=None, topological=None, stream=int(string[1:]))
if string[0] == 't': if string[0] == 't': # For backwards compat with older tokens.
parts = string[1:].split('-', 1) parts = string[1:].split('-', 1)
return cls(topological=int(parts[0]), stream=int(parts[1])) return cls(chunk=None, topological=int(parts[0]), stream=int(parts[1]))
if string[0] == 'c':
parts = string[1:].split('~', 2)
return cls(
chunk=int(parts[0]),
topological=int(parts[1]),
stream=int(parts[2]),
)
except Exception: except Exception:
pass pass
raise SynapseError(400, "Invalid token %r" % (string,)) raise SynapseError(400, "Invalid token %r" % (string,))
@ -346,12 +353,14 @@ class RoomStreamToken(namedtuple("_StreamToken", "topological stream")):
def parse_stream_token(cls, string): def parse_stream_token(cls, string):
try: try:
if string[0] == 's': if string[0] == 's':
return cls(topological=None, stream=int(string[1:])) return cls(chunk=None, topological=None, stream=int(string[1:]))
except Exception: except Exception:
pass pass
raise SynapseError(400, "Invalid token %r" % (string,)) raise SynapseError(400, "Invalid token %r" % (string,))
def __str__(self): def __str__(self):
if self.chunk is not None:
return "c%d~%d~%d" % (self.chunk, self.topological, self.stream)
if self.topological is not None: if self.topological is not None:
return "t%d-%d" % (self.topological, self.stream) return "t%d-%d" % (self.topological, self.stream)
else: else: