Merge branch 'develop' of github.com:matrix-org/synapse into erikj/search

This commit is contained in:
Erik Johnston 2015-10-22 13:16:35 +01:00
commit 8a98f0dc5b
8 changed files with 329 additions and 27 deletions

View File

@ -38,8 +38,12 @@ for port in 8080 8081 8082; do
perl -p -i -e 's/^enable_registration:.*/enable_registration: true/g' $DIR/etc/$port.config
echo "full_twisted_stacktraces: true" >> $DIR/etc/$port.config
echo "report_stats: false" >> $DIR/etc/$port.config
if ! grep -F "full_twisted_stacktraces" -q $DIR/etc/$port.config; then
echo "full_twisted_stacktraces: true" >> $DIR/etc/$port.config
fi
if ! grep -F "report_stats" -q $DIR/etc/$port.config ; then
echo "report_stats: false" >> $DIR/etc/$port.config
fi
python -m synapse.app.homeserver \
--config-path "$DIR/etc/$port.config" \

View File

@ -224,8 +224,8 @@ class _Recoverer(object):
self.clock.call_later((2 ** self.backoff_counter), self.retry)
def _backoff(self):
# cap the backoff to be around 18h => (2^16) = 65536 secs
if self.backoff_counter < 16:
# cap the backoff to be around 8.5min => (2^9) = 512 secs
if self.backoff_counter < 9:
self.backoff_counter += 1
self.recover()

View File

