From 5e3b254dc810c5f17f635005253a977af65e3a53 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Mon, 11 May 2015 14:37:33 +0100 Subject: [PATCH 01/13] Use wait_for_events to implement 'get_events' --- synapse/notifier.py | 115 +++++++++++++------------------------------- 1 file changed, 33 insertions(+), 82 deletions(-) diff --git a/synapse/notifier.py b/synapse/notifier.py index 78eb28e4b2..e16a4608e9 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -305,14 +305,16 @@ class Notifier(object): ) @defer.inlineCallbacks - def wait_for_events(self, user, rooms, filter, timeout, callback): + def wait_for_events(self, user, rooms, timeout, callback, + from_token=StreamToken("s0", "0", "0")): """Wait until the callback returns a non empty response or the timeout fires. """ deferred = defer.Deferred() - - from_token = StreamToken("s0", "0", "0") + appservice = yield self.hs.get_datastore().get_app_service_by_user_id( + user.to_string() + ) listener = [_NotificationListener( user=user, @@ -321,6 +323,7 @@ class Notifier(object): limit=1, timeout=timeout, deferred=deferred, + appservice=appservice, )] if timeout: @@ -363,65 +366,43 @@ class Notifier(object): defer.returnValue(result) + @defer.inlineCallbacks def get_events_for(self, user, rooms, pagination_config, timeout): """ For the given user and rooms, return any new events for them. If there are no new events wait for up to `timeout` milliseconds for any new events to happen before returning. """ - deferred = defer.Deferred() - - self._get_events( - deferred, user, rooms, pagination_config.from_token, - pagination_config.limit, timeout - ).addErrback(deferred.errback) - - return deferred - - @defer.inlineCallbacks - def _get_events(self, deferred, user, rooms, from_token, limit, timeout): + from_token = pagination_config.from_token if not from_token: from_token = yield self.event_sources.get_current_token() - appservice = yield self.hs.get_datastore().get_app_service_by_user_id( - user.to_string() - ) + limit = pagination_config.limit - listener = _NotificationListener( - user, - rooms, - from_token, - limit, - timeout, - deferred, - appservice=appservice - ) - - def _timeout_listener(): - # TODO (erikj): We should probably set to_token to the current - # max rather than reusing from_token. - # Remove the timer from the listener so we don't try to cancel it. - listener.timer = None - listener.notify( - self, - [], - listener.from_token, - listener.from_token, - ) - - if timeout: - self._register_with_keys(listener) - - yield self._check_for_updates(listener) - - if not timeout: - _timeout_listener() - else: - # Only add the timer if the listener hasn't been notified - if not listener.notified(): - listener.timer = self.clock.call_later( - timeout/1000.0, _timeout_listener + @defer.inlineCallbacks + def check_for_updates(): + events = [] + end_token = from_token + for name, source in self.event_sources.sources.items(): + keyname = "%s_key" % name + stuff, new_key = yield source.get_new_events_for_user( + user, getattr(from_token, keyname), limit, ) - return + events.extend(stuff) + end_token = from_token.copy_and_replace(keyname, new_key) + + if events: + defer.returnValue((events, (from_token, end_token))) + else: + defer.returnValue(None) + + result = yield self.wait_for_events( + user, rooms, timeout, check_for_updates, from_token=from_token + ) + + if result is None: + result = ([], (from_token, from_token)) + + defer.returnValue(result) @log_function def _register_with_keys(self, listener): @@ -436,36 +417,6 @@ class Notifier(object): listener.appservice, set() ).add(listener) - @defer.inlineCallbacks - @log_function - def _check_for_updates(self, listener): - # TODO (erikj): We need to think about limits across multiple sources - events = [] - - from_token = listener.from_token - limit = listener.limit - - # TODO (erikj): DeferredList? - for name, source in self.event_sources.sources.items(): - keyname = "%s_key" % name - - stuff, new_key = yield source.get_new_events_for_user( - listener.user, - getattr(from_token, keyname), - limit, - ) - - events.extend(stuff) - - from_token = from_token.copy_and_replace(keyname, new_key) - - end_token = from_token - - if events: - listener.notify(self, events, listener.from_token, end_token) - - defer.returnValue(listener) - def _user_joined_room(self, user, room_id): new_listeners = self.user_to_listeners.get(user, set()) From e269c511f61100bfd96bb0201db320aa6d59925c Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Mon, 11 May 2015 15:01:51 +0100 Subject: [PATCH 02/13] Don't bother passing the events to the notifier since it isn't using them --- synapse/notifier.py | 113 +++++++------------------------------------- 1 file changed, 18 insertions(+), 95 deletions(-) diff --git a/synapse/notifier.py b/synapse/notifier.py index e16a4608e9..abe12b1434 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -17,6 +17,7 @@ from twisted.internet import defer from synapse.util.logutils import log_function from synapse.util.logcontext import PreserveLoggingContext +from synapse.util.async import run_on_reactor from synapse.types import StreamToken import synapse.metrics @@ -50,13 +51,9 @@ class _NotificationListener(object): so that it can remove itself from the indexes in the Notifier class. """ - def __init__(self, user, rooms, from_token, limit, timeout, deferred, - appservice=None): + def __init__(self, user, rooms, deferred, appservice=None): self.user = user self.appservice = appservice - self.from_token = from_token - self.limit = limit - self.timeout = timeout self.deferred = deferred self.rooms = rooms self.timer = None @@ -64,17 +61,14 @@ class _NotificationListener(object): def notified(self): return self.deferred.called - def notify(self, notifier, events, start_token, end_token): + def notify(self, notifier): """ Inform whoever is listening about the new events. This will also remove this listener from all the indexes in the Notifier it knows about. """ - result = (events, (start_token, end_token)) - try: - self.deferred.callback(result) - notified_events_counter.inc_by(len(events)) + self.deferred.callback(None) except defer.AlreadyCalledError: pass @@ -161,6 +155,7 @@ class Notifier(object): listening to the room, and any listeners for the users in the `extra_users` param. """ + yield run_on_reactor() # poke any interested application service. self.hs.get_handlers().appservice_handler.notify_interested_services( event @@ -168,8 +163,6 @@ class Notifier(object): room_id = event.room_id - room_source = self.event_sources.sources["room"] - room_listeners = self.room_to_listeners.get(room_id, set()) _discard_if_notified(room_listeners) @@ -200,34 +193,12 @@ class Notifier(object): logger.debug("on_new_room_event listeners %s", listeners) - # TODO (erikj): Can we make this more efficient by hitting the - # db once? - - @defer.inlineCallbacks - def notify(listener): - events, end_key = yield room_source.get_new_events_for_user( - listener.user, - listener.from_token.room_key, - listener.limit, - ) - - if events: - end_token = listener.from_token.copy_and_replace( - "room_key", end_key - ) - - listener.notify( - self, events, listener.from_token, end_token - ) - - def eb(failure): - logger.exception("Failed to notify listener", failure) - with PreserveLoggingContext(): - yield defer.DeferredList( - [notify(l).addErrback(eb) for l in listeners], - consumeErrors=True, - ) + for listener in listeners: + try: + listener.notify(self) + except: + logger.exception("Failed to notify listener") @defer.inlineCallbacks @log_function @@ -237,11 +208,7 @@ class Notifier(object): Will wake up all listeners for the given users and rooms. """ - # TODO(paul): This is horrible, having to manually list every event - # source here individually - presence_source = self.event_sources.sources["presence"] - typing_source = self.event_sources.sources["typing"] - + yield run_on_reactor() listeners = set() for user in users: @@ -258,51 +225,12 @@ class Notifier(object): listeners |= room_listeners - @defer.inlineCallbacks - def notify(listener): - presence_events, presence_end_key = ( - yield presence_source.get_new_events_for_user( - listener.user, - listener.from_token.presence_key, - listener.limit, - ) - ) - typing_events, typing_end_key = ( - yield typing_source.get_new_events_for_user( - listener.user, - listener.from_token.typing_key, - listener.limit, - ) - ) - - if presence_events or typing_events: - end_token = listener.from_token.copy_and_replace( - "presence_key", presence_end_key - ).copy_and_replace( - "typing_key", typing_end_key - ) - - listener.notify( - self, - presence_events + typing_events, - listener.from_token, - end_token - ) - - def eb(failure): - logger.error( - "Failed to notify listener", - exc_info=( - failure.type, - failure.value, - failure.getTracebackObject()) - ) - with PreserveLoggingContext(): - yield defer.DeferredList( - [notify(l).addErrback(eb) for l in listeners], - consumeErrors=True, - ) + for listener in listeners: + try: + listener.notify(self) + except: + logger.exception("Failed to notify listener") @defer.inlineCallbacks def wait_for_events(self, user, rooms, timeout, callback, @@ -319,9 +247,6 @@ class Notifier(object): listener = [_NotificationListener( user=user, rooms=rooms, - from_token=from_token, - limit=1, - timeout=timeout, deferred=deferred, appservice=appservice, )] @@ -338,7 +263,7 @@ class Notifier(object): def _timeout_listener(): timed_out[0] = True timer[0] = None - listener[0].notify(self, [], from_token, from_token) + listener[0].notify(self) # We create multiple notification listeners so we have to manage # canceling the timeout ourselves. @@ -350,10 +275,8 @@ class Notifier(object): listener[0] = _NotificationListener( user=user, rooms=rooms, - from_token=from_token, - limit=1, - timeout=timeout, deferred=deferred, + appservice=appservice, ) self._register_with_keys(listener[0]) result = yield callback() From 2551b6645d5d0855f72638d718ceaf365bbb5938 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Tue, 12 May 2015 11:54:18 +0100 Subject: [PATCH 03/13] Update the end_token correctly, otherwise the token doesn't advance and the client gets duplicate events --- synapse/notifier.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/notifier.py b/synapse/notifier.py index abe12b1434..ef7d15671f 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -311,7 +311,7 @@ class Notifier(object): user, getattr(from_token, keyname), limit, ) events.extend(stuff) - end_token = from_token.copy_and_replace(keyname, new_key) + end_token = end_token.copy_and_replace(keyname, new_key) if events: defer.returnValue((events, (from_token, end_token))) From c37a6e151ff551bb4ec243d5d237761c2ed6b914 Mon Sep 17 00:00:00 2001 From: David Baker Date: Thu, 14 May 2015 12:03:13 +0100 Subject: [PATCH 04/13] Make shared secret registration work again --- synapse/rest/client/v2_alpha/register.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/synapse/rest/client/v2_alpha/register.py b/synapse/rest/client/v2_alpha/register.py index 3640fb4a29..72dfb876c5 100644 --- a/synapse/rest/client/v2_alpha/register.py +++ b/synapse/rest/client/v2_alpha/register.py @@ -82,8 +82,10 @@ class RegisterRestServlet(RestServlet): [LoginType.EMAIL_IDENTITY] ] + result = None if service: is_application_server = True + params = body elif 'mac' in body: # Check registration-specific shared secret auth if 'username' not in body: @@ -92,6 +94,7 @@ class RegisterRestServlet(RestServlet): body['username'], body['mac'] ) is_using_shared_secret = True + params = body else: authed, result, params = yield self.auth_handler.check_auth( flows, body, self.hs.get_ip_from_request(request) @@ -118,7 +121,7 @@ class RegisterRestServlet(RestServlet): password=new_password ) - if LoginType.EMAIL_IDENTITY in result: + if result and LoginType.EMAIL_IDENTITY in result: threepid = result[LoginType.EMAIL_IDENTITY] for reqd in ['medium', 'address', 'validated_at']: From 0c894e1ebdb204cf0a0dce16ed819b9e5d9f3fc0 Mon Sep 17 00:00:00 2001 From: David Baker Date: Thu, 14 May 2015 13:11:28 +0100 Subject: [PATCH 05/13] Throw error when creating room if alias contains whitespace #SYN-335 --- synapse/handlers/room.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index dac683616a..401cc677d1 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -26,6 +26,7 @@ from synapse.util.async import run_on_reactor from synapse.events.utils import serialize_event import logging +import string logger = logging.getLogger(__name__) @@ -50,6 +51,10 @@ class RoomCreationHandler(BaseHandler): self.ratelimit(user_id) if "room_alias_name" in config: + for wchar in string.whitespace: + if wchar in config["room_alias_name"]: + raise SynapseError(400, "Invalid characters in room alias") + room_alias = RoomAlias.create( config["room_alias_name"], self.hs.hostname, From 92e1c8983dcbc1b9e75ff71b06928fd51627f61a Mon Sep 17 00:00:00 2001 From: David Baker Date: Thu, 14 May 2015 13:21:55 +0100 Subject: [PATCH 06/13] Disallow whitespace in aliases here too --- synapse/handlers/directory.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/synapse/handlers/directory.py b/synapse/handlers/directory.py index f76febee8f..e41a688836 100644 --- a/synapse/handlers/directory.py +++ b/synapse/handlers/directory.py @@ -22,6 +22,7 @@ from synapse.api.constants import EventTypes from synapse.types import RoomAlias import logging +import string logger = logging.getLogger(__name__) @@ -40,6 +41,10 @@ class DirectoryHandler(BaseHandler): def _create_association(self, room_alias, room_id, servers=None): # general association creation for both human users and app services + for wchar in string.whitespace: + if wchar in room_alias.localpart: + raise SynapseError(400, "Invalid characters in room alias") + if not self.hs.is_mine(room_alias): raise SynapseError(400, "Room alias must be local") # TODO(erikj): Change this. From 67800f7626d07202b19da9090432aab4b0bc1aef Mon Sep 17 00:00:00 2001 From: David Baker Date: Thu, 14 May 2015 14:19:10 +0100 Subject: [PATCH 07/13] Treat setting your display name to the empty string as removing it (SYN-186). --- synapse/handlers/profile.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/synapse/handlers/profile.py b/synapse/handlers/profile.py index 71ff78ab23..799faffe53 100644 --- a/synapse/handlers/profile.py +++ b/synapse/handlers/profile.py @@ -88,6 +88,9 @@ class ProfileHandler(BaseHandler): if target_user != auth_user: raise AuthError(400, "Cannot set another user's displayname") + if new_displayname == '': + new_displayname = None + yield self.store.set_profile_displayname( target_user.localpart, new_displayname ) From c5d1b4986bbb5983054b64fdc3dd3c32e80e3c17 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Thu, 14 May 2015 14:59:31 +0100 Subject: [PATCH 08/13] Remove unused arguments and doc PresenceHandler.push_update_to_clients --- synapse/handlers/presence.py | 20 ++++++--------- tests/handlers/test_presence.py | 22 ++++------------ tests/handlers/test_presencelike.py | 39 ++++++----------------------- 3 files changed, 21 insertions(+), 60 deletions(-) diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index 1edab05492..0c246958ac 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -496,9 +496,7 @@ class PresenceHandler(BaseHandler): # We want to tell the person that just came online # presence state of people they are interested in? self.push_update_to_clients( - observed_user=target_user, users_to_push=[user], - statuscache=self._get_or_offline_usercache(target_user), ) deferreds = [] @@ -712,10 +710,7 @@ class PresenceHandler(BaseHandler): continue self.push_update_to_clients( - observed_user=user, - users_to_push=observers, - room_ids=room_ids, - statuscache=statuscache, + users_to_push=observers, room_ids=room_ids ) user_id = user.to_string() @@ -779,10 +774,7 @@ class PresenceHandler(BaseHandler): localusers = set(localusers) self.push_update_to_clients( - observed_user=observed_user, - users_to_push=localusers, - room_ids=room_ids, - statuscache=statuscache, + users_to_push=localusers, room_ids=room_ids ) remote_domains = set(remote_domains) @@ -807,8 +799,12 @@ class PresenceHandler(BaseHandler): defer.returnValue((localusers, remote_domains)) - def push_update_to_clients(self, observed_user, users_to_push=[], - room_ids=[], statuscache=None): + def push_update_to_clients(self, users_to_push=[], room_ids=[]): + """Notify clients of a new presence event. + Args: + users_to_push(list): List of users to notify. + room_ids(list): List of room_ids to notify. + """ with PreserveLoggingContext(): self.notifier.on_new_user_event( users_to_push, diff --git a/tests/handlers/test_presence.py b/tests/handlers/test_presence.py index 70147b017e..ee773797e7 100644 --- a/tests/handlers/test_presence.py +++ b/tests/handlers/test_presence.py @@ -1097,12 +1097,8 @@ class PresencePollingTestCase(MockedDatastorePresenceTestCase): # apple should see both banana and clementine currently offline self.mock_update_client.assert_has_calls([ - call(users_to_push=[self.u_apple], - observed_user=self.u_banana, - statuscache=ANY), - call(users_to_push=[self.u_apple], - observed_user=self.u_clementine, - statuscache=ANY), + call(users_to_push=[self.u_apple]), + call(users_to_push=[self.u_apple]), ], any_order=True) # Gut-wrenching tests @@ -1121,13 +1117,8 @@ class PresencePollingTestCase(MockedDatastorePresenceTestCase): # apple and banana should now both see each other online self.mock_update_client.assert_has_calls([ - call(users_to_push=set([self.u_apple]), - observed_user=self.u_banana, - room_ids=[], - statuscache=ANY), - call(users_to_push=[self.u_banana], - observed_user=self.u_apple, - statuscache=ANY), + call(users_to_push=set([self.u_apple]), room_ids=[]), + call(users_to_push=[self.u_banana]), ], any_order=True) self.assertTrue("apple" in self.handler._local_pushmap) @@ -1143,10 +1134,7 @@ class PresencePollingTestCase(MockedDatastorePresenceTestCase): # banana should now be told apple is offline self.mock_update_client.assert_has_calls([ - call(users_to_push=set([self.u_banana, self.u_apple]), - observed_user=self.u_apple, - room_ids=[], - statuscache=ANY), + call(users_to_push=set([self.u_banana, self.u_apple]), room_ids=[]), ], any_order=True) self.assertFalse("banana" in self.handler._local_pushmap) diff --git a/tests/handlers/test_presencelike.py b/tests/handlers/test_presencelike.py index 977e832da7..1f2e66ac11 100644 --- a/tests/handlers/test_presencelike.py +++ b/tests/handlers/test_presencelike.py @@ -209,20 +209,12 @@ class PresenceProfilelikeDataTestCase(unittest.TestCase): ], presence) self.mock_update_client.assert_has_calls([ - call(users_to_push=set([self.u_apple, self.u_banana, self.u_clementine]), - room_ids=[], - observed_user=self.u_apple, - statuscache=ANY), # self-reflection + call( + users_to_push={self.u_apple, self.u_banana, self.u_clementine}, + room_ids=[] + ), ], any_order=True) - statuscache = self.mock_update_client.call_args[1]["statuscache"] - self.assertEquals({ - "presence": ONLINE, - "last_active": 1000000, # MockClock - "displayname": "Frank", - "avatar_url": "http://foo", - }, statuscache.state) - self.mock_update_client.reset_mock() self.datastore.set_profile_displayname.return_value = defer.succeed( @@ -232,21 +224,12 @@ class PresenceProfilelikeDataTestCase(unittest.TestCase): self.u_apple, "I am an Apple") self.mock_update_client.assert_has_calls([ - call(users_to_push=set([self.u_apple, self.u_banana, self.u_clementine]), + call( + users_to_push={self.u_apple, self.u_banana, self.u_clementine}, room_ids=[], - observed_user=self.u_apple, - statuscache=ANY), # self-reflection + ), ], any_order=True) - statuscache = self.mock_update_client.call_args[1]["statuscache"] - self.assertEquals({ - "presence": ONLINE, - "last_active": 1000000, # MockClock - "displayname": "I am an Apple", - "avatar_url": "http://foo", - }, statuscache.state) - - @defer.inlineCallbacks def test_push_remote(self): self.presence_list = [ @@ -314,13 +297,7 @@ class PresenceProfilelikeDataTestCase(unittest.TestCase): self.mock_update_client.assert_called_with( users_to_push=set([self.u_apple]), room_ids=[], - observed_user=self.u_potato, - statuscache=ANY) - - statuscache = self.mock_update_client.call_args[1]["statuscache"] - self.assertEquals({"presence": ONLINE, - "displayname": "Frank", - "avatar_url": "http://foo"}, statuscache.state) + ) state = yield self.handlers.presence_handler.get_state(self.u_potato, self.u_apple) From 6e1ad283cf671e0ff1b04856a2f711643900c436 Mon Sep 17 00:00:00 2001 From: David Baker Date: Thu, 14 May 2015 16:39:19 +0100 Subject: [PATCH 09/13] Support gzip encoding for client, client v2 and web client resources (SYN-176). --- synapse/app/homeserver.py | 21 ++++++++++++++++----- 1 file changed, 16 insertions(+), 5 deletions(-) diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py index c227265190..dfb5314ff7 100755 --- a/synapse/app/homeserver.py +++ b/synapse/app/homeserver.py @@ -32,9 +32,9 @@ from synapse.server import HomeServer from twisted.internet import reactor from twisted.application import service from twisted.enterprise import adbapi -from twisted.web.resource import Resource +from twisted.web.resource import Resource, EncodingResourceWrapper from twisted.web.static import File -from twisted.web.server import Site +from twisted.web.server import Site, GzipEncoderFactory from twisted.web.http import proxiedLogFormatter, combinedLogFormatter from synapse.http.server import JsonResource, RootRedirect from synapse.rest.media.v0.content_repository import ContentRepoResource @@ -69,16 +69,26 @@ import subprocess logger = logging.getLogger("synapse.app.homeserver") +class GzipFile(File): + def getChild(self, path, request): + child = File.getChild(self, path, request) + return EncodingResourceWrapper(child, [GzipEncoderFactory()]) + + +def gz_wrap(r): + return EncodingResourceWrapper(r, [GzipEncoderFactory()]) + + class SynapseHomeServer(HomeServer): def build_http_client(self): return MatrixFederationHttpClient(self) def build_resource_for_client(self): - return ClientV1RestResource(self) + return gz_wrap(ClientV1RestResource(self)) def build_resource_for_client_v2_alpha(self): - return ClientV2AlphaRestResource(self) + return gz_wrap(ClientV2AlphaRestResource(self)) def build_resource_for_federation(self): return JsonResource(self) @@ -87,9 +97,10 @@ class SynapseHomeServer(HomeServer): import syweb syweb_path = os.path.dirname(syweb.__file__) webclient_path = os.path.join(syweb_path, "webclient") - return File(webclient_path) # TODO configurable? + return GzipFile(webclient_path) # TODO configurable? def build_resource_for_static_content(self): + # This is old and should go away: not going to bother adding gzip return File("static") def build_resource_for_content_repo(self): From 47ec693e29ce61885b605191b97a69c1cbf7ab09 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Thu, 14 May 2015 15:29:58 +0100 Subject: [PATCH 10/13] More doc-strings --- synapse/handlers/presence.py | 241 +++++++++++++++++++++++++++++------ 1 file changed, 202 insertions(+), 39 deletions(-) diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index 0c246958ac..23302242bd 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -317,6 +317,13 @@ class PresenceHandler(BaseHandler): @defer.inlineCallbacks def user_joined_room(self, user, room_id): + """Called via the distributor whenever a user joins a room. + Notifies the new member of the presence of the current members. + Notifies the current members of the room of the new member's presence. + Args: + user(UserID): The user who joined the room. + room_id(str): The room id the user joined. + """ if self.hs.is_mine(user): statuscache = self._get_or_make_usercache(user) @@ -344,6 +351,7 @@ class PresenceHandler(BaseHandler): @defer.inlineCallbacks def send_invite(self, observer_user, observed_user): + """Request the presence of a local or remote user for a local user""" if not self.hs.is_mine(observer_user): raise SynapseError(400, "User is not hosted on this Home Server") @@ -378,6 +386,16 @@ class PresenceHandler(BaseHandler): @defer.inlineCallbacks def invite_presence(self, observed_user, observer_user): + """Handles a m.presence_invite EDU. A remote or local user has + requested presence updates for a local user. If the invite is accepted + then allow the local or remote user to see the presence of the local + user. + + Args: + observed_user(UserID): The local user whose presence is requested. + observer_user(UserID): The remote or local user requesting presence. + + """ accept = yield self._should_accept_invite(observed_user, observer_user) if accept: @@ -404,6 +422,14 @@ class PresenceHandler(BaseHandler): @defer.inlineCallbacks def accept_presence(self, observed_user, observer_user): + """Handles a m.presence_accept EDU. Mark a presence invite from a + local or remote user as accepted in a local user's presence list. + Starts polling for presence updates from the local or remote user. + + Args: + observed_user(UserID): The user to update in the presence list. + observer_user(UserID): The owner of the presence list to update. + """ yield self.store.set_presence_list_accepted( observer_user.localpart, observed_user.to_string() ) @@ -414,6 +440,15 @@ class PresenceHandler(BaseHandler): @defer.inlineCallbacks def deny_presence(self, observed_user, observer_user): + """Handle a m.presence_deny EDU. Removes a local or remote user from a + local user's presence list. + + Args: + observed_user: The local or remote user to remove from the list. + observer_user: The local owner of the presence list. + Returns: + A Deferred. + """ yield self.store.del_presence_list( observer_user.localpart, observed_user.to_string() ) @@ -422,6 +457,15 @@ class PresenceHandler(BaseHandler): @defer.inlineCallbacks def drop(self, observed_user, observer_user): + """Remove a local or remote user from a local user's presence list and + unsubscribe the local user from updates that user. + + Args: + observed_user: The local or remote user to remove from the list. + observer_user: The local owner of the presence list. + Returns: + A Deferred. + """ if not self.hs.is_mine(observer_user): raise SynapseError(400, "User is not hosted on this Home Server") @@ -435,6 +479,16 @@ class PresenceHandler(BaseHandler): @defer.inlineCallbacks def get_presence_list(self, observer_user, accepted=None): + """Get the presence list for a local user. The retured list includes + the current presence state for each user listed. + + Args: + observer_user(UserID): The local user whose presence list to fetch. + accepted(bool or None): If not none then only include users who + have or have not accepted the presence invite request. + Returns: + A Deferred list of presence state events. + """ if not self.hs.is_mine(observer_user): raise SynapseError(400, "User is not hosted on this Home Server") @@ -456,6 +510,23 @@ class PresenceHandler(BaseHandler): @defer.inlineCallbacks @log_function def start_polling_presence(self, user, target_user=None, state=None): + """Subscribe a local user to presence updates from a local or remote + user. If no target_user is supplied then subscribe to all users stored + in the presence list for the local user. + + Additonally this pushes the current presence state of this user to all + target_users. That state can be provided directly or will be read from + the stored state for the local user. + + Also this attempts to notify the local user of the current state of + any local target users. + + Args: + user(UserID): The local user that whishes for presence updates. + target_user(UserID): The local or remote user whose updates are + wanted. + state(dict): Optional presence state for the local user. + """ logger.debug("Start polling for presence from %s", user) if target_user: @@ -513,6 +584,11 @@ class PresenceHandler(BaseHandler): yield defer.DeferredList(deferreds, consumeErrors=True) def _start_polling_local(self, user, target_user): + """Subscribe a local user to presence updates for a local user + Args: + user(UserId): The local user that wishes for updates. + target_user(UserId): The local users whose updates are wanted. + """ target_localpart = target_user.localpart if target_localpart not in self._local_pushmap: @@ -521,6 +597,16 @@ class PresenceHandler(BaseHandler): self._local_pushmap[target_localpart].add(user) def _start_polling_remote(self, user, domain, remoteusers): + """Subscribe a local user to presence updates for remote users on a + given domain. + Args: + user(UserID): The local user that wishes for updates. + domain(str): The remote server the local user wants updates from. + remoteusers(UserID): The remote users that local user wants to be + told about. + Returns: + A Deferred. + """ to_poll = set() for u in remoteusers: @@ -541,6 +627,16 @@ class PresenceHandler(BaseHandler): @log_function def stop_polling_presence(self, user, target_user=None): + """Unsubscribe a local user from presence updates from a local or + remote user. If no target user is supplied then unsubscribe the user + from all presence updates that the user had subscribed to. + Args: + user(UserID): The local user that no longer wishes for updates. + target_user(UserID or None): The user whose updates are no longer + wanted. + Returns: + A Deferred. + """ logger.debug("Stop polling for presence from %s", user) if not target_user or self.hs.is_mine(target_user): @@ -569,6 +665,12 @@ class PresenceHandler(BaseHandler): return defer.DeferredList(deferreds, consumeErrors=True) def _stop_polling_local(self, user, target_user): + """Unsubscribe a local user from presence updates from a local user on + this server. + Args: + user(UserID): The local user that no longer wishes for updates. + target_user(UserID): The user whose updates are no longer wanted. + """ for localpart in self._local_pushmap.keys(): if target_user and localpart != target_user.localpart: continue @@ -581,6 +683,16 @@ class PresenceHandler(BaseHandler): @log_function def _stop_polling_remote(self, user, domain, remoteusers): + """Unsubscribe a local user from presence updates from remote users on + a given domain. + Args: + user(UserID): The local user that no longer wishes for updates. + domain(str): The remote server to unsubscribe from. + remoteusers([UserID]): The users on that remote server that the + local user no longer wishes to be updated about. + Returns: + A Deferred. + """ to_unpoll = set() for u in remoteusers: @@ -602,6 +714,18 @@ class PresenceHandler(BaseHandler): @defer.inlineCallbacks @log_function def push_presence(self, user, statuscache): + """ + Notify local and remote users of a change in presence of a local user. + Pushes the update to local clients and remote domains that are directly + subscribed to the presence of the local user. + Also pushes that update to any local user or remote domain that shares + a room with the local user. + Args: + user(UserID): The local user whose presence was updated. + statuscache(UserPresenceCache): Cache of the user's presence state + Returns: + A Deferred. + """ assert(self.hs.is_mine(user)) logger.debug("Pushing presence update from %s", user) @@ -628,45 +752,23 @@ class PresenceHandler(BaseHandler): ) yield self.distributor.fire("user_presence_changed", user, statuscache) - @defer.inlineCallbacks - def _push_presence_remote(self, user, destination, state=None): - if state is None: - state = yield self.store.get_presence_state(user.localpart) - del state["mtime"] - state["presence"] = state.pop("state") - - if user in self._user_cachemap: - state["last_active"] = ( - self._user_cachemap[user].get_state()["last_active"] - ) - - yield self.distributor.fire( - "collect_presencelike_data", user, state - ) - - if "last_active" in state: - state = dict(state) - state["last_active_ago"] = int( - self.clock.time_msec() - state.pop("last_active") - ) - - user_state = { - "user_id": user.to_string(), - } - user_state.update(**state) - - yield self.federation.send_edu( - destination=destination, - edu_type="m.presence", - content={ - "push": [ - user_state, - ], - } - ) - @defer.inlineCallbacks def incoming_presence(self, origin, content): + """Handle an incoming m.presence EDU. + For each presence update in the "push" list update our local cache and + notify the appropriate local clients. Only clients that share a room + or are directly subscribed to the presence for a user should be + notified of the update. + For each subscription request in the "poll" list start pushing presence + updates to the remote server. + For unsubscribe request in the "unpoll" list stop pushing presence + updates to the remote server. + Args: + orgin(str): The source of this m.presence EDU. + content(dict): The content of this m.presence EDU. + Returns: + A Deferred. + """ deferreds = [] for push in content.get("push", []): @@ -765,6 +867,22 @@ class PresenceHandler(BaseHandler): def push_update_to_local_and_remote(self, observed_user, statuscache, users_to_push=[], room_ids=[], remote_domains=[]): + """Notify local clients and remote servers of a change in the presence + of a user. + Args: + observed_user(UserID): The user to push the presence state for. + statuscache(UserPresenceCache): The cache for the presence state to + push. + users_to_push([UserID]): A list of local and remote users to + notify. + room_ids([str]): Notify the local and remote occupants of these + rooms. + remote_domains([str]): A list of remote servers to notify in + addition to those implied by the users_to_push and the + room_ids. + Returns: + A Deferred. + """ localusers, remoteusers = partitionbool( users_to_push, @@ -802,8 +920,8 @@ class PresenceHandler(BaseHandler): def push_update_to_clients(self, users_to_push=[], room_ids=[]): """Notify clients of a new presence event. Args: - users_to_push(list): List of users to notify. - room_ids(list): List of room_ids to notify. + users_to_push([UserID]): List of users to notify. + room_ids([str]): List of room_ids to notify. """ with PreserveLoggingContext(): self.notifier.on_new_user_event( @@ -811,6 +929,51 @@ class PresenceHandler(BaseHandler): room_ids, ) + @defer.inlineCallbacks + def _push_presence_remote(self, user, destination, state=None): + """Push a user's presence to a remote server. If a presence state event + that event is sent. Otherwise a new state event is constructed from the + stored presence state. + The last_active is replaced with last_active_ago in case the wallclock + time on the remote server is different to the time on this server. + Sends an EDU to the remote server with the current presence state. + Args: + user(UserID): The user to push the presence state for. + destination(str): The remote server to send state to. + state(dict): The state to push, or None to use the current stored + state. + Returns: + A Deferred. + """ + if state is None: + state = yield self.store.get_presence_state(user.localpart) + del state["mtime"] + state["presence"] = state.pop("state") + + if user in self._user_cachemap: + state["last_active"] = ( + self._user_cachemap[user].get_state()["last_active"] + ) + + yield self.distributor.fire( + "collect_presencelike_data", user, state + ) + + if "last_active" in state: + state = dict(state) + state["last_active_ago"] = int( + self.clock.time_msec() - state.pop("last_active") + ) + + user_state = {"user_id": user.to_string(), } + user_state.update(state) + + yield self.federation.send_edu( + destination=destination, + edu_type="m.presence", + content={"push": [user_state, ], } + ) + class PresenceEventSource(object): def __init__(self, hs): From 0a4330cd5d5e2230fb9e1ff4e24952829d03ef76 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Thu, 14 May 2015 17:48:12 +0100 Subject: [PATCH 11/13] Add some missed argument types, cleanup the whitespace a bit --- synapse/handlers/presence.py | 23 +++++++++++++++++------ 1 file changed, 17 insertions(+), 6 deletions(-) diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index 23302242bd..9638faf4b9 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -394,7 +394,6 @@ class PresenceHandler(BaseHandler): Args: observed_user(UserID): The local user whose presence is requested. observer_user(UserID): The remote or local user requesting presence. - """ accept = yield self._should_accept_invite(observed_user, observer_user) @@ -444,8 +443,9 @@ class PresenceHandler(BaseHandler): local user's presence list. Args: - observed_user: The local or remote user to remove from the list. - observer_user: The local owner of the presence list. + observed_user(UserID): The local or remote user to remove from the + list. + observer_user(UserID): The local owner of the presence list. Returns: A Deferred. """ @@ -461,8 +461,9 @@ class PresenceHandler(BaseHandler): unsubscribe the local user from updates that user. Args: - observed_user: The local or remote user to remove from the list. - observer_user: The local owner of the presence list. + observed_user(UserId): The local or remote user to remove from the + list. + observer_user(UserId): The local owner of the presence list. Returns: A Deferred. """ @@ -585,6 +586,7 @@ class PresenceHandler(BaseHandler): def _start_polling_local(self, user, target_user): """Subscribe a local user to presence updates for a local user + Args: user(UserId): The local user that wishes for updates. target_user(UserId): The local users whose updates are wanted. @@ -598,7 +600,8 @@ class PresenceHandler(BaseHandler): def _start_polling_remote(self, user, domain, remoteusers): """Subscribe a local user to presence updates for remote users on a - given domain. + given remote domain. + Args: user(UserID): The local user that wishes for updates. domain(str): The remote server the local user wants updates from. @@ -630,6 +633,7 @@ class PresenceHandler(BaseHandler): """Unsubscribe a local user from presence updates from a local or remote user. If no target user is supplied then unsubscribe the user from all presence updates that the user had subscribed to. + Args: user(UserID): The local user that no longer wishes for updates. target_user(UserID or None): The user whose updates are no longer @@ -667,6 +671,7 @@ class PresenceHandler(BaseHandler): def _stop_polling_local(self, user, target_user): """Unsubscribe a local user from presence updates from a local user on this server. + Args: user(UserID): The local user that no longer wishes for updates. target_user(UserID): The user whose updates are no longer wanted. @@ -685,6 +690,7 @@ class PresenceHandler(BaseHandler): def _stop_polling_remote(self, user, domain, remoteusers): """Unsubscribe a local user from presence updates from remote users on a given domain. + Args: user(UserID): The local user that no longer wishes for updates. domain(str): The remote server to unsubscribe from. @@ -720,6 +726,7 @@ class PresenceHandler(BaseHandler): subscribed to the presence of the local user. Also pushes that update to any local user or remote domain that shares a room with the local user. + Args: user(UserID): The local user whose presence was updated. statuscache(UserPresenceCache): Cache of the user's presence state @@ -763,6 +770,7 @@ class PresenceHandler(BaseHandler): updates to the remote server. For unsubscribe request in the "unpoll" list stop pushing presence updates to the remote server. + Args: orgin(str): The source of this m.presence EDU. content(dict): The content of this m.presence EDU. @@ -869,6 +877,7 @@ class PresenceHandler(BaseHandler): remote_domains=[]): """Notify local clients and remote servers of a change in the presence of a user. + Args: observed_user(UserID): The user to push the presence state for. statuscache(UserPresenceCache): The cache for the presence state to @@ -919,6 +928,7 @@ class PresenceHandler(BaseHandler): def push_update_to_clients(self, users_to_push=[], room_ids=[]): """Notify clients of a new presence event. + Args: users_to_push([UserID]): List of users to notify. room_ids([str]): List of room_ids to notify. @@ -937,6 +947,7 @@ class PresenceHandler(BaseHandler): The last_active is replaced with last_active_ago in case the wallclock time on the remote server is different to the time on this server. Sends an EDU to the remote server with the current presence state. + Args: user(UserID): The user to push the presence state for. destination(str): The remote server to send state to. From 415b158ce229d4f740bf577aca5cc3d5f73e1bf6 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Fri, 15 May 2015 11:09:47 +0100 Subject: [PATCH 12/13] More whitespace --- synapse/handlers/presence.py | 1 + 1 file changed, 1 insertion(+) diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index 9638faf4b9..a01020e202 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -320,6 +320,7 @@ class PresenceHandler(BaseHandler): """Called via the distributor whenever a user joins a room. Notifies the new member of the presence of the current members. Notifies the current members of the room of the new member's presence. + Args: user(UserID): The user who joined the room. room_id(str): The room id the user joined. From 10f1bdb9a27abe34c8b022c32209a4abfc8cb443 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 18 May 2015 10:21:40 +0100 Subject: [PATCH 13/13] Move get_events functions to storage.events --- synapse/storage/_base.py | 155 -------------------------------------- synapse/storage/events.py | 132 ++++++++++++++++++++++++++++++++ 2 files changed, 132 insertions(+), 155 deletions(-) diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index 81052409b7..ec80169c5b 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -15,8 +15,6 @@ import logging from synapse.api.errors import StoreError -from synapse.events import FrozenEvent -from synapse.events.utils import prune_event from synapse.util.logutils import log_function from synapse.util.logcontext import preserve_context_over_fn, LoggingContext from synapse.util.lrucache import LruCache @@ -28,7 +26,6 @@ from twisted.internet import defer from collections import namedtuple, OrderedDict import functools -import simplejson as json import sys import time import threading @@ -867,158 +864,6 @@ class SQLBaseStore(object): return self.runInteraction("_simple_max_id", func) - def _get_events(self, event_ids, check_redacted=True, - get_prev_content=False): - return self.runInteraction( - "_get_events", self._get_events_txn, event_ids, - check_redacted=check_redacted, get_prev_content=get_prev_content, - ) - - def _get_events_txn(self, txn, event_ids, check_redacted=True, - get_prev_content=False): - if not event_ids: - return [] - - events = [ - self._get_event_txn( - txn, event_id, - check_redacted=check_redacted, - get_prev_content=get_prev_content - ) - for event_id in event_ids - ] - - return [e for e in events if e] - - def _invalidate_get_event_cache(self, event_id): - for check_redacted in (False, True): - for get_prev_content in (False, True): - self._get_event_cache.invalidate(event_id, check_redacted, - get_prev_content) - - def _get_event_txn(self, txn, event_id, check_redacted=True, - get_prev_content=False, allow_rejected=False): - - start_time = time.time() * 1000 - - def update_counter(desc, last_time): - curr_time = self._get_event_counters.update(desc, last_time) - sql_getevents_timer.inc_by(curr_time - last_time, desc) - return curr_time - - try: - ret = self._get_event_cache.get(event_id, check_redacted, get_prev_content) - - if allow_rejected or not ret.rejected_reason: - return ret - else: - return None - except KeyError: - pass - finally: - start_time = update_counter("event_cache", start_time) - - sql = ( - "SELECT e.internal_metadata, e.json, r.event_id, rej.reason " - "FROM event_json as e " - "LEFT JOIN redactions as r ON e.event_id = r.redacts " - "LEFT JOIN rejections as rej on rej.event_id = e.event_id " - "WHERE e.event_id = ? " - "LIMIT 1 " - ) - - txn.execute(sql, (event_id,)) - - res = txn.fetchone() - - if not res: - return None - - internal_metadata, js, redacted, rejected_reason = res - - start_time = update_counter("select_event", start_time) - - result = self._get_event_from_row_txn( - txn, internal_metadata, js, redacted, - check_redacted=check_redacted, - get_prev_content=get_prev_content, - rejected_reason=rejected_reason, - ) - self._get_event_cache.prefill(event_id, check_redacted, get_prev_content, result) - - if allow_rejected or not rejected_reason: - return result - else: - return None - - def _get_event_from_row_txn(self, txn, internal_metadata, js, redacted, - check_redacted=True, get_prev_content=False, - rejected_reason=None): - - start_time = time.time() * 1000 - - def update_counter(desc, last_time): - curr_time = self._get_event_counters.update(desc, last_time) - sql_getevents_timer.inc_by(curr_time - last_time, desc) - return curr_time - - d = json.loads(js) - start_time = update_counter("decode_json", start_time) - - internal_metadata = json.loads(internal_metadata) - start_time = update_counter("decode_internal", start_time) - - ev = FrozenEvent( - d, - internal_metadata_dict=internal_metadata, - rejected_reason=rejected_reason, - ) - start_time = update_counter("build_frozen_event", start_time) - - if check_redacted and redacted: - ev = prune_event(ev) - - ev.unsigned["redacted_by"] = redacted - # Get the redaction event. - - because = self._get_event_txn( - txn, - redacted, - check_redacted=False - ) - - if because: - ev.unsigned["redacted_because"] = because - start_time = update_counter("redact_event", start_time) - - if get_prev_content and "replaces_state" in ev.unsigned: - prev = self._get_event_txn( - txn, - ev.unsigned["replaces_state"], - get_prev_content=False, - ) - if prev: - ev.unsigned["prev_content"] = prev.get_dict()["content"] - start_time = update_counter("get_prev_content", start_time) - - return ev - - def _parse_events(self, rows): - return self.runInteraction( - "_parse_events", self._parse_events_txn, rows - ) - - def _parse_events_txn(self, txn, rows): - event_ids = [r["event_id"] for r in rows] - - return self._get_events_txn(txn, event_ids) - - def _has_been_redacted_txn(self, txn, event): - sql = "SELECT event_id FROM redactions WHERE redacts = ?" - txn.execute(sql, (event.event_id,)) - result = txn.fetchone() - return result[0] if result else None - def get_next_stream_id(self): with self._next_stream_id_lock: i = self._next_stream_id diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 9242b0a84e..afdf0f7193 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -17,6 +17,9 @@ from _base import SQLBaseStore, _RollbackButIsFineException from twisted.internet import defer +from synapse.events import FrozenEvent +from synapse.events.utils import prune_event + from synapse.util.logutils import log_function from synapse.api.constants import EventTypes from synapse.crypto.event_signing import compute_event_reference_hash @@ -26,6 +29,7 @@ from syutil.jsonutil import encode_canonical_json from contextlib import contextmanager import logging +import simplejson as json logger = logging.getLogger(__name__) @@ -393,3 +397,131 @@ class EventsStore(SQLBaseStore): return self.runInteraction( "have_events", f, ) + + def _get_events(self, event_ids, check_redacted=True, + get_prev_content=False): + return self.runInteraction( + "_get_events", self._get_events_txn, event_ids, + check_redacted=check_redacted, get_prev_content=get_prev_content, + ) + + def _get_events_txn(self, txn, event_ids, check_redacted=True, + get_prev_content=False): + if not event_ids: + return [] + + events = [ + self._get_event_txn( + txn, event_id, + check_redacted=check_redacted, + get_prev_content=get_prev_content + ) + for event_id in event_ids + ] + + return [e for e in events if e] + + def _invalidate_get_event_cache(self, event_id): + for check_redacted in (False, True): + for get_prev_content in (False, True): + self._get_event_cache.invalidate(event_id, check_redacted, + get_prev_content) + + def _get_event_txn(self, txn, event_id, check_redacted=True, + get_prev_content=False, allow_rejected=False): + + try: + ret = self._get_event_cache.get(event_id, check_redacted, get_prev_content) + + if allow_rejected or not ret.rejected_reason: + return ret + else: + return None + except KeyError: + pass + + sql = ( + "SELECT e.internal_metadata, e.json, r.event_id, rej.reason " + "FROM event_json as e " + "LEFT JOIN redactions as r ON e.event_id = r.redacts " + "LEFT JOIN rejections as rej on rej.event_id = e.event_id " + "WHERE e.event_id = ? " + "LIMIT 1 " + ) + + txn.execute(sql, (event_id,)) + + res = txn.fetchone() + + if not res: + return None + + internal_metadata, js, redacted, rejected_reason = res + + result = self._get_event_from_row_txn( + txn, internal_metadata, js, redacted, + check_redacted=check_redacted, + get_prev_content=get_prev_content, + rejected_reason=rejected_reason, + ) + self._get_event_cache.prefill(event_id, check_redacted, get_prev_content, result) + + if allow_rejected or not rejected_reason: + return result + else: + return None + + def _get_event_from_row_txn(self, txn, internal_metadata, js, redacted, + check_redacted=True, get_prev_content=False, + rejected_reason=None): + + d = json.loads(js) + internal_metadata = json.loads(internal_metadata) + + ev = FrozenEvent( + d, + internal_metadata_dict=internal_metadata, + rejected_reason=rejected_reason, + ) + + if check_redacted and redacted: + ev = prune_event(ev) + + ev.unsigned["redacted_by"] = redacted + # Get the redaction event. + + because = self._get_event_txn( + txn, + redacted, + check_redacted=False + ) + + if because: + ev.unsigned["redacted_because"] = because + + if get_prev_content and "replaces_state" in ev.unsigned: + prev = self._get_event_txn( + txn, + ev.unsigned["replaces_state"], + get_prev_content=False, + ) + if prev: + ev.unsigned["prev_content"] = prev.get_dict()["content"] + + return ev + + def _parse_events(self, rows): + return self.runInteraction( + "_parse_events", self._parse_events_txn, rows + ) + + def _parse_events_txn(self, txn, rows): + event_ids = [r["event_id"] for r in rows] + + return self._get_events_txn(txn, event_ids) + + def _has_been_redacted_txn(self, txn, event): + sql = "SELECT event_id FROM redactions WHERE redacts = ?" + txn.execute(sql, (event.event_id,)) + result = txn.fetchone() + return result[0] if result else None