2020-01-14 04:58:02 -07:00
|
|
|
# Copyright 2014-2016 OpenMarket Ltd
|
|
|
|
# Copyright 2020 The Matrix.org Foundation C.I.C.
|
|
|
|
#
|
|
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
|
|
# you may not use this file except in compliance with the License.
|
|
|
|
# You may obtain a copy of the License at
|
|
|
|
#
|
|
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
#
|
|
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
|
|
# See the License for the specific language governing permissions and
|
|
|
|
# limitations under the License.
|
2021-01-11 09:09:22 -07:00
|
|
|
import heapq
|
2020-01-14 04:58:02 -07:00
|
|
|
from itertools import islice
|
2021-01-11 09:09:22 -07:00
|
|
|
from typing import (
|
2023-05-24 22:22:24 -06:00
|
|
|
Callable,
|
2021-04-22 09:43:50 -06:00
|
|
|
Collection,
|
2021-01-11 09:09:22 -07:00
|
|
|
Dict,
|
|
|
|
Generator,
|
|
|
|
Iterable,
|
|
|
|
Iterator,
|
2023-05-24 22:22:24 -06:00
|
|
|
List,
|
2021-01-11 09:09:22 -07:00
|
|
|
Mapping,
|
|
|
|
Set,
|
Encode JSON responses on a thread in C, mk2 (#10905)
Currently we use `JsonEncoder.iterencode` to write JSON responses, which ensures that we don't block the main reactor thread when encoding huge objects. The downside to this is that `iterencode` falls back to using a pure Python encoder that is *much* less efficient and can easily burn a lot of CPU for huge responses. To fix this, while still ensuring we don't block the reactor loop, we encode the JSON on a threadpool using the standard `JsonEncoder.encode` functions, which is backed by a C library.
Doing so, however, requires `respond_with_json` to have access to the reactor, which it previously didn't. There are two ways of doing this:
1. threading through the reactor object, which is a bit fiddly as e.g. `DirectServeJsonResource` doesn't currently take a reactor, but is exposed to modules and so is a PITA to change; or
2. expose the reactor in `SynapseRequest`, which requires updating a bunch of servlet types.
I went with the latter as that is just a mechanical change, and I think makes sense as a request already has a reactor associated with it (via its http channel).
2021-09-28 03:37:58 -06:00
|
|
|
Sized,
|
2021-01-11 09:09:22 -07:00
|
|
|
Tuple,
|
|
|
|
TypeVar,
|
|
|
|
)
|
|
|
|
|
Encode JSON responses on a thread in C, mk2 (#10905)
Currently we use `JsonEncoder.iterencode` to write JSON responses, which ensures that we don't block the main reactor thread when encoding huge objects. The downside to this is that `iterencode` falls back to using a pure Python encoder that is *much* less efficient and can easily burn a lot of CPU for huge responses. To fix this, while still ensuring we don't block the reactor loop, we encode the JSON on a threadpool using the standard `JsonEncoder.encode` functions, which is backed by a C library.
Doing so, however, requires `respond_with_json` to have access to the reactor, which it previously didn't. There are two ways of doing this:
1. threading through the reactor object, which is a bit fiddly as e.g. `DirectServeJsonResource` doesn't currently take a reactor, but is exposed to modules and so is a PITA to change; or
2. expose the reactor in `SynapseRequest`, which requires updating a bunch of servlet types.
I went with the latter as that is just a mechanical change, and I think makes sense as a request already has a reactor associated with it (via its http channel).
2021-09-28 03:37:58 -06:00
|
|
|
from typing_extensions import Protocol
|
|
|
|
|
2020-01-14 04:58:02 -07:00
|
|
|
T = TypeVar("T")
|
Encode JSON responses on a thread in C, mk2 (#10905)
Currently we use `JsonEncoder.iterencode` to write JSON responses, which ensures that we don't block the main reactor thread when encoding huge objects. The downside to this is that `iterencode` falls back to using a pure Python encoder that is *much* less efficient and can easily burn a lot of CPU for huge responses. To fix this, while still ensuring we don't block the reactor loop, we encode the JSON on a threadpool using the standard `JsonEncoder.encode` functions, which is backed by a C library.
Doing so, however, requires `respond_with_json` to have access to the reactor, which it previously didn't. There are two ways of doing this:
1. threading through the reactor object, which is a bit fiddly as e.g. `DirectServeJsonResource` doesn't currently take a reactor, but is exposed to modules and so is a PITA to change; or
2. expose the reactor in `SynapseRequest`, which requires updating a bunch of servlet types.
I went with the latter as that is just a mechanical change, and I think makes sense as a request already has a reactor associated with it (via its http channel).
2021-09-28 03:37:58 -06:00
|
|
|
S = TypeVar("S", bound="_SelfSlice")
|
|
|
|
|
|
|
|
|
|
|
|
class _SelfSlice(Sized, Protocol):
|
|
|
|
"""A helper protocol that matches types where taking a slice results in the
|
|
|
|
same type being returned.
|
|
|
|
|
|
|
|
This is more specific than `Sequence`, which allows another `Sequence` to be
|
|
|
|
returned.
|
|
|
|
"""
|
|
|
|
|
|
|
|
def __getitem__(self: S, i: slice) -> S:
|
|
|
|
...
|
2020-01-14 04:58:02 -07:00
|
|
|
|
|
|
|
|
2021-05-24 13:32:01 -06:00
|
|
|
def batch_iter(iterable: Iterable[T], size: int) -> Iterator[Tuple[T, ...]]:
|
2020-01-14 04:58:02 -07:00
|
|
|
"""batch an iterable up into tuples with a maximum size
|
|
|
|
|
|
|
|
Args:
|
2021-05-24 13:32:01 -06:00
|
|
|
iterable: the iterable to slice
|
|
|
|
size: the maximum batch size
|
2020-01-14 04:58:02 -07:00
|
|
|
|
|
|
|
Returns:
|
|
|
|
an iterator over the chunks
|
|
|
|
"""
|
|
|
|
# make sure we can deal with iterables like lists too
|
|
|
|
sourceiter = iter(iterable)
|
|
|
|
# call islice until it returns an empty tuple
|
|
|
|
return iter(lambda: tuple(islice(sourceiter, size)), ())
|
2020-01-16 15:26:34 -07:00
|
|
|
|
|
|
|
|
Encode JSON responses on a thread in C, mk2 (#10905)
Currently we use `JsonEncoder.iterencode` to write JSON responses, which ensures that we don't block the main reactor thread when encoding huge objects. The downside to this is that `iterencode` falls back to using a pure Python encoder that is *much* less efficient and can easily burn a lot of CPU for huge responses. To fix this, while still ensuring we don't block the reactor loop, we encode the JSON on a threadpool using the standard `JsonEncoder.encode` functions, which is backed by a C library.
Doing so, however, requires `respond_with_json` to have access to the reactor, which it previously didn't. There are two ways of doing this:
1. threading through the reactor object, which is a bit fiddly as e.g. `DirectServeJsonResource` doesn't currently take a reactor, but is exposed to modules and so is a PITA to change; or
2. expose the reactor in `SynapseRequest`, which requires updating a bunch of servlet types.
I went with the latter as that is just a mechanical change, and I think makes sense as a request already has a reactor associated with it (via its http channel).
2021-09-28 03:37:58 -06:00
|
|
|
def chunk_seq(iseq: S, maxlen: int) -> Iterator[S]:
|
2020-01-16 15:26:34 -07:00
|
|
|
"""Split the given sequence into chunks of the given size
|
|
|
|
|
|
|
|
The last chunk may be shorter than the given size.
|
|
|
|
|
|
|
|
If the input is empty, no chunks are returned.
|
|
|
|
"""
|
|
|
|
return (iseq[i : i + maxlen] for i in range(0, len(iseq), maxlen))
|
2021-01-11 09:09:22 -07:00
|
|
|
|
|
|
|
|
2023-05-24 22:22:24 -06:00
|
|
|
def partition(
|
|
|
|
iterable: Iterable[T], predicate: Callable[[T], bool]
|
|
|
|
) -> Tuple[List[T], List[T]]:
|
|
|
|
"""
|
|
|
|
Separate a given iterable into two lists based on the result of a predicate function.
|
|
|
|
|
|
|
|
Args:
|
|
|
|
iterable: the iterable to partition (separate)
|
|
|
|
predicate: a function that takes an item from the iterable and returns a boolean
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
A tuple of two lists, the first containing all items for which the predicate
|
|
|
|
returned True, the second containing all items for which the predicate returned
|
|
|
|
False
|
|
|
|
"""
|
|
|
|
true_results = []
|
|
|
|
false_results = []
|
|
|
|
for item in iterable:
|
|
|
|
if predicate(item):
|
|
|
|
true_results.append(item)
|
|
|
|
else:
|
|
|
|
false_results.append(item)
|
|
|
|
return true_results, false_results
|
|
|
|
|
|
|
|
|
2021-01-11 09:09:22 -07:00
|
|
|
def sorted_topologically(
|
|
|
|
nodes: Iterable[T],
|
|
|
|
graph: Mapping[T, Collection[T]],
|
|
|
|
) -> Generator[T, None, None]:
|
|
|
|
"""Given a set of nodes and a graph, yield the nodes in toplogical order.
|
|
|
|
|
|
|
|
For example `sorted_topologically([1, 2], {1: [2]})` will yield `2, 1`.
|
|
|
|
"""
|
|
|
|
|
|
|
|
# This is implemented by Kahn's algorithm.
|
|
|
|
|
|
|
|
degree_map = {node: 0 for node in nodes}
|
2021-07-15 10:46:54 -06:00
|
|
|
reverse_graph: Dict[T, Set[T]] = {}
|
2021-01-11 09:09:22 -07:00
|
|
|
|
|
|
|
for node, edges in graph.items():
|
|
|
|
if node not in degree_map:
|
|
|
|
continue
|
|
|
|
|
2021-01-22 12:44:08 -07:00
|
|
|
for edge in set(edges):
|
2021-01-11 09:09:22 -07:00
|
|
|
if edge in degree_map:
|
|
|
|
degree_map[node] += 1
|
|
|
|
|
|
|
|
reverse_graph.setdefault(edge, set()).add(node)
|
|
|
|
reverse_graph.setdefault(node, set())
|
|
|
|
|
|
|
|
zero_degree = [node for node, degree in degree_map.items() if degree == 0]
|
|
|
|
heapq.heapify(zero_degree)
|
|
|
|
|
|
|
|
while zero_degree:
|
|
|
|
node = heapq.heappop(zero_degree)
|
|
|
|
yield node
|
|
|
|
|
2021-01-14 11:57:32 -07:00
|
|
|
for edge in reverse_graph.get(node, []):
|
2021-01-11 09:09:22 -07:00
|
|
|
if edge in degree_map:
|
|
|
|
degree_map[edge] -= 1
|
|
|
|
if degree_map[edge] == 0:
|
|
|
|
heapq.heappush(zero_degree, edge)
|
2023-11-16 07:25:35 -07:00
|
|
|
|
|
|
|
|
|
|
|
def sorted_topologically_batched(
|
|
|
|
nodes: Iterable[T],
|
|
|
|
graph: Mapping[T, Collection[T]],
|
|
|
|
) -> Generator[Collection[T], None, None]:
|
|
|
|
r"""Walk the graph topologically, returning batches of nodes where all nodes
|
|
|
|
that references it have been previously returned.
|
|
|
|
|
|
|
|
For example, given the following graph:
|
|
|
|
|
|
|
|
A
|
|
|
|
/ \
|
|
|
|
B C
|
|
|
|
\ /
|
|
|
|
D
|
|
|
|
|
|
|
|
This function will return: `[[A], [B, C], [D]]`.
|
|
|
|
|
|
|
|
This function is useful for e.g. batch persisting events in an auth chain,
|
|
|
|
where we can only persist an event if all its auth events have already been
|
|
|
|
persisted.
|
|
|
|
"""
|
|
|
|
|
|
|
|
degree_map = {node: 0 for node in nodes}
|
|
|
|
reverse_graph: Dict[T, Set[T]] = {}
|
|
|
|
|
|
|
|
for node, edges in graph.items():
|
|
|
|
if node not in degree_map:
|
|
|
|
continue
|
|
|
|
|
|
|
|
for edge in set(edges):
|
|
|
|
if edge in degree_map:
|
|
|
|
degree_map[node] += 1
|
|
|
|
|
|
|
|
reverse_graph.setdefault(edge, set()).add(node)
|
|
|
|
reverse_graph.setdefault(node, set())
|
|
|
|
|
|
|
|
zero_degree = [node for node, degree in degree_map.items() if degree == 0]
|
|
|
|
|
|
|
|
while zero_degree:
|
|
|
|
new_zero_degree = []
|
|
|
|
for node in zero_degree:
|
|
|
|
for edge in reverse_graph.get(node, []):
|
|
|
|
if edge in degree_map:
|
|
|
|
degree_map[edge] -= 1
|
|
|
|
if degree_map[edge] == 0:
|
|
|
|
new_zero_degree.append(edge)
|
|
|
|
|
|
|
|
yield zero_degree
|
|
|
|
zero_degree = new_zero_degree
|