Merge branch 'develop' of github.com:matrix-org/synapse into develop

This commit is contained in:
David Baker 2014-08-29 13:24:08 +01:00
commit 3e6a19cf09
11 changed files with 422 additions and 205 deletions

View File

@ -9,7 +9,9 @@ TODO(Introduction) : Matthew
Architecture Architecture
============ ============
- Sending a message from A to B
Clients transmit data to other clients through home servers (HSes). Clients do not communicate with each
other directly.
:: ::
@ -26,39 +28,42 @@ Architecture
| |<--------( HTTP )-----------| | | |<--------( HTTP )-----------| |
+------------------+ Federation +------------------+ +------------------+ Federation +------------------+
- Client is an end-user (web app, mobile app) which uses C-S APIs to talk to the home server. A "Client" is an end-user, typically a human using a web application or mobile app. Clients use the
A given client is typically responsible for a single user. "Client-to-Server" (C-S) API to communicate with their home server. A single Client is usually
- A single user is represented by a User ID, scoped to the home server which allocated the account. responsible for a single user account. A user account is represented by their "User ID". This ID is
User IDs MUST have @ prefix; looks like @foo:domain - domain indicates the user's home namespaced to the home server which allocated the account and looks like::
server.
- Home server provides C-S APIs and has the ability to federate with other HSes. @localpart:domain
Typically responsible for N clients.
- Federation's purpose is to share content between interested HSes; no SPOF. The ``localpart`` of a user ID may be a user name, or an opaque ID identifying this user.
- Events are actions within the system. Typically each action (e.g. sending a message)
correlates with exactly one event. Each event has a ``type`` string.
- ``type`` values SHOULD be namespaced according to standard Java package naming conventions, A "Home Server" is a server which provides C-S APIs and has the ability to federate with other HSes.
with a ``.`` delimiter e.g. ``com.example.myapp.event`` It is typically responsible for multiple clients. "Federation" is the term used to describe the
- Events are typically send in the context of a room. sharing of data between two or more home servers.
Data in Matrix is encapsulated in an "Event". An event is an action within the system. Typically each
action (e.g. sending a message) correlates with exactly one event. Each event has a ``type`` which is
used to differentiate different kinds of data. ``type`` values SHOULD be namespaced according to standard
Java package naming conventions, e.g. ``com.example.myapp.event``. Events are usually sent in the context
of a "Room".
Room structure Room structure
-------------- --------------
A room is a conceptual place where users can send and receive messages. Rooms A room is a conceptual place where users can send and receive events. Rooms
can be created, joined and left. Messages are sent to a room, and all can be created, joined and left. Events are sent to a room, and all
participants in that room will receive the message. Rooms are uniquely participants in that room will receive the event. Rooms are uniquely
identified via a room ID. There is exactly one room ID for each room. Each identified via a "Room ID", which look like::
room can also have an alias. Each room can have many aliases.
- Room IDs MUST have ! prefix; looks like !foo:domain - domain is simply for namespacing, !opaque_id:domain
the room does NOT reside on any one domain. NOT human readable.
- Room Aliases MUST have # prefix; looks like #foo:domain - domain indicates where this There is exactly one room ID for each room. Whilst the room ID does contain a
alias can be mapped to a room ID. Key point: human readable / friendly. domain, it is simply for namespacing room IDs. The room does NOT reside on the
domain specified. Room IDs are not meant to be human readable.
- Aliases can be queried on the domain they specify, which will return a room ID if a The following diagram shows an ``m.room.message`` event being sent in the room
mapping exists. These mappings can change. ``!qporfwt:matrix.org``::
::
{ @alice:matrix.org } { @bob:domain.com } { @alice:matrix.org } { @bob:domain.com }
| ^ | ^
@ -73,7 +78,7 @@ room can also have an alias. Each room can have many aliases.
| matrix.org |<-------Federation------->| domain.com | | matrix.org |<-------Federation------->| domain.com |
+------------------+ +------------------+ +------------------+ +------------------+
| ................................. | | ................................. |
|______| Shared State |_______| |______| Partially Shared State |_______|
| Room ID: !qporfwt:matrix.org | | Room ID: !qporfwt:matrix.org |
| Servers: matrix.org, domain.com | | Servers: matrix.org, domain.com |
| Members: | | Members: |
@ -81,10 +86,40 @@ room can also have an alias. Each room can have many aliases.
| - @bob:domain.com | | - @bob:domain.com |
|.................................| |.................................|
- Federation's goal is to maintain the shared state. Don't need FULL state in order Federation maintains shared state between multiple home servers, such that when an event is
to be a part of a room. sent to a room, the home server knows where to forward the event on to, and how to process
- Introduce the DAG. the event. Home servers do not need to have completely shared state in order to participate
- Events are wrapped in PDUs. in a room. State is scoped to a single room, and federation ensures that all home servers
have the information they need, even if that means the home server has to request more
information from another home server before processing the event.
Room Aliases
------------
Each room can also have multiple "Room Aliases", which looks like::
#room_alias:domain
A room alias "points" to a room ID. The room ID the alias is pointing to can be obtained
by visiting the domain specified. Room aliases are designed to be human readable strings
which can be used to publicise rooms. Note that the mapping from a room alias to a
room ID is not fixed, and may change over time to point to a different room ID. For this
reason, Clients SHOULD resolve the room alias to a room ID once and then use that ID on
subsequent requests.
::
GET
#matrix:domain.com !aaabaa:matrix.org
| ^
| |
_______V____________________|____
| domain.com |
| Mappings: |
| #matrix >> !aaabaa:matrix.org |
| #golf >> !wfeiofh:sport.com |
| #bike >> !4rguxf:matrix.org |
|________________________________|
Identity Identity

View File

