diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index 8d33def6c6..8f812f0fd7 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -329,13 +329,14 @@ class SQLBaseStore(object): self.database_engine = hs.database_engine - self._stream_id_gen = StreamIdGenerator() + self._stream_id_gen = StreamIdGenerator("events", "stream_ordering") self._transaction_id_gen = IdGenerator("sent_transactions", "id", self) self._state_groups_id_gen = IdGenerator("state_groups", "id", self) self._access_tokens_id_gen = IdGenerator("access_tokens", "id", self) self._pushers_id_gen = IdGenerator("pushers", "id", self) self._push_rule_id_gen = IdGenerator("push_rules", "id", self) self._push_rules_enable_id_gen = IdGenerator("push_rules_enable", "id", self) + self._receipts_id_gen = StreamIdGenerator("receipts_linearized", "stream_id") def start_profiling(self): self._previous_loop_ts = self._clock.time_msec() diff --git a/synapse/storage/receipts.py b/synapse/storage/receipts.py new file mode 100644 index 0000000000..0168e74a0d --- /dev/null +++ b/synapse/storage/receipts.py @@ -0,0 +1,162 @@ +# -*- coding: utf-8 -*- +# Copyright 2014, 2015 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from ._base import SQLBaseStore, cached + +from twisted.internet import defer + + +class ReceiptStore(SQLBaseStore): + + @cached + @defer.inlineCallbacks + def get_linearized_receipts_for_room(self, room_id): + rows = yield self._simple_select_list( + table="receipts_linearized", + keyvalues={"room_id": room_id}, + retcols=["receipt_type", "user_id", "event_id"], + desc="get_linearized_receipts_for_room", + ) + + result = {} + for row in rows: + result.setdefault( + row["event_id"], {} + ).setdefault( + row["receipt_type"], [] + ).append(row["user_id"]) + + defer.returnValue(result) + + @cached + @defer.inlineCallbacks + def get_graph_receipts_for_room(self, room_id): + rows = yield self._simple_select_list( + table="receipts_graph", + keyvalues={"room_id": room_id}, + retcols=["receipt_type", "user_id", "event_id"], + desc="get_linearized_receipts_for_room", + ) + + result = {} + for row in rows: + result.setdefault( + row["user_id"], {} + ).setdefault( + row["receipt_type"], [] + ).append(row["event_id"]) + + defer.returnValue(result) + + def insert_linearized_receipt_txn(self, txn, room_id, receipt_type, + user_id, event_id, stream_id): + self._simple_delete_txn( + txn, + table="receipts_linearized", + keyvalues={ + "stream_id": stream_id, + "room_id": room_id, + "receipt_type": receipt_type, + "user_id": user_id, + } + ) + + self._simple_insert_txn( + txn, + table="receipts_linearized", + values={ + "room_id": room_id, + "receipt_type": receipt_type, + "user_id": user_id, + "event_id": event_id, + } + ) + + @defer.inlineCallbacks + def insert_receipt(self, room_id, receipt_type, user_id, event_ids): + if not event_ids: + return + + if len(event_ids) == 1: + linearized_event_id = event_ids[0] + else: + # we need to points in graph -> linearized form. + def graph_to_linear(txn): + query = ( + "SELECT event_id WHERE room_id = ? AND stream_ordering IN (" + " SELECT max(stream_ordering) WHERE event_id IN (%s)" + ")" + ) % (",".join(["?"] * len(event_ids))) + + txn.execute(query, [room_id] + event_ids) + rows = txn.fetchall() + if rows: + return rows[0][0] + else: + # TODO: ARGH?! + return None + + linearized_event_id = yield self.runInteraction( + graph_to_linear, desc="insert_receipt_conv" + ) + + stream_id_manager = yield self._stream_id_gen.get_next(self) + with stream_id_manager() as stream_id: + yield self.runInteraction( + self.insert_linearized_receipt_txn, + room_id, receipt_type, user_id, linearized_event_id, + stream_id=stream_id, + desc="insert_linearized_receipt" + ) + + yield self.insert_graph_receipt( + room_id, receipt_type, user_id, event_ids + ) + + max_persisted_id = yield self._stream_id_gen.get_max_token(self) + defer.returnValue((stream_id, max_persisted_id)) + + def insert_graph_receipt(self, room_id, receipt_type, + user_id, event_ids): + return self.runInteraction( + self.insert_graph_receipt_txn, + room_id, receipt_type, user_id, event_ids, + desc="insert_graph_receipt" + ) + + def insert_graph_receipt_txn(self, txn, room_id, receipt_type, + user_id, event_ids): + self._simple_delete_txn( + txn, + table="receipts_graph", + keyvalues={ + "room_id": room_id, + "receipt_type": receipt_type, + "user_id": user_id, + } + ) + self._simple_insert_many_txn( + txn, + table="receipts_graph", + values=[ + { + "room_id": room_id, + "receipt_type": receipt_type, + "user_id": user_id, + "event_id": event_id, + } + for event_id in event_ids + ], + ) diff --git a/synapse/storage/schema/delta/21/receipts.sql b/synapse/storage/schema/delta/21/receipts.sql index da9e18e903..ccd64ec7f4 100644 --- a/synapse/storage/schema/delta/21/receipts.sql +++ b/synapse/storage/schema/delta/21/receipts.sql @@ -1,16 +1,18 @@ -# Copyright 2015 OpenMarket Ltd -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. +/* Copyright 2015 OpenMarket Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + CREATE TABLE IF NOT EXISTS receipts_graph( room_id TEXT NOT NULL, @@ -24,12 +26,13 @@ CREATE INDEX receipts_graph_room_tuple ON receipts_graph( ); CREATE TABLE IF NOT EXISTS receipts_linearized ( + stream_id BIGINT NOT NULL, room_id TEXT NOT NULL, receipt_type TEXT NOT NULL, user_id TEXT NOT NULL, event_id TEXT NOT NULL ); -CREATE INDEX receipts_graph_room_tuple ON receipts_graph( +CREATE INDEX receipts_linearized_room_tuple ON receipts_graph( room_id, receipt_type, user_id ); diff --git a/synapse/storage/util/id_generators.py b/synapse/storage/util/id_generators.py index 89d1643f10..b39006315d 100644 --- a/synapse/storage/util/id_generators.py +++ b/synapse/storage/util/id_generators.py @@ -72,7 +72,10 @@ class StreamIdGenerator(object): with stream_id_gen.get_next_txn(txn) as stream_id: # ... persist event ... """ - def __init__(self): + def __init__(self, table, column): + self.table = table + self.column = column + self._lock = threading.Lock() self._current_max = None @@ -126,7 +129,7 @@ class StreamIdGenerator(object): def _get_or_compute_current_max(self, txn): with self._lock: - txn.execute("SELECT MAX(stream_ordering) FROM events") + txn.execute("SELECT MAX(%s) FROM %s" % (self.column, self.table)) rows = txn.fetchall() val, = rows[0]