Refactor event ordering check to events store
This commit is contained in:
parent
131485ef66
commit
73880268ef
|
@ -42,41 +42,17 @@ class ReadMarkerHandler(BaseHandler):
|
||||||
"""
|
"""
|
||||||
|
|
||||||
# Get ordering for existing read marker
|
# Get ordering for existing read marker
|
||||||
with (yield self.read_marker_linearizer.queue(room_id + "_" + user_id)):
|
with (yield self.read_marker_linearizer.queue((room_id, user_id))):
|
||||||
account_data = yield self.store.get_account_data_for_room(user_id, room_id)
|
account_data = yield self.store.get_account_data_for_room(user_id, room_id)
|
||||||
existing_read_marker = account_data["m.read_marker"]
|
existing_read_marker = account_data["m.read_marker"]
|
||||||
|
|
||||||
should_update = True
|
should_update = True
|
||||||
|
|
||||||
res = yield self.store._simple_select_one(
|
|
||||||
table="events",
|
|
||||||
retcols=["topological_ordering", "stream_ordering"],
|
|
||||||
keyvalues={"event_id": event_id},
|
|
||||||
allow_none=True
|
|
||||||
)
|
|
||||||
|
|
||||||
if not res:
|
|
||||||
raise SynapseError(404, 'Event does not exist')
|
|
||||||
|
|
||||||
if existing_read_marker:
|
if existing_read_marker:
|
||||||
new_to = int(res["topological_ordering"])
|
should_update = yield self.store.is_event_after(
|
||||||
new_so = int(res["stream_ordering"])
|
existing_read_marker['marker'],
|
||||||
|
event_id
|
||||||
# Get ordering for existing read marker
|
|
||||||
res = yield self.store._simple_select_one(
|
|
||||||
table="events",
|
|
||||||
retcols=["topological_ordering", "stream_ordering"],
|
|
||||||
keyvalues={"event_id": existing_read_marker['marker']},
|
|
||||||
allow_none=True
|
|
||||||
)
|
)
|
||||||
existing_to = int(res["topological_ordering"]) if res else None
|
|
||||||
existing_so = int(res["stream_ordering"]) if res else None
|
|
||||||
|
|
||||||
# Prevent updating if the existing marker is ahead in the stream
|
|
||||||
if existing_to > new_to:
|
|
||||||
should_update = False
|
|
||||||
elif existing_to == new_to and existing_so >= new_so:
|
|
||||||
should_update = False
|
|
||||||
|
|
||||||
if should_update:
|
if should_update:
|
||||||
content = {
|
content = {
|
||||||
|
|
|
@ -2159,6 +2159,34 @@ class EventsStore(SQLBaseStore):
|
||||||
]
|
]
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@defer.inlineCallbacks
|
||||||
|
def is_event_after(self, event_id1, event_id2):
|
||||||
|
is_after = True
|
||||||
|
|
||||||
|
to_1, so_1 = yield self._get_event_ordering(event_id1)
|
||||||
|
to_2, so_2 = yield self._get_event_ordering(event_id2)
|
||||||
|
|
||||||
|
# Prevent updating if the existing marker is ahead in the stream
|
||||||
|
if to_1 > to_2:
|
||||||
|
is_after = False
|
||||||
|
elif to_1 == to_2 and so_1 >= so_2:
|
||||||
|
is_after = False
|
||||||
|
|
||||||
|
defer.returnValue(is_after)
|
||||||
|
|
||||||
|
@defer.inlineCallbacks
|
||||||
|
def _get_event_ordering(self, event_id):
|
||||||
|
res = yield self._simple_select_one(
|
||||||
|
table="events",
|
||||||
|
retcols=["topological_ordering", "stream_ordering"],
|
||||||
|
keyvalues={"event_id": event_id},
|
||||||
|
allow_none=True
|
||||||
|
)
|
||||||
|
|
||||||
|
if not res:
|
||||||
|
raise SynapseError(404, "Could not find event %s" % (event_id,))
|
||||||
|
|
||||||
|
defer.returnValue((int(res["topological_ordering"]), int(res["stream_ordering"])))
|
||||||
|
|
||||||
AllNewEventsResult = namedtuple("AllNewEventsResult", [
|
AllNewEventsResult = namedtuple("AllNewEventsResult", [
|
||||||
"new_forward_events", "new_backfill_events",
|
"new_forward_events", "new_backfill_events",
|
||||||
|
|
Loading…
Reference in New Issue