Combine the CurrentStateDeltaStream into the EventStream
This commit is contained in:
parent
1f6d6f918a
commit
4b91c313a9
|
@ -36,6 +36,10 @@ from synapse.replication.slave.storage.client_ips import SlavedClientIpStore
|
|||
from synapse.replication.slave.storage.events import SlavedEventStore
|
||||
from synapse.replication.slave.storage.registration import SlavedRegistrationStore
|
||||
from synapse.replication.tcp.client import ReplicationClientHandler
|
||||
from synapse.replication.tcp.streams.events import (
|
||||
EventsStream,
|
||||
EventsStreamCurrentStateRow,
|
||||
)
|
||||
from synapse.rest.client.v2_alpha import user_directory
|
||||
from synapse.server import HomeServer
|
||||
from synapse.storage.engines import create_engine
|
||||
|
@ -73,19 +77,18 @@ class UserDirectorySlaveStore(
|
|||
prefilled_cache=curr_state_delta_prefill,
|
||||
)
|
||||
|
||||
self._current_state_delta_pos = events_max
|
||||
|
||||
def stream_positions(self):
|
||||
result = super(UserDirectorySlaveStore, self).stream_positions()
|
||||
result["current_state_deltas"] = self._current_state_delta_pos
|
||||
return result
|
||||
|
||||
def process_replication_rows(self, stream_name, token, rows):
|
||||
if stream_name == "current_state_deltas":
|
||||
self._current_state_delta_pos = token
|
||||
if stream_name == EventsStream.NAME:
|
||||
self._stream_id_gen.advance(token)
|
||||
for row in rows:
|
||||
if row.type != EventsStreamCurrentStateRow.TypeId:
|
||||
continue
|
||||
self._curr_state_delta_stream_cache.entity_has_changed(
|
||||
row.room_id, token
|
||||
row.data.room_id, token
|
||||
)
|
||||
return super(UserDirectorySlaveStore, self).process_replication_rows(
|
||||
stream_name, token, rows
|
||||
|
@ -170,7 +173,7 @@ class UserDirectoryReplicationHandler(ReplicationClientHandler):
|
|||
yield super(UserDirectoryReplicationHandler, self).on_rdata(
|
||||
stream_name, token, rows
|
||||
)
|
||||
if stream_name == "current_state_deltas":
|
||||
if stream_name == EventsStream.NAME:
|
||||
run_in_background(self._notify_directory)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
|
|
|
@ -44,7 +44,6 @@ STREAMS_MAP = {
|
|||
federation.FederationStream,
|
||||
_base.TagAccountDataStream,
|
||||
_base.AccountDataStream,
|
||||
_base.CurrentStateDeltaStream,
|
||||
_base.GroupServerStream,
|
||||
)
|
||||
}
|
||||
|
|
|
@ -91,12 +91,6 @@ AccountDataStreamRow = namedtuple("AccountDataStream", (
|
|||
"data_type", # str
|
||||
"data", # dict
|
||||
))
|
||||
CurrentStateDeltaStreamRow = namedtuple("CurrentStateDeltaStream", (
|
||||
"room_id", # str
|
||||
"type", # str
|
||||
"state_key", # str
|
||||
"event_id", # str, optional
|
||||
))
|
||||
GroupsStreamRow = namedtuple("GroupsStreamRow", (
|
||||
"group_id", # str
|
||||
"user_id", # str
|
||||
|
@ -428,21 +422,6 @@ class AccountDataStream(Stream):
|
|||
defer.returnValue(results)
|
||||
|
||||
|
||||
class CurrentStateDeltaStream(Stream):
|
||||
"""Current state for a room was changed
|
||||
"""
|
||||
NAME = "current_state_deltas"
|
||||
ROW_TYPE = CurrentStateDeltaStreamRow
|
||||
|
||||
def __init__(self, hs):
|
||||
store = hs.get_datastore()
|
||||
|
||||
self.current_token = store.get_max_current_state_delta_stream_id
|
||||
self.update_function = store.get_all_updated_current_state_deltas
|
||||
|
||||
super(CurrentStateDeltaStream, self).__init__(hs)
|
||||
|
||||
|
||||
class GroupServerStream(Stream):
|
||||
NAME = "groups"
|
||||
ROW_TYPE = GroupsStreamRow
|
||||
|
|
|
@ -13,6 +13,7 @@
|
|||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
import heapq
|
||||
|
||||
import attr
|
||||
|
||||
|
@ -26,6 +27,7 @@ from ._base import Stream
|
|||
This stream contains rows of various types. Each row therefore contains a 'type'
|
||||
identifier before the real data. For example::
|
||||
|
||||
RDATA events batch ["state", ["!room:id", "m.type", "", "$event:id"]]
|
||||
RDATA events 12345 ["ev", ["$event:id", "!room:id", "m.type", null, null]]
|
||||
|
||||
An "ev" row is sent for each new event. The fields in the data part are:
|
||||
|
@ -36,6 +38,14 @@ An "ev" row is sent for each new event. The fields in the data part are:
|
|||
* The state key of the event, for state events
|
||||
* The event id of an event which is redacted by this event.
|
||||
|
||||
A "state" row is sent whenever the "current state" in a room changes. The fields in the
|
||||
data part are:
|
||||
|
||||
* The room id for the state change
|
||||
* The event type of the state which has changed
|
||||
* The state_key of the state which has changed
|
||||
* The event id of the new state
|
||||
|
||||
"""
|
||||
|
||||
|
||||
|
@ -77,10 +87,21 @@ class EventsStreamEventRow(BaseEventsStreamRow):
|
|||
redacts = attr.ib() # str, optional
|
||||
|
||||
|
||||
@attr.s(slots=True, frozen=True)
|
||||
class EventsStreamCurrentStateRow(BaseEventsStreamRow):
|
||||
TypeId = "state"
|
||||
|
||||
room_id = attr.ib() # str
|
||||
type = attr.ib() # str
|
||||
state_key = attr.ib() # str
|
||||
event_id = attr.ib() # str, optional
|
||||
|
||||
|
||||
TypeToRow = {
|
||||
Row.TypeId: Row
|
||||
for Row in (
|
||||
EventsStreamEventRow,
|
||||
EventsStreamCurrentStateRow,
|
||||
)
|
||||
}
|
||||
|
||||
|
@ -105,7 +126,18 @@ class EventsStream(Stream):
|
|||
(row[0], EventsStreamEventRow.TypeId, row[1:])
|
||||
for row in event_rows
|
||||
)
|
||||
defer.returnValue(event_updates)
|
||||
|
||||
state_rows = yield self._store.get_all_updated_current_state_deltas(
|
||||
from_token, current_token, limit
|
||||
)
|
||||
state_updates = (
|
||||
(row[0], EventsStreamCurrentStateRow.TypeId, row[1:])
|
||||
for row in state_rows
|
||||
)
|
||||
|
||||
all_updates = heapq.merge(event_updates, state_updates)
|
||||
|
||||
defer.returnValue(all_updates)
|
||||
|
||||
@classmethod
|
||||
def parse_row(cls, row):
|
||||
|
|
|
@ -2287,9 +2287,6 @@ class EventsStore(StateGroupWorkerStore, EventFederationStore, EventsWorkerStore
|
|||
|
||||
defer.returnValue((int(res["topological_ordering"]), int(res["stream_ordering"])))
|
||||
|
||||
def get_max_current_state_delta_stream_id(self):
|
||||
return self._stream_id_gen.get_current_token()
|
||||
|
||||
def get_all_updated_current_state_deltas(self, from_token, to_token, limit):
|
||||
def get_all_updated_current_state_deltas_txn(txn):
|
||||
sql = """
|
||||
|
|
Loading…
Reference in New Issue