@ -61,18 +61,37 @@ class JoinedSyncResult(collections.namedtuple("JoinedSyncResult", [
return bool(self.timeline or self.state or self.ephemeral)
class ArchivedSyncResult(collections.namedtuple("JoinedSyncResult", [
"room_id",
"timeline",
"state",
])):
__slots__ = []
def __nonzero__(self):
"""Make the result appear empty if there are no updates. This is used
to tell if room needs to be part of the sync result.
"""
return bool(self.timeline or self.state)
class InvitedSyncResult(collections.namedtuple("InvitedSyncResult", [
"room_id",
"invite",
])):
__slots__ = []
def __nonzero__(self):
"""Invited rooms should always be reported to the client"""
return True
class SyncResult(collections.namedtuple("SyncResult", [
"next_batch", # Token for the next sync
"presence", # List of presence events for the user.
"joined", # JoinedSyncResult for each joined room.
"invited", # InvitedSyncResult for each invited room.
"archived", # ArchivedSyncResult for each archived room.
])):
__slots__ = []
@ -145,6 +164,10 @@ class SyncHandler(BaseHandler):
"""
now_token = yield self.event_sources.get_current_token()
now_token, typing_by_room = yield self.typing_by_room(
sync_config, now_token
)
presence_stream = self.event_sources.sources["presence"]
# TODO (mjark): This looks wrong, shouldn't we be getting the presence
# UP to the present rather than after the present?
@ -156,15 +179,21 @@ class SyncHandler(BaseHandler):
)
room_list = yield self.store.get_rooms_for_user_where_membership_is(
user_id=sync_config.user.to_string(),
membership_list=[Membership.INVITE, Membership.JOIN]
membership_list=(
Membership.INVITE,
Membership.JOIN,
Membership.LEAVE,
Membership.BAN
)
)
joined = []
invited = []
archived = []
for event in room_list:
if event.membership == Membership.JOIN:
room_sync = yield self.initial_sync_for_joined_room(
event.room_id, sync_config, now_token,
event.room_id, sync_config, now_token, typing_by_room
)
joined.append(room_sync)
elif event.membership == Membership.INVITE:
@ -173,16 +202,29 @@ class SyncHandler(BaseHandler):
room_id=event.room_id,
invite=invite,
))
elif event.membership in (Membership.LEAVE, Membership.BAN):
leave_token = now_token.copy_and_replace(
"room_key", "s%d" % (event.stream_ordering,)
)
room_sync = yield self.initial_sync_for_archived_room(
sync_config=sync_config,
room_id=event.room_id,
leave_event_id=event.event_id,
leave_token=leave_token,
)
archived.append(room_sync)
defer.returnValue(SyncResult(
presence=presence,
joined=joined,
invited=invited,
archived=archived,
next_batch=now_token,
))
@defer.inlineCallbacks
def initial_sync_for_joined_room(self, room_id, sync_config, now_token):
def initial_sync_for_joined_room(self, room_id, sync_config, now_token,
typing_by_room):
"""Sync a room for a client which is starting without any state
Returns:
A Deferred JoinedSyncResult.
@ -201,7 +243,60 @@ class SyncHandler(BaseHandler):
room_id=room_id,
timeline=batch,
state=current_state_events,
ephemeral=[],
ephemeral=typing_by_room.get(room_id, []),
))
@defer.inlineCallbacks
def typing_by_room(self, sync_config, now_token, since_token=None):
"""Get the typing events for each room the user is in
Args:
sync_config (SyncConfig): The flags, filters and user for the sync.
now_token (StreamToken): Where the server is currently up to.
since_token (StreamToken): Where the server was when the client
last synced.
Returns:
A tuple of the now StreamToken, updated to reflect the which typing
events are included, and a dict mapping from room_id to a list of
typing events for that room.
"""
typing_key = since_token.typing_key if since_token else "0"
typing_source = self.event_sources.sources["typing"]
typing, typing_key = yield typing_source.get_new_events_for_user(
user=sync_config.user,
from_key=typing_key,
limit=sync_config.filter.ephemeral_limit(),
)
now_token = now_token.copy_and_replace("typing_key", typing_key)
typing_by_room = {event["room_id"]: [event] for event in typing}
for event in typing:
event.pop("room_id")
logger.debug("Typing %r", typing_by_room)
defer.returnValue((now_token, typing_by_room))
@defer.inlineCallbacks
def initial_sync_for_archived_room(self, room_id, sync_config,
leave_event_id, leave_token):
"""Sync a room for a client which is starting without any state
Returns:
A Deferred JoinedSyncResult.
"""
batch = yield self.load_filtered_recents(
room_id, sync_config, leave_token,
)
leave_state = yield self.store.get_state_for_events(
[leave_event_id], None
)
defer.returnValue(ArchivedSyncResult(
room_id=room_id,
timeline=batch,
state=leave_state[leave_event_id].values(),
))
@defer.inlineCallbacks
@ -221,18 +316,9 @@ class SyncHandler(BaseHandler):
)
now_token = now_token.copy_and_replace("presence_key", presence_key)
typing_source = self.event_sources.sources["typing"]
typing, typing_key = yield typing_source.get_new_events_for_user(
user=sync_config.user,
from_key=since_token.typing_key,
limit=sync_config.filter.ephemeral_limit(),
now_token, typing_by_room = yield self.typing_by_room(
sync_config, now_token, since_token
)
now_token = now_token.copy_and_replace("typing_key", typing_key)
typing_by_room = {event["room_id"]: [event] for event in typing}
for event in typing:
event.pop("room_id")
logger.debug("Typing %r", typing_by_room)
rm_handler = self.hs.get_handlers().room_member_handler
app_service = yield self.store.get_app_service_by_user_id(
@ -257,18 +343,22 @@ class SyncHandler(BaseHandler):
)
joined = []
archived = []
if len(room_events) <= timeline_limit:
# There is no gap in any of the rooms. Therefore we can just
# partition the new events by room and return them.
invite_events = []
leave_events = []
events_by_room_id = {}
for event in room_events:
events_by_room_id.setdefault(event.room_id, []).append(event)
if event.room_id not in joined_room_ids:
if (event.type == EventTypes.Member
and event.membership == Membership.INVITE
and event.state_key == sync_config.user.to_string()):
invite_events.append(event)
if event.membership == Membership.INVITE:
invite_events.append(event)
elif event.membership in (Membership.LEAVE, Membership.BAN):
leave_events.append(event)
for room_id in joined_room_ids:
recents = events_by_room_id.get(room_id, [])
@ -296,11 +386,16 @@ class SyncHandler(BaseHandler):
)
if room_sync:
joined.append(room_sync)
else:
invite_events = yield self.store.get_invites_for_user(
sync_config.user.to_string()
)
leave_events = yield self.store.get_leave_and_ban_events_for_user(
sync_config.user.to_string()
)
for room_id in joined_room_ids:
room_sync = yield self.incremental_sync_with_gap_for_room(
room_id, sync_config, since_token, now_token,
@ -309,6 +404,12 @@ class SyncHandler(BaseHandler):
if room_sync:
joined.append(room_sync)
for leave_event in leave_events:
room_sync = yield self.incremental_sync_for_archived_room(
sync_config, leave_event, since_token
)
archived.append(room_sync)
invited = [
InvitedSyncResult(room_id=event.room_id, invite=event)
for event in invite_events
@ -318,6 +419,7 @@ class SyncHandler(BaseHandler):
presence=presence,
joined=joined,
invited=invited,
archived=archived,
next_batch=now_token,
))
@ -416,6 +518,55 @@ class SyncHandler(BaseHandler):
defer.returnValue(room_sync)
@defer.inlineCallbacks
def incremental_sync_for_archived_room(self, sync_config, leave_event,
since_token):
""" Get the incremental delta needed to bring the client up to date for
the archived room.
Returns:
A Deferred ArchivedSyncResult
"""
stream_token = yield self.store.get_stream_token_for_event(
leave_event.event_id
)
leave_token = since_token.copy_and_replace("room_key", stream_token)
batch = yield self.load_filtered_recents(
leave_event.room_id, sync_config, leave_token, since_token,
)
logging.debug("Recents %r", batch)
# TODO(mjark): This seems racy since this isn't being passed a
# token to indicate what point in the stream this is
leave_state = yield self.store.get_state_for_events(
[leave_event.event_id], None
)
state_events_at_leave = leave_state[leave_event.event_id].values()
state_at_previous_sync = yield self.get_state_at_previous_sync(
leave_event.room_id, since_token=since_token
)
state_events_delta = yield self.compute_state_delta(
since_token=since_token,
previous_state=state_at_previous_sync,
current_state=state_events_at_leave,
)
room_sync = ArchivedSyncResult(
room_id=leave_event.room_id,
timeline=batch,
state=state_events_delta,
)
logging.debug("Room sync: %r", room_sync)
defer.returnValue(room_sync)
@defer.inlineCallbacks
def get_state_at_previous_sync(self, room_id, since_token):
""" Get the room state at the previous sync the client made.

View File

@ -101,6 +101,8 @@ class LoginRestServlet(ClientV1RestServlet):
user_id = yield self.hs.get_datastore().get_user_id_by_threepid(
login_submission['medium'], login_submission['address']
)
if not user_id:
raise LoginError(403, "", errcode=Codes.FORBIDDEN)
else:
user_id = login_submission['user']

View File

@ -136,6 +136,10 @@ class SyncRestServlet(RestServlet):
sync_result.invited, filter, time_now, token_id
)
archived = self.encode_archived(
sync_result.archived, filter, time_now, token_id
)
response_content = {
"presence": self.encode_presence(
sync_result.presence, filter, time_now
@ -143,7 +147,7 @@ class SyncRestServlet(RestServlet):
"rooms": {
"joined": joined,
"invited": invited,
"archived": {},
"archived": archived,
},
"next_batch": sync_result.next_batch.to_string(),
}
@ -182,14 +186,20 @@ class SyncRestServlet(RestServlet):
return invited
def encode_archived(self, rooms, filter, time_now, token_id):
joined = {}
for room in rooms:
joined[room.room_id] = self.encode_room(
room, filter, time_now, token_id, joined=False
)
return joined
@staticmethod
def encode_room(room, filter, time_now, token_id):
def encode_room(room, filter, time_now, token_id, joined=True):
event_map = {}
state_events = filter.filter_room_state(room.state)
timeline_events = filter.filter_room_timeline(room.timeline.events)
ephemeral_events = filter.filter_room_ephemeral(room.ephemeral)
state_event_ids = []
timeline_event_ids = []
for event in state_events:
# TODO(mjark): Respect formatting requirements in the filter.
event_map[event.event_id] = serialize_event(
@ -198,6 +208,8 @@ class SyncRestServlet(RestServlet):
)
state_event_ids.append(event.event_id)
timeline_events = filter.filter_room_timeline(room.timeline.events)
timeline_event_ids = []
for event in timeline_events:
# TODO(mjark): Respect formatting requirements in the filter.
event_map[event.event_id] = serialize_event(
@ -205,6 +217,7 @@ class SyncRestServlet(RestServlet):
event_format=format_event_for_client_v2_without_event_id,
)
timeline_event_ids.append(event.event_id)
result = {
"event_map": event_map,
"timeline": {
@ -213,8 +226,12 @@ class SyncRestServlet(RestServlet):
"limited": room.timeline.limited,
},
"state": {"events": state_event_ids},
"ephemeral": {"events": ephemeral_events},
}
if joined:
ephemeral_events = filter.filter_room_ephemeral(room.ephemeral)
result["ephemeral"] = {"events": ephemeral_events}
return result

View File

@ -124,6 +124,19 @@ class RoomMemberStore(SQLBaseStore):
invites.event_id for invite in invites
]))
def get_leave_and_ban_events_for_user(self, user_id):
""" Get all the leave events for a user
Args:
user_id (str): The user ID.
Returns:
A deferred list of event objects.
"""
return self.get_rooms_for_user_where_membership_is(
user_id, (Membership.LEAVE, Membership.BAN)
).addCallback(lambda leaves: self._get_events([
leave.event_id for leave in leaves
]))
def get_rooms_for_user_where_membership_is(self, user_id, membership_list):
""" Get all the rooms for this user where the membership for this user
matches one in the membership list.

