Split out DB writes in federation handler
This will allow us to easily add an internal replication API to proxy these reqeusts to master, so that we can move federation APIs to workers.
This commit is contained in:
parent
3849f7f69f
commit
78a691d005
|
@ -444,7 +444,7 @@ class FederationHandler(BaseHandler):
|
|||
yield self._handle_new_events(origin, event_infos)
|
||||
|
||||
try:
|
||||
context, event_stream_id, max_stream_id = yield self._handle_new_event(
|
||||
context = yield self._handle_new_event(
|
||||
origin,
|
||||
event,
|
||||
state=state,
|
||||
|
@ -469,17 +469,6 @@ class FederationHandler(BaseHandler):
|
|||
except StoreError:
|
||||
logger.exception("Failed to store room.")
|
||||
|
||||
extra_users = []
|
||||
if event.type == EventTypes.Member:
|
||||
target_user_id = event.state_key
|
||||
target_user = UserID.from_string(target_user_id)
|
||||
extra_users.append(target_user)
|
||||
|
||||
self.notifier.on_new_room_event(
|
||||
event, event_stream_id, max_stream_id,
|
||||
extra_users=extra_users
|
||||
)
|
||||
|
||||
if event.type == EventTypes.Member:
|
||||
if event.membership == Membership.JOIN:
|
||||
# Only fire user_joined_room if the user has acutally
|
||||
|
@ -501,7 +490,7 @@ class FederationHandler(BaseHandler):
|
|||
|
||||
if newly_joined:
|
||||
user = UserID.from_string(event.state_key)
|
||||
yield user_joined_room(self.distributor, user, event.room_id)
|
||||
yield self.user_joined_room(user, event.room_id)
|
||||
|
||||
@log_function
|
||||
@defer.inlineCallbacks
|
||||
|
@ -942,7 +931,7 @@ class FederationHandler(BaseHandler):
|
|||
|
||||
self.room_queues[room_id] = []
|
||||
|
||||
yield self.store.clean_room_for_join(room_id)
|
||||
yield self._clean_room_for_join(room_id)
|
||||
|
||||
handled_events = set()
|
||||
|
||||
|
@ -981,15 +970,10 @@ class FederationHandler(BaseHandler):
|
|||
# FIXME
|
||||
pass
|
||||
|
||||
event_stream_id, max_stream_id = yield self._persist_auth_tree(
|
||||
yield self._persist_auth_tree(
|
||||
origin, auth_chain, state, event
|
||||
)
|
||||
|
||||
self.notifier.on_new_room_event(
|
||||
event, event_stream_id, max_stream_id,
|
||||
extra_users=[joinee]
|
||||
)
|
||||
|
||||
logger.debug("Finished joining %s to %s", joinee, room_id)
|
||||
finally:
|
||||
room_queue = self.room_queues[room_id]
|
||||
|
@ -1084,7 +1068,7 @@ class FederationHandler(BaseHandler):
|
|||
# would introduce the danger of backwards-compatibility problems.
|
||||
event.internal_metadata.send_on_behalf_of = origin
|
||||
|
||||
context, event_stream_id, max_stream_id = yield self._handle_new_event(
|
||||
context = yield self._handle_new_event(
|
||||
origin, event
|
||||
)
|
||||
|
||||
|
@ -1094,20 +1078,10 @@ class FederationHandler(BaseHandler):
|
|||
event.signatures,
|
||||
)
|
||||
|
||||
extra_users = []
|
||||
if event.type == EventTypes.Member:
|
||||
target_user_id = event.state_key
|
||||
target_user = UserID.from_string(target_user_id)
|
||||
extra_users.append(target_user)
|
||||
|
||||
self.notifier.on_new_room_event(
|
||||
event, event_stream_id, max_stream_id, extra_users=extra_users
|
||||
)
|
||||
|
||||
if event.type == EventTypes.Member:
|
||||
if event.content["membership"] == Membership.JOIN:
|
||||
user = UserID.from_string(event.state_key)
|
||||
yield user_joined_room(self.distributor, user, event.room_id)
|
||||
yield self.user_joined_room(user, event.room_id)
|
||||
|
||||
prev_state_ids = yield context.get_prev_state_ids(self.store)
|
||||
|
||||
|
@ -1176,17 +1150,7 @@ class FederationHandler(BaseHandler):
|
|||
)
|
||||
|
||||
context = yield self.state_handler.compute_event_context(event)
|
||||
|
||||
event_stream_id, max_stream_id = yield self.store.persist_event(
|
||||
event,
|
||||
context=context,
|
||||
)
|
||||
|
||||
target_user = UserID.from_string(event.state_key)
|
||||
self.notifier.on_new_room_event(
|
||||
event, event_stream_id, max_stream_id,
|
||||
extra_users=[target_user],
|
||||
)
|
||||
yield self._persist_events([(event, context)])
|
||||
|
||||
defer.returnValue(event)
|
||||
|
||||
|
@ -1217,17 +1181,7 @@ class FederationHandler(BaseHandler):
|
|||
)
|
||||
|
||||
context = yield self.state_handler.compute_event_context(event)
|
||||
|
||||
event_stream_id, max_stream_id = yield self.store.persist_event(
|
||||
event,
|
||||
context=context,
|
||||
)
|
||||
|
||||
target_user = UserID.from_string(event.state_key)
|
||||
self.notifier.on_new_room_event(
|
||||
event, event_stream_id, max_stream_id,
|
||||
extra_users=[target_user],
|
||||
)
|
||||
yield self._persist_events([(event, context)])
|
||||
|
||||
defer.returnValue(event)
|
||||
|
||||
|
@ -1318,7 +1272,7 @@ class FederationHandler(BaseHandler):
|
|||
|
||||
event.internal_metadata.outlier = False
|
||||
|
||||
context, event_stream_id, max_stream_id = yield self._handle_new_event(
|
||||
yield self._handle_new_event(
|
||||
origin, event
|
||||
)
|
||||
|
||||
|
@ -1328,16 +1282,6 @@ class FederationHandler(BaseHandler):
|
|||
event.signatures,
|
||||
)
|
||||
|
||||
extra_users = []
|
||||
if event.type == EventTypes.Member:
|
||||
target_user_id = event.state_key
|
||||
target_user = UserID.from_string(target_user_id)
|
||||
extra_users.append(target_user)
|
||||
|
||||
self.notifier.on_new_room_event(
|
||||
event, event_stream_id, max_stream_id, extra_users=extra_users
|
||||
)
|
||||
|
||||
defer.returnValue(None)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
|
@ -1472,9 +1416,8 @@ class FederationHandler(BaseHandler):
|
|||
event, context
|
||||
)
|
||||
|
||||
event_stream_id, max_stream_id = yield self.store.persist_event(
|
||||
event,
|
||||
context=context,
|
||||
yield self._persist_events(
|
||||
[(event, context)],
|
||||
backfilled=backfilled,
|
||||
)
|
||||
except: # noqa: E722, as we reraise the exception this is fine.
|
||||
|
@ -1487,15 +1430,7 @@ class FederationHandler(BaseHandler):
|
|||
|
||||
six.reraise(tp, value, tb)
|
||||
|
||||
if not backfilled:
|
||||
# this intentionally does not yield: we don't care about the result
|
||||
# and don't need to wait for it.
|
||||
logcontext.run_in_background(
|
||||
self.pusher_pool.on_new_notifications,
|
||||
event_stream_id, max_stream_id,
|
||||
)
|
||||
|
||||
defer.returnValue((context, event_stream_id, max_stream_id))
|
||||
defer.returnValue(context)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _handle_new_events(self, origin, event_infos, backfilled=False):
|
||||
|
@ -1517,7 +1452,7 @@ class FederationHandler(BaseHandler):
|
|||
], consumeErrors=True,
|
||||
))
|
||||
|
||||
yield self.store.persist_events(
|
||||
yield self._persist_events(
|
||||
[
|
||||
(ev_info["event"], context)
|
||||
for ev_info, context in zip(event_infos, contexts)
|
||||
|
@ -1605,7 +1540,7 @@ class FederationHandler(BaseHandler):
|
|||
raise
|
||||
events_to_context[e.event_id].rejected = RejectedReason.AUTH_ERROR
|
||||
|
||||
yield self.store.persist_events(
|
||||
yield self._persist_events(
|
||||
[
|
||||
(e, events_to_context[e.event_id])
|
||||
for e in itertools.chain(auth_events, state)
|
||||
|
@ -1616,12 +1551,10 @@ class FederationHandler(BaseHandler):
|
|||
event, old_state=state
|
||||
)
|
||||
|
||||
event_stream_id, max_stream_id = yield self.store.persist_event(
|
||||
event, new_event_context,
|
||||
yield self._persist_events(
|
||||
[(event, new_event_context)],
|
||||
)
|
||||
|
||||
defer.returnValue((event_stream_id, max_stream_id))
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _prep_event(self, origin, event, state=None, auth_events=None):
|
||||
"""
|
||||
|
@ -2347,3 +2280,69 @@ class FederationHandler(BaseHandler):
|
|||
)
|
||||
if "valid" not in response or not response["valid"]:
|
||||
raise AuthError(403, "Third party certificate was invalid")
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _persist_events(self, event_and_contexts, backfilled=False):
|
||||
"""Persists events and tells the notifier/pushers about them, if
|
||||
necessary.
|
||||
|
||||
Args:
|
||||
event_and_contexts(list[tuple[FrozenEvent, EventContext]])
|
||||
backfilled (bool): Whether these events are a result of
|
||||
backfilling or not
|
||||
|
||||
Returns:
|
||||
Deferred
|
||||
"""
|
||||
max_stream_id = yield self.store.persist_events(
|
||||
event_and_contexts,
|
||||
backfilled=backfilled,
|
||||
)
|
||||
|
||||
if not backfilled: # Never notify for backfilled events
|
||||
for event, _ in event_and_contexts:
|
||||
self._notify_persisted_event(event, max_stream_id)
|
||||
|
||||
def _notify_persisted_event(self, event, max_stream_id):
|
||||
"""Checks to see if notifier/pushers should be notified about the
|
||||
event or not.
|
||||
|
||||
Args:
|
||||
event (FrozenEvent)
|
||||
max_stream_id (int): The max_stream_id returned by persist_events
|
||||
"""
|
||||
|
||||
extra_users = []
|
||||
if event.type == EventTypes.Member:
|
||||
target_user_id = event.state_key
|
||||
|
||||
# We notify for memberships if its an invite for one of our
|
||||
# users
|
||||
if event.internal_metadata.is_outlier():
|
||||
if event.membership != Membership.INVITE:
|
||||
if not self.is_mine_id(target_user_id):
|
||||
return
|
||||
|
||||
target_user = UserID.from_string(target_user_id)
|
||||
extra_users.append(target_user)
|
||||
elif event.internal_metadata.is_outlier():
|
||||
return
|
||||
|
||||
event_stream_id = event.internal_metadata.stream_ordering
|
||||
self.notifier.on_new_room_event(
|
||||
event, event_stream_id, max_stream_id,
|
||||
extra_users=extra_users
|
||||
)
|
||||
|
||||
logcontext.run_in_background(
|
||||
self.pusher_pool.on_new_notifications,
|
||||
event_stream_id, max_stream_id,
|
||||
)
|
||||
|
||||
def _clean_room_for_join(self, room_id):
|
||||
return self.store.clean_room_for_join(room_id)
|
||||
|
||||
def user_joined_room(self, user, room_id):
|
||||
"""Called when a new user has joined the room
|
||||
"""
|
||||
return user_joined_room(self.distributor, user, room_id)
|
||||
|
|
|
@ -231,12 +231,18 @@ class EventsStore(EventsWorkerStore):
|
|||
|
||||
self._state_resolution_handler = hs.get_state_resolution_handler()
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def persist_events(self, events_and_contexts, backfilled=False):
|
||||
"""
|
||||
Write events to the database
|
||||
Args:
|
||||
events_and_contexts: list of tuples of (event, context)
|
||||
backfilled: ?
|
||||
backfilled (bool): Whether the results are retrieved from federation
|
||||
via backfill or not. Used to determine if they're "new" events
|
||||
which might update the current state etc.
|
||||
|
||||
Returns:
|
||||
Deferred[int]: he stream ordering of the latest persisted event
|
||||
"""
|
||||
partitioned = {}
|
||||
for event, ctx in events_and_contexts:
|
||||
|
@ -253,10 +259,14 @@ class EventsStore(EventsWorkerStore):
|
|||
for room_id in partitioned:
|
||||
self._maybe_start_persisting(room_id)
|
||||
|
||||
return make_deferred_yieldable(
|
||||
yield make_deferred_yieldable(
|
||||
defer.gatherResults(deferreds, consumeErrors=True)
|
||||
)
|
||||
|
||||
max_persisted_id = yield self._stream_id_gen.get_current_token()
|
||||
|
||||
defer.returnValue(max_persisted_id)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
@log_function
|
||||
def persist_event(self, event, context, backfilled=False):
|
||||
|
|
Loading…
Reference in New Issue