From 6188512b180aaf9d04ba73015b59313cb221bd62 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 21 May 2018 17:39:06 +0100 Subject: [PATCH 01/10] Add chunk ID to pagination token --- synapse/storage/stream.py | 108 ++++++++++++++++++++++++-------------- synapse/types.py | 19 +++++-- 2 files changed, 84 insertions(+), 43 deletions(-) diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index fb463c525a..387120090a 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -62,24 +62,25 @@ _TOPOLOGICAL_TOKEN = "topological" # Used as return values for pagination APIs _EventDictReturn = namedtuple("_EventDictReturn", ( - "event_id", "topological_ordering", "stream_ordering", + "event_id", "chunk_id", "topological_ordering", "stream_ordering", )) def lower_bound(token, engine, inclusive=False): inclusive = "=" if inclusive else "" - if token.topological is None: + if token.chunk is None: return "(%d <%s %s)" % (token.stream, inclusive, "stream_ordering") else: if isinstance(engine, PostgresEngine): # Postgres doesn't optimise ``(x < a) OR (x=a AND y%s %s)" % (token.stream, inclusive, "stream_ordering") else: if isinstance(engine, PostgresEngine): # Postgres doesn't optimise ``(x > a) OR (x=a AND y>b)`` as well # as it optimises ``(x,y) > (a,b)`` on multicolumn indexes. So we # use the later form when running against postgres. - return "((%d,%d) >%s (%s,%s))" % ( - token.topological, token.stream, inclusive, + return "(chunk_id = %d AND (%d,%d) >%s (%s,%s))" % ( + token.chunk, token.topological, token.stream, inclusive, "topological_ordering", "stream_ordering", ) - return "(%d > %s OR (%d = %s AND %d >%s %s))" % ( + return "(chunk_id = %d AND (%d > %s OR (%d = %s AND %d >%s %s)))" % ( + token.chunk, token.topological, "topological_ordering", token.topological, "topological_ordering", token.stream, inclusive, "stream_ordering", @@ -275,7 +277,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): ) % (order,) txn.execute(sql, (room_id, from_id, to_id, limit)) - rows = [_EventDictReturn(row[0], None, row[1]) for row in txn] + rows = [_EventDictReturn(row[0], None, None, row[1]) for row in txn] return rows rows = yield self.runInteraction("get_room_events_stream_for_room", f) @@ -325,7 +327,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): ) txn.execute(sql, (user_id, from_id, to_id,)) - rows = [_EventDictReturn(row[0], None, row[1]) for row in txn] + rows = [_EventDictReturn(row[0], None, None, row[1]) for row in txn] return rows @@ -437,15 +439,17 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): `room_id` causes it to return the current room specific topological token. """ - token = yield self.get_room_max_stream_ordering() if room_id is None: - defer.returnValue("s%d" % (token,)) + token = yield self.get_room_max_stream_ordering() + defer.returnValue(str(RoomStreamToken(None, None, token))) else: - topo = yield self.runInteraction( - "_get_max_topological_txn", self._get_max_topological_txn, + token = yield self.runInteraction( + "get_room_events_max_id", self._get_topological_token_for_room_txn, room_id, ) - defer.returnValue("t%d-%d" % (topo, token)) + if not token: + raise Exception("Server not in room") + defer.returnValue(str(token)) def get_stream_token_for_event(self, event_id): """The stream token for an event @@ -460,7 +464,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): table="events", keyvalues={"event_id": event_id}, retcol="stream_ordering", - ).addCallback(lambda row: "s%d" % (row,)) + ).addCallback(lambda row: str(RoomStreamToken(None, None, row))) def get_topological_token_for_event(self, event_id): """The stream token for an event @@ -469,16 +473,34 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): Raises: StoreError if the event wasn't in the database. Returns: - A deferred "t%d-%d" topological token. + A deferred topological token. """ return self._simple_select_one( table="events", keyvalues={"event_id": event_id}, - retcols=("stream_ordering", "topological_ordering"), + retcols=("stream_ordering", "topological_ordering", "chunk_id"), desc="get_topological_token_for_event", - ).addCallback(lambda row: "t%d-%d" % ( - row["topological_ordering"], row["stream_ordering"],) - ) + ).addCallback(lambda row: str(RoomStreamToken( + row["chunk_id"], + row["topological_ordering"], + row["stream_ordering"], + ))) + + def _get_topological_token_for_room_txn(self, txn, room_id): + sql = """ + SELECT chunk_id, topological_ordering, stream_ordering + FROM events + NATURAL JOIN event_forward_extremities + WHERE room_id = ? + ORDER BY stream_ordering DESC + LIMIT 1 + """ + txn.execute(sql, (room_id,)) + row = txn.fetchone() + if row: + c, t, s = row + return RoomStreamToken(c, t, s) + return None def get_max_topological_token(self, room_id, stream_key): sql = ( @@ -515,18 +537,25 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): null topological_ordering. """ for event, row in zip(events, rows): + chunk = row.chunk_id + topo = row.topological_ordering stream = row.stream_ordering - if topo_order and row.topological_ordering: - topo = row.topological_ordering - else: - topo = None + internal = event.internal_metadata - internal.before = str(RoomStreamToken(topo, stream - 1)) - internal.after = str(RoomStreamToken(topo, stream)) - internal.order = ( - int(topo) if topo else 0, - int(stream), - ) + if topo_order and chunk: + internal.before = str(RoomStreamToken(chunk, topo, stream - 1)) + internal.after = str(RoomStreamToken(chunk, topo, stream)) + internal.order = ( + int(chunk) if chunk else 0, + int(topo) if topo else 0, + int(stream), + ) + else: + internal.before = str(RoomStreamToken(None, None, stream - 1)) + internal.after = str(RoomStreamToken(None, None, stream)) + internal.order = ( + 0, 0, int(stream), + ) @defer.inlineCallbacks def get_events_around(self, room_id, event_id, before_limit, after_limit): @@ -586,17 +615,19 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): "event_id": event_id, "room_id": room_id, }, - retcols=["stream_ordering", "topological_ordering"], + retcols=["stream_ordering", "topological_ordering", "chunk_id"], ) # Paginating backwards includes the event at the token, but paginating # forward doesn't. before_token = RoomStreamToken( - results["topological_ordering"] - 1, - results["stream_ordering"], + results["chunk_id"], + results["topological_ordering"], + results["stream_ordering"] - 1, ) after_token = RoomStreamToken( + results["chunk_id"], results["topological_ordering"], results["stream_ordering"], ) @@ -728,7 +759,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): args.append(int(limit)) sql = ( - "SELECT event_id, topological_ordering, stream_ordering" + "SELECT event_id, chunk_id, topological_ordering, stream_ordering" " FROM events" " WHERE outlier = ? AND room_id = ? AND %(bounds)s" " ORDER BY topological_ordering %(order)s," @@ -743,6 +774,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): rows = [_EventDictReturn(row[0], row[1], row[2]) for row in txn] if rows: + chunk = rows[-1].chunk_id topo = rows[-1].topological_ordering toke = rows[-1].stream_ordering if direction == 'b': @@ -752,12 +784,12 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): # when we are going backwards so we subtract one from the # stream part. toke -= 1 - next_token = RoomStreamToken(topo, toke) + next_token = RoomStreamToken(chunk, topo, toke) else: # TODO (erikj): We should work out what to do here instead. next_token = to_token if to_token else from_token - return rows, str(next_token), + return rows, str(next_token), iterated_chunks, @defer.inlineCallbacks def paginate_room_events(self, room_id, from_key, to_key=None, diff --git a/synapse/types.py b/synapse/types.py index cc7c182a78..27f204ce79 100644 --- a/synapse/types.py +++ b/synapse/types.py @@ -306,7 +306,7 @@ StreamToken.START = StreamToken( ) -class RoomStreamToken(namedtuple("_StreamToken", "topological stream")): +class RoomStreamToken(namedtuple("_StreamToken", "chunk topological stream")): """Tokens are positions between events. The token "s1" comes after event 1. s0 s1 @@ -334,10 +334,17 @@ class RoomStreamToken(namedtuple("_StreamToken", "topological stream")): def parse(cls, string): try: if string[0] == 's': - return cls(topological=None, stream=int(string[1:])) - if string[0] == 't': + return cls(chunk=None, topological=None, stream=int(string[1:])) + if string[0] == 't': # For backwards compat with older tokens. parts = string[1:].split('-', 1) - return cls(topological=int(parts[0]), stream=int(parts[1])) + return cls(chunk=None, topological=int(parts[0]), stream=int(parts[1])) + if string[0] == 'c': + parts = string[1:].split('~', 2) + return cls( + chunk=int(parts[0]), + topological=int(parts[1]), + stream=int(parts[2]), + ) except Exception: pass raise SynapseError(400, "Invalid token %r" % (string,)) @@ -346,12 +353,14 @@ class RoomStreamToken(namedtuple("_StreamToken", "topological stream")): def parse_stream_token(cls, string): try: if string[0] == 's': - return cls(topological=None, stream=int(string[1:])) + return cls(chunk=None, topological=None, stream=int(string[1:])) except Exception: pass raise SynapseError(400, "Invalid token %r" % (string,)) def __str__(self): + if self.chunk is not None: + return "c%d~%d~%d" % (self.chunk, self.topological, self.stream) if self.topological is not None: return "t%d-%d" % (self.topological, self.stream) else: From bf599cdba1e708a182469e84754d6e1f063f1c7e Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 21 May 2018 17:40:10 +0100 Subject: [PATCH 02/10] Use calculated topological ordering when persisting events --- synapse/storage/events.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 5194c4a48d..d38f65b4e6 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -1020,7 +1020,7 @@ class EventsStore(EventsWorkerStore): } ) - chunk_id, _ = self._insert_into_chunk_txn( + chunk_id, topo = self._insert_into_chunk_txn( txn, event.room_id, event.event_id, [eid for eid, _ in event.prev_events], ) @@ -1032,6 +1032,7 @@ class EventsStore(EventsWorkerStore): updatevalues={ "outlier": False, "chunk_id": chunk_id, + "topological_ordering": topo, }, ) @@ -1117,9 +1118,9 @@ class EventsStore(EventsWorkerStore): for event, _ in events_and_contexts: if event.internal_metadata.is_outlier(): - chunk_id, _topo = None, 0 + chunk_id, topo = None, 0 else: - chunk_id, _topo = self._insert_into_chunk_txn( + chunk_id, topo = self._insert_into_chunk_txn( txn, event.room_id, event.event_id, [eid for eid, _ in event.prev_events], ) @@ -1130,7 +1131,7 @@ class EventsStore(EventsWorkerStore): values={ "stream_ordering": event.internal_metadata.stream_ordering, "chunk_id": chunk_id, - "topological_ordering": event.depth, + "topological_ordering": topo, "depth": event.depth, "event_id": event.event_id, "room_id": event.room_id, From b671e577595215ef78f385c4abfdaa6ab46dc189 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 21 May 2018 17:41:10 +0100 Subject: [PATCH 03/10] Implement pagination using chunks --- synapse/handlers/message.py | 2 +- synapse/handlers/room.py | 2 +- synapse/storage/stream.py | 123 +++++++++++++++++++++++++++++++----- 3 files changed, 109 insertions(+), 18 deletions(-) diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 81cff0870e..c350c93c7e 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -235,7 +235,7 @@ class MessageHandler(BaseHandler): room_id, max_topo ) - events, next_key = yield self.store.paginate_room_events( + events, next_key, extremities = yield self.store.paginate_room_events( room_id=room_id, from_key=source_config.from_key, to_key=source_config.to_key, diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index b5850db42f..d627b6db13 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -534,7 +534,7 @@ class RoomEventSource(object): @defer.inlineCallbacks def get_pagination_rows(self, user, config, key): - events, next_key = yield self.store.paginate_room_events( + events, next_key, _ = yield self.store.paginate_room_events( room_id=key, from_key=config.from_key, to_key=config.to_key, diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index 387120090a..34a5f7e3e7 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -41,6 +41,7 @@ from synapse.storage.events import EventsWorkerStore from synapse.types import RoomStreamToken from synapse.util.caches.stream_change_cache import StreamChangeCache from synapse.util.logcontext import make_deferred_yieldable, run_in_background +from synapse.storage.chunk_ordered_table import ChunkDBOrderedListStore from synapse.storage.engines import PostgresEngine import abc @@ -394,7 +395,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): end_token = RoomStreamToken.parse(end_token) - rows, token = yield self.runInteraction( + rows, token, _ = yield self.runInteraction( "get_recent_event_ids_for_room", self._paginate_room_events_txn, room_id, from_token=end_token, limit=limit, ) @@ -632,12 +633,12 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): results["stream_ordering"], ) - rows, start_token = self._paginate_room_events_txn( + rows, start_token, _ = self._paginate_room_events_txn( txn, room_id, before_token, direction='b', limit=before_limit, ) events_before = [r.event_id for r in rows] - rows, end_token = self._paginate_room_events_txn( + rows, end_token, _ = self._paginate_room_events_txn( txn, room_id, after_token, direction='f', limit=after_limit, ) events_after = [r.event_id for r in rows] @@ -720,12 +721,19 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): those that match the filter. Returns: - Deferred[tuple[list[_EventDictReturn], str]]: Returns the results - as a list of _EventDictReturn and a token that points to the end - of the result set. + Deferred[tuple[list[_EventDictReturn], str, list[int]]: Returns + the results as a list of _EventDictReturn, a token that points to + the end of the result set, and a list of chunks iterated over. """ - assert int(limit) >= 0 + limit = int(limit) # Sometimes we are passed a string from somewhere + assert limit >= 0 + + # There are two modes of fetching events: by stream order or by + # topological order. This is determined by whether the from_token is a + # stream or topological token. If stream then we can simply do a select + # ordered by stream_ordering column. If topological, then we need to + # fetch events from one chunk at a time until we hit the limit. # Tokens really represent positions between elements, but we use # the convention of pointing to the event before the gap. Hence @@ -756,7 +764,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): bounds += " AND " + filter_clause args.extend(filter_args) - args.append(int(limit)) + args.append(limit) sql = ( "SELECT event_id, chunk_id, topological_ordering, stream_ordering" @@ -771,7 +779,65 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): txn.execute(sql, args) - rows = [_EventDictReturn(row[0], row[1], row[2]) for row in txn] + rows = [_EventDictReturn(row[0], row[1], row[2], row[3]) for row in txn] + + # If we are paginating topologically and we haven't hit the limit on + # number of events then we need to fetch events from the previous or + # next chunk. + + iterated_chunks = [] + + chunk_id = None + if from_token.chunk: # FIXME: may be topological but no chunk. + if rows: + chunk_id = rows[-1].chunk_id + iterated_chunks = [r.chunk_id for r in rows] + else: + chunk_id = from_token.chunk + iterated_chunks = [chunk_id] + + table = ChunkDBOrderedListStore( + txn, room_id, self.clock, + ) + + if filter_clause: + filter_clause = "AND " + filter_clause + + sql = ( + "SELECT event_id, chunk_id, topological_ordering, stream_ordering" + " FROM events" + " WHERE outlier = ? AND room_id = ? %(filter_clause)s" + " ORDER BY topological_ordering %(order)s," + " stream_ordering %(order)s LIMIT ?" + ) % { + "filter_clause": filter_clause, + "order": order, + } + + args = [False, room_id] + filter_args + [limit] + + while chunk_id and (limit <= 0 or len(rows) < limit): + if chunk_id not in iterated_chunks: + iterated_chunks.append(chunk_id) + + if direction == 'b': + chunk_id = table.get_prev(chunk_id) + else: + chunk_id = table.get_next(chunk_id) + + if chunk_id is None: + break + + txn.execute(sql, args) + new_rows = [_EventDictReturn(row[0], row[1], row[2], row[3]) for row in txn] + + if not new_rows: + break + + rows.extend(new_rows) + + if limit > 0: + rows = rows[:limit] if rows: chunk = rows[-1].chunk_id @@ -809,18 +875,43 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): those that match the filter. Returns: - tuple[list[dict], str]: Returns the results as a list of dicts and - a token that points to the end of the result set. The dicts have - the keys "event_id", "topological_ordering" and "stream_orderign". + tuple[list[dict], str, list[str]]: Returns the results as a list of + dicts, a token that points to the end of the result set, and a list + of backwards extremities. The dicts have the keys "event_id", + "topological_ordering" and "stream_ordering". """ from_key = RoomStreamToken.parse(from_key) if to_key: to_key = RoomStreamToken.parse(to_key) - rows, token = yield self.runInteraction( - "paginate_room_events", self._paginate_room_events_txn, - room_id, from_key, to_key, direction, limit, event_filter, + def _do_paginate_room_events(txn): + rows, token, chunks = self._paginate_room_events_txn( + txn, room_id, from_key, to_key, direction, limit, event_filter, + ) + + # We now fetch the extremities by fetching the extremities for + # each chunk we iterated over. + extremities = [] + seen = set() + for chunk_id in chunks: + if chunk_id in seen: + continue + seen.add(chunk_id) + + event_ids = self._simple_select_onecol_txn( + txn, + table="chunk_backwards_extremities", + keyvalues={"chunk_id": chunk_id}, + retcol="event_id" + ) + + extremities.extend(e for e in event_ids if e not in extremities) + + return rows, token, extremities + + rows, token, extremities = yield self.runInteraction( + "paginate_room_events", _do_paginate_room_events, ) events = yield self._get_events( @@ -830,7 +921,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): self._set_before_and_after(events, rows) - defer.returnValue((events, token)) + defer.returnValue((events, token, extremities)) class StreamStore(StreamWorkerStore): From 47b36e9a02e0b1d56f39e6e981bfd0a51b4ea0fc Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 1 Jun 2018 11:19:57 +0100 Subject: [PATCH 04/10] Update docs for RoomStreamToken --- synapse/types.py | 24 ++++++++++++++++-------- 1 file changed, 16 insertions(+), 8 deletions(-) diff --git a/synapse/types.py b/synapse/types.py index 27f204ce79..1ff71aa4e2 100644 --- a/synapse/types.py +++ b/synapse/types.py @@ -306,7 +306,7 @@ StreamToken.START = StreamToken( ) -class RoomStreamToken(namedtuple("_StreamToken", "chunk topological stream")): +class RoomStreamToken(namedtuple("_StreamToken", ("chunk", "topological", "stream"))): """Tokens are positions between events. The token "s1" comes after event 1. s0 s1 @@ -319,14 +319,18 @@ class RoomStreamToken(namedtuple("_StreamToken", "chunk topological stream")): When traversing the live event stream events are ordered by when they arrived at the homeserver. - When traversing historic events the events are ordered by their depth in - the event graph "topological_ordering" and then by when they arrived at the - homeserver "stream_ordering". + When traversing historic events the events are ordered by the topological + ordering of the room graph. This is done using event chunks and the + `topological_ordering` column. - Live tokens start with an "s" followed by the "stream_ordering" id of the - event it comes after. Historic tokens start with a "t" followed by the - "topological_ordering" id of the event it comes after, followed by "-", - followed by the "stream_ordering" id of the event it comes after. + Live tokens start with an 's' and include the stream_ordering of the event + it comes after. Historic tokens start with a 'c' and include the chunk ID, + topological ordering and stream ordering of the event it comes after. + + (In previous versions, when chunks were not implemented, the historic tokens + started with 't' and included the topological and stream ordering. These + tokens can be roughly converted to the new format by looking up the chunk + and topological ordering of the event with the same stream ordering). """ __slots__ = [] @@ -339,6 +343,8 @@ class RoomStreamToken(namedtuple("_StreamToken", "chunk topological stream")): parts = string[1:].split('-', 1) return cls(chunk=None, topological=int(parts[0]), stream=int(parts[1])) if string[0] == 'c': + # We use '~' as both stream ordering and topological ordering + # can be negative, so we can't use '-' parts = string[1:].split('~', 2) return cls( chunk=int(parts[0]), @@ -360,6 +366,8 @@ class RoomStreamToken(namedtuple("_StreamToken", "chunk topological stream")): def __str__(self): if self.chunk is not None: + # We use '~' as both stream ordering and topological ordering + # can be negative, so we can't use '-' return "c%d~%d~%d" % (self.chunk, self.topological, self.stream) if self.topological is not None: return "t%d-%d" % (self.topological, self.stream) From 80a877e9d9382dbc5980db57c7b3d9a5de57c121 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 1 Jun 2018 11:31:16 +0100 Subject: [PATCH 05/10] Comment on stream vs topological vs depth ordering in schema --- synapse/storage/schema/full_schemas/16/im.sql | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/synapse/storage/schema/full_schemas/16/im.sql b/synapse/storage/schema/full_schemas/16/im.sql index ba5346806e..4faf8562ad 100644 --- a/synapse/storage/schema/full_schemas/16/im.sql +++ b/synapse/storage/schema/full_schemas/16/im.sql @@ -14,7 +14,12 @@ */ CREATE TABLE IF NOT EXISTS events( + -- Defines an ordering used to stream new events to clients. Events + -- fetched via backfill have negative values. stream_ordering INTEGER PRIMARY KEY, + -- Defines a topological ordering of events within a chunk + -- (The concept of a chunk was added in later schemas, this used to + -- be set to the same value as the `depth` field in an event) topological_ordering BIGINT NOT NULL, event_id TEXT NOT NULL, type TEXT NOT NULL, From 5bf4fa0fc47664bf8089ad36e70a2b2f746386eb Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 1 Jun 2018 11:43:03 +0100 Subject: [PATCH 06/10] Don't drop topo ordering when there is no chunk_id --- synapse/storage/stream.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index 34a5f7e3e7..baf3715c28 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -543,7 +543,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): stream = row.stream_ordering internal = event.internal_metadata - if topo_order and chunk: + if topo_order: internal.before = str(RoomStreamToken(chunk, topo, stream - 1)) internal.after = str(RoomStreamToken(chunk, topo, stream)) internal.order = ( From 9e7cf48461ac773cb6661465ab9cf0cf7a190c8d Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 1 Jun 2018 11:51:11 +0100 Subject: [PATCH 07/10] Reuse stream_ordering attribute instead of order The internal metadata "order" attribute was only used in one place, which was equivalent to using the stream ordering anyway. --- synapse/handlers/room.py | 3 ++- synapse/storage/stream.py | 11 +++-------- 2 files changed, 5 insertions(+), 9 deletions(-) diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index d627b6db13..870dbd3799 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -514,7 +514,8 @@ class RoomEventSource(object): events = list(room_events) events.extend(e for evs, _ in room_to_events.values() for e in evs) - events.sort(key=lambda e: e.internal_metadata.order) + # Order by the stream ordering of the events. + events.sort(key=lambda e: e.internal_metadata.stream_ordering) if limit: events[:] = events[:limit] diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index baf3715c28..d46672aa4a 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -543,20 +543,15 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): stream = row.stream_ordering internal = event.internal_metadata + + internal.stream_ordering = stream + if topo_order: internal.before = str(RoomStreamToken(chunk, topo, stream - 1)) internal.after = str(RoomStreamToken(chunk, topo, stream)) - internal.order = ( - int(chunk) if chunk else 0, - int(topo) if topo else 0, - int(stream), - ) else: internal.before = str(RoomStreamToken(None, None, stream - 1)) internal.after = str(RoomStreamToken(None, None, stream)) - internal.order = ( - 0, 0, int(stream), - ) @defer.inlineCallbacks def get_events_around(self, room_id, event_id, before_limit, after_limit): From e7bb34b72a36b7a4889618c2d00e1f8b3d826a50 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 1 Jun 2018 11:53:06 +0100 Subject: [PATCH 08/10] Use *row --- synapse/storage/stream.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index d46672aa4a..b3492d9109 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -774,7 +774,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): txn.execute(sql, args) - rows = [_EventDictReturn(row[0], row[1], row[2], row[3]) for row in txn] + rows = [_EventDictReturn(*row) for row in txn] # If we are paginating topologically and we haven't hit the limit on # number of events then we need to fetch events from the previous or @@ -824,7 +824,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): break txn.execute(sql, args) - new_rows = [_EventDictReturn(row[0], row[1], row[2], row[3]) for row in txn] + new_rows = [_EventDictReturn(*row) for row in txn] if not new_rows: break From 58aadd3dd4fc55899d049e41e789d2828dff668b Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 1 Jun 2018 11:54:24 +0100 Subject: [PATCH 09/10] Remove spurious break --- synapse/storage/stream.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index b3492d9109..5062bf4e47 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -826,9 +826,6 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): txn.execute(sql, args) new_rows = [_EventDictReturn(*row) for row in txn] - if not new_rows: - break - rows.extend(new_rows) if limit > 0: From c33810d9ccd6cb954eaf2924ecdbdcc0a79e67af Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 1 Jun 2018 11:55:08 +0100 Subject: [PATCH 10/10] Remove spurious conditional --- synapse/storage/stream.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index 5062bf4e47..0d32a3a498 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -828,8 +828,8 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): rows.extend(new_rows) - if limit > 0: - rows = rows[:limit] + # We may have inserted more rows than necessary in the loop above + rows = rows[:limit] if rows: chunk = rows[-1].chunk_id