Merge pull request #408 from matrix-org/markjh/distributor_facade
Wrap calls to distributor.fire in appropriately named functions
This commit is contained in:
commit
9fbd504b4e
|
@ -28,6 +28,18 @@ import random
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
def started_user_eventstream(distributor, user):
|
||||||
|
return distributor.fire("started_user_eventstream", user)
|
||||||
|
|
||||||
|
|
||||||
|
def stopped_user_eventstream(distributor, user):
|
||||||
|
return distributor.fire("stopped_user_eventstream", user)
|
||||||
|
|
||||||
|
|
||||||
|
def user_joined_room(distributor, user, room_id):
|
||||||
|
return distributor.fire("user_joined_room", user, room_id)
|
||||||
|
|
||||||
|
|
||||||
class EventStreamHandler(BaseHandler):
|
class EventStreamHandler(BaseHandler):
|
||||||
|
|
||||||
def __init__(self, hs):
|
def __init__(self, hs):
|
||||||
|
@ -66,7 +78,7 @@ class EventStreamHandler(BaseHandler):
|
||||||
except:
|
except:
|
||||||
logger.exception("Failed to cancel event timer")
|
logger.exception("Failed to cancel event timer")
|
||||||
else:
|
else:
|
||||||
yield self.distributor.fire("started_user_eventstream", user)
|
yield started_user_eventstream(self.distributor, user)
|
||||||
|
|
||||||
self._streams_per_user[user] += 1
|
self._streams_per_user[user] += 1
|
||||||
|
|
||||||
|
@ -89,7 +101,7 @@ class EventStreamHandler(BaseHandler):
|
||||||
|
|
||||||
self._stop_timer_per_user.pop(user, None)
|
self._stop_timer_per_user.pop(user, None)
|
||||||
|
|
||||||
return self.distributor.fire("stopped_user_eventstream", user)
|
return stopped_user_eventstream(self.distributor, user)
|
||||||
|
|
||||||
logger.debug("Scheduling _later: for %s", user)
|
logger.debug("Scheduling _later: for %s", user)
|
||||||
self._stop_timer_per_user[user] = (
|
self._stop_timer_per_user[user] = (
|
||||||
|
@ -120,9 +132,7 @@ class EventStreamHandler(BaseHandler):
|
||||||
timeout = random.randint(int(timeout*0.9), int(timeout*1.1))
|
timeout = random.randint(int(timeout*0.9), int(timeout*1.1))
|
||||||
|
|
||||||
if is_guest:
|
if is_guest:
|
||||||
yield self.distributor.fire(
|
yield user_joined_room(self.distributor, auth_user, room_id)
|
||||||
"user_joined_room", user=auth_user, room_id=room_id
|
|
||||||
)
|
|
||||||
|
|
||||||
events, tokens = yield self.notifier.get_events_for(
|
events, tokens = yield self.notifier.get_events_for(
|
||||||
auth_user, pagin_config, timeout,
|
auth_user, pagin_config, timeout,
|
||||||
|
|
|
@ -44,6 +44,10 @@ import logging
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
def user_joined_room(distributor, user, room_id):
|
||||||
|
return distributor.fire("user_joined_room", user, room_id)
|
||||||
|
|
||||||
|
|
||||||
class FederationHandler(BaseHandler):
|
class FederationHandler(BaseHandler):
|
||||||
"""Handles events that originated from federation.
|
"""Handles events that originated from federation.
|
||||||
Responsible for:
|
Responsible for:
|
||||||
|
@ -60,10 +64,7 @@ class FederationHandler(BaseHandler):
|
||||||
|
|
||||||
self.hs = hs
|
self.hs = hs
|
||||||
|
|
||||||
self.distributor.observe(
|
self.distributor.observe("user_joined_room", self.user_joined_room)
|
||||||
"user_joined_room",
|
|
||||||
self._on_user_joined
|
|
||||||
)
|
|
||||||
|
|
||||||
self.waiting_for_join_list = {}
|
self.waiting_for_join_list = {}
|
||||||
|
|
||||||
|
@ -234,9 +235,7 @@ class FederationHandler(BaseHandler):
|
||||||
if event.type == EventTypes.Member:
|
if event.type == EventTypes.Member:
|
||||||
if event.membership == Membership.JOIN:
|
if event.membership == Membership.JOIN:
|
||||||
user = UserID.from_string(event.state_key)
|
user = UserID.from_string(event.state_key)
|
||||||
yield self.distributor.fire(
|
yield user_joined_room(self.distributor, user, event.room_id)
|
||||||
"user_joined_room", user=user, room_id=event.room_id
|
|
||||||
)
|
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def _filter_events_for_server(self, server_name, room_id, events):
|
def _filter_events_for_server(self, server_name, room_id, events):
|
||||||
|
@ -733,9 +732,7 @@ class FederationHandler(BaseHandler):
|
||||||
if event.type == EventTypes.Member:
|
if event.type == EventTypes.Member:
|
||||||
if event.content["membership"] == Membership.JOIN:
|
if event.content["membership"] == Membership.JOIN:
|
||||||
user = UserID.from_string(event.state_key)
|
user = UserID.from_string(event.state_key)
|
||||||
yield self.distributor.fire(
|
yield user_joined_room(self.distributor, user, event.room_id)
|
||||||
"user_joined_room", user=user, room_id=event.room_id
|
|
||||||
)
|
|
||||||
|
|
||||||
new_pdu = event
|
new_pdu = event
|
||||||
|
|
||||||
|
@ -1082,7 +1079,7 @@ class FederationHandler(BaseHandler):
|
||||||
return self.store.get_min_depth(context)
|
return self.store.get_min_depth(context)
|
||||||
|
|
||||||
@log_function
|
@log_function
|
||||||
def _on_user_joined(self, user, room_id):
|
def user_joined_room(self, user, room_id):
|
||||||
waiters = self.waiting_for_join_list.get(
|
waiters = self.waiting_for_join_list.get(
|
||||||
(user.to_string(), room_id),
|
(user.to_string(), room_id),
|
||||||
[]
|
[]
|
||||||
|
|
|
@ -31,6 +31,10 @@ import logging
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
def collect_presencelike_data(distributor, user, content):
|
||||||
|
return distributor.fire("changed_presencelike_data", user, content)
|
||||||
|
|
||||||
|
|
||||||
class MessageHandler(BaseHandler):
|
class MessageHandler(BaseHandler):
|
||||||
|
|
||||||
def __init__(self, hs):
|
def __init__(self, hs):
|
||||||
|
@ -195,10 +199,8 @@ class MessageHandler(BaseHandler):
|
||||||
if membership == Membership.JOIN:
|
if membership == Membership.JOIN:
|
||||||
joinee = UserID.from_string(builder.state_key)
|
joinee = UserID.from_string(builder.state_key)
|
||||||
# If event doesn't include a display name, add one.
|
# If event doesn't include a display name, add one.
|
||||||
yield self.distributor.fire(
|
yield collect_presencelike_data(
|
||||||
"collect_presencelike_data",
|
self.distributor, joinee, builder.content
|
||||||
joinee,
|
|
||||||
builder.content
|
|
||||||
)
|
)
|
||||||
|
|
||||||
if token_id is not None:
|
if token_id is not None:
|
||||||
|
|
|
@ -62,6 +62,14 @@ def partitionbool(l, func):
|
||||||
return ret.get(True, []), ret.get(False, [])
|
return ret.get(True, []), ret.get(False, [])
|
||||||
|
|
||||||
|
|
||||||
|
def user_presence_changed(distributor, user, statuscache):
|
||||||
|
return distributor.fire("user_presence_changed", user, statuscache)
|
||||||
|
|
||||||
|
|
||||||
|
def collect_presencelike_data(distributor, user, content):
|
||||||
|
return distributor.fire("collect_presencelike_data", user, content)
|
||||||
|
|
||||||
|
|
||||||
class PresenceHandler(BaseHandler):
|
class PresenceHandler(BaseHandler):
|
||||||
|
|
||||||
STATE_LEVELS = {
|
STATE_LEVELS = {
|
||||||
|
@ -361,9 +369,7 @@ class PresenceHandler(BaseHandler):
|
||||||
yield self.store.set_presence_state(
|
yield self.store.set_presence_state(
|
||||||
target_user.localpart, state_to_store
|
target_user.localpart, state_to_store
|
||||||
)
|
)
|
||||||
yield self.distributor.fire(
|
yield collect_presencelike_data(self.distributor, target_user, state)
|
||||||
"collect_presencelike_data", target_user, state
|
|
||||||
)
|
|
||||||
|
|
||||||
if now_level > was_level:
|
if now_level > was_level:
|
||||||
state["last_active"] = self.clock.time_msec()
|
state["last_active"] = self.clock.time_msec()
|
||||||
|
@ -878,7 +884,7 @@ class PresenceHandler(BaseHandler):
|
||||||
room_ids=room_ids,
|
room_ids=room_ids,
|
||||||
statuscache=statuscache,
|
statuscache=statuscache,
|
||||||
)
|
)
|
||||||
yield self.distributor.fire("user_presence_changed", user, statuscache)
|
yield user_presence_changed(self.distributor, user, statuscache)
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def incoming_presence(self, origin, content):
|
def incoming_presence(self, origin, content):
|
||||||
|
@ -1116,9 +1122,7 @@ class PresenceHandler(BaseHandler):
|
||||||
self._user_cachemap[user].get_state()["last_active"]
|
self._user_cachemap[user].get_state()["last_active"]
|
||||||
)
|
)
|
||||||
|
|
||||||
yield self.distributor.fire(
|
yield collect_presencelike_data(self.distributor, user, state)
|
||||||
"collect_presencelike_data", user, state
|
|
||||||
)
|
|
||||||
|
|
||||||
if "last_active" in state:
|
if "last_active" in state:
|
||||||
state = dict(state)
|
state = dict(state)
|
||||||
|
|
|
@ -28,6 +28,14 @@ import logging
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
def changed_presencelike_data(distributor, user, state):
|
||||||
|
return distributor.fire("changed_presencelike_data", user, state)
|
||||||
|
|
||||||
|
|
||||||
|
def collect_presencelike_data(distributor, user, content):
|
||||||
|
return distributor.fire("collect_presencelike_data", user, content)
|
||||||
|
|
||||||
|
|
||||||
class ProfileHandler(BaseHandler):
|
class ProfileHandler(BaseHandler):
|
||||||
|
|
||||||
def __init__(self, hs):
|
def __init__(self, hs):
|
||||||
|
@ -95,11 +103,9 @@ class ProfileHandler(BaseHandler):
|
||||||
target_user.localpart, new_displayname
|
target_user.localpart, new_displayname
|
||||||
)
|
)
|
||||||
|
|
||||||
yield self.distributor.fire(
|
yield changed_presencelike_data(self.distributor, target_user, {
|
||||||
"changed_presencelike_data", target_user, {
|
|
||||||
"displayname": new_displayname,
|
"displayname": new_displayname,
|
||||||
}
|
})
|
||||||
)
|
|
||||||
|
|
||||||
yield self._update_join_states(target_user)
|
yield self._update_join_states(target_user)
|
||||||
|
|
||||||
|
@ -144,11 +150,9 @@ class ProfileHandler(BaseHandler):
|
||||||
target_user.localpart, new_avatar_url
|
target_user.localpart, new_avatar_url
|
||||||
)
|
)
|
||||||
|
|
||||||
yield self.distributor.fire(
|
yield changed_presencelike_data(self.distributor, target_user, {
|
||||||
"changed_presencelike_data", target_user, {
|
|
||||||
"avatar_url": new_avatar_url,
|
"avatar_url": new_avatar_url,
|
||||||
}
|
})
|
||||||
)
|
|
||||||
|
|
||||||
yield self._update_join_states(target_user)
|
yield self._update_join_states(target_user)
|
||||||
|
|
||||||
|
@ -208,9 +212,7 @@ class ProfileHandler(BaseHandler):
|
||||||
"membership": Membership.JOIN,
|
"membership": Membership.JOIN,
|
||||||
}
|
}
|
||||||
|
|
||||||
yield self.distributor.fire(
|
yield collect_presencelike_data(self.distributor, user, content)
|
||||||
"collect_presencelike_data", user, content
|
|
||||||
)
|
|
||||||
|
|
||||||
msg_handler = self.hs.get_handlers().message_handler
|
msg_handler = self.hs.get_handlers().message_handler
|
||||||
try:
|
try:
|
||||||
|
|
|
@ -31,6 +31,10 @@ import urllib
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
def registered_user(distributor, user):
|
||||||
|
return distributor.fire("registered_user", user)
|
||||||
|
|
||||||
|
|
||||||
class RegistrationHandler(BaseHandler):
|
class RegistrationHandler(BaseHandler):
|
||||||
|
|
||||||
def __init__(self, hs):
|
def __init__(self, hs):
|
||||||
|
@ -98,7 +102,7 @@ class RegistrationHandler(BaseHandler):
|
||||||
password_hash=password_hash
|
password_hash=password_hash
|
||||||
)
|
)
|
||||||
|
|
||||||
yield self.distributor.fire("registered_user", user)
|
yield registered_user(self.distributor, user)
|
||||||
else:
|
else:
|
||||||
# autogen a random user ID
|
# autogen a random user ID
|
||||||
attempts = 0
|
attempts = 0
|
||||||
|
@ -117,7 +121,7 @@ class RegistrationHandler(BaseHandler):
|
||||||
token=token,
|
token=token,
|
||||||
password_hash=password_hash)
|
password_hash=password_hash)
|
||||||
|
|
||||||
self.distributor.fire("registered_user", user)
|
yield registered_user(self.distributor, user)
|
||||||
except SynapseError:
|
except SynapseError:
|
||||||
# if user id is taken, just generate another
|
# if user id is taken, just generate another
|
||||||
user_id = None
|
user_id = None
|
||||||
|
@ -167,7 +171,7 @@ class RegistrationHandler(BaseHandler):
|
||||||
token=token,
|
token=token,
|
||||||
password_hash=""
|
password_hash=""
|
||||||
)
|
)
|
||||||
self.distributor.fire("registered_user", user)
|
registered_user(self.distributor, user)
|
||||||
defer.returnValue((user_id, token))
|
defer.returnValue((user_id, token))
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
|
@ -215,7 +219,7 @@ class RegistrationHandler(BaseHandler):
|
||||||
token=token,
|
token=token,
|
||||||
password_hash=None
|
password_hash=None
|
||||||
)
|
)
|
||||||
yield self.distributor.fire("registered_user", user)
|
yield registered_user(self.distributor, user)
|
||||||
except Exception, e:
|
except Exception, e:
|
||||||
yield self.store.add_access_token_to_user(user_id, token)
|
yield self.store.add_access_token_to_user(user_id, token)
|
||||||
# Ignore Registration errors
|
# Ignore Registration errors
|
||||||
|
|
|
@ -41,6 +41,18 @@ logger = logging.getLogger(__name__)
|
||||||
id_server_scheme = "https://"
|
id_server_scheme = "https://"
|
||||||
|
|
||||||
|
|
||||||
|
def collect_presencelike_data(distributor, user, content):
|
||||||
|
return distributor.fire("collect_presencelike_data", user, content)
|
||||||
|
|
||||||
|
|
||||||
|
def user_left_room(distributor, user, room_id):
|
||||||
|
return distributor.fire("user_left_room", user=user, room_id=room_id)
|
||||||
|
|
||||||
|
|
||||||
|
def user_joined_room(distributor, user, room_id):
|
||||||
|
return distributor.fire("user_joined_room", user=user, room_id=room_id)
|
||||||
|
|
||||||
|
|
||||||
class RoomCreationHandler(BaseHandler):
|
class RoomCreationHandler(BaseHandler):
|
||||||
|
|
||||||
PRESETS_DICT = {
|
PRESETS_DICT = {
|
||||||
|
@ -438,9 +450,7 @@ class RoomMemberHandler(BaseHandler):
|
||||||
|
|
||||||
if prev_state and prev_state.membership == Membership.JOIN:
|
if prev_state and prev_state.membership == Membership.JOIN:
|
||||||
user = UserID.from_string(event.user_id)
|
user = UserID.from_string(event.user_id)
|
||||||
self.distributor.fire(
|
user_left_room(self.distributor, user, event.room_id)
|
||||||
"user_left_room", user=user, room_id=event.room_id
|
|
||||||
)
|
|
||||||
|
|
||||||
defer.returnValue({"room_id": room_id})
|
defer.returnValue({"room_id": room_id})
|
||||||
|
|
||||||
|
@ -458,9 +468,7 @@ class RoomMemberHandler(BaseHandler):
|
||||||
raise SynapseError(404, "No known servers")
|
raise SynapseError(404, "No known servers")
|
||||||
|
|
||||||
# If event doesn't include a display name, add one.
|
# If event doesn't include a display name, add one.
|
||||||
yield self.distributor.fire(
|
yield collect_presencelike_data(self.distributor, joinee, content)
|
||||||
"collect_presencelike_data", joinee, content
|
|
||||||
)
|
|
||||||
|
|
||||||
content.update({"membership": Membership.JOIN})
|
content.update({"membership": Membership.JOIN})
|
||||||
builder = self.event_builder_factory.new({
|
builder = self.event_builder_factory.new({
|
||||||
|
@ -518,9 +526,7 @@ class RoomMemberHandler(BaseHandler):
|
||||||
)
|
)
|
||||||
|
|
||||||
user = UserID.from_string(event.user_id)
|
user = UserID.from_string(event.user_id)
|
||||||
yield self.distributor.fire(
|
yield user_joined_room(self.distributor, user, room_id)
|
||||||
"user_joined_room", user=user, room_id=room_id
|
|
||||||
)
|
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def get_inviter(self, event):
|
def get_inviter(self, event):
|
||||||
|
|
Loading…
Reference in New Issue