0
tests/events/__init__.py Normal file
View File

115
tests/events/test_utils.py Normal file
View File

@ -0,0 +1,115 @@
# -*- coding: utf-8 -*-
# Copyright 2015 OpenMarket Ltd
#
# Licensed under the Apache License, Version 2.0 (the 'License');
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an 'AS IS' BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from .. import unittest
from synapse.events import FrozenEvent
from synapse.events.utils import prune_event
class PruneEventTestCase(unittest.TestCase):
""" Asserts that a new event constructed with `evdict` will look like
`matchdict` when it is redacted. """
def run_test(self, evdict, matchdict):
self.assertEquals(
prune_event(FrozenEvent(evdict)).get_dict(),
matchdict
)
def test_minimal(self):
self.run_test(
{'type': 'A'},
{
'type': 'A',
'content': {},
'signatures': {},
'unsigned': {},
}
)
def test_basic_keys(self):
self.run_test(
{
'type': 'A',
'room_id': '!1:domain',
'sender': '@2:domain',
'event_id': '$3:domain',
'origin': 'domain',
},
{
'type': 'A',
'room_id': '!1:domain',
'sender': '@2:domain',
'event_id': '$3:domain',
'origin': 'domain',
'content': {},
'signatures': {},
'unsigned': {},
}
)
def test_unsigned_age_ts(self):
self.run_test(
{
'type': 'B',
'unsigned': {'age_ts': 20},
},
{
'type': 'B',
'content': {},
'signatures': {},
'unsigned': {'age_ts': 20},
}
)
self.run_test(
{
'type': 'B',
'unsigned': {'other_key': 'here'},
},
{
'type': 'B',
'content': {},
'signatures': {},
'unsigned': {},
}
)
def test_content(self):
self.run_test(
{
'type': 'C',
'content': {'things': 'here'},
},
{
'type': 'C',
'content': {},
'signatures': {},
'unsigned': {},
}
)
self.run_test(
{
'type': 'm.room.create',
'content': {'creator': '@2:domain', 'other_field': 'here'},
},
{
'type': 'm.room.create',
'content': {'creator': '@2:domain'},
'signatures': {},
'unsigned': {},
}
)