Move event_reports to `RoomWorkerStore` (#15165)
This commit is contained in:
parent
916b8061d2
commit
65f10afb64
|
@ -0,0 +1 @@
|
|||
Move `get_event_report` and `get_event_reports_paginate` from `RoomStore` to `RoomWorkerStore`.
|
|
@ -1417,6 +1417,183 @@ class RoomWorkerStore(CacheInvalidationWorkerStore):
|
|||
get_un_partial_stated_rooms_from_stream_txn,
|
||||
)
|
||||
|
||||
async def get_event_report(self, report_id: int) -> Optional[Dict[str, Any]]:
|
||||
"""Retrieve an event report
|
||||
|
||||
Args:
|
||||
report_id: ID of reported event in database
|
||||
Returns:
|
||||
JSON dict of information from an event report or None if the
|
||||
report does not exist.
|
||||
"""
|
||||
|
||||
def _get_event_report_txn(
|
||||
txn: LoggingTransaction, report_id: int
|
||||
) -> Optional[Dict[str, Any]]:
|
||||
sql = """
|
||||
SELECT
|
||||
er.id,
|
||||
er.received_ts,
|
||||
er.room_id,
|
||||
er.event_id,
|
||||
er.user_id,
|
||||
er.content,
|
||||
events.sender,
|
||||
room_stats_state.canonical_alias,
|
||||
room_stats_state.name,
|
||||
event_json.json AS event_json
|
||||
FROM event_reports AS er
|
||||
LEFT JOIN events
|
||||
ON events.event_id = er.event_id
|
||||
JOIN event_json
|
||||
ON event_json.event_id = er.event_id
|
||||
JOIN room_stats_state
|
||||
ON room_stats_state.room_id = er.room_id
|
||||
WHERE er.id = ?
|
||||
"""
|
||||
|
||||
txn.execute(sql, [report_id])
|
||||
row = txn.fetchone()
|
||||
|
||||
if not row:
|
||||
return None
|
||||
|
||||
event_report = {
|
||||
"id": row[0],
|
||||
"received_ts": row[1],
|
||||
"room_id": row[2],
|
||||
"event_id": row[3],
|
||||
"user_id": row[4],
|
||||
"score": db_to_json(row[5]).get("score"),
|
||||
"reason": db_to_json(row[5]).get("reason"),
|
||||
"sender": row[6],
|
||||
"canonical_alias": row[7],
|
||||
"name": row[8],
|
||||
"event_json": db_to_json(row[9]),
|
||||
}
|
||||
|
||||
return event_report
|
||||
|
||||
return await self.db_pool.runInteraction(
|
||||
"get_event_report", _get_event_report_txn, report_id
|
||||
)
|
||||
|
||||
async def get_event_reports_paginate(
|
||||
self,
|
||||
start: int,
|
||||
limit: int,
|
||||
direction: Direction = Direction.BACKWARDS,
|
||||
user_id: Optional[str] = None,
|
||||
room_id: Optional[str] = None,
|
||||
) -> Tuple[List[Dict[str, Any]], int]:
|
||||
"""Retrieve a paginated list of event reports
|
||||
|
||||
Args:
|
||||
start: event offset to begin the query from
|
||||
limit: number of rows to retrieve
|
||||
direction: Whether to fetch the most recent first (backwards) or the
|
||||
oldest first (forwards)
|
||||
user_id: search for user_id. Ignored if user_id is None
|
||||
room_id: search for room_id. Ignored if room_id is None
|
||||
Returns:
|
||||
Tuple of:
|
||||
json list of event reports
|
||||
total number of event reports matching the filter criteria
|
||||
"""
|
||||
|
||||
def _get_event_reports_paginate_txn(
|
||||
txn: LoggingTransaction,
|
||||
) -> Tuple[List[Dict[str, Any]], int]:
|
||||
filters = []
|
||||
args: List[object] = []
|
||||
|
||||
if user_id:
|
||||
filters.append("er.user_id LIKE ?")
|
||||
args.extend(["%" + user_id + "%"])
|
||||
if room_id:
|
||||
filters.append("er.room_id LIKE ?")
|
||||
args.extend(["%" + room_id + "%"])
|
||||
|
||||
if direction == Direction.BACKWARDS:
|
||||
order = "DESC"
|
||||
else:
|
||||
order = "ASC"
|
||||
|
||||
where_clause = "WHERE " + " AND ".join(filters) if len(filters) > 0 else ""
|
||||
|
||||
# We join on room_stats_state despite not using any columns from it
|
||||
# because the join can influence the number of rows returned;
|
||||
# e.g. a room that doesn't have state, maybe because it was deleted.
|
||||
# The query returning the total count should be consistent with
|
||||
# the query returning the results.
|
||||
sql = """
|
||||
SELECT COUNT(*) as total_event_reports
|
||||
FROM event_reports AS er
|
||||
JOIN room_stats_state ON room_stats_state.room_id = er.room_id
|
||||
{}
|
||||
""".format(
|
||||
where_clause
|
||||
)
|
||||
txn.execute(sql, args)
|
||||
count = cast(Tuple[int], txn.fetchone())[0]
|
||||
|
||||
sql = """
|
||||
SELECT
|
||||
er.id,
|
||||
er.received_ts,
|
||||
er.room_id,
|
||||
er.event_id,
|
||||
er.user_id,
|
||||
er.content,
|
||||
events.sender,
|
||||
room_stats_state.canonical_alias,
|
||||
room_stats_state.name
|
||||
FROM event_reports AS er
|
||||
LEFT JOIN events
|
||||
ON events.event_id = er.event_id
|
||||
JOIN room_stats_state
|
||||
ON room_stats_state.room_id = er.room_id
|
||||
{where_clause}
|
||||
ORDER BY er.received_ts {order}
|
||||
LIMIT ?
|
||||
OFFSET ?
|
||||
""".format(
|
||||
where_clause=where_clause,
|
||||
order=order,
|
||||
)
|
||||
|
||||
args += [limit, start]
|
||||
txn.execute(sql, args)
|
||||
|
||||
event_reports = []
|
||||
for row in txn:
|
||||
try:
|
||||
s = db_to_json(row[5]).get("score")
|
||||
r = db_to_json(row[5]).get("reason")
|
||||
except Exception:
|
||||
logger.error("Unable to parse json from event_reports: %s", row[0])
|
||||
continue
|
||||
event_reports.append(
|
||||
{
|
||||
"id": row[0],
|
||||
"received_ts": row[1],
|
||||
"room_id": row[2],
|
||||
"event_id": row[3],
|
||||
"user_id": row[4],
|
||||
"score": s,
|
||||
"reason": r,
|
||||
"sender": row[6],
|
||||
"canonical_alias": row[7],
|
||||
"name": row[8],
|
||||
}
|
||||
)
|
||||
|
||||
return event_reports, count
|
||||
|
||||
return await self.db_pool.runInteraction(
|
||||
"get_event_reports_paginate", _get_event_reports_paginate_txn
|
||||
)
|
||||
|
||||
async def delete_event_report(self, report_id: int) -> bool:
|
||||
"""Remove an event report from database.
|
||||
|
||||
|
@ -2189,183 +2366,6 @@ class RoomStore(RoomBackgroundUpdateStore, RoomWorkerStore):
|
|||
)
|
||||
return next_id
|
||||
|
||||
async def get_event_report(self, report_id: int) -> Optional[Dict[str, Any]]:
|
||||
"""Retrieve an event report
|
||||
|
||||
Args:
|
||||
report_id: ID of reported event in database
|
||||
Returns:
|
||||
JSON dict of information from an event report or None if the
|
||||
report does not exist.
|
||||
"""
|
||||
|
||||
def _get_event_report_txn(
|
||||
txn: LoggingTransaction, report_id: int
|
||||
) -> Optional[Dict[str, Any]]:
|
||||
sql = """
|
||||
SELECT
|
||||
er.id,
|
||||
er.received_ts,
|
||||
er.room_id,
|
||||
er.event_id,
|
||||
er.user_id,
|
||||
er.content,
|
||||
events.sender,
|
||||
room_stats_state.canonical_alias,
|
||||
room_stats_state.name,
|
||||
event_json.json AS event_json
|
||||
FROM event_reports AS er
|
||||
LEFT JOIN events
|
||||
ON events.event_id = er.event_id
|
||||
JOIN event_json
|
||||
ON event_json.event_id = er.event_id
|
||||
JOIN room_stats_state
|
||||
ON room_stats_state.room_id = er.room_id
|
||||
WHERE er.id = ?
|
||||
"""
|
||||
|
||||
txn.execute(sql, [report_id])
|
||||
row = txn.fetchone()
|
||||
|
||||
if not row:
|
||||
return None
|
||||
|
||||
event_report = {
|
||||
"id": row[0],
|
||||
"received_ts": row[1],
|
||||
"room_id": row[2],
|
||||
"event_id": row[3],
|
||||
"user_id": row[4],
|
||||
"score": db_to_json(row[5]).get("score"),
|
||||
"reason": db_to_json(row[5]).get("reason"),
|
||||
"sender": row[6],
|
||||
"canonical_alias": row[7],
|
||||
"name": row[8],
|
||||
"event_json": db_to_json(row[9]),
|
||||
}
|
||||
|
||||
return event_report
|
||||
|
||||
return await self.db_pool.runInteraction(
|
||||
"get_event_report", _get_event_report_txn, report_id
|
||||
)
|
||||
|
||||
async def get_event_reports_paginate(
|
||||
self,
|
||||
start: int,
|
||||
limit: int,
|
||||
direction: Direction = Direction.BACKWARDS,
|
||||
user_id: Optional[str] = None,
|
||||
room_id: Optional[str] = None,
|
||||
) -> Tuple[List[Dict[str, Any]], int]:
|
||||
"""Retrieve a paginated list of event reports
|
||||
|
||||
Args:
|
||||
start: event offset to begin the query from
|
||||
limit: number of rows to retrieve
|
||||
direction: Whether to fetch the most recent first (backwards) or the
|
||||
oldest first (forwards)
|
||||
user_id: search for user_id. Ignored if user_id is None
|
||||
room_id: search for room_id. Ignored if room_id is None
|
||||
Returns:
|
||||
Tuple of:
|
||||
json list of event reports
|
||||
total number of event reports matching the filter criteria
|
||||
"""
|
||||
|
||||
def _get_event_reports_paginate_txn(
|
||||
txn: LoggingTransaction,
|
||||
) -> Tuple[List[Dict[str, Any]], int]:
|
||||
filters = []
|
||||
args: List[object] = []
|
||||
|
||||
if user_id:
|
||||
filters.append("er.user_id LIKE ?")
|
||||
args.extend(["%" + user_id + "%"])
|
||||
if room_id:
|
||||
filters.append("er.room_id LIKE ?")
|
||||
args.extend(["%" + room_id + "%"])
|
||||
|
||||
if direction == Direction.BACKWARDS:
|
||||
order = "DESC"
|
||||
else:
|
||||
order = "ASC"
|
||||
|
||||
where_clause = "WHERE " + " AND ".join(filters) if len(filters) > 0 else ""
|
||||
|
||||
# We join on room_stats_state despite not using any columns from it
|
||||
# because the join can influence the number of rows returned;
|
||||
# e.g. a room that doesn't have state, maybe because it was deleted.
|
||||
# The query returning the total count should be consistent with
|
||||
# the query returning the results.
|
||||
sql = """
|
||||
SELECT COUNT(*) as total_event_reports
|
||||
FROM event_reports AS er
|
||||
JOIN room_stats_state ON room_stats_state.room_id = er.room_id
|
||||
{}
|
||||
""".format(
|
||||
where_clause
|
||||
)
|
||||
txn.execute(sql, args)
|
||||
count = cast(Tuple[int], txn.fetchone())[0]
|
||||
|
||||
sql = """
|
||||
SELECT
|
||||
er.id,
|
||||
er.received_ts,
|
||||
er.room_id,
|
||||
er.event_id,
|
||||
er.user_id,
|
||||
er.content,
|
||||
events.sender,
|
||||
room_stats_state.canonical_alias,
|
||||
room_stats_state.name
|
||||
FROM event_reports AS er
|
||||
LEFT JOIN events
|
||||
ON events.event_id = er.event_id
|
||||
JOIN room_stats_state
|
||||
ON room_stats_state.room_id = er.room_id
|
||||
{where_clause}
|
||||
ORDER BY er.received_ts {order}
|
||||
LIMIT ?
|
||||
OFFSET ?
|
||||
""".format(
|
||||
where_clause=where_clause,
|
||||
order=order,
|
||||
)
|
||||
|
||||
args += [limit, start]
|
||||
txn.execute(sql, args)
|
||||
|
||||
event_reports = []
|
||||
for row in txn:
|
||||
try:
|
||||
s = db_to_json(row[5]).get("score")
|
||||
r = db_to_json(row[5]).get("reason")
|
||||
except Exception:
|
||||
logger.error("Unable to parse json from event_reports: %s", row[0])
|
||||
continue
|
||||
event_reports.append(
|
||||
{
|
||||
"id": row[0],
|
||||
"received_ts": row[1],
|
||||
"room_id": row[2],
|
||||
"event_id": row[3],
|
||||
"user_id": row[4],
|
||||
"score": s,
|
||||
"reason": r,
|
||||
"sender": row[6],
|
||||
"canonical_alias": row[7],
|
||||
"name": row[8],
|
||||
}
|
||||
)
|
||||
|
||||
return event_reports, count
|
||||
|
||||
return await self.db_pool.runInteraction(
|
||||
"get_event_reports_paginate", _get_event_reports_paginate_txn
|
||||
)
|
||||
|
||||
async def block_room(self, room_id: str, user_id: str) -> None:
|
||||
"""Marks the room as blocked.
|
||||
|
||||
|
|
Loading…
Reference in New Issue