parent
0acb5010ec
commit
a0101fc021
|
@ -0,0 +1 @@
|
||||||
|
Fix bug when using workers where pagination requests failed if a remote server returned zero events from `/backfill`. Introduced in 1.35.0.
|
|
@ -22,6 +22,7 @@ from collections.abc import Container
|
||||||
from http import HTTPStatus
|
from http import HTTPStatus
|
||||||
from typing import (
|
from typing import (
|
||||||
TYPE_CHECKING,
|
TYPE_CHECKING,
|
||||||
|
Collection,
|
||||||
Dict,
|
Dict,
|
||||||
Iterable,
|
Iterable,
|
||||||
List,
|
List,
|
||||||
|
@ -1364,11 +1365,12 @@ class FederationHandler(BaseHandler):
|
||||||
|
|
||||||
event_infos.append(_NewEventInfo(event, None, auth))
|
event_infos.append(_NewEventInfo(event, None, auth))
|
||||||
|
|
||||||
await self._auth_and_persist_events(
|
if event_infos:
|
||||||
destination,
|
await self._auth_and_persist_events(
|
||||||
room_id,
|
destination,
|
||||||
event_infos,
|
room_id,
|
||||||
)
|
event_infos,
|
||||||
|
)
|
||||||
|
|
||||||
def _sanity_check_event(self, ev: EventBase) -> None:
|
def _sanity_check_event(self, ev: EventBase) -> None:
|
||||||
"""
|
"""
|
||||||
|
@ -2077,7 +2079,7 @@ class FederationHandler(BaseHandler):
|
||||||
self,
|
self,
|
||||||
origin: str,
|
origin: str,
|
||||||
room_id: str,
|
room_id: str,
|
||||||
event_infos: Iterable[_NewEventInfo],
|
event_infos: Collection[_NewEventInfo],
|
||||||
backfilled: bool = False,
|
backfilled: bool = False,
|
||||||
) -> None:
|
) -> None:
|
||||||
"""Creates the appropriate contexts and persists events. The events
|
"""Creates the appropriate contexts and persists events. The events
|
||||||
|
@ -2088,6 +2090,9 @@ class FederationHandler(BaseHandler):
|
||||||
Notifies about the events where appropriate.
|
Notifies about the events where appropriate.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
if not event_infos:
|
||||||
|
return
|
||||||
|
|
||||||
async def prep(ev_info: _NewEventInfo):
|
async def prep(ev_info: _NewEventInfo):
|
||||||
event = ev_info.event
|
event = ev_info.event
|
||||||
with nested_logging_context(suffix=event.event_id):
|
with nested_logging_context(suffix=event.event_id):
|
||||||
|
@ -2216,13 +2221,14 @@ class FederationHandler(BaseHandler):
|
||||||
raise
|
raise
|
||||||
events_to_context[e.event_id].rejected = RejectedReason.AUTH_ERROR
|
events_to_context[e.event_id].rejected = RejectedReason.AUTH_ERROR
|
||||||
|
|
||||||
await self.persist_events_and_notify(
|
if auth_events or state:
|
||||||
room_id,
|
await self.persist_events_and_notify(
|
||||||
[
|
room_id,
|
||||||
(e, events_to_context[e.event_id])
|
[
|
||||||
for e in itertools.chain(auth_events, state)
|
(e, events_to_context[e.event_id])
|
||||||
],
|
for e in itertools.chain(auth_events, state)
|
||||||
)
|
],
|
||||||
|
)
|
||||||
|
|
||||||
new_event_context = await self.state_handler.compute_event_context(
|
new_event_context = await self.state_handler.compute_event_context(
|
||||||
event, old_state=state
|
event, old_state=state
|
||||||
|
@ -3061,7 +3067,13 @@ class FederationHandler(BaseHandler):
|
||||||
the same room.
|
the same room.
|
||||||
backfilled: Whether these events are a result of
|
backfilled: Whether these events are a result of
|
||||||
backfilling or not
|
backfilling or not
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
The stream ID after which all events have been persisted.
|
||||||
"""
|
"""
|
||||||
|
if not event_and_contexts:
|
||||||
|
return self.store.get_current_events_token()
|
||||||
|
|
||||||
instance = self.config.worker.events_shard_config.get_instance(room_id)
|
instance = self.config.worker.events_shard_config.get_instance(room_id)
|
||||||
if instance != self._instance_name:
|
if instance != self._instance_name:
|
||||||
# Limit the number of events sent over replication. We choose 200
|
# Limit the number of events sent over replication. We choose 200
|
||||||
|
|
Loading…
Reference in New Issue