Merge pull request #113 from matrix-org/store_rearrangement

Store rearrangement
This commit is contained in:
Erik Johnston 2015-03-20 16:23:01 +00:00
commit 4848fdbf59
20 changed files with 678 additions and 622 deletions

View File

@ -13,14 +13,9 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
from twisted.internet import defer
from synapse.util.logutils import log_function
from synapse.api.constants import EventTypes
from .appservice import ApplicationServiceStore from .appservice import ApplicationServiceStore
from .directory import DirectoryStore from .directory import DirectoryStore
from .feedback import FeedbackStore from .events import EventsStore
from .presence import PresenceStore from .presence import PresenceStore
from .profile import ProfileStore from .profile import ProfileStore
from .registration import RegistrationStore from .registration import RegistrationStore
@ -39,11 +34,6 @@ from .state import StateStore
from .signatures import SignatureStore from .signatures import SignatureStore
from .filtering import FilteringStore from .filtering import FilteringStore
from syutil.base64util import decode_base64
from syutil.jsonutil import encode_canonical_json
from synapse.crypto.event_signing import compute_event_reference_hash
import fnmatch import fnmatch
import imp import imp
@ -62,15 +52,8 @@ SCHEMA_VERSION = 14
dir_path = os.path.abspath(os.path.dirname(__file__)) dir_path = os.path.abspath(os.path.dirname(__file__))
class _RollbackButIsFineException(Exception):
""" This exception is used to rollback a transaction without implying
something went wrong.
"""
pass
class DataStore(RoomMemberStore, RoomStore, class DataStore(RoomMemberStore, RoomStore,
RegistrationStore, StreamStore, ProfileStore, FeedbackStore, RegistrationStore, StreamStore, ProfileStore,
PresenceStore, TransactionStore, PresenceStore, TransactionStore,
DirectoryStore, KeyStore, StateStore, SignatureStore, DirectoryStore, KeyStore, StateStore, SignatureStore,
ApplicationServiceStore, ApplicationServiceStore,
@ -79,7 +62,8 @@ class DataStore(RoomMemberStore, RoomStore,
RejectionsStore, RejectionsStore,
FilteringStore, FilteringStore,
PusherStore, PusherStore,
PushRuleStore PushRuleStore,
EventsStore,
): ):
def __init__(self, hs): def __init__(self, hs):
@ -89,423 +73,6 @@ class DataStore(RoomMemberStore, RoomStore,
self.min_token_deferred = self._get_min_token() self.min_token_deferred = self._get_min_token()
self.min_token = None self.min_token = None
@defer.inlineCallbacks
@log_function
def persist_event(self, event, context, backfilled=False,
is_new_state=True, current_state=None):
stream_ordering = None
if backfilled:
if not self.min_token_deferred.called:
yield self.min_token_deferred
self.min_token -= 1
stream_ordering = self.min_token
try:
yield self.runInteraction(
"persist_event",
self._persist_event_txn,
event=event,
context=context,
backfilled=backfilled,
stream_ordering=stream_ordering,
is_new_state=is_new_state,
current_state=current_state,
)
self.get_room_events_max_id.invalidate()
except _RollbackButIsFineException:
pass
@defer.inlineCallbacks
def get_event(self, event_id, check_redacted=True,
get_prev_content=False, allow_rejected=False,
allow_none=False):
"""Get an event from the database by event_id.
Args:
event_id (str): The event_id of the event to fetch
check_redacted (bool): If True, check if event has been redacted
and redact it.
get_prev_content (bool): If True and event is a state event,
include the previous states content in the unsigned field.
allow_rejected (bool): If True return rejected events.
allow_none (bool): If True, return None if no event found, if
False throw an exception.
Returns:
Deferred : A FrozenEvent.
"""
event = yield self.runInteraction(
"get_event", self._get_event_txn,
event_id,
check_redacted=check_redacted,
get_prev_content=get_prev_content,
allow_rejected=allow_rejected,
)
if not event and not allow_none:
raise RuntimeError("Could not find event %s" % (event_id,))
defer.returnValue(event)
@log_function
def _persist_event_txn(self, txn, event, context, backfilled,
stream_ordering=None, is_new_state=True,
current_state=None):
# Remove the any existing cache entries for the event_id
self._get_event_cache.pop(event.event_id)
# We purposefully do this first since if we include a `current_state`
# key, we *want* to update the `current_state_events` table
if current_state:
txn.execute(
"DELETE FROM current_state_events WHERE room_id = ?",
(event.room_id,)
)
for s in current_state:
self._simple_insert_txn(
txn,
"current_state_events",
{
"event_id": s.event_id,
"room_id": s.room_id,
"type": s.type,
"state_key": s.state_key,
},
or_replace=True,
)
if event.is_state() and is_new_state:
if not backfilled and not context.rejected:
self._simple_insert_txn(
txn,
table="state_forward_extremities",
values={
"event_id": event.event_id,
"room_id": event.room_id,
"type": event.type,
"state_key": event.state_key,
},
or_replace=True,
)
for prev_state_id, _ in event.prev_state:
self._simple_delete_txn(
txn,
table="state_forward_extremities",
keyvalues={
"event_id": prev_state_id,
}
)
outlier = event.internal_metadata.is_outlier()
if not outlier:
self._store_state_groups_txn(txn, event, context)
self._update_min_depth_for_room_txn(
txn,
event.room_id,
event.depth
)
self._handle_prev_events(
txn,
outlier=outlier,
event_id=event.event_id,
prev_events=event.prev_events,
room_id=event.room_id,
)
have_persisted = self._simple_select_one_onecol_txn(
txn,
table="event_json",
keyvalues={"event_id": event.event_id},
retcol="event_id",
allow_none=True,
)
metadata_json = encode_canonical_json(
event.internal_metadata.get_dict()
)
# If we have already persisted this event, we don't need to do any
# more processing.
# The processing above must be done on every call to persist event,
# since they might not have happened on previous calls. For example,
# if we are persisting an event that we had persisted as an outlier,
# but is no longer one.
if have_persisted:
if not outlier:
sql = (
"UPDATE event_json SET internal_metadata = ?"
" WHERE event_id = ?"
)
txn.execute(
sql,
(metadata_json.decode("UTF-8"), event.event_id,)
)
sql = (
"UPDATE events SET outlier = 0"
" WHERE event_id = ?"
)
txn.execute(
sql,
(event.event_id,)
)
return
if event.type == EventTypes.Member:
self._store_room_member_txn(txn, event)
elif event.type == EventTypes.Feedback:
self._store_feedback_txn(txn, event)
elif event.type == EventTypes.Name:
self._store_room_name_txn(txn, event)
elif event.type == EventTypes.Topic:
self._store_room_topic_txn(txn, event)
elif event.type == EventTypes.Redaction:
self._store_redaction(txn, event)
event_dict = {
k: v
for k, v in event.get_dict().items()
if k not in [
"redacted",
"redacted_because",
]
}
self._simple_insert_txn(
txn,
table="event_json",
values={
"event_id": event.event_id,
"room_id": event.room_id,
"internal_metadata": metadata_json.decode("UTF-8"),
"json": encode_canonical_json(event_dict).decode("UTF-8"),
},
or_replace=True,
)
content = encode_canonical_json(
event.content
).decode("UTF-8")
vals = {
"topological_ordering": event.depth,
"event_id": event.event_id,
"type": event.type,
"room_id": event.room_id,
"content": content,
"processed": True,
"outlier": outlier,
"depth": event.depth,
}
if stream_ordering is not None:
vals["stream_ordering"] = stream_ordering
unrec = {
k: v
for k, v in event.get_dict().items()
if k not in vals.keys() and k not in [
"redacted",
"redacted_because",
"signatures",
"hashes",
"prev_events",
]
}
vals["unrecognized_keys"] = encode_canonical_json(
unrec
).decode("UTF-8")
try:
self._simple_insert_txn(
txn,
"events",
vals,
or_replace=(not outlier),
or_ignore=bool(outlier),
)
except:
logger.warn(
"Failed to persist, probably duplicate: %s",
event.event_id,
exc_info=True,
)
raise _RollbackButIsFineException("_persist_event")
if context.rejected:
self._store_rejections_txn(txn, event.event_id, context.rejected)
if event.is_state():
vals = {
"event_id": event.event_id,
"room_id": event.room_id,
"type": event.type,
"state_key": event.state_key,
}
# TODO: How does this work with backfilling?
if hasattr(event, "replaces_state"):
vals["prev_state"] = event.replaces_state
self._simple_insert_txn(
txn,
"state_events",
vals,
or_replace=True,
)
if is_new_state and not context.rejected:
self._simple_insert_txn(
txn,
"current_state_events",
{
"event_id": event.event_id,
"room_id": event.room_id,
"type": event.type,
"state_key": event.state_key,
},
or_replace=True,
)
for e_id, h in event.prev_state:
self._simple_insert_txn(
txn,
table="event_edges",
values={
"event_id": event.event_id,
"prev_event_id": e_id,
"room_id": event.room_id,
"is_state": 1,
},
or_ignore=True,
)
for hash_alg, hash_base64 in event.hashes.items():
hash_bytes = decode_base64(hash_base64)
self._store_event_content_hash_txn(
txn, event.event_id, hash_alg, hash_bytes,
)
for prev_event_id, prev_hashes in event.prev_events:
for alg, hash_base64 in prev_hashes.items():
hash_bytes = decode_base64(hash_base64)
self._store_prev_event_hash_txn(
txn, event.event_id, prev_event_id, alg, hash_bytes
)
for auth_id, _ in event.auth_events:
self._simple_insert_txn(
txn,
table="event_auth",
values={
"event_id": event.event_id,
"room_id": event.room_id,
"auth_id": auth_id,
},
or_ignore=True,
)
(ref_alg, ref_hash_bytes) = compute_event_reference_hash(event)
self._store_event_reference_hash_txn(
txn, event.event_id, ref_alg, ref_hash_bytes
)
def _store_redaction(self, txn, event):
# invalidate the cache for the redacted event
self._get_event_cache.pop(event.redacts)
txn.execute(
"INSERT OR IGNORE INTO redactions "
"(event_id, redacts) VALUES (?,?)",
(event.event_id, event.redacts)
)
@defer.inlineCallbacks
def get_current_state(self, room_id, event_type=None, state_key=""):
del_sql = (
"SELECT event_id FROM redactions WHERE redacts = e.event_id "
"LIMIT 1"
)
sql = (
"SELECT e.*, (%(redacted)s) AS redacted FROM events as e "
"INNER JOIN current_state_events as c ON e.event_id = c.event_id "
"INNER JOIN state_events as s ON e.event_id = s.event_id "
"WHERE c.room_id = ? "
) % {
"redacted": del_sql,
}
if event_type and state_key is not None:
sql += " AND s.type = ? AND s.state_key = ? "
args = (room_id, event_type, state_key)
elif event_type:
sql += " AND s.type = ?"
args = (room_id, event_type)
else:
args = (room_id, )
results = yield self._execute_and_decode("get_current_state", sql, *args)
events = yield self._parse_events(results)
defer.returnValue(events)
@defer.inlineCallbacks
def get_room_name_and_aliases(self, room_id):
del_sql = (
"SELECT event_id FROM redactions WHERE redacts = e.event_id "
"LIMIT 1"
)
sql = (
"SELECT e.*, (%(redacted)s) AS redacted FROM events as e "
"INNER JOIN current_state_events as c ON e.event_id = c.event_id "
"INNER JOIN state_events as s ON e.event_id = s.event_id "
"WHERE c.room_id = ? "
) % {
"redacted": del_sql,
}
sql += " AND ((s.type = 'm.room.name' AND s.state_key = '')"
sql += " OR s.type = 'm.room.aliases')"
args = (room_id,)
results = yield self._execute_and_decode("get_current_state", sql, *args)
events = yield self._parse_events(results)
name = None
aliases = []
for e in events:
if e.type == 'm.room.name':
if 'name' in e.content:
name = e.content['name']
elif e.type == 'm.room.aliases':
if 'aliases' in e.content:
aliases.extend(e.content['aliases'])
defer.returnValue((name, aliases))
@defer.inlineCallbacks
def _get_min_token(self):
row = yield self._execute(
"_get_min_token", None, "SELECT MIN(stream_ordering) FROM events"
)
self.min_token = row[0][0] if row and row[0] and row[0][0] else -1
self.min_token = min(self.min_token, -1)
logger.debug("min_token is: %s", self.min_token)
defer.returnValue(self.min_token)
def insert_client_ip(self, user, access_token, device_id, ip, user_agent): def insert_client_ip(self, user, access_token, device_id, ip, user_agent):
return self._simple_insert( return self._simple_insert(
"user_ips", "user_ips",
@ -516,7 +83,8 @@ class DataStore(RoomMemberStore, RoomStore,
"ip": ip, "ip": ip,
"user_agent": user_agent, "user_agent": user_agent,
"last_seen": int(self._clock.time_msec()), "last_seen": int(self._clock.time_msec()),
} },
desc="insert_client_ip",
) )
def get_user_ip_and_agents(self, user): def get_user_ip_and_agents(self, user):
@ -526,38 +94,7 @@ class DataStore(RoomMemberStore, RoomStore,
retcols=[ retcols=[
"device_id", "access_token", "ip", "user_agent", "last_seen" "device_id", "access_token", "ip", "user_agent", "last_seen"
], ],
) desc="get_user_ip_and_agents",
def have_events(self, event_ids):
"""Given a list of event ids, check if we have already processed them.
Returns:
dict: Has an entry for each event id we already have seen. Maps to
the rejected reason string if we rejected the event, else maps to
None.
"""
if not event_ids:
return defer.succeed({})
def f(txn):
sql = (
"SELECT e.event_id, reason FROM events as e "
"LEFT JOIN rejections as r ON e.event_id = r.event_id "
"WHERE e.event_id = ?"
)
res = {}
for event_id in event_ids:
txn.execute(sql, (event_id,))
row = txn.fetchone()
if row:
_, rejected = row
res[event_id] = rejected
return res
return self.runInteraction(
"have_events", f,
) )

