Convert EventStreamResult to attrs. (#11574)
This commit is contained in:
parent
17886d2603
commit
323151b787
|
@ -0,0 +1 @@
|
||||||
|
Convert `EventStreamResult` from a `namedtuple` to `attrs` to improve type hints.
|
|
@ -79,13 +79,14 @@ class EventStreamHandler:
|
||||||
# thundering herds on restart.
|
# thundering herds on restart.
|
||||||
timeout = random.randint(int(timeout * 0.9), int(timeout * 1.1))
|
timeout = random.randint(int(timeout * 0.9), int(timeout * 1.1))
|
||||||
|
|
||||||
events, tokens = await self.notifier.get_events_for(
|
stream_result = await self.notifier.get_events_for(
|
||||||
auth_user,
|
auth_user,
|
||||||
pagin_config,
|
pagin_config,
|
||||||
timeout,
|
timeout,
|
||||||
is_guest=is_guest,
|
is_guest=is_guest,
|
||||||
explicit_room_id=room_id,
|
explicit_room_id=room_id,
|
||||||
)
|
)
|
||||||
|
events = stream_result.events
|
||||||
|
|
||||||
time_now = self.clock.time_msec()
|
time_now = self.clock.time_msec()
|
||||||
|
|
||||||
|
@ -128,8 +129,8 @@ class EventStreamHandler:
|
||||||
|
|
||||||
chunk = {
|
chunk = {
|
||||||
"chunk": chunks,
|
"chunk": chunks,
|
||||||
"start": await tokens[0].to_string(self.store),
|
"start": await stream_result.start_token.to_string(self.store),
|
||||||
"end": await tokens[1].to_string(self.store),
|
"end": await stream_result.end_token.to_string(self.store),
|
||||||
}
|
}
|
||||||
|
|
||||||
return chunk
|
return chunk
|
||||||
|
|
|
@ -13,7 +13,6 @@
|
||||||
# limitations under the License.
|
# limitations under the License.
|
||||||
|
|
||||||
import logging
|
import logging
|
||||||
from collections import namedtuple
|
|
||||||
from typing import (
|
from typing import (
|
||||||
Awaitable,
|
Awaitable,
|
||||||
Callable,
|
Callable,
|
||||||
|
@ -44,7 +43,13 @@ from synapse.logging.opentracing import log_kv, start_active_span
|
||||||
from synapse.logging.utils import log_function
|
from synapse.logging.utils import log_function
|
||||||
from synapse.metrics import LaterGauge
|
from synapse.metrics import LaterGauge
|
||||||
from synapse.streams.config import PaginationConfig
|
from synapse.streams.config import PaginationConfig
|
||||||
from synapse.types import PersistedEventPosition, RoomStreamToken, StreamToken, UserID
|
from synapse.types import (
|
||||||
|
JsonDict,
|
||||||
|
PersistedEventPosition,
|
||||||
|
RoomStreamToken,
|
||||||
|
StreamToken,
|
||||||
|
UserID,
|
||||||
|
)
|
||||||
from synapse.util.async_helpers import ObservableDeferred, timeout_deferred
|
from synapse.util.async_helpers import ObservableDeferred, timeout_deferred
|
||||||
from synapse.util.metrics import Measure
|
from synapse.util.metrics import Measure
|
||||||
from synapse.visibility import filter_events_for_client
|
from synapse.visibility import filter_events_for_client
|
||||||
|
@ -178,7 +183,12 @@ class _NotifierUserStream:
|
||||||
return _NotificationListener(self.notify_deferred.observe())
|
return _NotificationListener(self.notify_deferred.observe())
|
||||||
|
|
||||||
|
|
||||||
class EventStreamResult(namedtuple("EventStreamResult", ("events", "tokens"))):
|
@attr.s(slots=True, frozen=True, auto_attribs=True)
|
||||||
|
class EventStreamResult:
|
||||||
|
events: List[Union[JsonDict, EventBase]]
|
||||||
|
start_token: StreamToken
|
||||||
|
end_token: StreamToken
|
||||||
|
|
||||||
def __bool__(self):
|
def __bool__(self):
|
||||||
return bool(self.events)
|
return bool(self.events)
|
||||||
|
|
||||||
|
@ -582,9 +592,12 @@ class Notifier:
|
||||||
before_token: StreamToken, after_token: StreamToken
|
before_token: StreamToken, after_token: StreamToken
|
||||||
) -> EventStreamResult:
|
) -> EventStreamResult:
|
||||||
if after_token == before_token:
|
if after_token == before_token:
|
||||||
return EventStreamResult([], (from_token, from_token))
|
return EventStreamResult([], from_token, from_token)
|
||||||
|
|
||||||
events: List[EventBase] = []
|
# The events fetched from each source are a JsonDict, EventBase, or
|
||||||
|
# UserPresenceState, but see below for UserPresenceState being
|
||||||
|
# converted to JsonDict.
|
||||||
|
events: List[Union[JsonDict, EventBase]] = []
|
||||||
end_token = from_token
|
end_token = from_token
|
||||||
|
|
||||||
for name, source in self.event_sources.sources.get_sources():
|
for name, source in self.event_sources.sources.get_sources():
|
||||||
|
@ -623,7 +636,7 @@ class Notifier:
|
||||||
events.extend(new_events)
|
events.extend(new_events)
|
||||||
end_token = end_token.copy_and_replace(keyname, new_key)
|
end_token = end_token.copy_and_replace(keyname, new_key)
|
||||||
|
|
||||||
return EventStreamResult(events, (from_token, end_token))
|
return EventStreamResult(events, from_token, end_token)
|
||||||
|
|
||||||
user_id_for_stream = user.to_string()
|
user_id_for_stream = user.to_string()
|
||||||
if is_peeking:
|
if is_peeking:
|
||||||
|
|
Loading…
Reference in New Issue