@ -543,6 +543,8 @@ class _TransactionQueue(object):
def eb(failure): def eb(failure):
if not deferred.called: if not deferred.called:
deferred.errback(failure) deferred.errback(failure)
else:
logger.exception("Failed to send edu", failure)
self._attempt_new_transaction(destination).addErrback(eb) self._attempt_new_transaction(destination).addErrback(eb)
return deferred return deferred

View File

@ -16,6 +16,7 @@
from twisted.internet import defer from twisted.internet import defer
from synapse.api.events import SynapseEvent from synapse.api.events import SynapseEvent
from synapse.util.logutils import log_function
from ._base import BaseHandler from ._base import BaseHandler
@ -44,6 +45,7 @@ class EventStreamHandler(BaseHandler):
self.notifier = hs.get_notifier() self.notifier = hs.get_notifier()
@defer.inlineCallbacks @defer.inlineCallbacks
@log_function
def get_stream(self, auth_user_id, pagin_config, timeout=0): def get_stream(self, auth_user_id, pagin_config, timeout=0):
auth_user = self.hs.parse_userid(auth_user_id) auth_user = self.hs.parse_userid(auth_user_id)
@ -90,13 +92,15 @@ class EventStreamHandler(BaseHandler):
# 10 seconds of grace to allow the client to reconnect again # 10 seconds of grace to allow the client to reconnect again
# before we think they're gone # before we think they're gone
def _later(): def _later():
logger.debug("_later stopped_user_eventstream %s", auth_user)
self.distributor.fire( self.distributor.fire(
"stopped_user_eventstream", auth_user "stopped_user_eventstream", auth_user
) )
del self._stop_timer_per_user[auth_user] del self._stop_timer_per_user[auth_user]
logger.debug("Scheduling _later: for %s", auth_user)
self._stop_timer_per_user[auth_user] = ( self._stop_timer_per_user[auth_user] = (
self.clock.call_later(5, _later) self.clock.call_later(30, _later)
) )

View File