View File

@ -335,7 +335,8 @@ class SQLBaseStore(object):
# "Simple" SQL API methods that operate on a single table with no JOINs, # "Simple" SQL API methods that operate on a single table with no JOINs,
# no complex WHERE clauses, just a dict of values for columns. # no complex WHERE clauses, just a dict of values for columns.
def _simple_insert(self, table, values, or_replace=False, or_ignore=False): def _simple_insert(self, table, values, or_replace=False, or_ignore=False,
desc="_simple_insert"):
"""Executes an INSERT query on the named table. """Executes an INSERT query on the named table.
Args: Args:
@ -344,7 +345,7 @@ class SQLBaseStore(object):
or_replace : bool; if True performs an INSERT OR REPLACE or_replace : bool; if True performs an INSERT OR REPLACE
""" """
return self.runInteraction( return self.runInteraction(
"_simple_insert", desc,
self._simple_insert_txn, table, values, or_replace=or_replace, self._simple_insert_txn, table, values, or_replace=or_replace,
or_ignore=or_ignore, or_ignore=or_ignore,
) )
@ -368,7 +369,7 @@ class SQLBaseStore(object):
txn.execute(sql, values.values()) txn.execute(sql, values.values())
return txn.lastrowid return txn.lastrowid
def _simple_upsert(self, table, keyvalues, values): def _simple_upsert(self, table, keyvalues, values, desc="_simple_upsert"):
""" """
Args: Args:
table (str): The table to upsert into table (str): The table to upsert into
@ -377,7 +378,7 @@ class SQLBaseStore(object):
Returns: A deferred Returns: A deferred
""" """
return self.runInteraction( return self.runInteraction(
"_simple_upsert", desc,
self._simple_upsert_txn, table, keyvalues, values self._simple_upsert_txn, table, keyvalues, values
) )
@ -413,7 +414,7 @@ class SQLBaseStore(object):
txn.execute(sql, allvalues.values()) txn.execute(sql, allvalues.values())
def _simple_select_one(self, table, keyvalues, retcols, def _simple_select_one(self, table, keyvalues, retcols,
allow_none=False): allow_none=False, desc="_simple_select_one"):
"""Executes a SELECT query on the named table, which is expected to """Executes a SELECT query on the named table, which is expected to
return a single row, returning a single column from it. return a single row, returning a single column from it.
@ -425,12 +426,15 @@ class SQLBaseStore(object):
allow_none : If true, return None instead of failing if the SELECT allow_none : If true, return None instead of failing if the SELECT
statement returns no rows statement returns no rows
""" """
return self._simple_selectupdate_one( return self.runInteraction(
table, keyvalues, retcols=retcols, allow_none=allow_none desc,
self._simple_select_one_txn,
table, keyvalues, retcols, allow_none,
) )
def _simple_select_one_onecol(self, table, keyvalues, retcol, def _simple_select_one_onecol(self, table, keyvalues, retcol,
allow_none=False): allow_none=False,
desc="_simple_select_one_onecol"):
"""Executes a SELECT query on the named table, which is expected to """Executes a SELECT query on the named table, which is expected to
return a single row, returning a single column from it." return a single row, returning a single column from it."
@ -440,7 +444,7 @@ class SQLBaseStore(object):
retcol : string giving the name of the column to return retcol : string giving the name of the column to return
""" """
return self.runInteraction( return self.runInteraction(
"_simple_select_one_onecol", desc,
self._simple_select_one_onecol_txn, self._simple_select_one_onecol_txn,
table, keyvalues, retcol, allow_none=allow_none, table, keyvalues, retcol, allow_none=allow_none,
) )
@ -476,7 +480,8 @@ class SQLBaseStore(object):
return [r[0] for r in txn.fetchall()] return [r[0] for r in txn.fetchall()]
def _simple_select_onecol(self, table, keyvalues, retcol): def _simple_select_onecol(self, table, keyvalues, retcol,
desc="_simple_select_onecol"):
"""Executes a SELECT query on the named table, which returns a list """Executes a SELECT query on the named table, which returns a list
comprising of the values of the named column from the selected rows. comprising of the values of the named column from the selected rows.
@ -489,12 +494,13 @@ class SQLBaseStore(object):
Deferred: Results in a list Deferred: Results in a list
""" """
return self.runInteraction( return self.runInteraction(
"_simple_select_onecol", desc,
self._simple_select_onecol_txn, self._simple_select_onecol_txn,
table, keyvalues, retcol table, keyvalues, retcol
) )
def _simple_select_list(self, table, keyvalues, retcols): def _simple_select_list(self, table, keyvalues, retcols,
desc="_simple_select_list"):
"""Executes a SELECT query on the named table, which may return zero or """Executes a SELECT query on the named table, which may return zero or
more rows, returning the result as a list of dicts. more rows, returning the result as a list of dicts.
@ -505,7 +511,7 @@ class SQLBaseStore(object):
retcols : list of strings giving the names of the columns to return retcols : list of strings giving the names of the columns to return
""" """
return self.runInteraction( return self.runInteraction(
"_simple_select_list", desc,
self._simple_select_list_txn, self._simple_select_list_txn,
table, keyvalues, retcols table, keyvalues, retcols
) )
@ -537,7 +543,7 @@ class SQLBaseStore(object):
return self.cursor_to_dict(txn) return self.cursor_to_dict(txn)
def _simple_update_one(self, table, keyvalues, updatevalues, def _simple_update_one(self, table, keyvalues, updatevalues,
retcols=None): desc="_simple_update_one"):
"""Executes an UPDATE query on the named table, setting new values for """Executes an UPDATE query on the named table, setting new values for
columns in a row matching the key values. columns in a row matching the key values.
@ -555,56 +561,76 @@ class SQLBaseStore(object):
get-and-set. This can be used to implement compare-and-set by putting get-and-set. This can be used to implement compare-and-set by putting
the update column in the 'keyvalues' dict as well. the update column in the 'keyvalues' dict as well.
""" """
return self._simple_selectupdate_one(table, keyvalues, updatevalues, return self.runInteraction(
retcols=retcols) desc,
self._simple_update_one_txn,
table, keyvalues, updatevalues,
)
def _simple_update_one_txn(self, txn, table, keyvalues, updatevalues):
update_sql = "UPDATE %s SET %s WHERE %s" % (
table,
", ".join("%s = ?" % (k,) for k in updatevalues),
" AND ".join("%s = ?" % (k,) for k in keyvalues)
)
txn.execute(
update_sql,
updatevalues.values() + keyvalues.values()
)
if txn.rowcount == 0:
raise StoreError(404, "No row found")
if txn.rowcount > 1:
raise StoreError(500, "More than one row matched")
def _simple_select_one_txn(self, txn, table, keyvalues, retcols,
allow_none=False):
select_sql = "SELECT %s FROM %s WHERE %s ORDER BY rowid asc" % (
", ".join(retcols),
table,
" AND ".join("%s = ?" % (k) for k in keyvalues)
)
txn.execute(select_sql, keyvalues.values())
row = txn.fetchone()
if not row:
if allow_none:
return None
raise StoreError(404, "No row found")
if txn.rowcount > 1:
raise StoreError(500, "More than one row matched")
return dict(zip(retcols, row))
def _simple_selectupdate_one(self, table, keyvalues, updatevalues=None, def _simple_selectupdate_one(self, table, keyvalues, updatevalues=None,
retcols=None, allow_none=False): retcols=None, allow_none=False,
desc="_simple_selectupdate_one"):
""" Combined SELECT then UPDATE.""" """ Combined SELECT then UPDATE."""
if retcols:
select_sql = "SELECT %s FROM %s WHERE %s ORDER BY rowid asc" % (
", ".join(retcols),
table,
" AND ".join("%s = ?" % (k) for k in keyvalues)
)
if updatevalues:
update_sql = "UPDATE %s SET %s WHERE %s" % (
table,
", ".join("%s = ?" % (k,) for k in updatevalues),
" AND ".join("%s = ?" % (k,) for k in keyvalues)
)
def func(txn): def func(txn):
ret = None ret = None
if retcols: if retcols:
txn.execute(select_sql, keyvalues.values()) ret = self._simple_select_one_txn(
txn,
row = txn.fetchone() table=table,
if not row: keyvalues=keyvalues,
if allow_none: retcols=retcols,
return None allow_none=allow_none,
raise StoreError(404, "No row found")
if txn.rowcount > 1:
raise StoreError(500, "More than one row matched")
ret = dict(zip(retcols, row))
if updatevalues:
txn.execute(
update_sql,
updatevalues.values() + keyvalues.values()
) )
if txn.rowcount == 0: if updatevalues:
raise StoreError(404, "No row found") self._simple_update_one_txn(
if txn.rowcount > 1: txn,
raise StoreError(500, "More than one row matched") table=table,
keyvalues=keyvalues,
updatevalues=updatevalues,
)
return ret return ret
return self.runInteraction("_simple_selectupdate_one", func) return self.runInteraction(desc, func)
def _simple_delete_one(self, table, keyvalues): def _simple_delete_one(self, table, keyvalues, desc="_simple_delete_one"):
"""Executes a DELETE query on the named table, expecting to delete a """Executes a DELETE query on the named table, expecting to delete a
single row. single row.
@ -623,9 +649,9 @@ class SQLBaseStore(object):
raise StoreError(404, "No row found") raise StoreError(404, "No row found")
if txn.rowcount > 1: if txn.rowcount > 1:
raise StoreError(500, "more than one row matched") raise StoreError(500, "more than one row matched")
return self.runInteraction("_simple_delete_one", func) return self.runInteraction(desc, func)
def _simple_delete(self, table, keyvalues): def _simple_delete(self, table, keyvalues, desc="_simple_delete"):
"""Executes a DELETE query on the named table. """Executes a DELETE query on the named table.
Args: Args:
@ -633,7 +659,7 @@ class SQLBaseStore(object):
keyvalues : dict of column names and values to select the row with keyvalues : dict of column names and values to select the row with
""" """
return self.runInteraction("_simple_delete", self._simple_delete_txn) return self.runInteraction(desc, self._simple_delete_txn)
def _simple_delete_txn(self, txn, table, keyvalues): def _simple_delete_txn(self, txn, table, keyvalues):
sql = "DELETE FROM %s WHERE %s" % ( sql = "DELETE FROM %s WHERE %s" % (
@ -803,6 +829,13 @@ class SQLBaseStore(object):
return result[0] if result else None return result[0] if result else None
class _RollbackButIsFineException(Exception):
""" This exception is used to rollback a transaction without implying
something went wrong.
"""
pass
class Table(object): class Table(object):
""" A base class used to store information about a particular table. """ A base class used to store information about a particular table.
""" """

View File

@ -48,6 +48,7 @@ class DirectoryStore(SQLBaseStore):
{"room_alias": room_alias.to_string()}, {"room_alias": room_alias.to_string()},
"room_id", "room_id",
allow_none=True, allow_none=True,
desc="get_association_from_room_alias",
) )
if not room_id: if not room_id:
@ -58,6 +59,7 @@ class DirectoryStore(SQLBaseStore):
"room_alias_servers", "room_alias_servers",
{"room_alias": room_alias.to_string()}, {"room_alias": room_alias.to_string()},
"server", "server",
desc="get_association_from_room_alias",
) )
if not servers: if not servers:
@ -87,6 +89,7 @@ class DirectoryStore(SQLBaseStore):
"room_alias": room_alias.to_string(), "room_alias": room_alias.to_string(),
"room_id": room_id, "room_id": room_id,
}, },
desc="create_room_alias_association",
) )
except sqlite3.IntegrityError: except sqlite3.IntegrityError:
raise SynapseError( raise SynapseError(
@ -100,7 +103,8 @@ class DirectoryStore(SQLBaseStore):
{ {
"room_alias": room_alias.to_string(), "room_alias": room_alias.to_string(),
"server": server, "server": server,
} },
desc="create_room_alias_association",
) )
def delete_room_alias(self, room_alias): def delete_room_alias(self, room_alias):
@ -139,4 +143,5 @@ class DirectoryStore(SQLBaseStore):
"room_aliases", "room_aliases",
{"room_id": room_id}, {"room_id": room_id},
"room_alias", "room_alias",
desc="get_aliases_for_room",
) )

