Deduplicate new deviceinbox rows for replication
This commit is contained in:
parent
a76886726b
commit
9f26d3b75b
|
@ -325,23 +325,26 @@ class DeviceInboxStore(BackgroundUpdateStore):
|
||||||
# we return.
|
# we return.
|
||||||
upper_pos = min(current_pos, last_pos + limit)
|
upper_pos = min(current_pos, last_pos + limit)
|
||||||
sql = (
|
sql = (
|
||||||
"SELECT stream_id, user_id"
|
"SELECT max(stream_id), user_id"
|
||||||
" FROM device_inbox"
|
" FROM device_inbox"
|
||||||
" WHERE ? < stream_id AND stream_id <= ?"
|
" WHERE ? < stream_id AND stream_id <= ?"
|
||||||
" ORDER BY stream_id ASC"
|
" GROUP BY user_id"
|
||||||
)
|
)
|
||||||
txn.execute(sql, (last_pos, upper_pos))
|
txn.execute(sql, (last_pos, upper_pos))
|
||||||
rows = txn.fetchall()
|
rows = txn.fetchall()
|
||||||
|
|
||||||
sql = (
|
sql = (
|
||||||
"SELECT stream_id, destination"
|
"SELECT max(stream_id), destination"
|
||||||
" FROM device_federation_outbox"
|
" FROM device_federation_outbox"
|
||||||
" WHERE ? < stream_id AND stream_id <= ?"
|
" WHERE ? < stream_id AND stream_id <= ?"
|
||||||
" ORDER BY stream_id ASC"
|
" GROUP BY destination"
|
||||||
)
|
)
|
||||||
txn.execute(sql, (last_pos, upper_pos))
|
txn.execute(sql, (last_pos, upper_pos))
|
||||||
rows.extend(txn)
|
rows.extend(txn)
|
||||||
|
|
||||||
|
# Order by ascending stream ordering
|
||||||
|
rows.sort()
|
||||||
|
|
||||||
return rows
|
return rows
|
||||||
|
|
||||||
return self.runInteraction(
|
return self.runInteraction(
|
||||||
|
|
Loading…
Reference in New Issue