diff --git a/synapse/events/utils.py b/synapse/events/utils.py index 48548f8c40..9989b76591 100644 --- a/synapse/events/utils.py +++ b/synapse/events/utils.py @@ -154,7 +154,8 @@ def serialize_event(e, time_now_ms, as_client_event=True, if "redacted_because" in e.unsigned: d["unsigned"]["redacted_because"] = serialize_event( - e.unsigned["redacted_because"], time_now_ms + e.unsigned["redacted_because"], time_now_ms, + event_format=event_format ) if token_id is not None: diff --git a/synapse/handlers/__init__.py b/synapse/handlers/__init__.py index 87b4d381c7..6a2339f2eb 100644 --- a/synapse/handlers/__init__.py +++ b/synapse/handlers/__init__.py @@ -17,7 +17,7 @@ from synapse.appservice.scheduler import AppServiceScheduler from synapse.appservice.api import ApplicationServiceApi from .register import RegistrationHandler from .room import ( - RoomCreationHandler, RoomMemberHandler, RoomListHandler + RoomCreationHandler, RoomMemberHandler, RoomListHandler, RoomContextHandler, ) from .message import MessageHandler from .events import EventStreamHandler, EventHandler @@ -70,3 +70,4 @@ class Handlers(object): self.auth_handler = AuthHandler(hs) self.identity_handler = IdentityHandler(hs) self.search_handler = SearchHandler(hs) + self.room_context_handler = RoomContextHandler(hs) diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index 60f9fa58b0..36878a6c20 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -33,6 +33,7 @@ from collections import OrderedDict from unpaddedbase64 import decode_base64 import logging +import math import string logger = logging.getLogger(__name__) @@ -747,6 +748,60 @@ class RoomListHandler(BaseHandler): defer.returnValue({"start": "START", "end": "END", "chunk": chunk}) +class RoomContextHandler(BaseHandler): + @defer.inlineCallbacks + def get_event_context(self, user, room_id, event_id, limit): + """Retrieves events, pagination tokens and state around a given event + in a room. + + Args: + user (UserID) + room_id (str) + event_id (str) + limit (int): The maximum number of events to return in total + (excluding state). + + Returns: + dict + """ + before_limit = math.floor(limit/2.) + after_limit = limit - before_limit + + now_token = yield self.hs.get_event_sources().get_current_token() + + results = yield self.store.get_events_around( + room_id, event_id, before_limit, after_limit + ) + + results["events_before"] = yield self._filter_events_for_client( + user.to_string(), results["events_before"] + ) + + results["events_after"] = yield self._filter_events_for_client( + user.to_string(), results["events_after"] + ) + + if results["events_after"]: + last_event_id = results["events_after"][-1].event_id + else: + last_event_id = event_id + + state = yield self.store.get_state_for_events( + [last_event_id], None + ) + results["state"] = state[last_event_id].values() + + results["start"] = now_token.copy_and_replace( + "room_key", results["start"] + ).to_string() + + results["end"] = now_token.copy_and_replace( + "room_key", results["end"] + ).to_string() + + defer.returnValue(results) + + class RoomEventSource(object): def __init__(self, hs): self.store = hs.get_datastore() diff --git a/synapse/handlers/search.py b/synapse/handlers/search.py index bbe82b1425..2718e9482e 100644 --- a/synapse/handlers/search.py +++ b/synapse/handlers/search.py @@ -51,6 +51,17 @@ class SearchHandler(BaseHandler): "content.body", "content.name", "content.topic", ]) filter_dict = content["search_categories"]["room_events"].get("filter", {}) + event_context = content["search_categories"]["room_events"].get( + "event_context", None + ) + + if event_context is not None: + before_limit = int(event_context.get( + "before_limit", 5 + )) + after_limit = int(event_context.get( + "after_limit", 5 + )) except KeyError: raise SynapseError(400, "Invalid search query") @@ -76,14 +87,57 @@ class SearchHandler(BaseHandler): user.to_string(), filtered_events ) + allowed_events.sort(key=lambda e: -rank_map[e.event_id]) + allowed_events = allowed_events[:search_filter.limit()] + + if event_context is not None: + now_token = yield self.hs.get_event_sources().get_current_token() + + contexts = {} + for event in allowed_events: + res = yield self.store.get_events_around( + event.room_id, event.event_id, before_limit, after_limit + ) + + res["events_before"] = yield self._filter_events_for_client( + user.to_string(), res["events_before"] + ) + + res["events_after"] = yield self._filter_events_for_client( + user.to_string(), res["events_after"] + ) + + res["start"] = now_token.copy_and_replace( + "room_key", res["start"] + ).to_string() + + res["end"] = now_token.copy_and_replace( + "room_key", res["end"] + ).to_string() + + contexts[event.event_id] = res + else: + contexts = {} + # TODO: Add a limit time_now = self.clock.time_msec() + for context in contexts.values(): + context["events_before"] = [ + serialize_event(e, time_now) + for e in context["events_before"] + ] + context["events_after"] = [ + serialize_event(e, time_now) + for e in context["events_after"] + ] + results = { e.event_id: { "rank": rank_map[e.event_id], - "result": serialize_event(e, time_now) + "result": serialize_event(e, time_now), + "context": contexts.get(e.event_id, {}), } for e in allowed_events } diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index b8e2c81969..4c5a2353b2 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -113,15 +113,20 @@ class SyncHandler(BaseHandler): self.clock = hs.get_clock() @defer.inlineCallbacks - def wait_for_sync_for_user(self, sync_config, since_token=None, timeout=0): + def wait_for_sync_for_user(self, sync_config, since_token=None, timeout=0, + full_state=False): """Get the sync for a client if we have new data for it now. Otherwise wait for new data to arrive on the server. If the timeout expires, then return an empty sync result. Returns: A Deferred SyncResult. """ - if timeout == 0 or since_token is None: - result = yield self.current_sync_for_user(sync_config, since_token) + + if timeout == 0 or since_token is None or full_state: + # we are going to return immediately, so don't bother calling + # notifier.wait_for_events. + result = yield self.current_sync_for_user(sync_config, since_token, + full_state=full_state) defer.returnValue(result) else: def current_sync_callback(before_token, after_token): @@ -146,19 +151,24 @@ class SyncHandler(BaseHandler): ) defer.returnValue(result) - def current_sync_for_user(self, sync_config, since_token=None): + def current_sync_for_user(self, sync_config, since_token=None, + full_state=False): """Get the sync for client needed to match what the server has now. Returns: A Deferred SyncResult. """ - if since_token is None: - return self.initial_sync(sync_config) + if since_token is None or full_state: + return self.full_state_sync(sync_config, since_token) else: return self.incremental_sync_with_gap(sync_config, since_token) @defer.inlineCallbacks - def initial_sync(self, sync_config): - """Get a sync for a client which is starting without any state + def full_state_sync(self, sync_config, timeline_since_token): + """Get a sync for a client which is starting without any state. + + If a 'message_since_token' is given, only timeline events which have + happened since that token will be returned. + Returns: A Deferred SyncResult. """ @@ -192,8 +202,12 @@ class SyncHandler(BaseHandler): archived = [] for event in room_list: if event.membership == Membership.JOIN: - room_sync = yield self.initial_sync_for_joined_room( - event.room_id, sync_config, now_token, typing_by_room + room_sync = yield self.full_state_sync_for_joined_room( + room_id=event.room_id, + sync_config=sync_config, + now_token=now_token, + timeline_since_token=timeline_since_token, + typing_by_room=typing_by_room ) joined.append(room_sync) elif event.membership == Membership.INVITE: @@ -206,11 +220,12 @@ class SyncHandler(BaseHandler): leave_token = now_token.copy_and_replace( "room_key", "s%d" % (event.stream_ordering,) ) - room_sync = yield self.initial_sync_for_archived_room( + room_sync = yield self.full_state_sync_for_archived_room( sync_config=sync_config, room_id=event.room_id, leave_event_id=event.event_id, leave_token=leave_token, + timeline_since_token=timeline_since_token, ) archived.append(room_sync) @@ -223,15 +238,16 @@ class SyncHandler(BaseHandler): )) @defer.inlineCallbacks - def initial_sync_for_joined_room(self, room_id, sync_config, now_token, - typing_by_room): + def full_state_sync_for_joined_room(self, room_id, sync_config, + now_token, timeline_since_token, + typing_by_room): """Sync a room for a client which is starting without any state Returns: A Deferred JoinedSyncResult. """ batch = yield self.load_filtered_recents( - room_id, sync_config, now_token, + room_id, sync_config, now_token, since_token=timeline_since_token ) current_state = yield self.state_handler.get_current_state( @@ -278,15 +294,16 @@ class SyncHandler(BaseHandler): defer.returnValue((now_token, typing_by_room)) @defer.inlineCallbacks - def initial_sync_for_archived_room(self, room_id, sync_config, - leave_event_id, leave_token): + def full_state_sync_for_archived_room(self, room_id, sync_config, + leave_event_id, leave_token, + timeline_since_token): """Sync a room for a client which is starting without any state Returns: A Deferred JoinedSyncResult. """ batch = yield self.load_filtered_recents( - room_id, sync_config, leave_token, + room_id, sync_config, leave_token, since_token=timeline_since_token ) leave_state = yield self.store.get_state_for_events( @@ -370,7 +387,7 @@ class SyncHandler(BaseHandler): else: prev_batch = now_token - state = yield self.check_joined_room( + state, limited = yield self.check_joined_room( sync_config, room_id, state ) @@ -379,7 +396,7 @@ class SyncHandler(BaseHandler): timeline=TimelineBatch( events=recents, prev_batch=prev_batch, - limited=False, + limited=limited, ), state=state, ephemeral=typing_by_room.get(room_id, []) @@ -503,7 +520,7 @@ class SyncHandler(BaseHandler): current_state=current_state_events, ) - state_events_delta = yield self.check_joined_room( + state_events_delta, _ = yield self.check_joined_room( sync_config, room_id, state_events_delta ) @@ -610,6 +627,7 @@ class SyncHandler(BaseHandler): @defer.inlineCallbacks def check_joined_room(self, sync_config, room_id, state_delta): joined = False + limited = False for event in state_delta: if ( event.type == EventTypes.Member @@ -621,5 +639,6 @@ class SyncHandler(BaseHandler): if joined: res = yield self.state_handler.get_current_state(room_id) state_delta = res.values() + limited = True - defer.returnValue(state_delta) + defer.returnValue((state_delta, limited)) diff --git a/synapse/rest/client/v1/room.py b/synapse/rest/client/v1/room.py index 4cee1c1599..2dcaee86cd 100644 --- a/synapse/rest/client/v1/room.py +++ b/synapse/rest/client/v1/room.py @@ -397,6 +397,41 @@ class RoomTriggerBackfill(ClientV1RestServlet): defer.returnValue((200, res)) +class RoomEventContext(ClientV1RestServlet): + PATTERN = client_path_pattern( + "/rooms/(?P[^/]*)/context/(?P[^/]*)$" + ) + + def __init__(self, hs): + super(RoomEventContext, self).__init__(hs) + self.clock = hs.get_clock() + + @defer.inlineCallbacks + def on_GET(self, request, room_id, event_id): + user, _ = yield self.auth.get_user_by_req(request) + + limit = int(request.args.get("limit", [10])[0]) + + results = yield self.handlers.room_context_handler.get_event_context( + user, room_id, event_id, limit, + ) + + time_now = self.clock.time_msec() + results["events_before"] = [ + serialize_event(event, time_now) for event in results["events_before"] + ] + results["events_after"] = [ + serialize_event(event, time_now) for event in results["events_after"] + ] + results["state"] = [ + serialize_event(event, time_now) for event in results["state"] + ] + + logger.info("Responding with %r", results) + + defer.returnValue((200, results)) + + # TODO: Needs unit testing class RoomMembershipRestServlet(ClientV1RestServlet): @@ -628,3 +663,4 @@ def register_servlets(hs, http_server): RoomRedactEventRestServlet(hs).register(http_server) RoomTypingRestServlet(hs).register(http_server) SearchRestServlet(hs).register(http_server) + RoomEventContext(hs).register(http_server) diff --git a/synapse/rest/client/v2_alpha/sync.py b/synapse/rest/client/v2_alpha/sync.py index 6c4f2b7cd4..1840eef775 100644 --- a/synapse/rest/client/v2_alpha/sync.py +++ b/synapse/rest/client/v2_alpha/sync.py @@ -16,7 +16,7 @@ from twisted.internet import defer from synapse.http.servlet import ( - RestServlet, parse_string, parse_integer + RestServlet, parse_string, parse_integer, parse_boolean ) from synapse.handlers.sync import SyncConfig from synapse.types import StreamToken @@ -90,6 +90,7 @@ class SyncRestServlet(RestServlet): allowed_values=self.ALLOWED_PRESENCE ) filter_id = parse_string(request, "filter", default=None) + full_state = parse_boolean(request, "full_state", default=False) logger.info( "/sync: user=%r, timeout=%r, since=%r," @@ -120,7 +121,8 @@ class SyncRestServlet(RestServlet): try: sync_result = yield self.sync_handler.wait_for_sync_for_user( - sync_config, since_token=since_token, timeout=timeout + sync_config, since_token=since_token, timeout=timeout, + full_state=full_state ) finally: if set_presence == "online": diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index 3cab06fdef..15d4c2bf68 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -23,7 +23,7 @@ paginate bacwards. This is implemented by keeping two ordering columns: stream_ordering and topological_ordering. Stream ordering is basically insertion/received order -(except for events from backfill requests). The topolgical_ordering is a +(except for events from backfill requests). The topological_ordering is a weak ordering of events based on the pdu graph. This means that we have to have two different types of tokens, depending on @@ -436,3 +436,138 @@ class StreamStore(SQLBaseStore): internal = event.internal_metadata internal.before = str(RoomStreamToken(topo, stream - 1)) internal.after = str(RoomStreamToken(topo, stream)) + + @defer.inlineCallbacks + def get_events_around(self, room_id, event_id, before_limit, after_limit): + """Retrieve events and pagination tokens around a given event in a + room. + + Args: + room_id (str) + event_id (str) + before_limit (int) + after_limit (int) + + Returns: + dict + """ + + results = yield self.runInteraction( + "get_events_around", self._get_events_around_txn, + room_id, event_id, before_limit, after_limit + ) + + events_before = yield self._get_events( + [e for e in results["before"]["event_ids"]], + get_prev_content=True + ) + + events_after = yield self._get_events( + [e for e in results["after"]["event_ids"]], + get_prev_content=True + ) + + defer.returnValue({ + "events_before": events_before, + "events_after": events_after, + "start": results["before"]["token"], + "end": results["after"]["token"], + }) + + def _get_events_around_txn(self, txn, room_id, event_id, before_limit, after_limit): + """Retrieves event_ids and pagination tokens around a given event in a + room. + + Args: + room_id (str) + event_id (str) + before_limit (int) + after_limit (int) + + Returns: + dict + """ + + results = self._simple_select_one_txn( + txn, + "events", + keyvalues={ + "event_id": event_id, + "room_id": room_id, + }, + retcols=["stream_ordering", "topological_ordering"], + ) + + stream_ordering = results["stream_ordering"] + topological_ordering = results["topological_ordering"] + + query_before = ( + "SELECT topological_ordering, stream_ordering, event_id FROM events" + " WHERE room_id = ? AND (topological_ordering < ?" + " OR (topological_ordering = ? AND stream_ordering < ?))" + " ORDER BY topological_ordering DESC, stream_ordering DESC" + " LIMIT ?" + ) + + query_after = ( + "SELECT topological_ordering, stream_ordering, event_id FROM events" + " WHERE room_id = ? AND (topological_ordering > ?" + " OR (topological_ordering = ? AND stream_ordering > ?))" + " ORDER BY topological_ordering ASC, stream_ordering ASC" + " LIMIT ?" + ) + + txn.execute( + query_before, + ( + room_id, topological_ordering, topological_ordering, + stream_ordering, before_limit, + ) + ) + + rows = self.cursor_to_dict(txn) + events_before = [r["event_id"] for r in rows] + + if rows: + start_token = str(RoomStreamToken( + rows[0]["topological_ordering"], + rows[0]["stream_ordering"] - 1, + )) + else: + start_token = str(RoomStreamToken( + topological_ordering, + stream_ordering - 1, + )) + + txn.execute( + query_after, + ( + room_id, topological_ordering, topological_ordering, + stream_ordering, after_limit, + ) + ) + + rows = self.cursor_to_dict(txn) + events_after = [r["event_id"] for r in rows] + + if rows: + end_token = str(RoomStreamToken( + rows[-1]["topological_ordering"], + rows[-1]["stream_ordering"], + )) + else: + end_token = str(RoomStreamToken( + topological_ordering, + stream_ordering, + )) + + return { + "before": { + "event_ids": events_before, + "token": start_token, + }, + "after": { + "event_ids": events_after, + "token": end_token, + }, + } diff --git a/synapse/types.py b/synapse/types.py index 84631d177d..28344d8b36 100644 --- a/synapse/types.py +++ b/synapse/types.py @@ -47,7 +47,7 @@ class DomainSpecificString( @classmethod def from_string(cls, s): """Parse the string given by 's' into a structure object.""" - if s[0] != cls.SIGIL: + if len(s) < 1 or s[0] != cls.SIGIL: raise SynapseError(400, "Expected %s string to start with '%s'" % ( cls.__name__, cls.SIGIL, )) diff --git a/tests/test_types.py b/tests/test_types.py index b29a8415b1..495cd20f02 100644 --- a/tests/test_types.py +++ b/tests/test_types.py @@ -15,13 +15,14 @@ from tests import unittest +from synapse.api.errors import SynapseError from synapse.server import BaseHomeServer from synapse.types import UserID, RoomAlias mock_homeserver = BaseHomeServer(hostname="my.domain") -class UserIDTestCase(unittest.TestCase): +class UserIDTestCase(unittest.TestCase): def test_parse(self): user = UserID.from_string("@1234abcd:my.domain") @@ -29,6 +30,11 @@ class UserIDTestCase(unittest.TestCase): self.assertEquals("my.domain", user.domain) self.assertEquals(True, mock_homeserver.is_mine(user)) + def test_pase_empty(self): + with self.assertRaises(SynapseError): + UserID.from_string("") + + def test_build(self): user = UserID("5678efgh", "my.domain") @@ -44,7 +50,6 @@ class UserIDTestCase(unittest.TestCase): class RoomAliasTestCase(unittest.TestCase): - def test_parse(self): room = RoomAlias.from_string("#channel:my.domain") diff --git a/tox.ini b/tox.ini index a69948484f..01b23e6bd9 100644 --- a/tox.ini +++ b/tox.ini @@ -19,6 +19,7 @@ commands = check-manifest [testenv:pep8] +skip_install = True basepython = python2.7 deps = flake8