Merge pull request #2234 from matrix-org/erikj/fix_push
Store ActionGenerator in HomeServer
This commit is contained in:
commit
99713dc7d3
|
@ -43,7 +43,6 @@ from synapse.events.utils import prune_event
|
||||||
|
|
||||||
from synapse.util.retryutils import NotRetryingDestination
|
from synapse.util.retryutils import NotRetryingDestination
|
||||||
|
|
||||||
from synapse.push.action_generator import ActionGenerator
|
|
||||||
from synapse.util.distributor import user_joined_room
|
from synapse.util.distributor import user_joined_room
|
||||||
|
|
||||||
from twisted.internet import defer
|
from twisted.internet import defer
|
||||||
|
@ -75,6 +74,7 @@ class FederationHandler(BaseHandler):
|
||||||
self.state_handler = hs.get_state_handler()
|
self.state_handler = hs.get_state_handler()
|
||||||
self.server_name = hs.hostname
|
self.server_name = hs.hostname
|
||||||
self.keyring = hs.get_keyring()
|
self.keyring = hs.get_keyring()
|
||||||
|
self.action_generator = hs.get_action_generator()
|
||||||
|
|
||||||
self.replication_layer.set_handler(self)
|
self.replication_layer.set_handler(self)
|
||||||
|
|
||||||
|
@ -1389,8 +1389,7 @@ class FederationHandler(BaseHandler):
|
||||||
)
|
)
|
||||||
|
|
||||||
if not event.internal_metadata.is_outlier():
|
if not event.internal_metadata.is_outlier():
|
||||||
action_generator = ActionGenerator(self.hs)
|
yield self.action_generator.handle_push_actions_for_event(
|
||||||
yield action_generator.handle_push_actions_for_event(
|
|
||||||
event, context
|
event, context
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
|
@ -20,7 +20,6 @@ from synapse.api.errors import AuthError, Codes, SynapseError
|
||||||
from synapse.crypto.event_signing import add_hashes_and_signatures
|
from synapse.crypto.event_signing import add_hashes_and_signatures
|
||||||
from synapse.events.utils import serialize_event
|
from synapse.events.utils import serialize_event
|
||||||
from synapse.events.validator import EventValidator
|
from synapse.events.validator import EventValidator
|
||||||
from synapse.push.action_generator import ActionGenerator
|
|
||||||
from synapse.types import (
|
from synapse.types import (
|
||||||
UserID, RoomAlias, RoomStreamToken,
|
UserID, RoomAlias, RoomStreamToken,
|
||||||
)
|
)
|
||||||
|
@ -54,7 +53,7 @@ class MessageHandler(BaseHandler):
|
||||||
# This is to stop us from diverging history *too* much.
|
# This is to stop us from diverging history *too* much.
|
||||||
self.limiter = Limiter(max_count=5)
|
self.limiter = Limiter(max_count=5)
|
||||||
|
|
||||||
self.action_generator = ActionGenerator(self.hs)
|
self.action_generator = hs.get_action_generator()
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def purge_history(self, room_id, event_id):
|
def purge_history(self, room_id, event_id):
|
||||||
|
|
|
@ -24,7 +24,7 @@ import logging
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
class ActionGenerator:
|
class ActionGenerator(object):
|
||||||
def __init__(self, hs):
|
def __init__(self, hs):
|
||||||
self.hs = hs
|
self.hs = hs
|
||||||
self.clock = hs.get_clock()
|
self.clock = hs.get_clock()
|
||||||
|
|
|
@ -24,6 +24,8 @@ from synapse.api.constants import EventTypes, Membership
|
||||||
from synapse.util.caches.descriptors import cached
|
from synapse.util.caches.descriptors import cached
|
||||||
from synapse.util.async import Linearizer
|
from synapse.util.async import Linearizer
|
||||||
|
|
||||||
|
from collections import namedtuple
|
||||||
|
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
@ -31,7 +33,7 @@ logger = logging.getLogger(__name__)
|
||||||
rules_by_room = {}
|
rules_by_room = {}
|
||||||
|
|
||||||
|
|
||||||
class BulkPushRuleEvaluator:
|
class BulkPushRuleEvaluator(object):
|
||||||
"""Calculates the outcome of push rules for an event for all users in the
|
"""Calculates the outcome of push rules for an event for all users in the
|
||||||
room at once.
|
room at once.
|
||||||
"""
|
"""
|
||||||
|
@ -204,12 +206,7 @@ class RulesForRoom(object):
|
||||||
# To get around this we pass a function that on invalidations looks ups
|
# To get around this we pass a function that on invalidations looks ups
|
||||||
# the RoomsForUser entry in the cache, rather than keeping a reference
|
# the RoomsForUser entry in the cache, rather than keeping a reference
|
||||||
# to self around in the callback.
|
# to self around in the callback.
|
||||||
def invalidate_all_cb():
|
self.invalidate_all_cb = _Invalidation(rules_for_room_cache, room_id)
|
||||||
rules = rules_for_room_cache.get(room_id, update_metrics=False)
|
|
||||||
if rules:
|
|
||||||
rules.invalidate_all()
|
|
||||||
|
|
||||||
self.invalidate_all_cb = invalidate_all_cb
|
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def get_rules(self, context):
|
def get_rules(self, context):
|
||||||
|
@ -347,3 +344,15 @@ class RulesForRoom(object):
|
||||||
self.member_map.update(members)
|
self.member_map.update(members)
|
||||||
self.rules_by_user = rules_by_user
|
self.rules_by_user = rules_by_user
|
||||||
self.state_group = state_group
|
self.state_group = state_group
|
||||||
|
|
||||||
|
|
||||||
|
class _Invalidation(namedtuple("_Invalidation", ("cache", "room_id"))):
|
||||||
|
# We rely on _CacheContext implementing __eq__ and __hash__ sensibly,
|
||||||
|
# which namedtuple does for us (i.e. two _CacheContext are the same if
|
||||||
|
# their caches and keys match). This is important in particular to
|
||||||
|
# dedupe when we add callbacks to lru cache nodes, otherwise the number
|
||||||
|
# of callbacks would grow.
|
||||||
|
def __call__(self):
|
||||||
|
rules = self.cache.get(self.room_id, None, update_metrics=False)
|
||||||
|
if rules:
|
||||||
|
rules.invalidate_all()
|
||||||
|
|
|
@ -52,6 +52,7 @@ from synapse.handlers.read_marker import ReadMarkerHandler
|
||||||
from synapse.http.client import SimpleHttpClient, InsecureInterceptableContextFactory
|
from synapse.http.client import SimpleHttpClient, InsecureInterceptableContextFactory
|
||||||
from synapse.http.matrixfederationclient import MatrixFederationHttpClient
|
from synapse.http.matrixfederationclient import MatrixFederationHttpClient
|
||||||
from synapse.notifier import Notifier
|
from synapse.notifier import Notifier
|
||||||
|
from synapse.push.action_generator import ActionGenerator
|
||||||
from synapse.push.pusherpool import PusherPool
|
from synapse.push.pusherpool import PusherPool
|
||||||
from synapse.rest.media.v1.media_repository import MediaRepository
|
from synapse.rest.media.v1.media_repository import MediaRepository
|
||||||
from synapse.state import StateHandler
|
from synapse.state import StateHandler
|
||||||
|
@ -135,6 +136,7 @@ class HomeServer(object):
|
||||||
'macaroon_generator',
|
'macaroon_generator',
|
||||||
'tcp_replication',
|
'tcp_replication',
|
||||||
'read_marker_handler',
|
'read_marker_handler',
|
||||||
|
'action_generator',
|
||||||
]
|
]
|
||||||
|
|
||||||
def __init__(self, hostname, **kwargs):
|
def __init__(self, hostname, **kwargs):
|
||||||
|
@ -299,6 +301,9 @@ class HomeServer(object):
|
||||||
def build_tcp_replication(self):
|
def build_tcp_replication(self):
|
||||||
raise NotImplementedError()
|
raise NotImplementedError()
|
||||||
|
|
||||||
|
def build_action_generator(self):
|
||||||
|
return ActionGenerator(self)
|
||||||
|
|
||||||
def remove_pusher(self, app_id, push_key, user_id):
|
def remove_pusher(self, app_id, push_key, user_id):
|
||||||
return self.get_pusherpool().remove_pusher(app_id, push_key, user_id)
|
return self.get_pusherpool().remove_pusher(app_id, push_key, user_id)
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue