Shuffle operations so that locking upsert happens last in the txn. This ensures the lock is held for the least amount of time possible.
This commit is contained in:
parent
e4c4664d73
commit
2732be83d9
|
@ -466,6 +466,9 @@ class SQLBaseStore(object):
|
||||||
)
|
)
|
||||||
|
|
||||||
def _simple_upsert_txn(self, txn, table, keyvalues, values, insertion_values={}):
|
def _simple_upsert_txn(self, txn, table, keyvalues, values, insertion_values={}):
|
||||||
|
# We need to lock the table :(
|
||||||
|
self.database_engine.lock_table(txn, table)
|
||||||
|
|
||||||
# Try to update
|
# Try to update
|
||||||
sql = "UPDATE %s SET %s WHERE %s" % (
|
sql = "UPDATE %s SET %s WHERE %s" % (
|
||||||
table,
|
table,
|
||||||
|
|
|
@ -42,3 +42,6 @@ class PostgresEngine(object):
|
||||||
|
|
||||||
def is_connection_closed(self, conn):
|
def is_connection_closed(self, conn):
|
||||||
return bool(conn)
|
return bool(conn)
|
||||||
|
|
||||||
|
def lock_table(self, txn, table):
|
||||||
|
txn.execute("LOCK TABLE %s in EXCLUSIVE MODE" % (table,))
|
||||||
|
|
|
@ -38,3 +38,6 @@ class Sqlite3Engine(object):
|
||||||
|
|
||||||
def is_connection_closed(self, conn):
|
def is_connection_closed(self, conn):
|
||||||
return False
|
return False
|
||||||
|
|
||||||
|
def lock_table(self, txn, table):
|
||||||
|
return
|
||||||
|
|
|
@ -283,50 +283,6 @@ class EventsStore(SQLBaseStore):
|
||||||
if context.rejected:
|
if context.rejected:
|
||||||
self._store_rejections_txn(txn, event.event_id, context.rejected)
|
self._store_rejections_txn(txn, event.event_id, context.rejected)
|
||||||
|
|
||||||
if event.is_state():
|
|
||||||
vals = {
|
|
||||||
"event_id": event.event_id,
|
|
||||||
"room_id": event.room_id,
|
|
||||||
"type": event.type,
|
|
||||||
"state_key": event.state_key,
|
|
||||||
}
|
|
||||||
|
|
||||||
# TODO: How does this work with backfilling?
|
|
||||||
if hasattr(event, "replaces_state"):
|
|
||||||
vals["prev_state"] = event.replaces_state
|
|
||||||
|
|
||||||
self._simple_insert_txn(
|
|
||||||
txn,
|
|
||||||
"state_events",
|
|
||||||
vals,
|
|
||||||
)
|
|
||||||
|
|
||||||
if is_new_state and not context.rejected:
|
|
||||||
self._simple_upsert_txn(
|
|
||||||
txn,
|
|
||||||
"current_state_events",
|
|
||||||
keyvalues={
|
|
||||||
"room_id": event.room_id,
|
|
||||||
"type": event.type,
|
|
||||||
"state_key": event.state_key,
|
|
||||||
},
|
|
||||||
values={
|
|
||||||
"event_id": event.event_id,
|
|
||||||
}
|
|
||||||
)
|
|
||||||
|
|
||||||
for e_id, h in event.prev_state:
|
|
||||||
self._simple_insert_txn(
|
|
||||||
txn,
|
|
||||||
table="event_edges",
|
|
||||||
values={
|
|
||||||
"event_id": event.event_id,
|
|
||||||
"prev_event_id": e_id,
|
|
||||||
"room_id": event.room_id,
|
|
||||||
"is_state": True,
|
|
||||||
},
|
|
||||||
)
|
|
||||||
|
|
||||||
for hash_alg, hash_base64 in event.hashes.items():
|
for hash_alg, hash_base64 in event.hashes.items():
|
||||||
hash_bytes = decode_base64(hash_base64)
|
hash_bytes = decode_base64(hash_base64)
|
||||||
self._store_event_content_hash_txn(
|
self._store_event_content_hash_txn(
|
||||||
|
@ -356,6 +312,50 @@ class EventsStore(SQLBaseStore):
|
||||||
txn, event.event_id, ref_alg, ref_hash_bytes
|
txn, event.event_id, ref_alg, ref_hash_bytes
|
||||||
)
|
)
|
||||||
|
|
||||||
|
if event.is_state():
|
||||||
|
vals = {
|
||||||
|
"event_id": event.event_id,
|
||||||
|
"room_id": event.room_id,
|
||||||
|
"type": event.type,
|
||||||
|
"state_key": event.state_key,
|
||||||
|
}
|
||||||
|
|
||||||
|
# TODO: How does this work with backfilling?
|
||||||
|
if hasattr(event, "replaces_state"):
|
||||||
|
vals["prev_state"] = event.replaces_state
|
||||||
|
|
||||||
|
self._simple_insert_txn(
|
||||||
|
txn,
|
||||||
|
"state_events",
|
||||||
|
vals,
|
||||||
|
)
|
||||||
|
|
||||||
|
for e_id, h in event.prev_state:
|
||||||
|
self._simple_insert_txn(
|
||||||
|
txn,
|
||||||
|
table="event_edges",
|
||||||
|
values={
|
||||||
|
"event_id": event.event_id,
|
||||||
|
"prev_event_id": e_id,
|
||||||
|
"room_id": event.room_id,
|
||||||
|
"is_state": True,
|
||||||
|
},
|
||||||
|
)
|
||||||
|
|
||||||
|
if is_new_state and not context.rejected:
|
||||||
|
self._simple_upsert_txn(
|
||||||
|
txn,
|
||||||
|
"current_state_events",
|
||||||
|
keyvalues={
|
||||||
|
"room_id": event.room_id,
|
||||||
|
"type": event.type,
|
||||||
|
"state_key": event.state_key,
|
||||||
|
},
|
||||||
|
values={
|
||||||
|
"event_id": event.event_id,
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
def _store_redaction(self, txn, event):
|
def _store_redaction(self, txn, event):
|
||||||
# invalidate the cache for the redacted event
|
# invalidate the cache for the redacted event
|
||||||
self._invalidate_get_event_cache(event.redacts)
|
self._invalidate_get_event_cache(event.redacts)
|
||||||
|
|
|
@ -76,25 +76,16 @@ class TransactionStore(SQLBaseStore):
|
||||||
response_json (str)
|
response_json (str)
|
||||||
"""
|
"""
|
||||||
|
|
||||||
return self.runInteraction(
|
return self._simple_insert(
|
||||||
"set_received_txn_response",
|
|
||||||
self._set_received_txn_response,
|
|
||||||
transaction_id, origin, code, response_dict
|
|
||||||
)
|
|
||||||
|
|
||||||
def _set_received_txn_response(self, txn, transaction_id, origin, code,
|
|
||||||
response_json):
|
|
||||||
self._simple_upsert_txn(
|
|
||||||
txn,
|
|
||||||
table=ReceivedTransactionsTable.table_name,
|
table=ReceivedTransactionsTable.table_name,
|
||||||
keyvalues={
|
values={
|
||||||
"transaction_id": transaction_id,
|
"transaction_id": transaction_id,
|
||||||
"origin": origin,
|
"origin": origin,
|
||||||
},
|
|
||||||
values={
|
|
||||||
"response_code": code,
|
"response_code": code,
|
||||||
"response_json": response_json,
|
"response_json": response_dict,
|
||||||
}
|
},
|
||||||
|
or_ignore=True,
|
||||||
|
desc="set_received_txn_response",
|
||||||
)
|
)
|
||||||
|
|
||||||
def prep_send_transaction(self, transaction_id, destination,
|
def prep_send_transaction(self, transaction_id, destination,
|
||||||
|
|
Loading…
Reference in New Issue