diff --git a/synapse/app/pusher.py b/synapse/app/pusher.py index 8e9c0e1960..135dd58c15 100644 --- a/synapse/app/pusher.py +++ b/synapse/app/pusher.py @@ -23,10 +23,11 @@ from synapse.config.logger import LoggingConfig from synapse.config.emailconfig import EmailConfig from synapse.http.site import SynapseSite from synapse.metrics.resource import MetricsResource, METRICS_PREFIX -from synapse.storage.state import StateStore +from synapse.storage.roommember import RoomMemberStore from synapse.replication.slave.storage.events import SlavedEventStore from synapse.replication.slave.storage.pushers import SlavedPusherStore from synapse.replication.slave.storage.receipts import SlavedReceiptsStore +from synapse.replication.slave.storage.account_data import SlavedAccountDataStore from synapse.storage.engines import create_engine from synapse.storage import DataStore from synapse.util.async import sleep @@ -60,6 +61,7 @@ class SlaveConfig(DatabaseConfig): self.soft_file_limit = config.get("soft_file_limit") self.daemonize = config.get("daemonize") self.pid_file = self.abspath(config.get("pid_file")) + self.public_baseurl = config["public_baseurl"] def default_config(self, server_name, **kwargs): pid_file = self.abspath("pusher.pid") @@ -98,7 +100,8 @@ class PusherSlaveConfig(SlaveConfig, LoggingConfig, EmailConfig): class PusherSlaveStore( - SlavedEventStore, SlavedPusherStore, SlavedReceiptsStore + SlavedEventStore, SlavedPusherStore, SlavedReceiptsStore, + SlavedAccountDataStore ): update_pusher_last_stream_ordering_and_success = ( DataStore.update_pusher_last_stream_ordering_and_success.__func__ @@ -128,16 +131,13 @@ class PusherSlaveStore( DataStore.get_profile_displayname.__func__ ) - get_state_groups = ( - DataStore.get_state_groups.__func__ - ) - - _get_state_group_for_events = ( - StateStore.__dict__["_get_state_group_for_events"] - ) - - _get_state_group_for_event = ( - StateStore.__dict__["_get_state_group_for_event"] + # XXX: This is a bit broken because we don't persist forgotten rooms + # in a way that they can be streamed. This means that we don't have a + # way to invalidate the forgotten rooms cache correctly. + # For now we expire the cache every 10 minutes. + BROKEN_CACHE_EXPIRY_MS = 60 * 60 * 1000 + who_forgot_in_room = ( + RoomMemberStore.__dict__["who_forgot_in_room"] ) @@ -219,6 +219,7 @@ class PusherServer(HomeServer): store = self.get_datastore() replication_url = self.config.replication_url pusher_pool = self.get_pusherpool() + clock = self.get_clock() def stop_pusher(user_id, app_id, pushkey): key = "%s:%s" % (app_id, pushkey) @@ -270,11 +271,21 @@ class PusherServer(HomeServer): min_stream_id, max_stream_id, affected_room_ids ) + def expire_broken_caches(): + store.who_forgot_in_room.invalidate_all() + + next_expire_broken_caches_ms = 0 while True: try: args = store.stream_positions() args["timeout"] = 30000 result = yield http_client.get_json(replication_url, args=args) + now_ms = clock.time_msec() + if now_ms > next_expire_broken_caches_ms: + expire_broken_caches() + next_expire_broken_caches_ms = ( + now_ms + store.BROKEN_CACHE_EXPIRY_MS + ) yield store.process_replication(result) poke_pushers(result) except: diff --git a/synapse/push/mailer.py b/synapse/push/mailer.py index 5d60c1efcf..2be294f52e 100644 --- a/synapse/push/mailer.py +++ b/synapse/push/mailer.py @@ -397,6 +397,7 @@ class Mailer(object): return "" serverAndMediaId = value[6:] + fragment = None if '#' in serverAndMediaId: (serverAndMediaId, fragment) = serverAndMediaId.split('#', 1) fragment = "#" + fragment diff --git a/synapse/replication/slave/storage/events.py b/synapse/replication/slave/storage/events.py index 635febb174..99cddf2518 100644 --- a/synapse/replication/slave/storage/events.py +++ b/synapse/replication/slave/storage/events.py @@ -75,6 +75,18 @@ class SlavedEventStore(BaseSlavedStore): get_unread_event_push_actions_by_room_for_user = ( EventPushActionsStore.__dict__["get_unread_event_push_actions_by_room_for_user"] ) + _get_state_group_for_events = ( + StateStore.__dict__["_get_state_group_for_events"] + ) + _get_state_group_for_event = ( + StateStore.__dict__["_get_state_group_for_event"] + ) + _get_state_groups_from_groups = ( + StateStore.__dict__["_get_state_groups_from_groups"] + ) + _get_state_group_from_group = ( + StateStore.__dict__["_get_state_group_from_group"] + ) get_unread_push_actions_for_user_in_range = ( DataStore.get_unread_push_actions_for_user_in_range.__func__ @@ -96,6 +108,9 @@ class SlavedEventStore(BaseSlavedStore): get_room_events_stream_for_room = ( DataStore.get_room_events_stream_for_room.__func__ ) + get_events_around = DataStore.get_events_around.__func__ + get_state_for_events = DataStore.get_state_for_events.__func__ + get_state_groups = DataStore.get_state_groups.__func__ _set_before_and_after = DataStore._set_before_and_after @@ -116,6 +131,10 @@ class SlavedEventStore(BaseSlavedStore): DataStore._get_rooms_for_user_where_membership_is_txn.__func__ ) _get_members_rows_txn = DataStore._get_members_rows_txn.__func__ + _get_state_for_groups = DataStore._get_state_for_groups.__func__ + _get_all_state_from_cache = DataStore._get_all_state_from_cache.__func__ + _get_events_around_txn = DataStore._get_events_around_txn.__func__ + _get_some_state_from_cache = DataStore._get_some_state_from_cache.__func__ def stream_positions(self): result = super(SlavedEventStore, self).stream_positions()