@ -18,6 +18,8 @@ from twisted.internet import defer
from synapse.api.errors import SynapseError, AuthError from synapse.api.errors import SynapseError, AuthError
from synapse.api.constants import PresenceState from synapse.api.constants import PresenceState
from synapse.util.logutils import log_function
from ._base import BaseHandler from ._base import BaseHandler
import logging import logging
@ -141,6 +143,10 @@ class PresenceHandler(BaseHandler):
@defer.inlineCallbacks @defer.inlineCallbacks
def is_presence_visible(self, observer_user, observed_user): def is_presence_visible(self, observer_user, observed_user):
defer.returnValue(True)
# return
# FIXME (erikj): This code path absolutely kills the database.
assert(observed_user.is_mine) assert(observed_user.is_mine)
if observer_user == observed_user: if observer_user == observed_user:
@ -184,7 +190,12 @@ class PresenceHandler(BaseHandler):
defer.returnValue(state) defer.returnValue(state)
@defer.inlineCallbacks @defer.inlineCallbacks
@log_function
def set_state(self, target_user, auth_user, state): def set_state(self, target_user, auth_user, state):
# return
# TODO (erikj): Turn this back on. Why did we end up sending EDUs
# everywhere?
if not target_user.is_mine: if not target_user.is_mine:
raise SynapseError(400, "User is not hosted on this Home Server") raise SynapseError(400, "User is not hosted on this Home Server")
@ -237,33 +248,42 @@ class PresenceHandler(BaseHandler):
self.push_presence(user, statuscache=statuscache) self.push_presence(user, statuscache=statuscache)
@log_function
def started_user_eventstream(self, user): def started_user_eventstream(self, user):
# TODO(paul): Use "last online" state # TODO(paul): Use "last online" state
self.set_state(user, user, {"state": PresenceState.ONLINE}) self.set_state(user, user, {"state": PresenceState.ONLINE})
@log_function
def stopped_user_eventstream(self, user): def stopped_user_eventstream(self, user):
# TODO(paul): Save current state as "last online" state # TODO(paul): Save current state as "last online" state
self.set_state(user, user, {"state": PresenceState.OFFLINE}) self.set_state(user, user, {"state": PresenceState.OFFLINE})
@defer.inlineCallbacks @defer.inlineCallbacks
def user_joined_room(self, user, room_id): def user_joined_room(self, user, room_id):
localusers = set()
remotedomains = set()
rm_handler = self.homeserver.get_handlers().room_member_handler
yield rm_handler.fetch_room_distributions_into(room_id,
localusers=localusers, remotedomains=remotedomains,
ignore_user=user)
if user.is_mine: if user.is_mine:
yield self._send_presence_to_distribution(srcuser=user, self.push_update_to_local_and_remote(
localusers=localusers, remotedomains=remotedomains, observed_user=user,
room_ids=[room_id],
statuscache=self._get_or_offline_usercache(user), statuscache=self._get_or_offline_usercache(user),
) )
for srcuser in localusers: else:
yield self._send_presence(srcuser=srcuser, destuser=user, self.push_update_to_clients(
statuscache=self._get_or_offline_usercache(srcuser), observed_user=user,
room_ids=[room_id],
statuscache=self._get_or_offline_usercache(user),
)
# We also want to tell them about current presence of people.
rm_handler = self.homeserver.get_handlers().room_member_handler
curr_users = yield rm_handler.get_room_members(room_id)
for local_user in [c for c in curr_users if c.is_mine]:
self.push_update_to_local_and_remote(
observed_user=local_user,
users_to_push=[user],
statuscache=self._get_or_offline_usercache(local_user),
) )
@defer.inlineCallbacks @defer.inlineCallbacks
@ -374,11 +394,13 @@ class PresenceHandler(BaseHandler):
defer.returnValue(presence) defer.returnValue(presence)
@defer.inlineCallbacks @defer.inlineCallbacks
@log_function
def start_polling_presence(self, user, target_user=None, state=None): def start_polling_presence(self, user, target_user=None, state=None):
logger.debug("Start polling for presence from %s", user) logger.debug("Start polling for presence from %s", user)
if target_user: if target_user:
target_users = set([target_user]) target_users = set([target_user])
room_ids = []
else: else:
presence = yield self.store.get_presence_list( presence = yield self.store.get_presence_list(
user.localpart, accepted=True user.localpart, accepted=True
@ -392,23 +414,37 @@ class PresenceHandler(BaseHandler):
rm_handler = self.homeserver.get_handlers().room_member_handler rm_handler = self.homeserver.get_handlers().room_member_handler
room_ids = yield rm_handler.get_rooms_for_user(user) room_ids = yield rm_handler.get_rooms_for_user(user)
for room_id in room_ids:
for member in (yield rm_handler.get_room_members(room_id)):
target_users.add(member)
if state is None: if state is None:
state = yield self.store.get_presence_state(user.localpart) state = yield self.store.get_presence_state(user.localpart)
else:
# statuscache = self._get_or_make_usercache(user)
# self._user_cachemap_latest_serial += 1
# statuscache.update(state, self._user_cachemap_latest_serial)
pass
localusers, remoteusers = partitionbool( yield self.push_update_to_local_and_remote(
target_users, observed_user=user,
lambda u: u.is_mine users_to_push=target_users,
room_ids=room_ids,
statuscache=self._get_or_make_usercache(user),
) )
for target_user in localusers: for target_user in target_users:
if target_user.is_mine:
self._start_polling_local(user, target_user) self._start_polling_local(user, target_user)
# We want to tell the person that just came online
# presence state of people they are interested in?
self.push_update_to_clients(
observed_user=target_user,
users_to_push=[user],
statuscache=self._get_or_offline_usercache(target_user),
)
deferreds = [] deferreds = []
remoteusers_by_domain = partition(remoteusers, lambda u: u.domain) remote_users = [u for u in target_users if not u.is_mine]
remoteusers_by_domain = partition(remote_users, lambda u: u.domain)
# Only poll for people in our get_presence_list
for domain in remoteusers_by_domain: for domain in remoteusers_by_domain:
remoteusers = remoteusers_by_domain[domain] remoteusers = remoteusers_by_domain[domain]
@ -430,12 +466,6 @@ class PresenceHandler(BaseHandler):
self._local_pushmap[target_localpart].add(user) self._local_pushmap[target_localpart].add(user)
self.push_update_to_clients(
observer_user=user,
observed_user=target_user,
statuscache=self._get_or_offline_usercache(target_user),
)
def _start_polling_remote(self, user, domain, remoteusers): def _start_polling_remote(self, user, domain, remoteusers):
to_poll = set() to_poll = set()
@ -455,6 +485,7 @@ class PresenceHandler(BaseHandler):
content={"poll": [u.to_string() for u in to_poll]} content={"poll": [u.to_string() for u in to_poll]}
) )
@log_function
def stop_polling_presence(self, user, target_user=None): def stop_polling_presence(self, user, target_user=None):
logger.debug("Stop polling for presence from %s", user) logger.debug("Stop polling for presence from %s", user)
@ -494,6 +525,7 @@ class PresenceHandler(BaseHandler):
if not self._local_pushmap[localpart]: if not self._local_pushmap[localpart]:
del self._local_pushmap[localpart] del self._local_pushmap[localpart]
@log_function
def _stop_polling_remote(self, user, domain, remoteusers): def _stop_polling_remote(self, user, domain, remoteusers):
to_unpoll = set() to_unpoll = set()
@ -514,6 +546,7 @@ class PresenceHandler(BaseHandler):
) )
@defer.inlineCallbacks @defer.inlineCallbacks
@log_function
def push_presence(self, user, statuscache): def push_presence(self, user, statuscache):
assert(user.is_mine) assert(user.is_mine)
@ -529,53 +562,17 @@ class PresenceHandler(BaseHandler):
rm_handler = self.homeserver.get_handlers().room_member_handler rm_handler = self.homeserver.get_handlers().room_member_handler
room_ids = yield rm_handler.get_rooms_for_user(user) room_ids = yield rm_handler.get_rooms_for_user(user)
for room_id in room_ids: if not localusers and not room_ids:
yield rm_handler.fetch_room_distributions_into(
room_id, localusers=localusers, remotedomains=remotedomains,
ignore_user=user,
)
if not localusers and not remotedomains:
defer.returnValue(None) defer.returnValue(None)
yield self._send_presence_to_distribution(user, yield self.push_update_to_local_and_remote(
localusers=localusers, remotedomains=remotedomains, observed_user=user,
statuscache=statuscache users_to_push=localusers,
) remote_domains=remotedomains,
room_ids=room_ids,
def _send_presence(self, srcuser, destuser, statuscache):
if destuser.is_mine:
self.push_update_to_clients(
observer_user=destuser,
observed_user=srcuser,
statuscache=statuscache)
return defer.succeed(None)
else:
return self._push_presence_remote(srcuser, destuser.domain,
state=statuscache.get_state()
)
@defer.inlineCallbacks
def _send_presence_to_distribution(self, srcuser, localusers=set(),
remotedomains=set(), statuscache=None):
for u in localusers:
logger.debug(" | push to local user %s", u)
self.push_update_to_clients(
observer_user=u,
observed_user=srcuser,
statuscache=statuscache, statuscache=statuscache,
) )
deferreds = []
for domain in remotedomains:
logger.debug(" | push to remote domain %s", domain)
deferreds.append(self._push_presence_remote(srcuser, domain,
state=statuscache.get_state())
)
yield defer.DeferredList(deferreds)
@defer.inlineCallbacks @defer.inlineCallbacks
def _push_presence_remote(self, user, destination, state=None): def _push_presence_remote(self, user, destination, state=None):
if state is None: if state is None:
@ -591,12 +588,17 @@ class PresenceHandler(BaseHandler):
self.clock.time_msec() - state.pop("mtime") self.clock.time_msec() - state.pop("mtime")
) )
user_state = {
"user_id": user.to_string(),
}
user_state.update(**state)
yield self.federation.send_edu( yield self.federation.send_edu(
destination=destination, destination=destination,
edu_type="m.presence", edu_type="m.presence",
content={ content={
"push": [ "push": [
dict(user_id=user.to_string(), **state), user_state,
], ],
} }
) )
@ -615,12 +617,7 @@ class PresenceHandler(BaseHandler):
rm_handler = self.homeserver.get_handlers().room_member_handler rm_handler = self.homeserver.get_handlers().room_member_handler
room_ids = yield rm_handler.get_rooms_for_user(user) room_ids = yield rm_handler.get_rooms_for_user(user)
for room_id in room_ids: if not observers and not room_ids:
yield rm_handler.fetch_room_distributions_into(
room_id, localusers=observers, ignore_user=user
)
if not observers:
break break
state = dict(push) state = dict(push)
@ -636,10 +633,10 @@ class PresenceHandler(BaseHandler):
self._user_cachemap_latest_serial += 1 self._user_cachemap_latest_serial += 1
statuscache.update(state, serial=self._user_cachemap_latest_serial) statuscache.update(state, serial=self._user_cachemap_latest_serial)
for observer_user in observers:
self.push_update_to_clients( self.push_update_to_clients(
observer_user=observer_user,
observed_user=user, observed_user=user,
users_to_push=observers,
room_ids=room_ids,
statuscache=statuscache, statuscache=statuscache,
) )
@ -675,12 +672,53 @@ class PresenceHandler(BaseHandler):
yield defer.DeferredList(deferreds) yield defer.DeferredList(deferreds)
def push_update_to_clients(self, observer_user, observed_user, @defer.inlineCallbacks
statuscache): def push_update_to_local_and_remote(self, observed_user,
statuscache.make_event(user=observed_user, clock=self.clock) users_to_push=[], room_ids=[],
remote_domains=[],
statuscache=None):
localusers, remoteusers = partitionbool(
users_to_push,
lambda u: u.is_mine
)
localusers = set(localusers)
self.push_update_to_clients(
observed_user=observed_user,
users_to_push=localusers,
room_ids=room_ids,
statuscache=statuscache,
)
remote_domains = set(remote_domains)
remote_domains |= set([r.domain for r in remoteusers])
for room_id in room_ids:
remote_domains.update(
(yield self.store.get_joined_hosts_for_room(room_id))
)
remote_domains.discard(self.hs.hostname)
deferreds = []
for domain in remote_domains:
logger.debug(" | push to remote domain %s", domain)
deferreds.append(
self._push_presence_remote(
observed_user, domain, state=statuscache.get_state()
)
)
yield defer.DeferredList(deferreds)
defer.returnValue((localusers, remote_domains))
def push_update_to_clients(self, observed_user, users_to_push=[],
room_ids=[], statuscache=None):
self.notifier.on_new_user_event( self.notifier.on_new_user_event(
[observer_user], users_to_push,
room_ids,
) )

