Fix edge case where a `Linearizer` could get stuck (#12358)
Just after a task acquires a contended `Linearizer` lock, it sleeps. If the task is cancelled during this sleep, we need to release the lock. Signed-off-by: Sean Quah <seanq@element.io>
This commit is contained in:
parent
31c1209c50
commit
79e7c2c426
|
@ -0,0 +1 @@
|
||||||
|
Fix a long-standing bug where `Linearizer`s could get stuck if a cancellation were to happen at the wrong time.
|
|
@ -453,7 +453,11 @@ class Linearizer:
|
||||||
#
|
#
|
||||||
# This needs to happen while we hold the lock. We could put it on the
|
# This needs to happen while we hold the lock. We could put it on the
|
||||||
# exit path, but that would slow down the uncontended case.
|
# exit path, but that would slow down the uncontended case.
|
||||||
await self._clock.sleep(0)
|
try:
|
||||||
|
await self._clock.sleep(0)
|
||||||
|
except CancelledError:
|
||||||
|
self._release_lock(key, entry)
|
||||||
|
raise
|
||||||
|
|
||||||
return entry
|
return entry
|
||||||
|
|
||||||
|
|
|
@ -13,7 +13,9 @@
|
||||||
# See the License for the specific language governing permissions and
|
# See the License for the specific language governing permissions and
|
||||||
# limitations under the License.
|
# limitations under the License.
|
||||||
|
|
||||||
from typing import Callable, Hashable, Tuple
|
from typing import Hashable, Tuple
|
||||||
|
|
||||||
|
from typing_extensions import Protocol
|
||||||
|
|
||||||
from twisted.internet import defer, reactor
|
from twisted.internet import defer, reactor
|
||||||
from twisted.internet.base import ReactorBase
|
from twisted.internet.base import ReactorBase
|
||||||
|
@ -25,10 +27,15 @@ from synapse.util.async_helpers import Linearizer
|
||||||
from tests import unittest
|
from tests import unittest
|
||||||
|
|
||||||
|
|
||||||
|
class UnblockFunction(Protocol):
|
||||||
|
def __call__(self, pump_reactor: bool = True) -> None:
|
||||||
|
...
|
||||||
|
|
||||||
|
|
||||||
class LinearizerTestCase(unittest.TestCase):
|
class LinearizerTestCase(unittest.TestCase):
|
||||||
def _start_task(
|
def _start_task(
|
||||||
self, linearizer: Linearizer, key: Hashable
|
self, linearizer: Linearizer, key: Hashable
|
||||||
) -> Tuple["Deferred[None]", "Deferred[None]", Callable[[], None]]:
|
) -> Tuple["Deferred[None]", "Deferred[None]", UnblockFunction]:
|
||||||
"""Starts a task which acquires the linearizer lock, blocks, then completes.
|
"""Starts a task which acquires the linearizer lock, blocks, then completes.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
|
@ -52,11 +59,12 @@ class LinearizerTestCase(unittest.TestCase):
|
||||||
|
|
||||||
d = defer.ensureDeferred(task())
|
d = defer.ensureDeferred(task())
|
||||||
|
|
||||||
def unblock() -> None:
|
def unblock(pump_reactor: bool = True) -> None:
|
||||||
unblock_d.callback(None)
|
unblock_d.callback(None)
|
||||||
# The next task, if it exists, will acquire the lock and require a kick of
|
# The next task, if it exists, will acquire the lock and require a kick of
|
||||||
# the reactor to advance.
|
# the reactor to advance.
|
||||||
self._pump()
|
if pump_reactor:
|
||||||
|
self._pump()
|
||||||
|
|
||||||
return d, acquired_d, unblock
|
return d, acquired_d, unblock
|
||||||
|
|
||||||
|
@ -212,3 +220,38 @@ class LinearizerTestCase(unittest.TestCase):
|
||||||
)
|
)
|
||||||
unblock3()
|
unblock3()
|
||||||
self.successResultOf(d3)
|
self.successResultOf(d3)
|
||||||
|
|
||||||
|
def test_cancellation_during_sleep(self) -> None:
|
||||||
|
"""Tests cancellation during the sleep just after waiting for a `Linearizer`."""
|
||||||
|
linearizer = Linearizer()
|
||||||
|
|
||||||
|
key = object()
|
||||||
|
|
||||||
|
d1, acquired_d1, unblock1 = self._start_task(linearizer, key)
|
||||||
|
self.assertTrue(acquired_d1.called)
|
||||||
|
|
||||||
|
# Create a second task, waiting for the first task.
|
||||||
|
d2, acquired_d2, _ = self._start_task(linearizer, key)
|
||||||
|
self.assertFalse(acquired_d2.called)
|
||||||
|
|
||||||
|
# Create a third task, waiting for the second task.
|
||||||
|
d3, acquired_d3, unblock3 = self._start_task(linearizer, key)
|
||||||
|
self.assertFalse(acquired_d3.called)
|
||||||
|
|
||||||
|
# Once the first task completes, cancel the waiting second task while it is
|
||||||
|
# sleeping just after acquiring the lock.
|
||||||
|
unblock1(pump_reactor=False)
|
||||||
|
self.successResultOf(d1)
|
||||||
|
d2.cancel()
|
||||||
|
self._pump()
|
||||||
|
|
||||||
|
self.assertTrue(d2.called)
|
||||||
|
self.failureResultOf(d2, CancelledError)
|
||||||
|
|
||||||
|
# The third task should continue running.
|
||||||
|
self.assertTrue(
|
||||||
|
acquired_d3.called,
|
||||||
|
"Third task did not get the lock after the second task was cancelled",
|
||||||
|
)
|
||||||
|
unblock3()
|
||||||
|
self.successResultOf(d3)
|
||||||
|
|
Loading…
Reference in New Issue