395
synapse/storage/events.py Normal file
View File

@ -0,0 +1,395 @@
# -*- coding: utf-8 -*-
# Copyright 2014, 2015 OpenMarket Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from _base import SQLBaseStore, _RollbackButIsFineException
from twisted.internet import defer
from synapse.util.logutils import log_function
from synapse.api.constants import EventTypes
from synapse.crypto.event_signing import compute_event_reference_hash
from syutil.base64util import decode_base64
from syutil.jsonutil import encode_canonical_json
import logging
logger = logging.getLogger(__name__)
class EventsStore(SQLBaseStore):
@defer.inlineCallbacks
@log_function
def persist_event(self, event, context, backfilled=False,
is_new_state=True, current_state=None):
stream_ordering = None
if backfilled:
if not self.min_token_deferred.called:
yield self.min_token_deferred
self.min_token -= 1
stream_ordering = self.min_token
try:
yield self.runInteraction(
"persist_event",
self._persist_event_txn,
event=event,
context=context,
backfilled=backfilled,
stream_ordering=stream_ordering,
is_new_state=is_new_state,
current_state=current_state,
)
self.get_room_events_max_id.invalidate()
except _RollbackButIsFineException:
pass
@defer.inlineCallbacks
def get_event(self, event_id, check_redacted=True,
get_prev_content=False, allow_rejected=False,
allow_none=False):
"""Get an event from the database by event_id.
Args:
event_id (str): The event_id of the event to fetch
check_redacted (bool): If True, check if event has been redacted
and redact it.
get_prev_content (bool): If True and event is a state event,
include the previous states content in the unsigned field.
allow_rejected (bool): If True return rejected events.
allow_none (bool): If True, return None if no event found, if
False throw an exception.
Returns:
Deferred : A FrozenEvent.
"""
event = yield self.runInteraction(
"get_event", self._get_event_txn,
event_id,
check_redacted=check_redacted,
get_prev_content=get_prev_content,
allow_rejected=allow_rejected,
)
if not event and not allow_none:
raise RuntimeError("Could not find event %s" % (event_id,))
defer.returnValue(event)
@log_function
def _persist_event_txn(self, txn, event, context, backfilled,
stream_ordering=None, is_new_state=True,
current_state=None):
# Remove the any existing cache entries for the event_id
self._get_event_cache.pop(event.event_id)
# We purposefully do this first since if we include a `current_state`
# key, we *want* to update the `current_state_events` table
if current_state:
txn.execute(
"DELETE FROM current_state_events WHERE room_id = ?",
(event.room_id,)
)
for s in current_state:
self._simple_insert_txn(
txn,
"current_state_events",
{
"event_id": s.event_id,
"room_id": s.room_id,
"type": s.type,
"state_key": s.state_key,
},
or_replace=True,
)
if event.is_state() and is_new_state:
if not backfilled and not context.rejected:
self._simple_insert_txn(
txn,
table="state_forward_extremities",
values={
"event_id": event.event_id,
"room_id": event.room_id,
"type": event.type,
"state_key": event.state_key,
},
or_replace=True,
)
for prev_state_id, _ in event.prev_state:
self._simple_delete_txn(
txn,
table="state_forward_extremities",
keyvalues={
"event_id": prev_state_id,
}
)
outlier = event.internal_metadata.is_outlier()
if not outlier:
self._store_state_groups_txn(txn, event, context)
self._update_min_depth_for_room_txn(
txn,
event.room_id,
event.depth
)
self._handle_prev_events(
txn,
outlier=outlier,
event_id=event.event_id,
prev_events=event.prev_events,
room_id=event.room_id,
)
have_persisted = self._simple_select_one_onecol_txn(
txn,
table="event_json",
keyvalues={"event_id": event.event_id},
retcol="event_id",
allow_none=True,
)
metadata_json = encode_canonical_json(
event.internal_metadata.get_dict()
)
# If we have already persisted this event, we don't need to do any
# more processing.
# The processing above must be done on every call to persist event,
# since they might not have happened on previous calls. For example,
# if we are persisting an event that we had persisted as an outlier,
# but is no longer one.
if have_persisted:
if not outlier:
sql = (
"UPDATE event_json SET internal_metadata = ?"
" WHERE event_id = ?"
)
txn.execute(
sql,
(metadata_json.decode("UTF-8"), event.event_id,)
)
sql = (
"UPDATE events SET outlier = 0"
" WHERE event_id = ?"
)
txn.execute(
sql,
(event.event_id,)
)
return
if event.type == EventTypes.Member:
self._store_room_member_txn(txn, event)
elif event.type == EventTypes.Feedback:
self._store_feedback_txn(txn, event)
elif event.type == EventTypes.Name:
self._store_room_name_txn(txn, event)
elif event.type == EventTypes.Topic:
self._store_room_topic_txn(txn, event)
elif event.type == EventTypes.Redaction:
self._store_redaction(txn, event)
event_dict = {
k: v
for k, v in event.get_dict().items()
if k not in [
"redacted",
"redacted_because",
]
}
self._simple_insert_txn(
txn,
table="event_json",
values={
"event_id": event.event_id,
"room_id": event.room_id,
"internal_metadata": metadata_json.decode("UTF-8"),
"json": encode_canonical_json(event_dict).decode("UTF-8"),
},
or_replace=True,
)
content = encode_canonical_json(
event.content
).decode("UTF-8")
vals = {
"topological_ordering": event.depth,
"event_id": event.event_id,
"type": event.type,
"room_id": event.room_id,
"content": content,
"processed": True,
"outlier": outlier,
"depth": event.depth,
}
if stream_ordering is not None:
vals["stream_ordering"] = stream_ordering
unrec = {
k: v
for k, v in event.get_dict().items()
if k not in vals.keys() and k not in [
"redacted",
"redacted_because",
"signatures",
"hashes",
"prev_events",
]
}
vals["unrecognized_keys"] = encode_canonical_json(
unrec
).decode("UTF-8")
try:
self._simple_insert_txn(
txn,
"events",
vals,
or_replace=(not outlier),
or_ignore=bool(outlier),
)
except:
logger.warn(
"Failed to persist, probably duplicate: %s",
event.event_id,
exc_info=True,
)
raise _RollbackButIsFineException("_persist_event")
if context.rejected:
self._store_rejections_txn(txn, event.event_id, context.rejected)
if event.is_state():
vals = {
"event_id": event.event_id,
"room_id": event.room_id,
"type": event.type,
"state_key": event.state_key,
}
# TODO: How does this work with backfilling?
if hasattr(event, "replaces_state"):
vals["prev_state"] = event.replaces_state
self._simple_insert_txn(
txn,
"state_events",
vals,
)
if is_new_state and not context.rejected:
self._simple_insert_txn(
txn,
"current_state_events",
{
"event_id": event.event_id,
"room_id": event.room_id,
"type": event.type,
"state_key": event.state_key,
},
)
for e_id, h in event.prev_state:
self._simple_insert_txn(
txn,
table="event_edges",
values={
"event_id": event.event_id,
"prev_event_id": e_id,
"room_id": event.room_id,
"is_state": 1,
},
)
for hash_alg, hash_base64 in event.hashes.items():
hash_bytes = decode_base64(hash_base64)
self._store_event_content_hash_txn(
txn, event.event_id, hash_alg, hash_bytes,
)
for prev_event_id, prev_hashes in event.prev_events:
for alg, hash_base64 in prev_hashes.items():
hash_bytes = decode_base64(hash_base64)
self._store_prev_event_hash_txn(
txn, event.event_id, prev_event_id, alg, hash_bytes
)
for auth_id, _ in event.auth_events:
self._simple_insert_txn(
txn,
table="event_auth",
values={
"event_id": event.event_id,
"room_id": event.room_id,
"auth_id": auth_id,
},
)
(ref_alg, ref_hash_bytes) = compute_event_reference_hash(event)
self._store_event_reference_hash_txn(
txn, event.event_id, ref_alg, ref_hash_bytes
)
def _store_redaction(self, txn, event):
# invalidate the cache for the redacted event
self._get_event_cache.pop(event.redacts)
txn.execute(
"INSERT INTO redactions (event_id, redacts) VALUES (?,?)",
(event.event_id, event.redacts)
)
def have_events(self, event_ids):
"""Given a list of event ids, check if we have already processed them.
Returns:
dict: Has an entry for each event id we already have seen. Maps to
the rejected reason string if we rejected the event, else maps to
None.
"""
if not event_ids:
return defer.succeed({})
def f(txn):
sql = (
"SELECT e.event_id, reason FROM events as e "
"LEFT JOIN rejections as r ON e.event_id = r.event_id "
"WHERE e.event_id = ?"
)
res = {}
for event_id in event_ids:
txn.execute(sql, (event_id,))
row = txn.fetchone()
if row:
_, rejected = row
res[event_id] = rejected
return res
return self.runInteraction(
"have_events", f,
)

