Split event creation into a separate handler
This commit is contained in:
parent
a1beca0e25
commit
5ff3d23564
|
@ -47,21 +47,9 @@ class MessageHandler(BaseHandler):
|
||||||
self.hs = hs
|
self.hs = hs
|
||||||
self.state = hs.get_state_handler()
|
self.state = hs.get_state_handler()
|
||||||
self.clock = hs.get_clock()
|
self.clock = hs.get_clock()
|
||||||
self.validator = EventValidator()
|
|
||||||
self.profile_handler = hs.get_profile_handler()
|
|
||||||
|
|
||||||
self.pagination_lock = ReadWriteLock()
|
self.pagination_lock = ReadWriteLock()
|
||||||
|
|
||||||
self.pusher_pool = hs.get_pusherpool()
|
|
||||||
|
|
||||||
# We arbitrarily limit concurrent event creation for a room to 5.
|
|
||||||
# This is to stop us from diverging history *too* much.
|
|
||||||
self.limiter = Limiter(max_count=5)
|
|
||||||
|
|
||||||
self.action_generator = hs.get_action_generator()
|
|
||||||
|
|
||||||
self.spam_checker = hs.get_spam_checker()
|
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def purge_history(self, room_id, event_id):
|
def purge_history(self, room_id, event_id):
|
||||||
event = yield self.store.get_event(event_id)
|
event = yield self.store.get_event(event_id)
|
||||||
|
@ -182,166 +170,6 @@ class MessageHandler(BaseHandler):
|
||||||
|
|
||||||
defer.returnValue(chunk)
|
defer.returnValue(chunk)
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
|
||||||
def create_event(self, requester, event_dict, token_id=None, txn_id=None,
|
|
||||||
prev_event_ids=None):
|
|
||||||
"""
|
|
||||||
Given a dict from a client, create a new event.
|
|
||||||
|
|
||||||
Creates an FrozenEvent object, filling out auth_events, prev_events,
|
|
||||||
etc.
|
|
||||||
|
|
||||||
Adds display names to Join membership events.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
requester
|
|
||||||
event_dict (dict): An entire event
|
|
||||||
token_id (str)
|
|
||||||
txn_id (str)
|
|
||||||
prev_event_ids (list): The prev event ids to use when creating the event
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
Tuple of created event (FrozenEvent), Context
|
|
||||||
"""
|
|
||||||
builder = self.event_builder_factory.new(event_dict)
|
|
||||||
|
|
||||||
with (yield self.limiter.queue(builder.room_id)):
|
|
||||||
self.validator.validate_new(builder)
|
|
||||||
|
|
||||||
if builder.type == EventTypes.Member:
|
|
||||||
membership = builder.content.get("membership", None)
|
|
||||||
target = UserID.from_string(builder.state_key)
|
|
||||||
|
|
||||||
if membership in {Membership.JOIN, Membership.INVITE}:
|
|
||||||
# If event doesn't include a display name, add one.
|
|
||||||
profile = self.profile_handler
|
|
||||||
content = builder.content
|
|
||||||
|
|
||||||
try:
|
|
||||||
if "displayname" not in content:
|
|
||||||
content["displayname"] = yield profile.get_displayname(target)
|
|
||||||
if "avatar_url" not in content:
|
|
||||||
content["avatar_url"] = yield profile.get_avatar_url(target)
|
|
||||||
except Exception as e:
|
|
||||||
logger.info(
|
|
||||||
"Failed to get profile information for %r: %s",
|
|
||||||
target, e
|
|
||||||
)
|
|
||||||
|
|
||||||
if token_id is not None:
|
|
||||||
builder.internal_metadata.token_id = token_id
|
|
||||||
|
|
||||||
if txn_id is not None:
|
|
||||||
builder.internal_metadata.txn_id = txn_id
|
|
||||||
|
|
||||||
event, context = yield self._create_new_client_event(
|
|
||||||
builder=builder,
|
|
||||||
requester=requester,
|
|
||||||
prev_event_ids=prev_event_ids,
|
|
||||||
)
|
|
||||||
|
|
||||||
defer.returnValue((event, context))
|
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
|
||||||
def send_nonmember_event(self, requester, event, context, ratelimit=True):
|
|
||||||
"""
|
|
||||||
Persists and notifies local clients and federation of an event.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
event (FrozenEvent) the event to send.
|
|
||||||
context (Context) the context of the event.
|
|
||||||
ratelimit (bool): Whether to rate limit this send.
|
|
||||||
is_guest (bool): Whether the sender is a guest.
|
|
||||||
"""
|
|
||||||
if event.type == EventTypes.Member:
|
|
||||||
raise SynapseError(
|
|
||||||
500,
|
|
||||||
"Tried to send member event through non-member codepath"
|
|
||||||
)
|
|
||||||
|
|
||||||
# We check here if we are currently being rate limited, so that we
|
|
||||||
# don't do unnecessary work. We check again just before we actually
|
|
||||||
# send the event.
|
|
||||||
yield self.ratelimit(requester, update=False)
|
|
||||||
|
|
||||||
user = UserID.from_string(event.sender)
|
|
||||||
|
|
||||||
assert self.hs.is_mine(user), "User must be our own: %s" % (user,)
|
|
||||||
|
|
||||||
if event.is_state():
|
|
||||||
prev_state = yield self.deduplicate_state_event(event, context)
|
|
||||||
if prev_state is not None:
|
|
||||||
defer.returnValue(prev_state)
|
|
||||||
|
|
||||||
yield self.handle_new_client_event(
|
|
||||||
requester=requester,
|
|
||||||
event=event,
|
|
||||||
context=context,
|
|
||||||
ratelimit=ratelimit,
|
|
||||||
)
|
|
||||||
|
|
||||||
if event.type == EventTypes.Message:
|
|
||||||
presence = self.hs.get_presence_handler()
|
|
||||||
# We don't want to block sending messages on any presence code. This
|
|
||||||
# matters as sometimes presence code can take a while.
|
|
||||||
preserve_fn(presence.bump_presence_active_time)(user)
|
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
|
||||||
def deduplicate_state_event(self, event, context):
|
|
||||||
"""
|
|
||||||
Checks whether event is in the latest resolved state in context.
|
|
||||||
|
|
||||||
If so, returns the version of the event in context.
|
|
||||||
Otherwise, returns None.
|
|
||||||
"""
|
|
||||||
prev_event_id = context.prev_state_ids.get((event.type, event.state_key))
|
|
||||||
prev_event = yield self.store.get_event(prev_event_id, allow_none=True)
|
|
||||||
if not prev_event:
|
|
||||||
return
|
|
||||||
|
|
||||||
if prev_event and event.user_id == prev_event.user_id:
|
|
||||||
prev_content = encode_canonical_json(prev_event.content)
|
|
||||||
next_content = encode_canonical_json(event.content)
|
|
||||||
if prev_content == next_content:
|
|
||||||
defer.returnValue(prev_event)
|
|
||||||
return
|
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
|
||||||
def create_and_send_nonmember_event(
|
|
||||||
self,
|
|
||||||
requester,
|
|
||||||
event_dict,
|
|
||||||
ratelimit=True,
|
|
||||||
txn_id=None
|
|
||||||
):
|
|
||||||
"""
|
|
||||||
Creates an event, then sends it.
|
|
||||||
|
|
||||||
See self.create_event and self.send_nonmember_event.
|
|
||||||
"""
|
|
||||||
event, context = yield self.create_event(
|
|
||||||
requester,
|
|
||||||
event_dict,
|
|
||||||
token_id=requester.access_token_id,
|
|
||||||
txn_id=txn_id
|
|
||||||
)
|
|
||||||
|
|
||||||
spam_error = self.spam_checker.check_event_for_spam(event)
|
|
||||||
if spam_error:
|
|
||||||
if not isinstance(spam_error, basestring):
|
|
||||||
spam_error = "Spam is not permitted here"
|
|
||||||
raise SynapseError(
|
|
||||||
403, spam_error, Codes.FORBIDDEN
|
|
||||||
)
|
|
||||||
|
|
||||||
yield self.send_nonmember_event(
|
|
||||||
requester,
|
|
||||||
event,
|
|
||||||
context,
|
|
||||||
ratelimit=ratelimit,
|
|
||||||
)
|
|
||||||
defer.returnValue(event)
|
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def get_room_data(self, user_id=None, room_id=None,
|
def get_room_data(self, user_id=None, room_id=None,
|
||||||
event_type=None, state_key="", is_guest=False):
|
event_type=None, state_key="", is_guest=False):
|
||||||
|
@ -470,6 +298,194 @@ class MessageHandler(BaseHandler):
|
||||||
for user_id, profile in users_with_profile.iteritems()
|
for user_id, profile in users_with_profile.iteritems()
|
||||||
})
|
})
|
||||||
|
|
||||||
|
|
||||||
|
class EventCreationHandler(object):
|
||||||
|
def __init__(self, hs):
|
||||||
|
self.hs = hs
|
||||||
|
self.auth = hs.get_auth()
|
||||||
|
self.store = hs.get_datastore()
|
||||||
|
self.state = hs.get_state_handler()
|
||||||
|
self.clock = hs.get_clock()
|
||||||
|
self.validator = EventValidator()
|
||||||
|
self.profile_handler = hs.get_profile_handler()
|
||||||
|
self.event_builder_factory = hs.get_event_builder_factory()
|
||||||
|
self.server_name = hs.hostname
|
||||||
|
self.ratelimiter = hs.get_ratelimiter()
|
||||||
|
self.notifier = hs.get_notifier()
|
||||||
|
|
||||||
|
# This is only used to get at ratelimit function, and maybe_kick_guest_users
|
||||||
|
self.base_handler = BaseHandler(hs)
|
||||||
|
|
||||||
|
self.pusher_pool = hs.get_pusherpool()
|
||||||
|
|
||||||
|
# We arbitrarily limit concurrent event creation for a room to 5.
|
||||||
|
# This is to stop us from diverging history *too* much.
|
||||||
|
self.limiter = Limiter(max_count=5)
|
||||||
|
|
||||||
|
self.action_generator = hs.get_action_generator()
|
||||||
|
|
||||||
|
self.spam_checker = hs.get_spam_checker()
|
||||||
|
|
||||||
|
@defer.inlineCallbacks
|
||||||
|
def create_event(self, requester, event_dict, token_id=None, txn_id=None,
|
||||||
|
prev_event_ids=None):
|
||||||
|
"""
|
||||||
|
Given a dict from a client, create a new event.
|
||||||
|
|
||||||
|
Creates an FrozenEvent object, filling out auth_events, prev_events,
|
||||||
|
etc.
|
||||||
|
|
||||||
|
Adds display names to Join membership events.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
requester
|
||||||
|
event_dict (dict): An entire event
|
||||||
|
token_id (str)
|
||||||
|
txn_id (str)
|
||||||
|
prev_event_ids (list): The prev event ids to use when creating the event
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Tuple of created event (FrozenEvent), Context
|
||||||
|
"""
|
||||||
|
builder = self.event_builder_factory.new(event_dict)
|
||||||
|
|
||||||
|
with (yield self.limiter.queue(builder.room_id)):
|
||||||
|
self.validator.validate_new(builder)
|
||||||
|
|
||||||
|
if builder.type == EventTypes.Member:
|
||||||
|
membership = builder.content.get("membership", None)
|
||||||
|
target = UserID.from_string(builder.state_key)
|
||||||
|
|
||||||
|
if membership in {Membership.JOIN, Membership.INVITE}:
|
||||||
|
# If event doesn't include a display name, add one.
|
||||||
|
profile = self.profile_handler
|
||||||
|
content = builder.content
|
||||||
|
|
||||||
|
try:
|
||||||
|
if "displayname" not in content:
|
||||||
|
content["displayname"] = yield profile.get_displayname(target)
|
||||||
|
if "avatar_url" not in content:
|
||||||
|
content["avatar_url"] = yield profile.get_avatar_url(target)
|
||||||
|
except Exception as e:
|
||||||
|
logger.info(
|
||||||
|
"Failed to get profile information for %r: %s",
|
||||||
|
target, e
|
||||||
|
)
|
||||||
|
|
||||||
|
if token_id is not None:
|
||||||
|
builder.internal_metadata.token_id = token_id
|
||||||
|
|
||||||
|
if txn_id is not None:
|
||||||
|
builder.internal_metadata.txn_id = txn_id
|
||||||
|
|
||||||
|
event, context = yield self._create_new_client_event(
|
||||||
|
builder=builder,
|
||||||
|
requester=requester,
|
||||||
|
prev_event_ids=prev_event_ids,
|
||||||
|
)
|
||||||
|
|
||||||
|
defer.returnValue((event, context))
|
||||||
|
|
||||||
|
@defer.inlineCallbacks
|
||||||
|
def send_nonmember_event(self, requester, event, context, ratelimit=True):
|
||||||
|
"""
|
||||||
|
Persists and notifies local clients and federation of an event.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
event (FrozenEvent) the event to send.
|
||||||
|
context (Context) the context of the event.
|
||||||
|
ratelimit (bool): Whether to rate limit this send.
|
||||||
|
is_guest (bool): Whether the sender is a guest.
|
||||||
|
"""
|
||||||
|
if event.type == EventTypes.Member:
|
||||||
|
raise SynapseError(
|
||||||
|
500,
|
||||||
|
"Tried to send member event through non-member codepath"
|
||||||
|
)
|
||||||
|
|
||||||
|
# We check here if we are currently being rate limited, so that we
|
||||||
|
# don't do unnecessary work. We check again just before we actually
|
||||||
|
# send the event.
|
||||||
|
yield self.base_handler.ratelimit(requester, update=False)
|
||||||
|
|
||||||
|
user = UserID.from_string(event.sender)
|
||||||
|
|
||||||
|
assert self.hs.is_mine(user), "User must be our own: %s" % (user,)
|
||||||
|
|
||||||
|
if event.is_state():
|
||||||
|
prev_state = yield self.deduplicate_state_event(event, context)
|
||||||
|
if prev_state is not None:
|
||||||
|
defer.returnValue(prev_state)
|
||||||
|
|
||||||
|
yield self.handle_new_client_event(
|
||||||
|
requester=requester,
|
||||||
|
event=event,
|
||||||
|
context=context,
|
||||||
|
ratelimit=ratelimit,
|
||||||
|
)
|
||||||
|
|
||||||
|
if event.type == EventTypes.Message:
|
||||||
|
presence = self.hs.get_presence_handler()
|
||||||
|
# We don't want to block sending messages on any presence code. This
|
||||||
|
# matters as sometimes presence code can take a while.
|
||||||
|
preserve_fn(presence.bump_presence_active_time)(user)
|
||||||
|
|
||||||
|
@defer.inlineCallbacks
|
||||||
|
def deduplicate_state_event(self, event, context):
|
||||||
|
"""
|
||||||
|
Checks whether event is in the latest resolved state in context.
|
||||||
|
|
||||||
|
If so, returns the version of the event in context.
|
||||||
|
Otherwise, returns None.
|
||||||
|
"""
|
||||||
|
prev_event_id = context.prev_state_ids.get((event.type, event.state_key))
|
||||||
|
prev_event = yield self.store.get_event(prev_event_id, allow_none=True)
|
||||||
|
if not prev_event:
|
||||||
|
return
|
||||||
|
|
||||||
|
if prev_event and event.user_id == prev_event.user_id:
|
||||||
|
prev_content = encode_canonical_json(prev_event.content)
|
||||||
|
next_content = encode_canonical_json(event.content)
|
||||||
|
if prev_content == next_content:
|
||||||
|
defer.returnValue(prev_event)
|
||||||
|
return
|
||||||
|
|
||||||
|
@defer.inlineCallbacks
|
||||||
|
def create_and_send_nonmember_event(
|
||||||
|
self,
|
||||||
|
requester,
|
||||||
|
event_dict,
|
||||||
|
ratelimit=True,
|
||||||
|
txn_id=None
|
||||||
|
):
|
||||||
|
"""
|
||||||
|
Creates an event, then sends it.
|
||||||
|
|
||||||
|
See self.create_event and self.send_nonmember_event.
|
||||||
|
"""
|
||||||
|
event, context = yield self.create_event(
|
||||||
|
requester,
|
||||||
|
event_dict,
|
||||||
|
token_id=requester.access_token_id,
|
||||||
|
txn_id=txn_id
|
||||||
|
)
|
||||||
|
|
||||||
|
spam_error = self.spam_checker.check_event_for_spam(event)
|
||||||
|
if spam_error:
|
||||||
|
if not isinstance(spam_error, basestring):
|
||||||
|
spam_error = "Spam is not permitted here"
|
||||||
|
raise SynapseError(
|
||||||
|
403, spam_error, Codes.FORBIDDEN
|
||||||
|
)
|
||||||
|
|
||||||
|
yield self.send_nonmember_event(
|
||||||
|
requester,
|
||||||
|
event,
|
||||||
|
context,
|
||||||
|
ratelimit=ratelimit,
|
||||||
|
)
|
||||||
|
defer.returnValue(event)
|
||||||
|
|
||||||
@measure_func("_create_new_client_event")
|
@measure_func("_create_new_client_event")
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def _create_new_client_event(self, builder, requester=None, prev_event_ids=None):
|
def _create_new_client_event(self, builder, requester=None, prev_event_ids=None):
|
||||||
|
@ -509,9 +525,7 @@ class MessageHandler(BaseHandler):
|
||||||
builder.prev_events = prev_events
|
builder.prev_events = prev_events
|
||||||
builder.depth = depth
|
builder.depth = depth
|
||||||
|
|
||||||
state_handler = self.state_handler
|
context = yield self.state.compute_event_context(builder)
|
||||||
|
|
||||||
context = yield state_handler.compute_event_context(builder)
|
|
||||||
if requester:
|
if requester:
|
||||||
context.app_service = requester.app_service
|
context.app_service = requester.app_service
|
||||||
|
|
||||||
|
@ -551,7 +565,7 @@ class MessageHandler(BaseHandler):
|
||||||
# We now need to go and hit out to wherever we need to hit out to.
|
# We now need to go and hit out to wherever we need to hit out to.
|
||||||
|
|
||||||
if ratelimit:
|
if ratelimit:
|
||||||
yield self.ratelimit(requester)
|
yield self.base_handler.ratelimit(requester)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
yield self.auth.check_from_context(event, context)
|
yield self.auth.check_from_context(event, context)
|
||||||
|
@ -567,7 +581,7 @@ class MessageHandler(BaseHandler):
|
||||||
logger.exception("Failed to encode content: %r", event.content)
|
logger.exception("Failed to encode content: %r", event.content)
|
||||||
raise
|
raise
|
||||||
|
|
||||||
yield self.maybe_kick_guest_users(event, context)
|
yield self.base_handler.maybe_kick_guest_users(event, context)
|
||||||
|
|
||||||
if event.type == EventTypes.CanonicalAlias:
|
if event.type == EventTypes.CanonicalAlias:
|
||||||
# Check the alias is acually valid (at this time at least)
|
# Check the alias is acually valid (at this time at least)
|
||||||
|
|
Loading…
Reference in New Issue