diff --git a/synapse/storage/devices.py b/synapse/storage/devices.py index 77b02c8a28..83b1d2eeba 100644 --- a/synapse/storage/devices.py +++ b/synapse/storage/devices.py @@ -37,6 +37,10 @@ class DeviceStore(SQLBaseStore): max_entries=10000, ) + self._clock.looping_call( + self._prune_old_outbound_device_pokes, 60 * 60 * 1000 + ) + self.register_background_index_update( "device_lists_stream_idx", index_name="device_lists_stream_user_id", @@ -641,3 +645,44 @@ class DeviceStore(SQLBaseStore): def get_device_stream_token(self): return self._device_list_id_gen.get_current_token() + + def _prune_old_outbound_device_pokes(self): + """Delete old entries out of the device_lists_outbound_pokes to ensure + that we don't fill up due to dead servers. We keep one entry per + (destination, user_id) tuple to ensure that the prev_ids remain correct + if the server does come back. + """ + yesterday = self._clock.time_msec() - 24 * 60 * 60 * 1000 + + def _prune_txn(txn): + select_sql = """ + SELECT destination, user_id, max(stream_id) as stream_id + FROM device_lists_outbound_pokes + GROUP BY destination, user_id + HAVING min(ts) < ? AND count(*) > 1 + """ + + txn.execute(select_sql, (yesterday,)) + rows = txn.fetchall() + + if not rows: + return + + delete_sql = """ + DELETE FROM device_lists_outbound_pokes + WHERE ts < ? AND destination = ? AND user_id = ? AND stream_id < ? + """ + + txn.executemany( + delete_sql, + ( + (yesterday, row[0], row[1], row[2]) + for row in rows + ) + ) + + logger.info("Pruned %d device list outbound pokes", txn.rowcount) + + return self.runInteraction( + "_prune_old_outbound_device_pokes", _prune_txn + )