View File

@ -119,6 +119,7 @@ class Notifier(object):
) )
@defer.inlineCallbacks @defer.inlineCallbacks
@log_function
def on_new_user_event(self, users=[], rooms=[]): def on_new_user_event(self, users=[], rooms=[]):
""" Used to inform listeners that something has happend """ Used to inform listeners that something has happend
presence/user event wise. presence/user event wise.

View File

@ -81,4 +81,4 @@ class PaginationConfig(object):
return ( return (
"<PaginationConfig from_tok=%s, to_tok=%s, " "<PaginationConfig from_tok=%s, to_tok=%s, "
"direction=%s, limit=%s>" "direction=%s, limit=%s>"
) % (self.from_tok, self.to_tok, self.direction, self.limit) ) % (self.from_token, self.to_token, self.direction, self.limit)

View File

@ -18,6 +18,8 @@ from inspect import getcallargs
from functools import wraps from functools import wraps
import logging import logging
import inspect
import traceback
def log_function(f): def log_function(f):
@ -65,4 +67,55 @@ def log_function(f):
return f(*args, **kwargs) return f(*args, **kwargs)
wrapped.__name__ = func_name
return wrapped
def trace_function(f):
func_name = f.__name__
linenum = f.func_code.co_firstlineno
pathname = f.func_code.co_filename
def wrapped(*args, **kwargs):
name = f.__module__
logger = logging.getLogger(name)
level = logging.DEBUG
s = inspect.currentframe().f_back
to_print = [
"\t%s:%s %s. Args: args=%s, kwargs=%s" % (
pathname, linenum, func_name, args, kwargs
)
]
while s:
if True or s.f_globals["__name__"].startswith("synapse"):
filename, lineno, function, _, _ = inspect.getframeinfo(s)
args_string = inspect.formatargvalues(*inspect.getargvalues(s))
to_print.append(
"\t%s:%d %s. Args: %s" % (
filename, lineno, function, args_string
)
)
s = s.f_back
msg = "\nTraceback for %s:\n" % (func_name,) + "\n".join(to_print)
record = logging.LogRecord(
name=name,
level=level,
pathname=pathname,
lineno=lineno,
msg=msg,
args=None,
exc_info=None
)
logger.handle(record)
return f(*args, **kwargs)
wrapped.__name__ = func_name
return wrapped return wrapped

View File

