Merge pull request #2033 from matrix-org/erikj/repl_speed
Don't send the full event json over replication
This commit is contained in:
commit
37a187bfab
|
@ -20,7 +20,6 @@ from synapse.api.constants import EventTypes, PresenceState
|
||||||
from synapse.config._base import ConfigError
|
from synapse.config._base import ConfigError
|
||||||
from synapse.config.homeserver import HomeServerConfig
|
from synapse.config.homeserver import HomeServerConfig
|
||||||
from synapse.config.logger import setup_logging
|
from synapse.config.logger import setup_logging
|
||||||
from synapse.events import FrozenEvent
|
|
||||||
from synapse.handlers.presence import PresenceHandler
|
from synapse.handlers.presence import PresenceHandler
|
||||||
from synapse.http.site import SynapseSite
|
from synapse.http.site import SynapseSite
|
||||||
from synapse.http.server import JsonResource
|
from synapse.http.server import JsonResource
|
||||||
|
@ -411,11 +410,16 @@ class SynchrotronServer(HomeServer):
|
||||||
stream = result.get("events")
|
stream = result.get("events")
|
||||||
if stream:
|
if stream:
|
||||||
max_position = stream["position"]
|
max_position = stream["position"]
|
||||||
|
|
||||||
|
event_map = yield store.get_events([row[1] for row in stream["rows"]])
|
||||||
|
|
||||||
for row in stream["rows"]:
|
for row in stream["rows"]:
|
||||||
position = row[0]
|
position = row[0]
|
||||||
internal = json.loads(row[1])
|
event_id = row[1]
|
||||||
event_json = json.loads(row[2])
|
event = event_map.get(event_id, None)
|
||||||
event = FrozenEvent(event_json, internal_metadata_dict=internal)
|
if not event:
|
||||||
|
continue
|
||||||
|
|
||||||
extra_users = ()
|
extra_users = ()
|
||||||
if event.type == EventTypes.Member:
|
if event.type == EventTypes.Member:
|
||||||
extra_users = (event.state_key,)
|
extra_users = (event.state_key,)
|
||||||
|
|
|
@ -283,12 +283,12 @@ class ReplicationResource(Resource):
|
||||||
|
|
||||||
if request_events != upto_events_token:
|
if request_events != upto_events_token:
|
||||||
writer.write_header_and_rows("events", res.new_forward_events, (
|
writer.write_header_and_rows("events", res.new_forward_events, (
|
||||||
"position", "internal", "json", "state_group"
|
"position", "event_id", "room_id", "type", "state_key",
|
||||||
), position=upto_events_token)
|
), position=upto_events_token)
|
||||||
|
|
||||||
if request_backfill != upto_backfill_token:
|
if request_backfill != upto_backfill_token:
|
||||||
writer.write_header_and_rows("backfill", res.new_backfill_events, (
|
writer.write_header_and_rows("backfill", res.new_backfill_events, (
|
||||||
"position", "internal", "json", "state_group",
|
"position", "event_id", "room_id", "type", "state_key", "redacts",
|
||||||
), position=upto_backfill_token)
|
), position=upto_backfill_token)
|
||||||
|
|
||||||
writer.write_header_and_rows(
|
writer.write_header_and_rows(
|
||||||
|
|
|
@ -16,7 +16,6 @@ from ._base import BaseSlavedStore
|
||||||
from ._slaved_id_tracker import SlavedIdTracker
|
from ._slaved_id_tracker import SlavedIdTracker
|
||||||
|
|
||||||
from synapse.api.constants import EventTypes
|
from synapse.api.constants import EventTypes
|
||||||
from synapse.events import FrozenEvent
|
|
||||||
from synapse.storage import DataStore
|
from synapse.storage import DataStore
|
||||||
from synapse.storage.roommember import RoomMemberStore
|
from synapse.storage.roommember import RoomMemberStore
|
||||||
from synapse.storage.event_federation import EventFederationStore
|
from synapse.storage.event_federation import EventFederationStore
|
||||||
|
@ -25,7 +24,6 @@ from synapse.storage.state import StateStore
|
||||||
from synapse.storage.stream import StreamStore
|
from synapse.storage.stream import StreamStore
|
||||||
from synapse.util.caches.stream_change_cache import StreamChangeCache
|
from synapse.util.caches.stream_change_cache import StreamChangeCache
|
||||||
|
|
||||||
import ujson as json
|
|
||||||
import logging
|
import logging
|
||||||
|
|
||||||
|
|
||||||
|
@ -242,46 +240,32 @@ class SlavedEventStore(BaseSlavedStore):
|
||||||
return super(SlavedEventStore, self).process_replication(result)
|
return super(SlavedEventStore, self).process_replication(result)
|
||||||
|
|
||||||
def _process_replication_row(self, row, backfilled):
|
def _process_replication_row(self, row, backfilled):
|
||||||
internal = json.loads(row[1])
|
stream_ordering = row[0] if not backfilled else -row[0]
|
||||||
event_json = json.loads(row[2])
|
|
||||||
event = FrozenEvent(event_json, internal_metadata_dict=internal)
|
|
||||||
self.invalidate_caches_for_event(
|
self.invalidate_caches_for_event(
|
||||||
event, backfilled,
|
stream_ordering, row[1], row[2], row[3], row[4], row[5],
|
||||||
|
backfilled=backfilled,
|
||||||
)
|
)
|
||||||
|
|
||||||
def invalidate_caches_for_event(self, event, backfilled):
|
def invalidate_caches_for_event(self, stream_ordering, event_id, room_id,
|
||||||
self._invalidate_get_event_cache(event.event_id)
|
etype, state_key, redacts, backfilled):
|
||||||
|
self._invalidate_get_event_cache(event_id)
|
||||||
|
|
||||||
self.get_latest_event_ids_in_room.invalidate((event.room_id,))
|
self.get_latest_event_ids_in_room.invalidate((room_id,))
|
||||||
|
|
||||||
self.get_unread_event_push_actions_by_room_for_user.invalidate_many(
|
self.get_unread_event_push_actions_by_room_for_user.invalidate_many(
|
||||||
(event.room_id,)
|
(room_id,)
|
||||||
)
|
)
|
||||||
|
|
||||||
if not backfilled:
|
if not backfilled:
|
||||||
self._events_stream_cache.entity_has_changed(
|
self._events_stream_cache.entity_has_changed(
|
||||||
event.room_id, event.internal_metadata.stream_ordering
|
room_id, stream_ordering
|
||||||
)
|
)
|
||||||
|
|
||||||
# self.get_unread_event_push_actions_by_room_for_user.invalidate_many(
|
if redacts:
|
||||||
# (event.room_id,)
|
self._invalidate_get_event_cache(redacts)
|
||||||
# )
|
|
||||||
|
|
||||||
if event.type == EventTypes.Redaction:
|
if etype == EventTypes.Member:
|
||||||
self._invalidate_get_event_cache(event.redacts)
|
|
||||||
|
|
||||||
if event.type == EventTypes.Member:
|
|
||||||
self._membership_stream_cache.entity_has_changed(
|
self._membership_stream_cache.entity_has_changed(
|
||||||
event.state_key, event.internal_metadata.stream_ordering
|
state_key, stream_ordering
|
||||||
)
|
)
|
||||||
self.get_invited_rooms_for_user.invalidate((event.state_key,))
|
self.get_invited_rooms_for_user.invalidate((state_key,))
|
||||||
|
|
||||||
if not event.is_state():
|
|
||||||
return
|
|
||||||
|
|
||||||
if backfilled:
|
|
||||||
return
|
|
||||||
|
|
||||||
if (not event.internal_metadata.is_invite_from_remote()
|
|
||||||
and event.internal_metadata.is_outlier()):
|
|
||||||
return
|
|
||||||
|
|
|
@ -1773,14 +1773,13 @@ class EventsStore(SQLBaseStore):
|
||||||
|
|
||||||
def get_all_new_events_txn(txn):
|
def get_all_new_events_txn(txn):
|
||||||
sql = (
|
sql = (
|
||||||
"SELECT e.stream_ordering, ej.internal_metadata, ej.json, eg.state_group"
|
"SELECT e.stream_ordering, e.event_id, e.room_id, e.type,"
|
||||||
" FROM events as e"
|
" state_key, redacts"
|
||||||
" JOIN event_json as ej"
|
" FROM events AS e"
|
||||||
" ON e.event_id = ej.event_id AND e.room_id = ej.room_id"
|
" LEFT JOIN redactions USING (event_id)"
|
||||||
" LEFT JOIN event_to_state_groups as eg"
|
" LEFT JOIN state_events USING (event_id)"
|
||||||
" ON e.event_id = eg.event_id"
|
" WHERE ? < stream_ordering AND stream_ordering <= ?"
|
||||||
" WHERE ? < e.stream_ordering AND e.stream_ordering <= ?"
|
" ORDER BY stream_ordering ASC"
|
||||||
" ORDER BY e.stream_ordering ASC"
|
|
||||||
" LIMIT ?"
|
" LIMIT ?"
|
||||||
)
|
)
|
||||||
if have_forward_events:
|
if have_forward_events:
|
||||||
|
@ -1806,15 +1805,13 @@ class EventsStore(SQLBaseStore):
|
||||||
forward_ex_outliers = []
|
forward_ex_outliers = []
|
||||||
|
|
||||||
sql = (
|
sql = (
|
||||||
"SELECT -e.stream_ordering, ej.internal_metadata, ej.json,"
|
"SELECT -e.stream_ordering, e.event_id, e.room_id, e.type,"
|
||||||
" eg.state_group"
|
" state_key, redacts"
|
||||||
" FROM events as e"
|
" FROM events AS e"
|
||||||
" JOIN event_json as ej"
|
" LEFT JOIN redactions USING (event_id)"
|
||||||
" ON e.event_id = ej.event_id AND e.room_id = ej.room_id"
|
" LEFT JOIN state_events USING (event_id)"
|
||||||
" LEFT JOIN event_to_state_groups as eg"
|
" WHERE ? > stream_ordering AND stream_ordering >= ?"
|
||||||
" ON e.event_id = eg.event_id"
|
" ORDER BY stream_ordering DESC"
|
||||||
" WHERE ? > e.stream_ordering AND e.stream_ordering >= ?"
|
|
||||||
" ORDER BY e.stream_ordering DESC"
|
|
||||||
" LIMIT ?"
|
" LIMIT ?"
|
||||||
)
|
)
|
||||||
if have_backfill_events:
|
if have_backfill_events:
|
||||||
|
|
|
@ -68,7 +68,7 @@ class ReplicationResourceCase(unittest.TestCase):
|
||||||
code, body = yield get
|
code, body = yield get
|
||||||
self.assertEquals(code, 200)
|
self.assertEquals(code, 200)
|
||||||
self.assertEquals(body["events"]["field_names"], [
|
self.assertEquals(body["events"]["field_names"], [
|
||||||
"position", "internal", "json", "state_group"
|
"position", "event_id", "room_id", "type", "state_key",
|
||||||
])
|
])
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
|
|
Loading…
Reference in New Issue