View File

@ -1,47 +0,0 @@
# -*- coding: utf-8 -*-
# Copyright 2014, 2015 OpenMarket Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from twisted.internet import defer
from ._base import SQLBaseStore
class FeedbackStore(SQLBaseStore):
def _store_feedback_txn(self, txn, event):
self._simple_insert_txn(txn, "feedback", {
"event_id": event.event_id,
"feedback_type": event.content["type"],
"room_id": event.room_id,
"target_event_id": event.content["target_event_id"],
"sender": event.user_id,
})
@defer.inlineCallbacks
def get_feedback_for_event(self, event_id):
sql = (
"SELECT events.* FROM events INNER JOIN feedback "
"ON events.event_id = feedback.event_id "
"WHERE feedback.target_event_id = ? "
)
rows = yield self._execute_and_decode("get_feedback_for_event", sql, event_id)
defer.returnValue(
[
(yield self._parse_events(r))
for r in rows
]
)

View File

@ -31,6 +31,7 @@ class FilteringStore(SQLBaseStore):
}, },
retcol="filter_json", retcol="filter_json",
allow_none=False, allow_none=False,
desc="get_user_filter",
) )
defer.returnValue(json.loads(def_json)) defer.returnValue(json.loads(def_json))

View File

@ -32,6 +32,7 @@ class MediaRepositoryStore(SQLBaseStore):
{"media_id": media_id}, {"media_id": media_id},
("media_type", "media_length", "upload_name", "created_ts"), ("media_type", "media_length", "upload_name", "created_ts"),
allow_none=True, allow_none=True,
desc="get_local_media",
) )
def store_local_media(self, media_id, media_type, time_now_ms, upload_name, def store_local_media(self, media_id, media_type, time_now_ms, upload_name,
@ -45,7 +46,8 @@ class MediaRepositoryStore(SQLBaseStore):
"upload_name": upload_name, "upload_name": upload_name,
"media_length": media_length, "media_length": media_length,
"user_id": user_id.to_string(), "user_id": user_id.to_string(),
} },
desc="store_local_media",
) )
def get_local_media_thumbnails(self, media_id): def get_local_media_thumbnails(self, media_id):
@ -55,7 +57,8 @@ class MediaRepositoryStore(SQLBaseStore):
( (
"thumbnail_width", "thumbnail_height", "thumbnail_method", "thumbnail_width", "thumbnail_height", "thumbnail_method",
"thumbnail_type", "thumbnail_length", "thumbnail_type", "thumbnail_length",
) ),
desc="get_local_media_thumbnails",
) )
def store_local_thumbnail(self, media_id, thumbnail_width, def store_local_thumbnail(self, media_id, thumbnail_width,
@ -70,7 +73,8 @@ class MediaRepositoryStore(SQLBaseStore):
"thumbnail_method": thumbnail_method, "thumbnail_method": thumbnail_method,
"thumbnail_type": thumbnail_type, "thumbnail_type": thumbnail_type,
"thumbnail_length": thumbnail_length, "thumbnail_length": thumbnail_length,
} },
desc="store_local_thumbnail",
) )
def get_cached_remote_media(self, origin, media_id): def get_cached_remote_media(self, origin, media_id):
@ -82,6 +86,7 @@ class MediaRepositoryStore(SQLBaseStore):
"filesystem_id", "filesystem_id",
), ),
allow_none=True, allow_none=True,
desc="get_cached_remote_media",
) )
def store_cached_remote_media(self, origin, media_id, media_type, def store_cached_remote_media(self, origin, media_id, media_type,
@ -97,7 +102,8 @@ class MediaRepositoryStore(SQLBaseStore):
"created_ts": time_now_ms, "created_ts": time_now_ms,
"upload_name": upload_name, "upload_name": upload_name,
"filesystem_id": filesystem_id, "filesystem_id": filesystem_id,
} },
desc="store_cached_remote_media",
) )
def get_remote_media_thumbnails(self, origin, media_id): def get_remote_media_thumbnails(self, origin, media_id):
@ -107,7 +113,8 @@ class MediaRepositoryStore(SQLBaseStore):
( (
"thumbnail_width", "thumbnail_height", "thumbnail_method", "thumbnail_width", "thumbnail_height", "thumbnail_method",
"thumbnail_type", "thumbnail_length", "filesystem_id", "thumbnail_type", "thumbnail_length", "filesystem_id",
) ),
desc="get_remote_media_thumbnails",
) )
def store_remote_media_thumbnail(self, origin, media_id, filesystem_id, def store_remote_media_thumbnail(self, origin, media_id, filesystem_id,
@ -125,5 +132,6 @@ class MediaRepositoryStore(SQLBaseStore):
"thumbnail_type": thumbnail_type, "thumbnail_type": thumbnail_type,
"thumbnail_length": thumbnail_length, "thumbnail_length": thumbnail_length,
"filesystem_id": filesystem_id, "filesystem_id": filesystem_id,
} },
desc="store_remote_media_thumbnail",
) )