@ -193,6 +193,8 @@ class PresenceStateTestCase(unittest.TestCase):
SynapseError SynapseError
) )
test_get_disallowed_state.skip = "Presence permissions are disabled"
@defer.inlineCallbacks @defer.inlineCallbacks
def test_set_my_state(self): def test_set_my_state(self):
mocked_set = self.datastore.set_presence_state mocked_set = self.datastore.set_presence_state
@ -497,6 +499,7 @@ class PresencePushTestCase(unittest.TestCase):
db_pool=None, db_pool=None,
datastore=Mock(spec=[ datastore=Mock(spec=[
"set_presence_state", "set_presence_state",
"get_joined_hosts_for_room",
# Bits that Federation needs # Bits that Federation needs
"prep_send_transaction", "prep_send_transaction",
@ -511,8 +514,12 @@ class PresencePushTestCase(unittest.TestCase):
) )
hs.handlers = JustPresenceHandlers(hs) hs.handlers = JustPresenceHandlers(hs)
def update(*args,**kwargs):
# print "mock_update_client: Args=%s, kwargs=%s" %(args, kwargs,)
return defer.succeed(None)
self.mock_update_client = Mock() self.mock_update_client = Mock()
self.mock_update_client.return_value = defer.succeed(None) self.mock_update_client.side_effect = update
self.datastore = hs.get_datastore() self.datastore = hs.get_datastore()
@ -546,6 +553,14 @@ class PresencePushTestCase(unittest.TestCase):
return defer.succeed([]) return defer.succeed([])
self.room_member_handler.get_room_members = get_room_members self.room_member_handler.get_room_members = get_room_members
def get_room_hosts(room_id):
if room_id == "a-room":
hosts = set([u.domain for u in self.room_members])
return defer.succeed(hosts)
else:
return defer.succeed([])
self.datastore.get_joined_hosts_for_room = get_room_hosts
@defer.inlineCallbacks @defer.inlineCallbacks
def fetch_room_distributions_into(room_id, localusers=None, def fetch_room_distributions_into(room_id, localusers=None,
remotedomains=None, ignore_user=None): remotedomains=None, ignore_user=None):
@ -611,18 +626,10 @@ class PresencePushTestCase(unittest.TestCase):
{"state": ONLINE}) {"state": ONLINE})
self.mock_update_client.assert_has_calls([ self.mock_update_client.assert_has_calls([
call(observer_user=self.u_apple, call(users_to_push=set([self.u_apple, self.u_banana, self.u_clementine]),
room_ids=["a-room"],
observed_user=self.u_apple, observed_user=self.u_apple,
statuscache=ANY), # self-reflection statuscache=ANY), # self-reflection
call(observer_user=self.u_banana,
observed_user=self.u_apple,
statuscache=ANY),
call(observer_user=self.u_clementine,
observed_user=self.u_apple,
statuscache=ANY),
call(observer_user=self.u_elderberry,
observed_user=self.u_apple,
statuscache=ANY),
], any_order=True) ], any_order=True)
self.mock_update_client.reset_mock() self.mock_update_client.reset_mock()
@ -651,7 +658,8 @@ class PresencePushTestCase(unittest.TestCase):
], presence) ], presence)
self.mock_update_client.assert_has_calls([ self.mock_update_client.assert_has_calls([
call(observer_user=self.u_banana, call(users_to_push=set([self.u_banana]),
room_ids=[],
observed_user=self.u_banana, observed_user=self.u_banana,
statuscache=ANY), # self-reflection statuscache=ANY), # self-reflection
]) # and no others... ]) # and no others...
@ -659,21 +667,21 @@ class PresencePushTestCase(unittest.TestCase):
@defer.inlineCallbacks @defer.inlineCallbacks
def test_push_remote(self): def test_push_remote(self):
put_json = self.mock_http_client.put_json put_json = self.mock_http_client.put_json
put_json.expect_call_and_return( # put_json.expect_call_and_return(
call("remote", # call("remote",
path=ANY, # Can't guarantee which txn ID will be which # path=ANY, # Can't guarantee which txn ID will be which
data=_expect_edu("remote", "m.presence", # data=_expect_edu("remote", "m.presence",
content={ # content={
"push": [ # "push": [
{"user_id": "@apple:test", # {"user_id": "@apple:test",
"state": "online", # "state": "online",
"mtime_age": 0}, # "mtime_age": 0},
], # ],
} # }
) # )
), # ),
defer.succeed((200, "OK")) # defer.succeed((200, "OK"))
) # )
put_json.expect_call_and_return( put_json.expect_call_and_return(
call("farm", call("farm",
path=ANY, # Can't guarantee which txn ID will be which path=ANY, # Can't guarantee which txn ID will be which
@ -681,7 +689,7 @@ class PresencePushTestCase(unittest.TestCase):
content={ content={
"push": [ "push": [
{"user_id": "@apple:test", {"user_id": "@apple:test",
"state": "online", "state": u"online",
"mtime_age": 0}, "mtime_age": 0},
], ],
} }
@ -730,10 +738,8 @@ class PresencePushTestCase(unittest.TestCase):
) )
self.mock_update_client.assert_has_calls([ self.mock_update_client.assert_has_calls([
call(observer_user=self.u_apple, call(users_to_push=set([self.u_apple]),
observed_user=self.u_potato, room_ids=["a-room"],
statuscache=ANY),
call(observer_user=self.u_banana,
observed_user=self.u_potato, observed_user=self.u_potato,
statuscache=ANY), statuscache=ANY),
], any_order=True) ], any_order=True)
@ -753,19 +759,17 @@ class PresencePushTestCase(unittest.TestCase):
) )
self.mock_update_client.assert_has_calls([ self.mock_update_client.assert_has_calls([
# Apple and Elderberry see each other call(room_ids=["a-room"],
call(observer_user=self.u_apple,
observed_user=self.u_elderberry, observed_user=self.u_elderberry,
users_to_push=set(),
statuscache=ANY), statuscache=ANY),
call(observer_user=self.u_elderberry, call(users_to_push=set([self.u_elderberry]),
observed_user=self.u_apple, observed_user=self.u_apple,
room_ids=[],
statuscache=ANY), statuscache=ANY),
# Banana and Elderberry see each other call(users_to_push=set([self.u_elderberry]),
call(observer_user=self.u_banana,
observed_user=self.u_elderberry,
statuscache=ANY),
call(observer_user=self.u_elderberry,
observed_user=self.u_banana, observed_user=self.u_banana,
room_ids=[],
statuscache=ANY), statuscache=ANY),
], any_order=True) ], any_order=True)
@ -887,7 +891,12 @@ class PresencePollingTestCase(unittest.TestCase):
self.datastore.get_received_txn_response = get_received_txn_response self.datastore.get_received_txn_response = get_received_txn_response
self.mock_update_client = Mock() self.mock_update_client = Mock()
self.mock_update_client.return_value = defer.succeed(None)
def update(*args,**kwargs):
# print "mock_update_client: Args=%s, kwargs=%s" %(args, kwargs,)
return defer.succeed(None)
self.mock_update_client.side_effect = update
self.handler = hs.get_handlers().presence_handler self.handler = hs.get_handlers().presence_handler
self.handler.push_update_to_clients = self.mock_update_client self.handler.push_update_to_clients = self.mock_update_client
@ -951,10 +960,10 @@ class PresencePollingTestCase(unittest.TestCase):
# apple should see both banana and clementine currently offline # apple should see both banana and clementine currently offline
self.mock_update_client.assert_has_calls([ self.mock_update_client.assert_has_calls([
call(observer_user=self.u_apple, call(users_to_push=[self.u_apple],
observed_user=self.u_banana, observed_user=self.u_banana,
statuscache=ANY), statuscache=ANY),
call(observer_user=self.u_apple, call(users_to_push=[self.u_apple],
observed_user=self.u_clementine, observed_user=self.u_clementine,
statuscache=ANY), statuscache=ANY),
], any_order=True) ], any_order=True)
@ -974,10 +983,11 @@ class PresencePollingTestCase(unittest.TestCase):
# apple and banana should now both see each other online # apple and banana should now both see each other online
self.mock_update_client.assert_has_calls([ self.mock_update_client.assert_has_calls([
call(observer_user=self.u_apple, call(users_to_push=set([self.u_apple]),
observed_user=self.u_banana, observed_user=self.u_banana,
room_ids=[],
statuscache=ANY), statuscache=ANY),
call(observer_user=self.u_banana, call(users_to_push=[self.u_banana],
observed_user=self.u_apple, observed_user=self.u_apple,
statuscache=ANY), statuscache=ANY),
], any_order=True) ], any_order=True)
@ -994,8 +1004,9 @@ class PresencePollingTestCase(unittest.TestCase):
# banana should now be told apple is offline # banana should now be told apple is offline
self.mock_update_client.assert_has_calls([ self.mock_update_client.assert_has_calls([
call(observer_user=self.u_banana, call(users_to_push=set([self.u_banana, self.u_apple]),
observed_user=self.u_apple, observed_user=self.u_apple,
room_ids=[],
statuscache=ANY), statuscache=ANY),
], any_order=True) ], any_order=True)
@ -1008,7 +1019,7 @@ class PresencePollingTestCase(unittest.TestCase):
put_json = self.mock_http_client.put_json put_json = self.mock_http_client.put_json
put_json.expect_call_and_return( put_json.expect_call_and_return(
call("remote", call("remote",
path="/matrix/federation/v1/send/1000000/", path=ANY,
data=_expect_edu("remote", "m.presence", data=_expect_edu("remote", "m.presence",
content={ content={
"poll": [ "@potato:remote" ], "poll": [ "@potato:remote" ],
@ -1018,6 +1029,18 @@ class PresencePollingTestCase(unittest.TestCase):
defer.succeed((200, "OK")) defer.succeed((200, "OK"))
) )
put_json.expect_call_and_return(
call("remote",
path=ANY,
data=_expect_edu("remote", "m.presence",
content={
"push": [ {"user_id": "@clementine:test" }],
},
),
),
defer.succeed((200, "OK"))
)
# clementine goes online # clementine goes online
yield self.handler.set_state( yield self.handler.set_state(
target_user=self.u_clementine, auth_user=self.u_clementine, target_user=self.u_clementine, auth_user=self.u_clementine,
@ -1032,15 +1055,28 @@ class PresencePollingTestCase(unittest.TestCase):
self.assertTrue(self.u_clementine in self.assertTrue(self.u_clementine in
self.handler._remote_recvmap[self.u_potato]) self.handler._remote_recvmap[self.u_potato])
put_json.expect_call_and_return(
call("remote",
path=ANY,
data=_expect_edu("remote", "m.presence",
content={
"push": [ {"user_id": "@fig:test" }],
},
),
),
defer.succeed((200, "OK"))
)
# fig goes online; shouldn't send a second poll # fig goes online; shouldn't send a second poll
yield self.handler.set_state( yield self.handler.set_state(
target_user=self.u_fig, auth_user=self.u_fig, target_user=self.u_fig, auth_user=self.u_fig,
state={"state": ONLINE} state={"state": ONLINE}
) )
reactor.iterate(delay=0) # reactor.iterate(delay=0)
put_json.assert_had_no_calls() yield put_json.await_calls()
# fig goes offline # fig goes offline
yield self.handler.set_state( yield self.handler.set_state(
@ -1054,7 +1090,7 @@ class PresencePollingTestCase(unittest.TestCase):
put_json.expect_call_and_return( put_json.expect_call_and_return(
call("remote", call("remote",
path="/matrix/federation/v1/send/1000001/", path=ANY,
data=_expect_edu("remote", "m.presence", data=_expect_edu("remote", "m.presence",
content={ content={
"unpoll": [ "@potato:remote" ], "unpoll": [ "@potato:remote" ],
@ -1069,7 +1105,7 @@ class PresencePollingTestCase(unittest.TestCase):
target_user=self.u_clementine, auth_user=self.u_clementine, target_user=self.u_clementine, auth_user=self.u_clementine,
state={"state": OFFLINE}) state={"state": OFFLINE})
put_json.await_calls() yield put_json.await_calls()
self.assertFalse(self.u_potato in self.handler._remote_recvmap, self.assertFalse(self.u_potato in self.handler._remote_recvmap,
msg="expected potato not to be in _remote_recvmap" msg="expected potato not to be in _remote_recvmap"

View File

@ -81,7 +81,11 @@ class PresenceProfilelikeDataTestCase(unittest.TestCase):
self.replication = hs.get_replication_layer() self.replication = hs.get_replication_layer()
self.replication.send_edu = Mock() self.replication.send_edu = Mock()
self.replication.send_edu.return_value = defer.succeed((200, "OK"))
def send_edu(*args, **kwargs):
# print "send_edu: %s, %s" % (args, kwargs)
return defer.succeed((200, "OK"))
self.replication.send_edu.side_effect = send_edu
def get_profile_displayname(user_localpart): def get_profile_displayname(user_localpart):
return defer.succeed("Frank") return defer.succeed("Frank")
@ -95,11 +99,12 @@ class PresenceProfilelikeDataTestCase(unittest.TestCase):
return defer.succeed("http://foo") return defer.succeed("http://foo")
self.datastore.get_profile_avatar_url = get_profile_avatar_url self.datastore.get_profile_avatar_url = get_profile_avatar_url
def get_presence_list(user_localpart, accepted=None): self.presence_list = [
return defer.succeed([
{"observed_user_id": "@banana:test"}, {"observed_user_id": "@banana:test"},
{"observed_user_id": "@clementine:test"}, {"observed_user_id": "@clementine:test"},
]) ]
def get_presence_list(user_localpart, accepted=None):
return defer.succeed(self.presence_list)
self.datastore.get_presence_list = get_presence_list self.datastore.get_presence_list = get_presence_list
def do_users_share_a_room(userlist): def do_users_share_a_room(userlist):
@ -109,7 +114,10 @@ class PresenceProfilelikeDataTestCase(unittest.TestCase):
self.handlers = hs.get_handlers() self.handlers = hs.get_handlers()
self.mock_update_client = Mock() self.mock_update_client = Mock()
self.mock_update_client.return_value = defer.succeed(None) def update(*args, **kwargs):
# print "mock_update_client: %s, %s" %(args, kwargs)
return defer.succeed(None)
self.mock_update_client.side_effect = update
self.handlers.presence_handler.push_update_to_clients = ( self.handlers.presence_handler.push_update_to_clients = (
self.mock_update_client) self.mock_update_client)
@ -130,6 +138,11 @@ class PresenceProfilelikeDataTestCase(unittest.TestCase):
@defer.inlineCallbacks @defer.inlineCallbacks
def test_set_my_state(self): def test_set_my_state(self):
self.presence_list = [
{"observed_user_id": "@banana:test"},
{"observed_user_id": "@clementine:test"},
]
mocked_set = self.datastore.set_presence_state mocked_set = self.datastore.set_presence_state
mocked_set.return_value = defer.succeed({"state": OFFLINE}) mocked_set.return_value = defer.succeed({"state": OFFLINE})
@ -142,6 +155,11 @@ class PresenceProfilelikeDataTestCase(unittest.TestCase):
@defer.inlineCallbacks @defer.inlineCallbacks
def test_push_local(self): def test_push_local(self):
self.presence_list = [
{"observed_user_id": "@banana:test"},
{"observed_user_id": "@clementine:test"},
]
self.datastore.set_presence_state.return_value = defer.succeed( self.datastore.set_presence_state.return_value = defer.succeed(
{"state": ONLINE}) {"state": ONLINE})
@ -173,12 +191,10 @@ class PresenceProfilelikeDataTestCase(unittest.TestCase):
presence) presence)
self.mock_update_client.assert_has_calls([ self.mock_update_client.assert_has_calls([
call(observer_user=self.u_apple, call(users_to_push=set([self.u_apple, self.u_banana, self.u_clementine]),
room_ids=[],
observed_user=self.u_apple, observed_user=self.u_apple,
statuscache=ANY), # self-reflection statuscache=ANY), # self-reflection
call(observer_user=self.u_banana,
observed_user=self.u_apple,
statuscache=ANY),
], any_order=True) ], any_order=True)
statuscache = self.mock_update_client.call_args[1]["statuscache"] statuscache = self.mock_update_client.call_args[1]["statuscache"]
@ -198,12 +214,10 @@ class PresenceProfilelikeDataTestCase(unittest.TestCase):
self.u_apple, "I am an Apple") self.u_apple, "I am an Apple")
self.mock_update_client.assert_has_calls([ self.mock_update_client.assert_has_calls([
call(observer_user=self.u_apple, call(users_to_push=set([self.u_apple, self.u_banana, self.u_clementine]),
room_ids=[],
observed_user=self.u_apple, observed_user=self.u_apple,
statuscache=ANY), # self-reflection statuscache=ANY), # self-reflection
call(observer_user=self.u_banana,
observed_user=self.u_apple,
statuscache=ANY),
], any_order=True) ], any_order=True)
statuscache = self.mock_update_client.call_args[1]["statuscache"] statuscache = self.mock_update_client.call_args[1]["statuscache"]
@ -217,6 +231,10 @@ class PresenceProfilelikeDataTestCase(unittest.TestCase):
@defer.inlineCallbacks @defer.inlineCallbacks
def test_push_remote(self): def test_push_remote(self):
self.presence_list = [
{"observed_user_id": "@potato:remote"},
]
self.datastore.set_presence_state.return_value = defer.succeed( self.datastore.set_presence_state.return_value = defer.succeed(
{"state": ONLINE}) {"state": ONLINE})
@ -247,6 +265,11 @@ class PresenceProfilelikeDataTestCase(unittest.TestCase):
@defer.inlineCallbacks @defer.inlineCallbacks
def test_recv_remote(self): def test_recv_remote(self):
self.presence_list = [
{"observed_user_id": "@banana:test"},
{"observed_user_id": "@clementine:test"},
]
# TODO(paul): Gut-wrenching # TODO(paul): Gut-wrenching
potato_set = self.handlers.presence_handler._remote_recvmap.setdefault( potato_set = self.handlers.presence_handler._remote_recvmap.setdefault(
self.u_potato, set()) self.u_potato, set())
@ -264,7 +287,8 @@ class PresenceProfilelikeDataTestCase(unittest.TestCase):
) )
self.mock_update_client.assert_called_with( self.mock_update_client.assert_called_with(
observer_user=self.u_apple, users_to_push=set([self.u_apple]),
room_ids=[],
observed_user=self.u_potato, observed_user=self.u_potato,
statuscache=ANY) statuscache=ANY)

View File

@ -21,7 +21,7 @@ from synapse.api.events.room import (
RoomMemberEvent, MessageEvent RoomMemberEvent, MessageEvent
) )
from twisted.internet import defer from twisted.internet import defer, reactor
from collections import namedtuple from collections import namedtuple
from mock import patch, Mock from mock import patch, Mock
@ -263,18 +263,43 @@ class DeferredMockCallable(object):
d.callback(None) d.callback(None)
return result return result
raise AssertionError("Was not expecting call(%s)" % failure = AssertionError("Was not expecting call(%s)" %
_format_call(args, kwargs) _format_call(args, kwargs)
) )
for _, _, d in self.expectations:
try:
d.errback(failure)
except:
pass
raise failure
def expect_call_and_return(self, call, result): def expect_call_and_return(self, call, result):
self.expectations.append((call, result, defer.Deferred())) self.expectations.append((call, result, defer.Deferred()))
@defer.inlineCallbacks @defer.inlineCallbacks
def await_calls(self): def await_calls(self, timeout=1000):
while self.expectations: deferred = defer.DeferredList(
(_, _, d) = self.expectations.pop(0) [d for _, _, d in self.expectations],
yield d fireOnOneErrback=True
)
timer = reactor.callLater(
timeout/1000,
deferred.errback,
AssertionError(
"%d pending calls left: %s"% (
len([e for e in self.expectations if not e[2].called]),
[e for e in self.expectations if not e[2].called]
)
)
)
yield deferred
timer.cancel()
self.calls = [] self.calls = []
def assert_had_no_calls(self): def assert_had_no_calls(self):

View File

@ -15,8 +15,8 @@ limitations under the License.
*/ */
angular.module('RoomController', ['ngSanitize', 'mFileInput']) angular.module('RoomController', ['ngSanitize', 'mFileInput'])
.controller('RoomController', ['$scope', '$timeout', '$routeParams', '$location', '$rootScope', 'matrixService', 'eventHandlerService', 'mFileUpload', 'matrixPhoneService', 'MatrixCall', .controller('RoomController', ['$scope', '$timeout', '$routeParams', '$location', '$rootScope', 'matrixService', 'eventHandlerService', 'mFileUpload', 'mPresence', 'matrixPhoneService', 'MatrixCall',
function($scope, $timeout, $routeParams, $location, $rootScope, matrixService, eventHandlerService, mFileUpload, matrixPhoneService, MatrixCall) { function($scope, $timeout, $routeParams, $location, $rootScope, matrixService, eventHandlerService, mFileUpload, mPresence, matrixPhoneService, MatrixCall) {
'use strict'; 'use strict';
var MESSAGES_PER_PAGINATION = 30; var MESSAGES_PER_PAGINATION = 30;
var THUMBNAIL_SIZE = 320; var THUMBNAIL_SIZE = 320;
@ -57,15 +57,14 @@ angular.module('RoomController', ['ngSanitize', 'mFileInput'])
scrollToBottom(); scrollToBottom();
if (window.Notification) { if (window.Notification) {
// FIXME: we should also notify based on a timer or other heuristics // Show notification when the user is idle
// rather than the window being minimised if (matrixService.presence.offline === mPresence.getState()) {
if (document.hidden) {
var notification = new window.Notification( var notification = new window.Notification(
($scope.members[event.user_id].displayname || event.user_id) + ($scope.members[event.user_id].displayname || event.user_id) +
" (" + ($scope.room_alias || $scope.room_id) + ")", // FIXME: don't leak room_ids here " (" + ($scope.room_alias || $scope.room_id) + ")", // FIXME: don't leak room_ids here
{ {
"body": event.content.body, "body": event.content.body,
"icon": $scope.members[event.user_id].avatar_url, "icon": $scope.members[event.user_id].avatar_url
}); });
$timeout(function() { $timeout(function() {
notification.close(); notification.close();
@ -230,7 +229,7 @@ angular.module('RoomController', ['ngSanitize', 'mFileInput'])
var member = $scope.members[target_user_id]; var member = $scope.members[target_user_id];
member.content.membership = chunk.content.membership; member.content.membership = chunk.content.membership;
} }
} };
var updatePresence = function(chunk) { var updatePresence = function(chunk) {
if (!(chunk.content.user_id in $scope.members)) { if (!(chunk.content.user_id in $scope.members)) {
@ -257,10 +256,10 @@ angular.module('RoomController', ['ngSanitize', 'mFileInput'])
if ("avatar_url" in chunk.content) { if ("avatar_url" in chunk.content) {
member.avatar_url = chunk.content.avatar_url; member.avatar_url = chunk.content.avatar_url;
} }
} };
$scope.send = function() { $scope.send = function() {
if ($scope.textInput == "") { if ($scope.textInput === "") {
return; return;
} }
@ -269,7 +268,7 @@ angular.module('RoomController', ['ngSanitize', 'mFileInput'])
// Send the text message // Send the text message
var promise; var promise;
// FIXME: handle other commands too // FIXME: handle other commands too
if ($scope.textInput.indexOf("/me") == 0) { if ($scope.textInput.indexOf("/me") === 0) {
promise = matrixService.sendEmoteMessage($scope.room_id, $scope.textInput.substr(4)); promise = matrixService.sendEmoteMessage($scope.room_id, $scope.textInput.substr(4));
} }
else { else {