Minor typing fixes for `synapse/storage/persist_events.py` (#12069)
Signed-off-by: Sean Quah <seanq@element.io>
This commit is contained in:
parent
54e74cc15f
commit
f3fd8558cd
|
@ -0,0 +1 @@
|
||||||
|
Minor typing fixes.
|
|
@ -130,7 +130,7 @@ class PersistEventsStore:
|
||||||
*,
|
*,
|
||||||
current_state_for_room: Dict[str, StateMap[str]],
|
current_state_for_room: Dict[str, StateMap[str]],
|
||||||
state_delta_for_room: Dict[str, DeltaState],
|
state_delta_for_room: Dict[str, DeltaState],
|
||||||
new_forward_extremeties: Dict[str, List[str]],
|
new_forward_extremities: Dict[str, Set[str]],
|
||||||
use_negative_stream_ordering: bool = False,
|
use_negative_stream_ordering: bool = False,
|
||||||
inhibit_local_membership_updates: bool = False,
|
inhibit_local_membership_updates: bool = False,
|
||||||
) -> None:
|
) -> None:
|
||||||
|
@ -143,7 +143,7 @@ class PersistEventsStore:
|
||||||
the room based on forward extremities
|
the room based on forward extremities
|
||||||
state_delta_for_room: Map from room_id to the delta to apply to
|
state_delta_for_room: Map from room_id to the delta to apply to
|
||||||
room state
|
room state
|
||||||
new_forward_extremities: Map from room_id to list of event IDs
|
new_forward_extremities: Map from room_id to set of event IDs
|
||||||
that are the new forward extremities of the room.
|
that are the new forward extremities of the room.
|
||||||
use_negative_stream_ordering: Whether to start stream_ordering on
|
use_negative_stream_ordering: Whether to start stream_ordering on
|
||||||
the negative side and decrement. This should be set as True
|
the negative side and decrement. This should be set as True
|
||||||
|
@ -193,7 +193,7 @@ class PersistEventsStore:
|
||||||
events_and_contexts=events_and_contexts,
|
events_and_contexts=events_and_contexts,
|
||||||
inhibit_local_membership_updates=inhibit_local_membership_updates,
|
inhibit_local_membership_updates=inhibit_local_membership_updates,
|
||||||
state_delta_for_room=state_delta_for_room,
|
state_delta_for_room=state_delta_for_room,
|
||||||
new_forward_extremeties=new_forward_extremeties,
|
new_forward_extremities=new_forward_extremities,
|
||||||
)
|
)
|
||||||
persist_event_counter.inc(len(events_and_contexts))
|
persist_event_counter.inc(len(events_and_contexts))
|
||||||
|
|
||||||
|
@ -220,7 +220,7 @@ class PersistEventsStore:
|
||||||
for room_id, new_state in current_state_for_room.items():
|
for room_id, new_state in current_state_for_room.items():
|
||||||
self.store.get_current_state_ids.prefill((room_id,), new_state)
|
self.store.get_current_state_ids.prefill((room_id,), new_state)
|
||||||
|
|
||||||
for room_id, latest_event_ids in new_forward_extremeties.items():
|
for room_id, latest_event_ids in new_forward_extremities.items():
|
||||||
self.store.get_latest_event_ids_in_room.prefill(
|
self.store.get_latest_event_ids_in_room.prefill(
|
||||||
(room_id,), list(latest_event_ids)
|
(room_id,), list(latest_event_ids)
|
||||||
)
|
)
|
||||||
|
@ -334,8 +334,8 @@ class PersistEventsStore:
|
||||||
events_and_contexts: List[Tuple[EventBase, EventContext]],
|
events_and_contexts: List[Tuple[EventBase, EventContext]],
|
||||||
inhibit_local_membership_updates: bool = False,
|
inhibit_local_membership_updates: bool = False,
|
||||||
state_delta_for_room: Optional[Dict[str, DeltaState]] = None,
|
state_delta_for_room: Optional[Dict[str, DeltaState]] = None,
|
||||||
new_forward_extremeties: Optional[Dict[str, List[str]]] = None,
|
new_forward_extremities: Optional[Dict[str, Set[str]]] = None,
|
||||||
):
|
) -> None:
|
||||||
"""Insert some number of room events into the necessary database tables.
|
"""Insert some number of room events into the necessary database tables.
|
||||||
|
|
||||||
Rejected events are only inserted into the events table, the events_json table,
|
Rejected events are only inserted into the events table, the events_json table,
|
||||||
|
@ -353,13 +353,13 @@ class PersistEventsStore:
|
||||||
from the database. This is useful when retrying due to
|
from the database. This is useful when retrying due to
|
||||||
IntegrityError.
|
IntegrityError.
|
||||||
state_delta_for_room: The current-state delta for each room.
|
state_delta_for_room: The current-state delta for each room.
|
||||||
new_forward_extremetie: The new forward extremities for each room.
|
new_forward_extremities: The new forward extremities for each room.
|
||||||
For each room, a list of the event ids which are the forward
|
For each room, a list of the event ids which are the forward
|
||||||
extremities.
|
extremities.
|
||||||
|
|
||||||
"""
|
"""
|
||||||
state_delta_for_room = state_delta_for_room or {}
|
state_delta_for_room = state_delta_for_room or {}
|
||||||
new_forward_extremeties = new_forward_extremeties or {}
|
new_forward_extremities = new_forward_extremities or {}
|
||||||
|
|
||||||
all_events_and_contexts = events_and_contexts
|
all_events_and_contexts = events_and_contexts
|
||||||
|
|
||||||
|
@ -372,7 +372,7 @@ class PersistEventsStore:
|
||||||
|
|
||||||
self._update_forward_extremities_txn(
|
self._update_forward_extremities_txn(
|
||||||
txn,
|
txn,
|
||||||
new_forward_extremities=new_forward_extremeties,
|
new_forward_extremities=new_forward_extremities,
|
||||||
max_stream_order=max_stream_order,
|
max_stream_order=max_stream_order,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -1158,7 +1158,10 @@ class PersistEventsStore:
|
||||||
)
|
)
|
||||||
|
|
||||||
def _update_forward_extremities_txn(
|
def _update_forward_extremities_txn(
|
||||||
self, txn, new_forward_extremities, max_stream_order
|
self,
|
||||||
|
txn: LoggingTransaction,
|
||||||
|
new_forward_extremities: Dict[str, Set[str]],
|
||||||
|
max_stream_order: int,
|
||||||
):
|
):
|
||||||
for room_id in new_forward_extremities.keys():
|
for room_id in new_forward_extremities.keys():
|
||||||
self.db_pool.simple_delete_txn(
|
self.db_pool.simple_delete_txn(
|
||||||
|
|
|
@ -427,21 +427,21 @@ class EventsPersistenceStorage:
|
||||||
# NB: Assumes that we are only persisting events for one room
|
# NB: Assumes that we are only persisting events for one room
|
||||||
# at a time.
|
# at a time.
|
||||||
|
|
||||||
# map room_id->list[event_ids] giving the new forward
|
# map room_id->set[event_ids] giving the new forward
|
||||||
# extremities in each room
|
# extremities in each room
|
||||||
new_forward_extremeties = {}
|
new_forward_extremities: Dict[str, Set[str]] = {}
|
||||||
|
|
||||||
# map room_id->(type,state_key)->event_id tracking the full
|
# map room_id->(type,state_key)->event_id tracking the full
|
||||||
# state in each room after adding these events.
|
# state in each room after adding these events.
|
||||||
# This is simply used to prefill the get_current_state_ids
|
# This is simply used to prefill the get_current_state_ids
|
||||||
# cache
|
# cache
|
||||||
current_state_for_room = {}
|
current_state_for_room: Dict[str, StateMap[str]] = {}
|
||||||
|
|
||||||
# map room_id->(to_delete, to_insert) where to_delete is a list
|
# map room_id->(to_delete, to_insert) where to_delete is a list
|
||||||
# of type/state keys to remove from current state, and to_insert
|
# of type/state keys to remove from current state, and to_insert
|
||||||
# is a map (type,key)->event_id giving the state delta in each
|
# is a map (type,key)->event_id giving the state delta in each
|
||||||
# room
|
# room
|
||||||
state_delta_for_room = {}
|
state_delta_for_room: Dict[str, DeltaState] = {}
|
||||||
|
|
||||||
# Set of remote users which were in rooms the server has left. We
|
# Set of remote users which were in rooms the server has left. We
|
||||||
# should check if we still share any rooms and if not we mark their
|
# should check if we still share any rooms and if not we mark their
|
||||||
|
@ -460,14 +460,13 @@ class EventsPersistenceStorage:
|
||||||
)
|
)
|
||||||
|
|
||||||
for room_id, ev_ctx_rm in events_by_room.items():
|
for room_id, ev_ctx_rm in events_by_room.items():
|
||||||
latest_event_ids = (
|
latest_event_ids = set(
|
||||||
await self.main_store.get_latest_event_ids_in_room(room_id)
|
await self.main_store.get_latest_event_ids_in_room(room_id)
|
||||||
)
|
)
|
||||||
new_latest_event_ids = await self._calculate_new_extremities(
|
new_latest_event_ids = await self._calculate_new_extremities(
|
||||||
room_id, ev_ctx_rm, latest_event_ids
|
room_id, ev_ctx_rm, latest_event_ids
|
||||||
)
|
)
|
||||||
|
|
||||||
latest_event_ids = set(latest_event_ids)
|
|
||||||
if new_latest_event_ids == latest_event_ids:
|
if new_latest_event_ids == latest_event_ids:
|
||||||
# No change in extremities, so no change in state
|
# No change in extremities, so no change in state
|
||||||
continue
|
continue
|
||||||
|
@ -478,7 +477,7 @@ class EventsPersistenceStorage:
|
||||||
# extremities, so we'll `continue` above and skip this bit.)
|
# extremities, so we'll `continue` above and skip this bit.)
|
||||||
assert new_latest_event_ids, "No forward extremities left!"
|
assert new_latest_event_ids, "No forward extremities left!"
|
||||||
|
|
||||||
new_forward_extremeties[room_id] = new_latest_event_ids
|
new_forward_extremities[room_id] = new_latest_event_ids
|
||||||
|
|
||||||
len_1 = (
|
len_1 = (
|
||||||
len(latest_event_ids) == 1
|
len(latest_event_ids) == 1
|
||||||
|
@ -533,7 +532,7 @@ class EventsPersistenceStorage:
|
||||||
# extremities, so we'll `continue` above and skip this bit.)
|
# extremities, so we'll `continue` above and skip this bit.)
|
||||||
assert new_latest_event_ids, "No forward extremities left!"
|
assert new_latest_event_ids, "No forward extremities left!"
|
||||||
|
|
||||||
new_forward_extremeties[room_id] = new_latest_event_ids
|
new_forward_extremities[room_id] = new_latest_event_ids
|
||||||
|
|
||||||
# If either are not None then there has been a change,
|
# If either are not None then there has been a change,
|
||||||
# and we need to work out the delta (or use that
|
# and we need to work out the delta (or use that
|
||||||
|
@ -567,7 +566,7 @@ class EventsPersistenceStorage:
|
||||||
)
|
)
|
||||||
if not is_still_joined:
|
if not is_still_joined:
|
||||||
logger.info("Server no longer in room %s", room_id)
|
logger.info("Server no longer in room %s", room_id)
|
||||||
latest_event_ids = []
|
latest_event_ids = set()
|
||||||
current_state = {}
|
current_state = {}
|
||||||
delta.no_longer_in_room = True
|
delta.no_longer_in_room = True
|
||||||
|
|
||||||
|
@ -582,7 +581,7 @@ class EventsPersistenceStorage:
|
||||||
chunk,
|
chunk,
|
||||||
current_state_for_room=current_state_for_room,
|
current_state_for_room=current_state_for_room,
|
||||||
state_delta_for_room=state_delta_for_room,
|
state_delta_for_room=state_delta_for_room,
|
||||||
new_forward_extremeties=new_forward_extremeties,
|
new_forward_extremities=new_forward_extremities,
|
||||||
use_negative_stream_ordering=backfilled,
|
use_negative_stream_ordering=backfilled,
|
||||||
inhibit_local_membership_updates=backfilled,
|
inhibit_local_membership_updates=backfilled,
|
||||||
)
|
)
|
||||||
|
@ -596,7 +595,7 @@ class EventsPersistenceStorage:
|
||||||
room_id: str,
|
room_id: str,
|
||||||
event_contexts: List[Tuple[EventBase, EventContext]],
|
event_contexts: List[Tuple[EventBase, EventContext]],
|
||||||
latest_event_ids: Collection[str],
|
latest_event_ids: Collection[str],
|
||||||
):
|
) -> Set[str]:
|
||||||
"""Calculates the new forward extremities for a room given events to
|
"""Calculates the new forward extremities for a room given events to
|
||||||
persist.
|
persist.
|
||||||
|
|
||||||
|
@ -906,9 +905,9 @@ class EventsPersistenceStorage:
|
||||||
# Ideally we'd figure out a way of still being able to drop old
|
# Ideally we'd figure out a way of still being able to drop old
|
||||||
# dummy events that reference local events, but this is good enough
|
# dummy events that reference local events, but this is good enough
|
||||||
# as a first cut.
|
# as a first cut.
|
||||||
events_to_check = [event]
|
events_to_check: Collection[EventBase] = [event]
|
||||||
while events_to_check:
|
while events_to_check:
|
||||||
new_events = set()
|
new_events: Set[str] = set()
|
||||||
for event_to_check in events_to_check:
|
for event_to_check in events_to_check:
|
||||||
if self.is_mine_id(event_to_check.sender):
|
if self.is_mine_id(event_to_check.sender):
|
||||||
if event_to_check.type != EventTypes.Dummy:
|
if event_to_check.type != EventTypes.Dummy:
|
||||||
|
|
Loading…
Reference in New Issue