View File

@ -21,6 +21,7 @@ class PresenceStore(SQLBaseStore):
return self._simple_insert( return self._simple_insert(
table="presence", table="presence",
values={"user_id": user_localpart}, values={"user_id": user_localpart},
desc="create_presence",
) )
def has_presence_state(self, user_localpart): def has_presence_state(self, user_localpart):
@ -29,6 +30,7 @@ class PresenceStore(SQLBaseStore):
keyvalues={"user_id": user_localpart}, keyvalues={"user_id": user_localpart},
retcols=["user_id"], retcols=["user_id"],
allow_none=True, allow_none=True,
desc="has_presence_state",
) )
def get_presence_state(self, user_localpart): def get_presence_state(self, user_localpart):
@ -36,6 +38,7 @@ class PresenceStore(SQLBaseStore):
table="presence", table="presence",
keyvalues={"user_id": user_localpart}, keyvalues={"user_id": user_localpart},
retcols=["state", "status_msg", "mtime"], retcols=["state", "status_msg", "mtime"],
desc="get_presence_state",
) )
def set_presence_state(self, user_localpart, new_state): def set_presence_state(self, user_localpart, new_state):
@ -45,7 +48,7 @@ class PresenceStore(SQLBaseStore):
updatevalues={"state": new_state["state"], updatevalues={"state": new_state["state"],
"status_msg": new_state["status_msg"], "status_msg": new_state["status_msg"],
"mtime": self._clock.time_msec()}, "mtime": self._clock.time_msec()},
retcols=["state"], desc="set_presence_state",
) )
def allow_presence_visible(self, observed_localpart, observer_userid): def allow_presence_visible(self, observed_localpart, observer_userid):
@ -53,6 +56,7 @@ class PresenceStore(SQLBaseStore):
table="presence_allow_inbound", table="presence_allow_inbound",
values={"observed_user_id": observed_localpart, values={"observed_user_id": observed_localpart,
"observer_user_id": observer_userid}, "observer_user_id": observer_userid},
desc="allow_presence_visible",
) )
def disallow_presence_visible(self, observed_localpart, observer_userid): def disallow_presence_visible(self, observed_localpart, observer_userid):
@ -60,6 +64,7 @@ class PresenceStore(SQLBaseStore):
table="presence_allow_inbound", table="presence_allow_inbound",
keyvalues={"observed_user_id": observed_localpart, keyvalues={"observed_user_id": observed_localpart,
"observer_user_id": observer_userid}, "observer_user_id": observer_userid},
desc="disallow_presence_visible",
) )
def is_presence_visible(self, observed_localpart, observer_userid): def is_presence_visible(self, observed_localpart, observer_userid):
@ -69,6 +74,7 @@ class PresenceStore(SQLBaseStore):
"observer_user_id": observer_userid}, "observer_user_id": observer_userid},
retcols=["observed_user_id"], retcols=["observed_user_id"],
allow_none=True, allow_none=True,
desc="is_presence_visible",
) )
def add_presence_list_pending(self, observer_localpart, observed_userid): def add_presence_list_pending(self, observer_localpart, observed_userid):
@ -77,6 +83,7 @@ class PresenceStore(SQLBaseStore):
values={"user_id": observer_localpart, values={"user_id": observer_localpart,
"observed_user_id": observed_userid, "observed_user_id": observed_userid,
"accepted": False}, "accepted": False},
desc="add_presence_list_pending",
) )
def set_presence_list_accepted(self, observer_localpart, observed_userid): def set_presence_list_accepted(self, observer_localpart, observed_userid):
@ -85,6 +92,7 @@ class PresenceStore(SQLBaseStore):
keyvalues={"user_id": observer_localpart, keyvalues={"user_id": observer_localpart,
"observed_user_id": observed_userid}, "observed_user_id": observed_userid},
updatevalues={"accepted": True}, updatevalues={"accepted": True},
desc="set_presence_list_accepted",
) )
def get_presence_list(self, observer_localpart, accepted=None): def get_presence_list(self, observer_localpart, accepted=None):
@ -96,6 +104,7 @@ class PresenceStore(SQLBaseStore):
table="presence_list", table="presence_list",
keyvalues=keyvalues, keyvalues=keyvalues,
retcols=["observed_user_id", "accepted"], retcols=["observed_user_id", "accepted"],
desc="get_presence_list",
) )
def del_presence_list(self, observer_localpart, observed_userid): def del_presence_list(self, observer_localpart, observed_userid):
@ -103,4 +112,5 @@ class PresenceStore(SQLBaseStore):
table="presence_list", table="presence_list",
keyvalues={"user_id": observer_localpart, keyvalues={"user_id": observer_localpart,
"observed_user_id": observed_userid}, "observed_user_id": observed_userid},
desc="del_presence_list",
) )

