Add a .runInteraction() method on SQLBaseStore itself to wrap the .db_pool
This commit is contained in:
parent
249e8f2277
commit
e53d77b501
|
@ -94,7 +94,7 @@ class DataStore(RoomMemberStore, RoomStore,
|
||||||
stream_ordering = self.min_token
|
stream_ordering = self.min_token
|
||||||
|
|
||||||
try:
|
try:
|
||||||
yield self._db_pool.runInteraction(
|
yield self.runInteraction(
|
||||||
self._persist_pdu_event_txn,
|
self._persist_pdu_event_txn,
|
||||||
pdu=pdu,
|
pdu=pdu,
|
||||||
event=event,
|
event=event,
|
||||||
|
@ -297,7 +297,7 @@ class DataStore(RoomMemberStore, RoomStore,
|
||||||
prev_state_pdu=prev_state_pdu,
|
prev_state_pdu=prev_state_pdu,
|
||||||
)
|
)
|
||||||
|
|
||||||
return self._db_pool.runInteraction(_snapshot)
|
return self.runInteraction(_snapshot)
|
||||||
|
|
||||||
|
|
||||||
class Snapshot(object):
|
class Snapshot(object):
|
||||||
|
|
|
@ -34,6 +34,10 @@ class SQLBaseStore(object):
|
||||||
self.event_factory = hs.get_event_factory()
|
self.event_factory = hs.get_event_factory()
|
||||||
self._clock = hs.get_clock()
|
self._clock = hs.get_clock()
|
||||||
|
|
||||||
|
def runInteraction(self, txn, *args, **kwargs):
|
||||||
|
"""Wraps the .runInteraction() method on the underlying db_pool."""
|
||||||
|
return self._db_pool.runInteraction(txn, *args, **kwargs)
|
||||||
|
|
||||||
def cursor_to_dict(self, cursor):
|
def cursor_to_dict(self, cursor):
|
||||||
"""Converts a SQL cursor into an list of dicts.
|
"""Converts a SQL cursor into an list of dicts.
|
||||||
|
|
||||||
|
@ -71,7 +75,7 @@ class SQLBaseStore(object):
|
||||||
else:
|
else:
|
||||||
return cursor.fetchall()
|
return cursor.fetchall()
|
||||||
|
|
||||||
return self._db_pool.runInteraction(interaction)
|
return self.runInteraction(interaction)
|
||||||
|
|
||||||
def _execute_and_decode(self, query, *args):
|
def _execute_and_decode(self, query, *args):
|
||||||
return self._execute(self.cursor_to_dict, query, *args)
|
return self._execute(self.cursor_to_dict, query, *args)
|
||||||
|
@ -87,7 +91,7 @@ class SQLBaseStore(object):
|
||||||
values : dict of new column names and values for them
|
values : dict of new column names and values for them
|
||||||
or_replace : bool; if True performs an INSERT OR REPLACE
|
or_replace : bool; if True performs an INSERT OR REPLACE
|
||||||
"""
|
"""
|
||||||
return self._db_pool.runInteraction(
|
return self.runInteraction(
|
||||||
self._simple_insert_txn, table, values, or_replace=or_replace
|
self._simple_insert_txn, table, values, or_replace=or_replace
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -164,7 +168,7 @@ class SQLBaseStore(object):
|
||||||
txn.execute(sql, keyvalues.values())
|
txn.execute(sql, keyvalues.values())
|
||||||
return txn.fetchall()
|
return txn.fetchall()
|
||||||
|
|
||||||
res = yield self._db_pool.runInteraction(func)
|
res = yield self.runInteraction(func)
|
||||||
|
|
||||||
defer.returnValue([r[0] for r in res])
|
defer.returnValue([r[0] for r in res])
|
||||||
|
|
||||||
|
@ -187,7 +191,7 @@ class SQLBaseStore(object):
|
||||||
txn.execute(sql, keyvalues.values())
|
txn.execute(sql, keyvalues.values())
|
||||||
return self.cursor_to_dict(txn)
|
return self.cursor_to_dict(txn)
|
||||||
|
|
||||||
return self._db_pool.runInteraction(func)
|
return self.runInteraction(func)
|
||||||
|
|
||||||
def _simple_update_one(self, table, keyvalues, updatevalues,
|
def _simple_update_one(self, table, keyvalues, updatevalues,
|
||||||
retcols=None):
|
retcols=None):
|
||||||
|
@ -255,7 +259,7 @@ class SQLBaseStore(object):
|
||||||
raise StoreError(500, "More than one row matched")
|
raise StoreError(500, "More than one row matched")
|
||||||
|
|
||||||
return ret
|
return ret
|
||||||
return self._db_pool.runInteraction(func)
|
return self.runInteraction(func)
|
||||||
|
|
||||||
def _simple_delete_one(self, table, keyvalues):
|
def _simple_delete_one(self, table, keyvalues):
|
||||||
"""Executes a DELETE query on the named table, expecting to delete a
|
"""Executes a DELETE query on the named table, expecting to delete a
|
||||||
|
@ -276,7 +280,7 @@ class SQLBaseStore(object):
|
||||||
raise StoreError(404, "No row found")
|
raise StoreError(404, "No row found")
|
||||||
if txn.rowcount > 1:
|
if txn.rowcount > 1:
|
||||||
raise StoreError(500, "more than one row matched")
|
raise StoreError(500, "more than one row matched")
|
||||||
return self._db_pool.runInteraction(func)
|
return self.runInteraction(func)
|
||||||
|
|
||||||
def _simple_max_id(self, table):
|
def _simple_max_id(self, table):
|
||||||
"""Executes a SELECT query on the named table, expecting to return the
|
"""Executes a SELECT query on the named table, expecting to return the
|
||||||
|
@ -294,7 +298,7 @@ class SQLBaseStore(object):
|
||||||
return 0
|
return 0
|
||||||
return max_id
|
return max_id
|
||||||
|
|
||||||
return self._db_pool.runInteraction(func)
|
return self.runInteraction(func)
|
||||||
|
|
||||||
def _parse_event_from_row(self, row_dict):
|
def _parse_event_from_row(self, row_dict):
|
||||||
d = copy.deepcopy({k: v for k, v in row_dict.items() if v})
|
d = copy.deepcopy({k: v for k, v in row_dict.items() if v})
|
||||||
|
@ -313,7 +317,7 @@ class SQLBaseStore(object):
|
||||||
)
|
)
|
||||||
|
|
||||||
def _parse_events(self, rows):
|
def _parse_events(self, rows):
|
||||||
return self._db_pool.runInteraction(self._parse_events_txn, rows)
|
return self.runInteraction(self._parse_events_txn, rows)
|
||||||
|
|
||||||
def _parse_events_txn(self, txn, rows):
|
def _parse_events_txn(self, txn, rows):
|
||||||
events = [self._parse_event_from_row(r) for r in rows]
|
events = [self._parse_event_from_row(r) for r in rows]
|
||||||
|
|
|
@ -42,7 +42,7 @@ class PduStore(SQLBaseStore):
|
||||||
PduTuple: If the pdu does not exist in the database, returns None
|
PduTuple: If the pdu does not exist in the database, returns None
|
||||||
"""
|
"""
|
||||||
|
|
||||||
return self._db_pool.runInteraction(
|
return self.runInteraction(
|
||||||
self._get_pdu_tuple, pdu_id, origin
|
self._get_pdu_tuple, pdu_id, origin
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -94,7 +94,7 @@ class PduStore(SQLBaseStore):
|
||||||
list: A list of PduTuples
|
list: A list of PduTuples
|
||||||
"""
|
"""
|
||||||
|
|
||||||
return self._db_pool.runInteraction(
|
return self.runInteraction(
|
||||||
self._get_current_state_for_context,
|
self._get_current_state_for_context,
|
||||||
context
|
context
|
||||||
)
|
)
|
||||||
|
@ -142,7 +142,7 @@ class PduStore(SQLBaseStore):
|
||||||
pdu_origin (str)
|
pdu_origin (str)
|
||||||
"""
|
"""
|
||||||
|
|
||||||
return self._db_pool.runInteraction(
|
return self.runInteraction(
|
||||||
self._mark_as_processed, pdu_id, pdu_origin
|
self._mark_as_processed, pdu_id, pdu_origin
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -151,7 +151,7 @@ class PduStore(SQLBaseStore):
|
||||||
|
|
||||||
def get_all_pdus_from_context(self, context):
|
def get_all_pdus_from_context(self, context):
|
||||||
"""Get a list of all PDUs for a given context."""
|
"""Get a list of all PDUs for a given context."""
|
||||||
return self._db_pool.runInteraction(
|
return self.runInteraction(
|
||||||
self._get_all_pdus_from_context, context,
|
self._get_all_pdus_from_context, context,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -178,7 +178,7 @@ class PduStore(SQLBaseStore):
|
||||||
Return:
|
Return:
|
||||||
list: A list of PduTuples
|
list: A list of PduTuples
|
||||||
"""
|
"""
|
||||||
return self._db_pool.runInteraction(
|
return self.runInteraction(
|
||||||
self._get_backfill, context, pdu_list, limit
|
self._get_backfill, context, pdu_list, limit
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -239,7 +239,7 @@ class PduStore(SQLBaseStore):
|
||||||
txn
|
txn
|
||||||
context (str)
|
context (str)
|
||||||
"""
|
"""
|
||||||
return self._db_pool.runInteraction(
|
return self.runInteraction(
|
||||||
self._get_min_depth_for_context, context
|
self._get_min_depth_for_context, context
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -345,7 +345,7 @@ class PduStore(SQLBaseStore):
|
||||||
bool
|
bool
|
||||||
"""
|
"""
|
||||||
|
|
||||||
return self._db_pool.runInteraction(
|
return self.runInteraction(
|
||||||
self._is_pdu_new,
|
self._is_pdu_new,
|
||||||
pdu_id=pdu_id,
|
pdu_id=pdu_id,
|
||||||
origin=origin,
|
origin=origin,
|
||||||
|
@ -498,7 +498,7 @@ class StatePduStore(SQLBaseStore):
|
||||||
)
|
)
|
||||||
|
|
||||||
def get_unresolved_state_tree(self, new_state_pdu):
|
def get_unresolved_state_tree(self, new_state_pdu):
|
||||||
return self._db_pool.runInteraction(
|
return self.runInteraction(
|
||||||
self._get_unresolved_state_tree, new_state_pdu
|
self._get_unresolved_state_tree, new_state_pdu
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -537,7 +537,7 @@ class StatePduStore(SQLBaseStore):
|
||||||
|
|
||||||
def update_current_state(self, pdu_id, origin, context, pdu_type,
|
def update_current_state(self, pdu_id, origin, context, pdu_type,
|
||||||
state_key):
|
state_key):
|
||||||
return self._db_pool.runInteraction(
|
return self.runInteraction(
|
||||||
self._update_current_state,
|
self._update_current_state,
|
||||||
pdu_id, origin, context, pdu_type, state_key
|
pdu_id, origin, context, pdu_type, state_key
|
||||||
)
|
)
|
||||||
|
@ -576,7 +576,7 @@ class StatePduStore(SQLBaseStore):
|
||||||
PduEntry
|
PduEntry
|
||||||
"""
|
"""
|
||||||
|
|
||||||
return self._db_pool.runInteraction(
|
return self.runInteraction(
|
||||||
self._get_current_state_pdu, context, pdu_type, state_key
|
self._get_current_state_pdu, context, pdu_type, state_key
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -638,7 +638,7 @@ class StatePduStore(SQLBaseStore):
|
||||||
PduIdTuple: A pdu that we are missing, or None if we have all the
|
PduIdTuple: A pdu that we are missing, or None if we have all the
|
||||||
pdus required to do the conflict resolution.
|
pdus required to do the conflict resolution.
|
||||||
"""
|
"""
|
||||||
return self._db_pool.runInteraction(
|
return self.runInteraction(
|
||||||
self._get_next_missing_pdu, new_pdu
|
self._get_next_missing_pdu, new_pdu
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -682,7 +682,7 @@ class StatePduStore(SQLBaseStore):
|
||||||
Returns:
|
Returns:
|
||||||
bool: True if the new_pdu clobbered the current state, False if not
|
bool: True if the new_pdu clobbered the current state, False if not
|
||||||
"""
|
"""
|
||||||
return self._db_pool.runInteraction(
|
return self.runInteraction(
|
||||||
self._handle_new_state, new_pdu
|
self._handle_new_state, new_pdu
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
|
@ -62,7 +62,7 @@ class RegistrationStore(SQLBaseStore):
|
||||||
Raises:
|
Raises:
|
||||||
StoreError if the user_id could not be registered.
|
StoreError if the user_id could not be registered.
|
||||||
"""
|
"""
|
||||||
yield self._db_pool.runInteraction(self._register, user_id, token,
|
yield self.runInteraction(self._register, user_id, token,
|
||||||
password_hash)
|
password_hash)
|
||||||
|
|
||||||
def _register(self, txn, user_id, token, password_hash):
|
def _register(self, txn, user_id, token, password_hash):
|
||||||
|
@ -99,7 +99,7 @@ class RegistrationStore(SQLBaseStore):
|
||||||
Raises:
|
Raises:
|
||||||
StoreError if no user was found.
|
StoreError if no user was found.
|
||||||
"""
|
"""
|
||||||
user_id = yield self._db_pool.runInteraction(self._query_for_auth,
|
user_id = yield self.runInteraction(self._query_for_auth,
|
||||||
token)
|
token)
|
||||||
defer.returnValue(user_id)
|
defer.returnValue(user_id)
|
||||||
|
|
||||||
|
|
|
@ -149,7 +149,7 @@ class RoomStore(SQLBaseStore):
|
||||||
defer.returnValue(None)
|
defer.returnValue(None)
|
||||||
|
|
||||||
def get_power_level(self, room_id, user_id):
|
def get_power_level(self, room_id, user_id):
|
||||||
return self._db_pool.runInteraction(
|
return self.runInteraction(
|
||||||
self._get_power_level,
|
self._get_power_level,
|
||||||
room_id, user_id,
|
room_id, user_id,
|
||||||
)
|
)
|
||||||
|
@ -182,7 +182,7 @@ class RoomStore(SQLBaseStore):
|
||||||
return None
|
return None
|
||||||
|
|
||||||
def get_ops_levels(self, room_id):
|
def get_ops_levels(self, room_id):
|
||||||
return self._db_pool.runInteraction(
|
return self.runInteraction(
|
||||||
self._get_ops_levels,
|
self._get_ops_levels,
|
||||||
room_id,
|
room_id,
|
||||||
)
|
)
|
||||||
|
|
|
@ -71,6 +71,11 @@ class RoomMemberStore(SQLBaseStore):
|
||||||
|
|
||||||
txn.execute(sql, (room_id, domain))
|
txn.execute(sql, (room_id, domain))
|
||||||
|
|
||||||
|
def store_room_member(self, user_id, room_id, event_id, membership):
|
||||||
|
return self.runInteraction(self._store_room_member_txn,
|
||||||
|
user_id, user_id, room_id, event_id, membership
|
||||||
|
)
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def get_room_member(self, user_id, room_id):
|
def get_room_member(self, user_id, room_id):
|
||||||
"""Retrieve the current state of a room member.
|
"""Retrieve the current state of a room member.
|
||||||
|
|
|
@ -286,7 +286,7 @@ class StreamStore(SQLBaseStore):
|
||||||
defer.returnValue(ret)
|
defer.returnValue(ret)
|
||||||
|
|
||||||
def get_room_events_max_id(self):
|
def get_room_events_max_id(self):
|
||||||
return self._db_pool.runInteraction(self._get_room_events_max_id_txn)
|
return self.runInteraction(self._get_room_events_max_id_txn)
|
||||||
|
|
||||||
def _get_room_events_max_id_txn(self, txn):
|
def _get_room_events_max_id_txn(self, txn):
|
||||||
txn.execute(
|
txn.execute(
|
||||||
|
|
|
@ -41,7 +41,7 @@ class TransactionStore(SQLBaseStore):
|
||||||
this transaction or a 2-tuple of (int, dict)
|
this transaction or a 2-tuple of (int, dict)
|
||||||
"""
|
"""
|
||||||
|
|
||||||
return self._db_pool.runInteraction(
|
return self.runInteraction(
|
||||||
self._get_received_txn_response, transaction_id, origin
|
self._get_received_txn_response, transaction_id, origin
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -72,7 +72,7 @@ class TransactionStore(SQLBaseStore):
|
||||||
response_json (str)
|
response_json (str)
|
||||||
"""
|
"""
|
||||||
|
|
||||||
return self._db_pool.runInteraction(
|
return self.runInteraction(
|
||||||
self._set_received_txn_response,
|
self._set_received_txn_response,
|
||||||
transaction_id, origin, code, response_dict
|
transaction_id, origin, code, response_dict
|
||||||
)
|
)
|
||||||
|
@ -104,7 +104,7 @@ class TransactionStore(SQLBaseStore):
|
||||||
list: A list of previous transaction ids.
|
list: A list of previous transaction ids.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
return self._db_pool.runInteraction(
|
return self.runInteraction(
|
||||||
self._prep_send_transaction,
|
self._prep_send_transaction,
|
||||||
transaction_id, destination, ts, pdu_list
|
transaction_id, destination, ts, pdu_list
|
||||||
)
|
)
|
||||||
|
@ -159,7 +159,7 @@ class TransactionStore(SQLBaseStore):
|
||||||
code (int)
|
code (int)
|
||||||
response_json (str)
|
response_json (str)
|
||||||
"""
|
"""
|
||||||
return self._db_pool.runInteraction(
|
return self.runInteraction(
|
||||||
self._delivered_txn,
|
self._delivered_txn,
|
||||||
transaction_id, destination, code, response_dict
|
transaction_id, destination, code, response_dict
|
||||||
)
|
)
|
||||||
|
@ -184,7 +184,7 @@ class TransactionStore(SQLBaseStore):
|
||||||
Returns:
|
Returns:
|
||||||
list: A list of `ReceivedTransactionsTable.EntryType`
|
list: A list of `ReceivedTransactionsTable.EntryType`
|
||||||
"""
|
"""
|
||||||
return self._db_pool.runInteraction(
|
return self.runInteraction(
|
||||||
self._get_transactions_after, transaction_id, destination
|
self._get_transactions_after, transaction_id, destination
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -214,7 +214,7 @@ class TransactionStore(SQLBaseStore):
|
||||||
Returns
|
Returns
|
||||||
list: A list of PduTuple
|
list: A list of PduTuple
|
||||||
"""
|
"""
|
||||||
return self._db_pool.runInteraction(
|
return self.runInteraction(
|
||||||
self._get_pdus_after_transaction,
|
self._get_pdus_after_transaction,
|
||||||
transaction_id, destination
|
transaction_id, destination
|
||||||
)
|
)
|
||||||
|
|
Loading…
Reference in New Issue