Add `/notifications` endpoint to workers (#16265)

This commit is contained in:
Erik Johnston 2023-09-07 10:26:07 +01:00 committed by GitHub
parent a83f75a37d
commit 8940d1b28e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 42 additions and 37 deletions

View File

@ -0,0 +1 @@
Allow `/notifications` endpoint to be routed to workers.

View File

@ -183,6 +183,7 @@ WORKERS_CONFIG: Dict[str, Dict[str, Any]] = {
"^/_matrix/client/(r0|v3|unstable)/password_policy$", "^/_matrix/client/(r0|v3|unstable)/password_policy$",
"^/_matrix/client/(api/v1|r0|v3|unstable)/directory/room/.*$", "^/_matrix/client/(api/v1|r0|v3|unstable)/directory/room/.*$",
"^/_matrix/client/(r0|v3|unstable)/capabilities$", "^/_matrix/client/(r0|v3|unstable)/capabilities$",
"^/_matrix/client/(r0|v3|unstable)/notifications$",
], ],
"shared_extra_conf": {}, "shared_extra_conf": {},
"worker_extra_conf": "", "worker_extra_conf": "",

View File

@ -246,6 +246,7 @@ information.
^/_matrix/client/(r0|v3|unstable)/user/.*/filter(/|$) ^/_matrix/client/(r0|v3|unstable)/user/.*/filter(/|$)
^/_matrix/client/(api/v1|r0|v3|unstable)/directory/room/.*$ ^/_matrix/client/(api/v1|r0|v3|unstable)/directory/room/.*$
^/_matrix/client/(r0|v3|unstable)/capabilities$ ^/_matrix/client/(r0|v3|unstable)/capabilities$
^/_matrix/client/(r0|v3|unstable)/notifications$
# Encryption requests # Encryption requests
^/_matrix/client/(r0|v3|unstable)/keys/query$ ^/_matrix/client/(r0|v3|unstable)/keys/query$

View File

@ -36,6 +36,8 @@ logger = logging.getLogger(__name__)
class NotificationsServlet(RestServlet): class NotificationsServlet(RestServlet):
PATTERNS = client_patterns("/notifications$") PATTERNS = client_patterns("/notifications$")
CATEGORY = "Client API requests"
def __init__(self, hs: "HomeServer"): def __init__(self, hs: "HomeServer"):
super().__init__() super().__init__()
self.store = hs.get_datastores().main self.store = hs.get_datastores().main

View File

@ -1740,42 +1740,6 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
# We sleep to ensure that we don't overwhelm the DB. # We sleep to ensure that we don't overwhelm the DB.
await self._clock.sleep(1.0) await self._clock.sleep(1.0)
class EventPushActionsStore(EventPushActionsWorkerStore):
EPA_HIGHLIGHT_INDEX = "epa_highlight_index"
def __init__(
self,
database: DatabasePool,
db_conn: LoggingDatabaseConnection,
hs: "HomeServer",
):
super().__init__(database, db_conn, hs)
self.db_pool.updates.register_background_index_update(
self.EPA_HIGHLIGHT_INDEX,
index_name="event_push_actions_u_highlight",
table="event_push_actions",
columns=["user_id", "stream_ordering"],
)
self.db_pool.updates.register_background_index_update(
"event_push_actions_highlights_index",
index_name="event_push_actions_highlights_index",
table="event_push_actions",
columns=["user_id", "room_id", "topological_ordering", "stream_ordering"],
where_clause="highlight=1",
)
# Add index to make deleting old push actions faster.
self.db_pool.updates.register_background_index_update(
"event_push_actions_stream_highlight_index",
index_name="event_push_actions_stream_highlight_index",
table="event_push_actions",
columns=["highlight", "stream_ordering"],
where_clause="highlight=0",
)
async def get_push_actions_for_user( async def get_push_actions_for_user(
self, self,
user_id: str, user_id: str,
@ -1834,6 +1798,42 @@ class EventPushActionsStore(EventPushActionsWorkerStore):
] ]
class EventPushActionsStore(EventPushActionsWorkerStore):
EPA_HIGHLIGHT_INDEX = "epa_highlight_index"
def __init__(
self,
database: DatabasePool,
db_conn: LoggingDatabaseConnection,
hs: "HomeServer",
):
super().__init__(database, db_conn, hs)
self.db_pool.updates.register_background_index_update(
self.EPA_HIGHLIGHT_INDEX,
index_name="event_push_actions_u_highlight",
table="event_push_actions",
columns=["user_id", "stream_ordering"],
)
self.db_pool.updates.register_background_index_update(
"event_push_actions_highlights_index",
index_name="event_push_actions_highlights_index",
table="event_push_actions",
columns=["user_id", "room_id", "topological_ordering", "stream_ordering"],
where_clause="highlight=1",
)
# Add index to make deleting old push actions faster.
self.db_pool.updates.register_background_index_update(
"event_push_actions_stream_highlight_index",
index_name="event_push_actions_stream_highlight_index",
table="event_push_actions",
columns=["highlight", "stream_ordering"],
where_clause="highlight=0",
)
def _action_has_highlight(actions: Collection[Union[Mapping, str]]) -> bool: def _action_has_highlight(actions: Collection[Union[Mapping, str]]) -> bool:
for action in actions: for action in actions:
if not isinstance(action, dict): if not isinstance(action, dict):