View File

@ -21,6 +21,7 @@ class ProfileStore(SQLBaseStore):
return self._simple_insert( return self._simple_insert(
table="profiles", table="profiles",
values={"user_id": user_localpart}, values={"user_id": user_localpart},
desc="create_profile",
) )
def get_profile_displayname(self, user_localpart): def get_profile_displayname(self, user_localpart):
@ -28,6 +29,7 @@ class ProfileStore(SQLBaseStore):
table="profiles", table="profiles",
keyvalues={"user_id": user_localpart}, keyvalues={"user_id": user_localpart},
retcol="displayname", retcol="displayname",
desc="get_profile_displayname",
) )
def set_profile_displayname(self, user_localpart, new_displayname): def set_profile_displayname(self, user_localpart, new_displayname):
@ -35,6 +37,7 @@ class ProfileStore(SQLBaseStore):
table="profiles", table="profiles",
keyvalues={"user_id": user_localpart}, keyvalues={"user_id": user_localpart},
updatevalues={"displayname": new_displayname}, updatevalues={"displayname": new_displayname},
desc="set_profile_displayname",
) )
def get_profile_avatar_url(self, user_localpart): def get_profile_avatar_url(self, user_localpart):
@ -42,6 +45,7 @@ class ProfileStore(SQLBaseStore):
table="profiles", table="profiles",
keyvalues={"user_id": user_localpart}, keyvalues={"user_id": user_localpart},
retcol="avatar_url", retcol="avatar_url",
desc="get_profile_avatar_url",
) )
def set_profile_avatar_url(self, user_localpart, new_avatar_url): def set_profile_avatar_url(self, user_localpart, new_avatar_url):
@ -49,4 +53,5 @@ class ProfileStore(SQLBaseStore):
table="profiles", table="profiles",
keyvalues={"user_id": user_localpart}, keyvalues={"user_id": user_localpart},
updatevalues={"avatar_url": new_avatar_url}, updatevalues={"avatar_url": new_avatar_url},
desc="set_profile_avatar_url",
) )

