Improvements to the Limiter
* give them names, to improve logging * use a deque rather than a list for efficiency
This commit is contained in:
parent
d7275eecf3
commit
8462c26485
|
@ -427,7 +427,7 @@ class EventCreationHandler(object):
|
||||||
|
|
||||||
# We arbitrarily limit concurrent event creation for a room to 5.
|
# We arbitrarily limit concurrent event creation for a room to 5.
|
||||||
# This is to stop us from diverging history *too* much.
|
# This is to stop us from diverging history *too* much.
|
||||||
self.limiter = Limiter(max_count=5)
|
self.limiter = Limiter(max_count=5, name="room_event_creation_limit")
|
||||||
|
|
||||||
self.action_generator = hs.get_action_generator()
|
self.action_generator = hs.get_action_generator()
|
||||||
|
|
||||||
|
|
|
@ -1,5 +1,6 @@
|
||||||
# -*- coding: utf-8 -*-
|
# -*- coding: utf-8 -*-
|
||||||
# Copyright 2014-2016 OpenMarket Ltd
|
# Copyright 2014-2016 OpenMarket Ltd
|
||||||
|
# Copyright 2018 New Vector Ltd.
|
||||||
#
|
#
|
||||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
# you may not use this file except in compliance with the License.
|
# you may not use this file except in compliance with the License.
|
||||||
|
@ -12,7 +13,7 @@
|
||||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
# See the License for the specific language governing permissions and
|
# See the License for the specific language governing permissions and
|
||||||
# limitations under the License.
|
# limitations under the License.
|
||||||
|
import collections
|
||||||
import logging
|
import logging
|
||||||
from contextlib import contextmanager
|
from contextlib import contextmanager
|
||||||
|
|
||||||
|
@ -248,11 +249,16 @@ class Limiter(object):
|
||||||
# do some work.
|
# do some work.
|
||||||
|
|
||||||
"""
|
"""
|
||||||
def __init__(self, max_count, clock=None):
|
def __init__(self, max_count=1, name=None, clock=None):
|
||||||
"""
|
"""
|
||||||
Args:
|
Args:
|
||||||
max_count(int): The maximum number of concurrent access
|
max_count(int): The maximum number of concurrent accesses
|
||||||
"""
|
"""
|
||||||
|
if name is None:
|
||||||
|
self.name = id(self)
|
||||||
|
else:
|
||||||
|
self.name = name
|
||||||
|
|
||||||
if not clock:
|
if not clock:
|
||||||
from twisted.internet import reactor
|
from twisted.internet import reactor
|
||||||
clock = Clock(reactor)
|
clock = Clock(reactor)
|
||||||
|
@ -260,14 +266,14 @@ class Limiter(object):
|
||||||
self.max_count = max_count
|
self.max_count = max_count
|
||||||
|
|
||||||
# key_to_defer is a map from the key to a 2 element list where
|
# key_to_defer is a map from the key to a 2 element list where
|
||||||
# the first element is the number of things executing
|
# the first element is the number of things executing, and
|
||||||
# the second element is a list of deferreds for the things blocked from
|
# the second element is a deque of deferreds for the things blocked from
|
||||||
# executing.
|
# executing.
|
||||||
self.key_to_defer = {}
|
self.key_to_defer = {}
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def queue(self, key):
|
def queue(self, key):
|
||||||
entry = self.key_to_defer.setdefault(key, [0, []])
|
entry = self.key_to_defer.setdefault(key, [0, collections.deque()])
|
||||||
|
|
||||||
# If the number of things executing is greater than the maximum
|
# If the number of things executing is greater than the maximum
|
||||||
# then add a deferred to the list of blocked items
|
# then add a deferred to the list of blocked items
|
||||||
|
@ -277,10 +283,10 @@ class Limiter(object):
|
||||||
new_defer = defer.Deferred()
|
new_defer = defer.Deferred()
|
||||||
entry[1].append(new_defer)
|
entry[1].append(new_defer)
|
||||||
|
|
||||||
logger.info("Waiting to acquire limiter lock for key %r", key)
|
logger.info("Waiting to acquire limiter lock %r for key %r", self.name, key)
|
||||||
with PreserveLoggingContext():
|
yield make_deferred_yieldable(new_defer)
|
||||||
yield new_defer
|
|
||||||
logger.info("Acquired limiter lock for key %r", key)
|
logger.info("Acquired limiter lock %r for key %r", self.name, key)
|
||||||
entry[0] += 1
|
entry[0] += 1
|
||||||
|
|
||||||
# if the code holding the lock completes synchronously, then it
|
# if the code holding the lock completes synchronously, then it
|
||||||
|
@ -296,7 +302,7 @@ class Limiter(object):
|
||||||
yield self._clock.sleep(0)
|
yield self._clock.sleep(0)
|
||||||
|
|
||||||
else:
|
else:
|
||||||
logger.info("Acquired uncontended limiter lock for key %r", key)
|
logger.info("Acquired uncontended limiter lock %r for key %r", self.name, key)
|
||||||
entry[0] += 1
|
entry[0] += 1
|
||||||
|
|
||||||
@contextmanager
|
@contextmanager
|
||||||
|
@ -304,15 +310,16 @@ class Limiter(object):
|
||||||
try:
|
try:
|
||||||
yield
|
yield
|
||||||
finally:
|
finally:
|
||||||
logger.info("Releasing limiter lock for key %r", key)
|
logger.info("Releasing limiter lock %r for key %r", self.name, key)
|
||||||
|
|
||||||
# We've finished executing so check if there are any things
|
# We've finished executing so check if there are any things
|
||||||
# blocked waiting to execute and start one of them
|
# blocked waiting to execute and start one of them
|
||||||
entry[0] -= 1
|
entry[0] -= 1
|
||||||
|
|
||||||
if entry[1]:
|
if entry[1]:
|
||||||
next_def = entry[1].pop(0)
|
next_def = entry[1].popleft()
|
||||||
|
|
||||||
|
# we need to run the next thing in the sentinel context.
|
||||||
with PreserveLoggingContext():
|
with PreserveLoggingContext():
|
||||||
next_def.callback(None)
|
next_def.callback(None)
|
||||||
elif entry[0] == 0:
|
elif entry[0] == 0:
|
||||||
|
|
Loading…
Reference in New Issue