Merge pull request #1095 from matrix-org/erikj/batch_edus

Clobber EDUs in send queue
This commit is contained in:
Erik Johnston 2016-09-12 08:04:15 +01:00 committed by GitHub
commit 555460ae1b
5 changed files with 60 additions and 20 deletions

View File

@ -122,8 +122,12 @@ class FederationClient(FederationBase):
pdu.event_id pdu.event_id
) )
def send_presence(self, destination, states):
if destination != self.server_name:
self._transaction_queue.enqueue_presence(destination, states)
@log_function @log_function
def send_edu(self, destination, edu_type, content): def send_edu(self, destination, edu_type, content, key=None):
edu = Edu( edu = Edu(
origin=self.server_name, origin=self.server_name,
destination=destination, destination=destination,
@ -134,7 +138,7 @@ class FederationClient(FederationBase):
sent_edus_counter.inc() sent_edus_counter.inc()
# TODO, add errback, etc. # TODO, add errback, etc.
self._transaction_queue.enqueue_edu(edu) self._transaction_queue.enqueue_edu(edu, key=key)
return defer.succeed(None) return defer.succeed(None)
@log_function @log_function

View File

@ -26,6 +26,7 @@ from synapse.util.retryutils import (
get_retry_limiter, NotRetryingDestination, get_retry_limiter, NotRetryingDestination,
) )
from synapse.util.metrics import measure_func from synapse.util.metrics import measure_func
from synapse.handlers.presence import format_user_presence_state
import synapse.metrics import synapse.metrics
import logging import logging
@ -69,13 +70,21 @@ class TransactionQueue(object):
# destination -> list of tuple(edu, deferred) # destination -> list of tuple(edu, deferred)
self.pending_edus_by_dest = edus = {} self.pending_edus_by_dest = edus = {}
# Presence needs to be separate as we send single aggragate EDUs
self.pending_presence_by_dest = presence = {}
self.pending_edus_keyed_by_dest = edus_keyed = {}
metrics.register_callback( metrics.register_callback(
"pending_pdus", "pending_pdus",
lambda: sum(map(len, pdus.values())), lambda: sum(map(len, pdus.values())),
) )
metrics.register_callback( metrics.register_callback(
"pending_edus", "pending_edus",
lambda: sum(map(len, edus.values())), lambda: (
sum(map(len, edus.values()))
+ sum(map(len, presence.values()))
+ sum(map(len, edus_keyed.values()))
),
) )
# destination -> list of tuple(failure, deferred) # destination -> list of tuple(failure, deferred)
@ -130,12 +139,26 @@ class TransactionQueue(object):
self._attempt_new_transaction, destination self._attempt_new_transaction, destination
) )
def enqueue_edu(self, edu): def enqueue_presence(self, destination, states):
self.pending_presence_by_dest.setdefault(destination, {}).update({
state.user_id: state for state in states
})
preserve_context_over_fn(
self._attempt_new_transaction, destination
)
def enqueue_edu(self, edu, key=None):
destination = edu.destination destination = edu.destination
if not self.can_send_to(destination): if not self.can_send_to(destination):
return return
if key:
self.pending_edus_keyed_by_dest.setdefault(
destination, {}
)[(edu.edu_type, key)] = edu
else:
self.pending_edus_by_dest.setdefault(destination, []).append(edu) self.pending_edus_by_dest.setdefault(destination, []).append(edu)
preserve_context_over_fn( preserve_context_over_fn(
@ -190,8 +213,13 @@ class TransactionQueue(object):
while True: while True:
pending_pdus = self.pending_pdus_by_dest.pop(destination, []) pending_pdus = self.pending_pdus_by_dest.pop(destination, [])
pending_edus = self.pending_edus_by_dest.pop(destination, []) pending_edus = self.pending_edus_by_dest.pop(destination, [])
pending_presence = self.pending_presence_by_dest.pop(destination, {})
pending_failures = self.pending_failures_by_dest.pop(destination, []) pending_failures = self.pending_failures_by_dest.pop(destination, [])
pending_edus.extend(
self.pending_edus_keyed_by_dest.pop(destination, {}).values()
)
limiter = yield get_retry_limiter( limiter = yield get_retry_limiter(
destination, destination,
self.clock, self.clock,
@ -203,6 +231,22 @@ class TransactionQueue(object):
) )
pending_edus.extend(device_message_edus) pending_edus.extend(device_message_edus)
if pending_presence:
pending_edus.append(
Edu(
origin=self.server_name,
destination=destination,
edu_type="m.presence",
content={
"push": [
format_user_presence_state(
presence, self.clock.time_msec()
)
for presence in pending_presence.values()
]
},
)
)
if pending_pdus: if pending_pdus:
logger.debug("TX [%s] len(pending_pdus_by_dest[dest]) = %d", logger.debug("TX [%s] len(pending_pdus_by_dest[dest]) = %d",

View File

@ -625,18 +625,8 @@ class PresenceHandler(object):
Args: Args:
hosts_to_states (dict): Mapping `server_name` -> `[UserPresenceState]` hosts_to_states (dict): Mapping `server_name` -> `[UserPresenceState]`
""" """
now = self.clock.time_msec()
for host, states in hosts_to_states.items(): for host, states in hosts_to_states.items():
self.federation.send_edu( self.federation.send_presence(host, states)
destination=host,
edu_type="m.presence",
content={
"push": [
_format_user_presence_state(state, now)
for state in states
]
}
)
@defer.inlineCallbacks @defer.inlineCallbacks
def incoming_presence(self, origin, content): def incoming_presence(self, origin, content):
@ -723,13 +713,13 @@ class PresenceHandler(object):
defer.returnValue([ defer.returnValue([
{ {
"type": "m.presence", "type": "m.presence",
"content": _format_user_presence_state(state, now), "content": format_user_presence_state(state, now),
} }
for state in updates for state in updates
]) ])
else: else:
defer.returnValue([ defer.returnValue([
_format_user_presence_state(state, now) for state in updates format_user_presence_state(state, now) for state in updates
]) ])
@defer.inlineCallbacks @defer.inlineCallbacks
@ -988,7 +978,7 @@ def should_notify(old_state, new_state):
return False return False
def _format_user_presence_state(state, now): def format_user_presence_state(state, now):
"""Convert UserPresenceState to a format that can be sent down to clients """Convert UserPresenceState to a format that can be sent down to clients
and to other servers. and to other servers.
""" """
@ -1101,7 +1091,7 @@ class PresenceEventSource(object):
defer.returnValue(([ defer.returnValue(([
{ {
"type": "m.presence", "type": "m.presence",
"content": _format_user_presence_state(s, now), "content": format_user_presence_state(s, now),
} }
for s in updates.values() for s in updates.values()
if include_offline or s.state != PresenceState.OFFLINE if include_offline or s.state != PresenceState.OFFLINE

View File

@ -156,6 +156,7 @@ class ReceiptsHandler(BaseHandler):
} }
}, },
}, },
key=(room_id, receipt_type, user_id),
) )
@defer.inlineCallbacks @defer.inlineCallbacks

View File

@ -187,6 +187,7 @@ class TypingHandler(object):
"user_id": user_id, "user_id": user_id,
"typing": typing, "typing": typing,
}, },
key=(room_id, user_id),
)) ))
yield preserve_context_over_deferred( yield preserve_context_over_deferred(