View File

@ -50,7 +50,8 @@ class PushRuleStore(SQLBaseStore):
results = yield self._simple_select_list( results = yield self._simple_select_list(
PushRuleEnableTable.table_name, PushRuleEnableTable.table_name,
{'user_name': user_name}, {'user_name': user_name},
PushRuleEnableTable.fields PushRuleEnableTable.fields,
desc="get_push_rules_enabled_for_user",
) )
defer.returnValue( defer.returnValue(
{r['rule_id']: False if r['enabled'] == 0 else True for r in results} {r['rule_id']: False if r['enabled'] == 0 else True for r in results}
@ -201,7 +202,8 @@ class PushRuleStore(SQLBaseStore):
""" """
yield self._simple_delete_one( yield self._simple_delete_one(
PushRuleTable.table_name, PushRuleTable.table_name,
{'user_name': user_name, 'rule_id': rule_id} {'user_name': user_name, 'rule_id': rule_id},
desc="delete_push_rule",
) )
@defer.inlineCallbacks @defer.inlineCallbacks
@ -209,7 +211,8 @@ class PushRuleStore(SQLBaseStore):
yield self._simple_upsert( yield self._simple_upsert(
PushRuleEnableTable.table_name, PushRuleEnableTable.table_name,
{'user_name': user_name, 'rule_id': rule_id}, {'user_name': user_name, 'rule_id': rule_id},
{'enabled': enabled} {'enabled': enabled},
desc="set_push_rule_enabled",
) )

View File

@ -114,7 +114,9 @@ class PusherStore(SQLBaseStore):
ts=pushkey_ts, ts=pushkey_ts,
lang=lang, lang=lang,
data=data data=data
)) ),
desc="add_pusher",
)
except Exception as e: except Exception as e:
logger.error("create_pusher with failed: %s", e) logger.error("create_pusher with failed: %s", e)
raise StoreError(500, "Problem creating pusher.") raise StoreError(500, "Problem creating pusher.")
@ -123,7 +125,8 @@ class PusherStore(SQLBaseStore):
def delete_pusher_by_app_id_pushkey(self, app_id, pushkey): def delete_pusher_by_app_id_pushkey(self, app_id, pushkey):
yield self._simple_delete_one( yield self._simple_delete_one(
PushersTable.table_name, PushersTable.table_name,
dict(app_id=app_id, pushkey=pushkey) {"app_id": app_id, "pushkey": pushkey},
desc="delete_pusher_by_app_id_pushkey",
) )
@defer.inlineCallbacks @defer.inlineCallbacks
@ -131,7 +134,8 @@ class PusherStore(SQLBaseStore):
yield self._simple_update_one( yield self._simple_update_one(
PushersTable.table_name, PushersTable.table_name,
{'app_id': app_id, 'pushkey': pushkey}, {'app_id': app_id, 'pushkey': pushkey},
{'last_token': last_token} {'last_token': last_token},
desc="update_pusher_last_token",
) )
@defer.inlineCallbacks @defer.inlineCallbacks
@ -140,7 +144,8 @@ class PusherStore(SQLBaseStore):
yield self._simple_update_one( yield self._simple_update_one(
PushersTable.table_name, PushersTable.table_name,
{'app_id': app_id, 'pushkey': pushkey}, {'app_id': app_id, 'pushkey': pushkey},
{'last_token': last_token, 'last_success': last_success} {'last_token': last_token, 'last_success': last_success},
desc="update_pusher_last_token_and_success",
) )
@defer.inlineCallbacks @defer.inlineCallbacks
@ -148,7 +153,8 @@ class PusherStore(SQLBaseStore):
yield self._simple_update_one( yield self._simple_update_one(
PushersTable.table_name, PushersTable.table_name,
{'app_id': app_id, 'pushkey': pushkey}, {'app_id': app_id, 'pushkey': pushkey},
{'failing_since': failing_since} {'failing_since': failing_since},
desc="update_pusher_failing_since",
) )

View File

@ -39,7 +39,10 @@ class RegistrationStore(SQLBaseStore):
Raises: Raises:
StoreError if there was a problem adding this. StoreError if there was a problem adding this.
""" """
row = yield self._simple_select_one("users", {"name": user_id}, ["id"]) row = yield self._simple_select_one(
"users", {"name": user_id}, ["id"],
desc="add_access_token_to_user",
)
if not row: if not row:
raise StoreError(400, "Bad user ID supplied.") raise StoreError(400, "Bad user ID supplied.")
row_id = row["id"] row_id = row["id"]
@ -48,7 +51,8 @@ class RegistrationStore(SQLBaseStore):
{ {
"user_id": row_id, "user_id": row_id,
"token": token "token": token
} },
desc="add_access_token_to_user",
) )
@defer.inlineCallbacks @defer.inlineCallbacks
@ -120,6 +124,7 @@ class RegistrationStore(SQLBaseStore):
keyvalues={"name": user.to_string()}, keyvalues={"name": user.to_string()},
retcol="admin", retcol="admin",
allow_none=True, allow_none=True,
desc="is_server_admin",
) )
defer.returnValue(res if res else False) defer.returnValue(res if res else False)

View File

@ -29,7 +29,7 @@ class RejectionsStore(SQLBaseStore):
"event_id": event_id, "event_id": event_id,
"reason": reason, "reason": reason,
"last_check": self._clock.time_msec(), "last_check": self._clock.time_msec(),
} },
) )
def get_rejection_reason(self, event_id): def get_rejection_reason(self, event_id):
@ -40,4 +40,5 @@ class RejectionsStore(SQLBaseStore):
"event_id": event_id, "event_id": event_id,
}, },
allow_none=True, allow_none=True,
desc="get_rejection_reason",
) )

View File

