Fix /initialSync error due to unhashable `RoomStreamToken` (#10827)
The deprecated /initialSync endpoint maintains a cache of responses, using parameter values as part of the cache key. When a `from` or `to` parameter is specified, it gets converted into a `StreamToken`, which contains a `RoomStreamToken` and forms part of the cache key. `RoomStreamToken`s need to be made hashable for this to work.
This commit is contained in:
parent
52913d56a5
commit
9391de3f37
|
@ -0,0 +1 @@
|
|||
Fix error in deprecated `/initialSync` endpoint when using the undocumented `from` and `to` parameters.
|
|
@ -39,6 +39,8 @@ import logging
|
|||
from collections import namedtuple
|
||||
from typing import TYPE_CHECKING, Collection, Dict, List, Optional, Set, Tuple
|
||||
|
||||
from frozendict import frozendict
|
||||
|
||||
from twisted.internet import defer
|
||||
|
||||
from synapse.api.filtering import Filter
|
||||
|
@ -379,7 +381,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore, metaclass=abc.ABCMeta):
|
|||
if p > min_pos
|
||||
}
|
||||
|
||||
return RoomStreamToken(None, min_pos, positions)
|
||||
return RoomStreamToken(None, min_pos, frozendict(positions))
|
||||
|
||||
async def get_room_events_stream_for_rooms(
|
||||
self,
|
||||
|
|
|
@ -30,6 +30,7 @@ from typing import (
|
|||
)
|
||||
|
||||
import attr
|
||||
from frozendict import frozendict
|
||||
from signedjson.key import decode_verify_key_bytes
|
||||
from unpaddedbase64 import decode_base64
|
||||
from zope.interface import Interface
|
||||
|
@ -457,6 +458,9 @@ class RoomStreamToken:
|
|||
|
||||
Note: The `RoomStreamToken` cannot have both a topological part and an
|
||||
instance map.
|
||||
|
||||
For caching purposes, `RoomStreamToken`s and by extension, all their
|
||||
attributes, must be hashable.
|
||||
"""
|
||||
|
||||
topological = attr.ib(
|
||||
|
@ -466,12 +470,12 @@ class RoomStreamToken:
|
|||
stream = attr.ib(type=int, validator=attr.validators.instance_of(int))
|
||||
|
||||
instance_map = attr.ib(
|
||||
type=Dict[str, int],
|
||||
factory=dict,
|
||||
type="frozendict[str, int]",
|
||||
factory=frozendict,
|
||||
validator=attr.validators.deep_mapping(
|
||||
key_validator=attr.validators.instance_of(str),
|
||||
value_validator=attr.validators.instance_of(int),
|
||||
mapping_validator=attr.validators.instance_of(dict),
|
||||
mapping_validator=attr.validators.instance_of(frozendict),
|
||||
),
|
||||
)
|
||||
|
||||
|
@ -507,7 +511,7 @@ class RoomStreamToken:
|
|||
return cls(
|
||||
topological=None,
|
||||
stream=stream,
|
||||
instance_map=instance_map,
|
||||
instance_map=frozendict(instance_map),
|
||||
)
|
||||
except Exception:
|
||||
pass
|
||||
|
@ -540,7 +544,7 @@ class RoomStreamToken:
|
|||
for instance in set(self.instance_map).union(other.instance_map)
|
||||
}
|
||||
|
||||
return RoomStreamToken(None, max_stream, instance_map)
|
||||
return RoomStreamToken(None, max_stream, frozendict(instance_map))
|
||||
|
||||
def as_historical_tuple(self) -> Tuple[int, int]:
|
||||
"""Returns a tuple of `(topological, stream)` for historical tokens.
|
||||
|
@ -593,6 +597,12 @@ class RoomStreamToken:
|
|||
|
||||
@attr.s(slots=True, frozen=True)
|
||||
class StreamToken:
|
||||
"""A collection of positions within multiple streams.
|
||||
|
||||
For caching purposes, `StreamToken`s and by extension, all their attributes,
|
||||
must be hashable.
|
||||
"""
|
||||
|
||||
room_key = attr.ib(
|
||||
type=RoomStreamToken, validator=attr.validators.instance_of(RoomStreamToken)
|
||||
)
|
||||
|
|
Loading…
Reference in New Issue