Merge pull request #3582 from matrix-org/erikj/fixup_stateless

Fix missing attributes on workers.
This commit is contained in:
Erik Johnston 2018-07-23 16:44:42 +01:00 committed by GitHub
commit a646bdc670
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 15 additions and 8 deletions

1
changelog.d/3582.misc Normal file
View File

@ -0,0 +1 @@
Lazily load state on master process when using workers to reduce DB consumption

View File

@ -110,7 +110,8 @@ class EventContext(object):
return context return context
def serialize(self, event): @defer.inlineCallbacks
def serialize(self, event, store):
"""Converts self to a type that can be serialized as JSON, and then """Converts self to a type that can be serialized as JSON, and then
deserialized by `deserialize` deserialized by `deserialize`
@ -126,11 +127,12 @@ class EventContext(object):
# the prev_state_ids, so if we're a state event we include the event # the prev_state_ids, so if we're a state event we include the event
# id that we replaced in the state. # id that we replaced in the state.
if event.is_state(): if event.is_state():
prev_state_id = self.prev_state_ids.get((event.type, event.state_key)) prev_state_ids = yield self.get_prev_state_ids(store)
prev_state_id = prev_state_ids.get((event.type, event.state_key))
else: else:
prev_state_id = None prev_state_id = None
return { defer.returnValue({
"prev_state_id": prev_state_id, "prev_state_id": prev_state_id,
"event_type": event.type, "event_type": event.type,
"event_state_key": event.state_key if event.is_state() else None, "event_state_key": event.state_key if event.is_state() else None,
@ -140,7 +142,7 @@ class EventContext(object):
"delta_ids": _encode_state_dict(self.delta_ids), "delta_ids": _encode_state_dict(self.delta_ids),
"prev_state_events": self.prev_state_events, "prev_state_events": self.prev_state_events,
"app_service_id": self.app_service.id if self.app_service else None "app_service_id": self.app_service.id if self.app_service else None
} })
@staticmethod @staticmethod
def deserialize(store, input): def deserialize(store, input):

View File

@ -807,8 +807,9 @@ class EventCreationHandler(object):
# If we're a worker we need to hit out to the master. # If we're a worker we need to hit out to the master.
if self.config.worker_app: if self.config.worker_app:
yield send_event_to_master( yield send_event_to_master(
self.hs.get_clock(), clock=self.hs.get_clock(),
self.http_client, store=self.store,
client=self.http_client,
host=self.config.worker_replication_host, host=self.config.worker_replication_host,
port=self.config.worker_replication_http_port, port=self.config.worker_replication_http_port,
requester=requester, requester=requester,

View File

@ -34,12 +34,13 @@ logger = logging.getLogger(__name__)
@defer.inlineCallbacks @defer.inlineCallbacks
def send_event_to_master(clock, client, host, port, requester, event, context, def send_event_to_master(clock, store, client, host, port, requester, event, context,
ratelimit, extra_users): ratelimit, extra_users):
"""Send event to be handled on the master """Send event to be handled on the master
Args: Args:
clock (synapse.util.Clock) clock (synapse.util.Clock)
store (DataStore)
client (SimpleHttpClient) client (SimpleHttpClient)
host (str): host of master host (str): host of master
port (int): port on master listening for HTTP replication port (int): port on master listening for HTTP replication
@ -53,11 +54,13 @@ def send_event_to_master(clock, client, host, port, requester, event, context,
host, port, event.event_id, host, port, event.event_id,
) )
serialized_context = yield context.serialize(event, store)
payload = { payload = {
"event": event.get_pdu_json(), "event": event.get_pdu_json(),
"internal_metadata": event.internal_metadata.get_dict(), "internal_metadata": event.internal_metadata.get_dict(),
"rejected_reason": event.rejected_reason, "rejected_reason": event.rejected_reason,
"context": context.serialize(event), "context": serialized_context,
"requester": requester.serialize(), "requester": requester.serialize(),
"ratelimit": ratelimit, "ratelimit": ratelimit,
"extra_users": [u.to_string() for u in extra_users], "extra_users": [u.to_string() for u in extra_users],