@ -15,11 +15,9 @@
from twisted.internet import defer from twisted.internet import defer
from sqlite3 import IntegrityError
from synapse.api.errors import StoreError from synapse.api.errors import StoreError
from ._base import SQLBaseStore, Table from ._base import SQLBaseStore
import collections import collections
import logging import logging
@ -27,8 +25,9 @@ import logging
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
OpsLevel = collections.namedtuple("OpsLevel", ( OpsLevel = collections.namedtuple(
"ban_level", "kick_level", "redact_level") "OpsLevel",
("ban_level", "kick_level", "redact_level",)
) )
@ -47,13 +46,15 @@ class RoomStore(SQLBaseStore):
StoreError if the room could not be stored. StoreError if the room could not be stored.
""" """
try: try:
yield self._simple_insert(RoomsTable.table_name, dict( yield self._simple_insert(
room_id=room_id, RoomsTable.table_name,
creator=room_creator_user_id, {
is_public=is_public "room_id": room_id,
)) "creator": room_creator_user_id,
except IntegrityError: "is_public": is_public,
raise StoreError(409, "Room ID in use.") },
desc="store_room",
)
except Exception as e: except Exception as e:
logger.error("store_room with room_id=%s failed: %s", room_id, e) logger.error("store_room with room_id=%s failed: %s", room_id, e)
raise StoreError(500, "Problem creating room.") raise StoreError(500, "Problem creating room.")
@ -66,9 +67,11 @@ class RoomStore(SQLBaseStore):
Returns: Returns:
A namedtuple containing the room information, or an empty list. A namedtuple containing the room information, or an empty list.
""" """
query = RoomsTable.select_statement("room_id=?") return self._simple_select_one(
return self._execute( table=RoomsTable.table_name,
"get_room", RoomsTable.decode_single_result, query, room_id, keyvalues={"room_id": room_id},
retcols=RoomsTable.fields,
desc="get_room",
) )
@defer.inlineCallbacks @defer.inlineCallbacks
@ -143,7 +146,7 @@ class RoomStore(SQLBaseStore):
"event_id": event.event_id, "event_id": event.event_id,
"room_id": event.room_id, "room_id": event.room_id,
"topic": event.content["topic"], "topic": event.content["topic"],
} },
) )
def _store_room_name_txn(self, txn, event): def _store_room_name_txn(self, txn, event):
@ -158,8 +161,45 @@ class RoomStore(SQLBaseStore):
} }
) )
@defer.inlineCallbacks
def get_room_name_and_aliases(self, room_id):
del_sql = (
"SELECT event_id FROM redactions WHERE redacts = e.event_id "
"LIMIT 1"
)
class RoomsTable(Table): sql = (
"SELECT e.*, (%(redacted)s) AS redacted FROM events as e "
"INNER JOIN current_state_events as c ON e.event_id = c.event_id "
"INNER JOIN state_events as s ON e.event_id = s.event_id "
"WHERE c.room_id = ? "
) % {
"redacted": del_sql,
}
sql += " AND ((s.type = 'm.room.name' AND s.state_key = '')"
sql += " OR s.type = 'm.room.aliases')"
args = (room_id,)
results = yield self._execute_and_decode("get_current_state", sql, *args)
events = yield self._parse_events(results)
name = None
aliases = []
for e in events:
if e.type == 'm.room.name':
if 'name' in e.content:
name = e.content['name']
elif e.type == 'm.room.aliases':
if 'aliases' in e.content:
aliases.extend(e.content['aliases'])
defer.returnValue((name, aliases))
class RoomsTable(object):
table_name = "rooms" table_name = "rooms"
fields = [ fields = [
@ -167,5 +207,3 @@ class RoomsTable(Table):
"is_public", "is_public",
"creator" "creator"
] ]
EntryType = collections.namedtuple("RoomEntry", fields)

View File

@ -212,7 +212,8 @@ class RoomMemberStore(SQLBaseStore):
return self._simple_select_onecol( return self._simple_select_onecol(
"room_hosts", "room_hosts",
{"room_id": room_id}, {"room_id": room_id},
"host" "host",
desc="get_joined_hosts_for_room",
) )
def _get_members_by_dict(self, where_dict): def _get_members_by_dict(self, where_dict):

View File

@ -15,6 +15,8 @@
from ._base import SQLBaseStore from ._base import SQLBaseStore
from twisted.internet import defer
import logging import logging
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -122,3 +124,33 @@ class StateStore(SQLBaseStore):
}, },
or_replace=True, or_replace=True,
) )
@defer.inlineCallbacks
def get_current_state(self, room_id, event_type=None, state_key=""):
del_sql = (
"SELECT event_id FROM redactions WHERE redacts = e.event_id "
"LIMIT 1"
)
sql = (
"SELECT e.*, (%(redacted)s) AS redacted FROM events as e "
"INNER JOIN current_state_events as c ON e.event_id = c.event_id "
"INNER JOIN state_events as s ON e.event_id = s.event_id "
"WHERE c.room_id = ? "
) % {
"redacted": del_sql,
}
if event_type and state_key is not None:
sql += " AND s.type = ? AND s.state_key = ? "
args = (room_id, event_type, state_key)
elif event_type:
sql += " AND s.type = ?"
args = (room_id, event_type)
else:
args = (room_id, )
results = yield self._execute_and_decode("get_current_state", sql, *args)
events = yield self._parse_events(results)
defer.returnValue(events)

View File

@ -420,6 +420,25 @@ class StreamStore(SQLBaseStore):
self._get_room_events_max_id_txn self._get_room_events_max_id_txn
) )
@defer.inlineCallbacks
def _get_min_token(self):
row = yield self._execute(
"_get_min_token", None, "SELECT MIN(stream_ordering) FROM events"
)
self.min_token = row[0][0] if row and row[0] and row[0][0] else -1
self.min_token = min(self.min_token, -1)
logger.debug("min_token is: %s", self.min_token)
defer.returnValue(self.min_token)
def get_next_stream_id(self):
with self._next_stream_id_lock:
i = self._next_stream_id
self._next_stream_id += 1
return i
def _get_room_events_max_id_txn(self, txn): def _get_room_events_max_id_txn(self, txn):
txn.execute( txn.execute(
"SELECT MAX(stream_ordering) as m FROM events" "SELECT MAX(stream_ordering) as m FROM events"

View File

@ -46,15 +46,19 @@ class TransactionStore(SQLBaseStore):
) )
def _get_received_txn_response(self, txn, transaction_id, origin): def _get_received_txn_response(self, txn, transaction_id, origin):
where_clause = "transaction_id = ? AND origin = ?" result = self._simple_select_one_txn(
query = ReceivedTransactionsTable.select_statement(where_clause) txn,
table=ReceivedTransactionsTable.table_name,
keyvalues={
"transaction_id": transaction_id,
"origin": origin,
},
retcols=ReceivedTransactionsTable.fields,
allow_none=True,
)
txn.execute(query, (transaction_id, origin)) if result and result.response_code:
return result["response_code"], result["response_json"]
results = ReceivedTransactionsTable.decode_results(txn.fetchall())
if results and results[0].response_code:
return (results[0].response_code, results[0].response_json)
else: else:
return None return None

View File

@ -180,7 +180,7 @@ class SQLBaseStoreTestCase(unittest.TestCase):
self.mock_txn.rowcount = 1 self.mock_txn.rowcount = 1
self.mock_txn.fetchone.return_value = ("Old Value",) self.mock_txn.fetchone.return_value = ("Old Value",)
ret = yield self.datastore._simple_update_one( ret = yield self.datastore._simple_selectupdate_one(
table="tablename", table="tablename",
keyvalues={"keycol": "TheKey"}, keyvalues={"keycol": "TheKey"},
updatevalues={"columname": "New Value"}, updatevalues={"columname": "New Value"},

View File

@ -44,7 +44,7 @@ class RoomStoreTestCase(unittest.TestCase):
@defer.inlineCallbacks @defer.inlineCallbacks
def test_get_room(self): def test_get_room(self):
self.assertObjectHasAttributes( self.assertDictContainsSubset(
{"room_id": self.room.to_string(), {"room_id": self.room.to_string(),
"creator": self.u_creator.to_string(), "creator": self.u_creator.to_string(),
"is_public": True}, "is_public": True},