Split paginate_room_events storage function
This commit is contained in:
parent
966686c845
commit
06c0d0ed08
|
@ -738,17 +738,28 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
|
|||
def has_room_changed_since(self, room_id, stream_id):
|
||||
return self._events_stream_cache.has_entity_changed(room_id, stream_id)
|
||||
|
||||
def paginate_room_events_txn(self, txn, room_id, from_key, to_key=None,
|
||||
direction='b', limit=-1, event_filter=None):
|
||||
"""Returns list of events before or after a given token.
|
||||
|
||||
class StreamStore(StreamWorkerStore):
|
||||
def get_room_max_stream_ordering(self):
|
||||
return self._stream_id_gen.get_current_token()
|
||||
Args:
|
||||
txn
|
||||
room_id (str)
|
||||
from_key (str): The token used to stream from
|
||||
to_key (str|None): A token which if given limits the results to
|
||||
only those before
|
||||
direction(char): Either 'b' or 'f' to indicate whether we are
|
||||
paginating forwards or backwards from `from_key`.
|
||||
limit (int): The maximum number of events to return. Zero or less
|
||||
means no limit.
|
||||
event_filter (Filter|None): If provided filters the events to
|
||||
those that match the filter.
|
||||
|
||||
def get_room_min_stream_ordering(self):
|
||||
return self._backfill_id_gen.get_current_token()
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def paginate_room_events(self, room_id, from_key, to_key=None,
|
||||
direction='b', limit=-1, event_filter=None):
|
||||
Returns:
|
||||
tuple[list[dict], str]: Returns the results as a list of dicts and
|
||||
a token that points to the end of the result set. The dicts have
|
||||
the keys "event_id", "toplogical_ordering" and "stream_orderign".
|
||||
"""
|
||||
# Tokens really represent positions between elements, but we use
|
||||
# the convention of pointing to the event before the gap. Hence
|
||||
# we have a bit of asymmetry when it comes to equalities.
|
||||
|
@ -795,29 +806,54 @@ class StreamStore(StreamWorkerStore):
|
|||
"limit": limit_str
|
||||
}
|
||||
|
||||
def f(txn):
|
||||
txn.execute(sql, args)
|
||||
txn.execute(sql, args)
|
||||
|
||||
rows = self.cursor_to_dict(txn)
|
||||
rows = self.cursor_to_dict(txn)
|
||||
|
||||
if rows:
|
||||
topo = rows[-1]["topological_ordering"]
|
||||
toke = rows[-1]["stream_ordering"]
|
||||
if direction == 'b':
|
||||
# Tokens are positions between events.
|
||||
# This token points *after* the last event in the chunk.
|
||||
# We need it to point to the event before it in the chunk
|
||||
# when we are going backwards so we subtract one from the
|
||||
# stream part.
|
||||
toke -= 1
|
||||
next_token = str(RoomStreamToken(topo, toke))
|
||||
else:
|
||||
# TODO (erikj): We should work out what to do here instead.
|
||||
next_token = to_key if to_key else from_key
|
||||
if rows:
|
||||
topo = rows[-1]["topological_ordering"]
|
||||
toke = rows[-1]["stream_ordering"]
|
||||
if direction == 'b':
|
||||
# Tokens are positions between events.
|
||||
# This token points *after* the last event in the chunk.
|
||||
# We need it to point to the event before it in the chunk
|
||||
# when we are going backwards so we subtract one from the
|
||||
# stream part.
|
||||
toke -= 1
|
||||
next_token = str(RoomStreamToken(topo, toke))
|
||||
else:
|
||||
# TODO (erikj): We should work out what to do here instead.
|
||||
next_token = to_key if to_key else from_key
|
||||
|
||||
return rows, next_token,
|
||||
return rows, next_token,
|
||||
|
||||
rows, token = yield self.runInteraction("paginate_room_events", f)
|
||||
@defer.inlineCallbacks
|
||||
def paginate_room_events(self, room_id, from_key, to_key=None,
|
||||
direction='b', limit=-1, event_filter=None):
|
||||
"""Returns list of events before or after a given token.
|
||||
|
||||
Args:
|
||||
room_id (str)
|
||||
from_key (str): The token used to stream from
|
||||
to_key (str|None): A token which if given limits the results to
|
||||
only those before
|
||||
direction(char): Either 'b' or 'f' to indicate whether we are
|
||||
paginating forwards or backwards from `from_key`.
|
||||
limit (int): The maximum number of events to return. Zero or less
|
||||
means no limit.
|
||||
event_filter (Filter|None): If provided filters the events to
|
||||
those that match the filter.
|
||||
|
||||
Returns:
|
||||
tuple[list[dict], str]: Returns the results as a list of dicts and
|
||||
a token that points to the end of the result set. The dicts have
|
||||
the keys "event_id", "toplogical_ordering" and "stream_orderign".
|
||||
"""
|
||||
|
||||
rows, token = yield self.runInteraction(
|
||||
"paginate_room_events", self.paginate_room_events_txn,
|
||||
room_id, from_key, to_key, direction, limit, event_filter,
|
||||
)
|
||||
|
||||
events = yield self._get_events(
|
||||
[r["event_id"] for r in rows],
|
||||
|
@ -827,3 +863,11 @@ class StreamStore(StreamWorkerStore):
|
|||
self._set_before_and_after(events, rows)
|
||||
|
||||
defer.returnValue((events, token))
|
||||
|
||||
|
||||
class StreamStore(StreamWorkerStore):
|
||||
def get_room_max_stream_ordering(self):
|
||||
return self._stream_id_gen.get_current_token()
|
||||
|
||||
def get_room_min_stream_ordering(self):
|
||||
return self._backfill_id_gen.get_current_token()
|
||||
|
|
Loading…
Reference in New Issue