Ignore key requests if the device inbox is already big

This commit is contained in:
Mathieu Velten 2023-06-20 14:22:16 +02:00 committed by Mathieu Velten
parent 2301a09d7a
commit 5047c01d3f
3 changed files with 47 additions and 9 deletions

1
changelog.d/15808.misc Normal file
View File

@ -0,0 +1 @@
Ignore key request if the device inbox is already big.

View File

@ -39,6 +39,9 @@ if TYPE_CHECKING:
logger = logging.getLogger(__name__)
INBOX_SIZE_LIMIT_FOR_KEY_REQUEST = 100
class DeviceMessageHandler:
def __init__(self, hs: "HomeServer"):
"""
@ -166,7 +169,7 @@ class DeviceMessageHandler:
found marks the remote cache for the user as stale.
"""
if message_type != "m.room_key_request":
if message_type != ToDeviceEventTypes.RoomKeyRequest:
return
# Get the sending device IDs
@ -286,10 +289,16 @@ class DeviceMessageHandler:
"org.matrix.opentracing_context": json_encoder.encode(context),
}
device_inbox_size_limit = None
if message_type == ToDeviceEventTypes.RoomKeyRequest and self.is_mine(
UserID.from_string(user_id)
):
device_inbox_size_limit = INBOX_SIZE_LIMIT_FOR_KEY_REQUEST
# Add messages to the database.
# Retrieve the stream id of the last-processed to-device message.
last_stream_id = await self.store.add_messages_to_device_inbox(
local_messages, remote_edu_contents
local_messages, remote_edu_contents, device_inbox_size_limit
)
# Notify listeners that there are new to-device messages to process,

View File

@ -650,6 +650,7 @@ class DeviceInboxWorkerStore(SQLBaseStore):
self,
local_messages_by_user_then_device: Dict[str, Dict[str, JsonDict]],
remote_messages_by_destination: Dict[str, JsonDict],
size_limit: Optional[int] = None,
) -> int:
"""Used to send messages from this server.
@ -666,11 +667,14 @@ class DeviceInboxWorkerStore(SQLBaseStore):
assert self._can_write_to_device
def add_messages_txn(
txn: LoggingTransaction, now_ms: int, stream_id: int
txn: LoggingTransaction,
now_ms: int,
stream_id: int,
size_limit: Optional[int],
) -> None:
# Add the local messages directly to the local inbox.
self._add_messages_to_local_device_inbox_txn(
txn, stream_id, local_messages_by_user_then_device
txn, stream_id, local_messages_by_user_then_device, size_limit
)
# Add the remote messages to the federation outbox.
@ -731,7 +735,11 @@ class DeviceInboxWorkerStore(SQLBaseStore):
async with self._device_inbox_id_gen.get_next() as stream_id:
now_ms = self._clock.time_msec()
await self.db_pool.runInteraction(
"add_messages_to_device_inbox", add_messages_txn, now_ms, stream_id
"add_messages_to_device_inbox",
add_messages_txn,
now_ms,
stream_id,
size_limit,
)
for user_id in local_messages_by_user_then_device.keys():
self._device_inbox_stream_cache.entity_has_changed(user_id, stream_id)
@ -802,11 +810,23 @@ class DeviceInboxWorkerStore(SQLBaseStore):
txn: LoggingTransaction,
stream_id: int,
messages_by_user_then_device: Dict[str, Dict[str, JsonDict]],
size_limit: Optional[int] = None,
) -> None:
assert self._can_write_to_device
local_by_user_then_device = {}
for user_id, messages_by_device in messages_by_user_then_device.items():
inboxes_size = {}
if size_limit:
sql = """
SELECT device_id, COUNT(*) FROM device_inbox
WHERE user_id = ?
GROUP BY device_id
"""
txn.execute(sql, (user_id,))
for r in txn:
inboxes_size[r[0]] = r[1]
messages_json_for_user = {}
devices = list(messages_by_device.keys())
if len(devices) == 1 and devices[0] == "*":
@ -822,6 +842,10 @@ class DeviceInboxWorkerStore(SQLBaseStore):
message_json = json_encoder.encode(messages_by_device["*"])
for device_id in devices:
if (
size_limit is None
or inboxes_size.get(device_id, 0) <= size_limit
):
# Add the message for all devices for this user on this
# server.
messages_json_for_user[device_id] = message_json
@ -857,6 +881,10 @@ class DeviceInboxWorkerStore(SQLBaseStore):
)
message_json = json_encoder.encode(msg)
if (
size_limit is None
or inboxes_size.get(device_id, 0) <= size_limit
):
messages_json_for_user[device_id] = message_json
if messages_json_for_user: