Add experimental support for sharding event persister. (#8170)
This is *not* ready for production yet. Caveats: 1. We should write some tests... 2. The stream token that we use for events can get stalled at the minimum position of all writers. This means that new events may not be processed and e.g. sent down sync streams if a writer isn't writing or is slow.
This commit is contained in:
parent
b257c788c0
commit
82c1ee1c22
|
@ -0,0 +1 @@
|
||||||
|
Add experimental support for sharding event persister.
|
|
@ -832,11 +832,26 @@ class ShardedWorkerHandlingConfig:
|
||||||
def should_handle(self, instance_name: str, key: str) -> bool:
|
def should_handle(self, instance_name: str, key: str) -> bool:
|
||||||
"""Whether this instance is responsible for handling the given key.
|
"""Whether this instance is responsible for handling the given key.
|
||||||
"""
|
"""
|
||||||
|
# If multiple instances are not defined we always return true
|
||||||
# If multiple instances are not defined we always return true.
|
|
||||||
if not self.instances or len(self.instances) == 1:
|
if not self.instances or len(self.instances) == 1:
|
||||||
return True
|
return True
|
||||||
|
|
||||||
|
return self.get_instance(key) == instance_name
|
||||||
|
|
||||||
|
def get_instance(self, key: str) -> str:
|
||||||
|
"""Get the instance responsible for handling the given key.
|
||||||
|
|
||||||
|
Note: For things like federation sending the config for which instance
|
||||||
|
is sending is known only to the sender instance if there is only one.
|
||||||
|
Therefore `should_handle` should be used where possible.
|
||||||
|
"""
|
||||||
|
|
||||||
|
if not self.instances:
|
||||||
|
return "master"
|
||||||
|
|
||||||
|
if len(self.instances) == 1:
|
||||||
|
return self.instances[0]
|
||||||
|
|
||||||
# We shard by taking the hash, modulo it by the number of instances and
|
# We shard by taking the hash, modulo it by the number of instances and
|
||||||
# then checking whether this instance matches the instance at that
|
# then checking whether this instance matches the instance at that
|
||||||
# index.
|
# index.
|
||||||
|
@ -846,7 +861,7 @@ class ShardedWorkerHandlingConfig:
|
||||||
dest_hash = sha256(key.encode("utf8")).digest()
|
dest_hash = sha256(key.encode("utf8")).digest()
|
||||||
dest_int = int.from_bytes(dest_hash, byteorder="little")
|
dest_int = int.from_bytes(dest_hash, byteorder="little")
|
||||||
remainder = dest_int % (len(self.instances))
|
remainder = dest_int % (len(self.instances))
|
||||||
return self.instances[remainder] == instance_name
|
return self.instances[remainder]
|
||||||
|
|
||||||
|
|
||||||
__all__ = ["Config", "RootConfig", "ShardedWorkerHandlingConfig"]
|
__all__ = ["Config", "RootConfig", "ShardedWorkerHandlingConfig"]
|
||||||
|
|
|
@ -142,3 +142,4 @@ class ShardedWorkerHandlingConfig:
|
||||||
instances: List[str]
|
instances: List[str]
|
||||||
def __init__(self, instances: List[str]) -> None: ...
|
def __init__(self, instances: List[str]) -> None: ...
|
||||||
def should_handle(self, instance_name: str, key: str) -> bool: ...
|
def should_handle(self, instance_name: str, key: str) -> bool: ...
|
||||||
|
def get_instance(self, key: str) -> str: ...
|
||||||
|
|
|
@ -13,12 +13,24 @@
|
||||||
# See the License for the specific language governing permissions and
|
# See the License for the specific language governing permissions and
|
||||||
# limitations under the License.
|
# limitations under the License.
|
||||||
|
|
||||||
|
from typing import List, Union
|
||||||
|
|
||||||
import attr
|
import attr
|
||||||
|
|
||||||
from ._base import Config, ConfigError, ShardedWorkerHandlingConfig
|
from ._base import Config, ConfigError, ShardedWorkerHandlingConfig
|
||||||
from .server import ListenerConfig, parse_listener_def
|
from .server import ListenerConfig, parse_listener_def
|
||||||
|
|
||||||
|
|
||||||
|
def _instance_to_list_converter(obj: Union[str, List[str]]) -> List[str]:
|
||||||
|
"""Helper for allowing parsing a string or list of strings to a config
|
||||||
|
option expecting a list of strings.
|
||||||
|
"""
|
||||||
|
|
||||||
|
if isinstance(obj, str):
|
||||||
|
return [obj]
|
||||||
|
return obj
|
||||||
|
|
||||||
|
|
||||||
@attr.s
|
@attr.s
|
||||||
class InstanceLocationConfig:
|
class InstanceLocationConfig:
|
||||||
"""The host and port to talk to an instance via HTTP replication.
|
"""The host and port to talk to an instance via HTTP replication.
|
||||||
|
@ -33,11 +45,13 @@ class WriterLocations:
|
||||||
"""Specifies the instances that write various streams.
|
"""Specifies the instances that write various streams.
|
||||||
|
|
||||||
Attributes:
|
Attributes:
|
||||||
events: The instance that writes to the event and backfill streams.
|
events: The instances that write to the event and backfill streams.
|
||||||
events: The instance that writes to the typing stream.
|
typing: The instance that writes to the typing stream.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
events = attr.ib(default="master", type=str)
|
events = attr.ib(
|
||||||
|
default=["master"], type=List[str], converter=_instance_to_list_converter
|
||||||
|
)
|
||||||
typing = attr.ib(default="master", type=str)
|
typing = attr.ib(default="master", type=str)
|
||||||
|
|
||||||
|
|
||||||
|
@ -105,15 +119,18 @@ class WorkerConfig(Config):
|
||||||
writers = config.get("stream_writers") or {}
|
writers = config.get("stream_writers") or {}
|
||||||
self.writers = WriterLocations(**writers)
|
self.writers = WriterLocations(**writers)
|
||||||
|
|
||||||
# Check that the configured writer for events and typing also appears in
|
# Check that the configured writers for events and typing also appears in
|
||||||
# `instance_map`.
|
# `instance_map`.
|
||||||
for stream in ("events", "typing"):
|
for stream in ("events", "typing"):
|
||||||
instance = getattr(self.writers, stream)
|
instances = _instance_to_list_converter(getattr(self.writers, stream))
|
||||||
if instance != "master" and instance not in self.instance_map:
|
for instance in instances:
|
||||||
raise ConfigError(
|
if instance != "master" and instance not in self.instance_map:
|
||||||
"Instance %r is configured to write %s but does not appear in `instance_map` config."
|
raise ConfigError(
|
||||||
% (instance, stream)
|
"Instance %r is configured to write %s but does not appear in `instance_map` config."
|
||||||
)
|
% (instance, stream)
|
||||||
|
)
|
||||||
|
|
||||||
|
self.events_shard_config = ShardedWorkerHandlingConfig(self.writers.events)
|
||||||
|
|
||||||
def generate_config_section(self, config_dir_path, server_name, **kwargs):
|
def generate_config_section(self, config_dir_path, server_name, **kwargs):
|
||||||
return """\
|
return """\
|
||||||
|
|
|
@ -923,7 +923,8 @@ class FederationHandler(BaseHandler):
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
|
|
||||||
await self._handle_new_events(dest, ev_infos, backfilled=True)
|
if ev_infos:
|
||||||
|
await self._handle_new_events(dest, room_id, ev_infos, backfilled=True)
|
||||||
|
|
||||||
# Step 2: Persist the rest of the events in the chunk one by one
|
# Step 2: Persist the rest of the events in the chunk one by one
|
||||||
events.sort(key=lambda e: e.depth)
|
events.sort(key=lambda e: e.depth)
|
||||||
|
@ -1216,7 +1217,7 @@ class FederationHandler(BaseHandler):
|
||||||
event_infos.append(_NewEventInfo(event, None, auth))
|
event_infos.append(_NewEventInfo(event, None, auth))
|
||||||
|
|
||||||
await self._handle_new_events(
|
await self._handle_new_events(
|
||||||
destination, event_infos,
|
destination, room_id, event_infos,
|
||||||
)
|
)
|
||||||
|
|
||||||
def _sanity_check_event(self, ev):
|
def _sanity_check_event(self, ev):
|
||||||
|
@ -1363,15 +1364,15 @@ class FederationHandler(BaseHandler):
|
||||||
)
|
)
|
||||||
|
|
||||||
max_stream_id = await self._persist_auth_tree(
|
max_stream_id = await self._persist_auth_tree(
|
||||||
origin, auth_chain, state, event, room_version_obj
|
origin, room_id, auth_chain, state, event, room_version_obj
|
||||||
)
|
)
|
||||||
|
|
||||||
# We wait here until this instance has seen the events come down
|
# We wait here until this instance has seen the events come down
|
||||||
# replication (if we're using replication) as the below uses caches.
|
# replication (if we're using replication) as the below uses caches.
|
||||||
#
|
|
||||||
# TODO: Currently the events stream is written to from master
|
|
||||||
await self._replication.wait_for_stream_position(
|
await self._replication.wait_for_stream_position(
|
||||||
self.config.worker.writers.events, "events", max_stream_id
|
self.config.worker.events_shard_config.get_instance(room_id),
|
||||||
|
"events",
|
||||||
|
max_stream_id,
|
||||||
)
|
)
|
||||||
|
|
||||||
# Check whether this room is the result of an upgrade of a room we already know
|
# Check whether this room is the result of an upgrade of a room we already know
|
||||||
|
@ -1625,7 +1626,7 @@ class FederationHandler(BaseHandler):
|
||||||
)
|
)
|
||||||
|
|
||||||
context = await self.state_handler.compute_event_context(event)
|
context = await self.state_handler.compute_event_context(event)
|
||||||
await self.persist_events_and_notify([(event, context)])
|
await self.persist_events_and_notify(event.room_id, [(event, context)])
|
||||||
|
|
||||||
return event
|
return event
|
||||||
|
|
||||||
|
@ -1652,7 +1653,9 @@ class FederationHandler(BaseHandler):
|
||||||
await self.federation_client.send_leave(host_list, event)
|
await self.federation_client.send_leave(host_list, event)
|
||||||
|
|
||||||
context = await self.state_handler.compute_event_context(event)
|
context = await self.state_handler.compute_event_context(event)
|
||||||
stream_id = await self.persist_events_and_notify([(event, context)])
|
stream_id = await self.persist_events_and_notify(
|
||||||
|
event.room_id, [(event, context)]
|
||||||
|
)
|
||||||
|
|
||||||
return event, stream_id
|
return event, stream_id
|
||||||
|
|
||||||
|
@ -1900,7 +1903,7 @@ class FederationHandler(BaseHandler):
|
||||||
)
|
)
|
||||||
|
|
||||||
await self.persist_events_and_notify(
|
await self.persist_events_and_notify(
|
||||||
[(event, context)], backfilled=backfilled
|
event.room_id, [(event, context)], backfilled=backfilled
|
||||||
)
|
)
|
||||||
except Exception:
|
except Exception:
|
||||||
run_in_background(
|
run_in_background(
|
||||||
|
@ -1913,6 +1916,7 @@ class FederationHandler(BaseHandler):
|
||||||
async def _handle_new_events(
|
async def _handle_new_events(
|
||||||
self,
|
self,
|
||||||
origin: str,
|
origin: str,
|
||||||
|
room_id: str,
|
||||||
event_infos: Iterable[_NewEventInfo],
|
event_infos: Iterable[_NewEventInfo],
|
||||||
backfilled: bool = False,
|
backfilled: bool = False,
|
||||||
) -> None:
|
) -> None:
|
||||||
|
@ -1944,6 +1948,7 @@ class FederationHandler(BaseHandler):
|
||||||
)
|
)
|
||||||
|
|
||||||
await self.persist_events_and_notify(
|
await self.persist_events_and_notify(
|
||||||
|
room_id,
|
||||||
[
|
[
|
||||||
(ev_info.event, context)
|
(ev_info.event, context)
|
||||||
for ev_info, context in zip(event_infos, contexts)
|
for ev_info, context in zip(event_infos, contexts)
|
||||||
|
@ -1954,6 +1959,7 @@ class FederationHandler(BaseHandler):
|
||||||
async def _persist_auth_tree(
|
async def _persist_auth_tree(
|
||||||
self,
|
self,
|
||||||
origin: str,
|
origin: str,
|
||||||
|
room_id: str,
|
||||||
auth_events: List[EventBase],
|
auth_events: List[EventBase],
|
||||||
state: List[EventBase],
|
state: List[EventBase],
|
||||||
event: EventBase,
|
event: EventBase,
|
||||||
|
@ -1968,6 +1974,7 @@ class FederationHandler(BaseHandler):
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
origin: Where the events came from
|
origin: Where the events came from
|
||||||
|
room_id,
|
||||||
auth_events
|
auth_events
|
||||||
state
|
state
|
||||||
event
|
event
|
||||||
|
@ -2042,17 +2049,20 @@ class FederationHandler(BaseHandler):
|
||||||
events_to_context[e.event_id].rejected = RejectedReason.AUTH_ERROR
|
events_to_context[e.event_id].rejected = RejectedReason.AUTH_ERROR
|
||||||
|
|
||||||
await self.persist_events_and_notify(
|
await self.persist_events_and_notify(
|
||||||
|
room_id,
|
||||||
[
|
[
|
||||||
(e, events_to_context[e.event_id])
|
(e, events_to_context[e.event_id])
|
||||||
for e in itertools.chain(auth_events, state)
|
for e in itertools.chain(auth_events, state)
|
||||||
]
|
],
|
||||||
)
|
)
|
||||||
|
|
||||||
new_event_context = await self.state_handler.compute_event_context(
|
new_event_context = await self.state_handler.compute_event_context(
|
||||||
event, old_state=state
|
event, old_state=state
|
||||||
)
|
)
|
||||||
|
|
||||||
return await self.persist_events_and_notify([(event, new_event_context)])
|
return await self.persist_events_and_notify(
|
||||||
|
room_id, [(event, new_event_context)]
|
||||||
|
)
|
||||||
|
|
||||||
async def _prep_event(
|
async def _prep_event(
|
||||||
self,
|
self,
|
||||||
|
@ -2903,6 +2913,7 @@ class FederationHandler(BaseHandler):
|
||||||
|
|
||||||
async def persist_events_and_notify(
|
async def persist_events_and_notify(
|
||||||
self,
|
self,
|
||||||
|
room_id: str,
|
||||||
event_and_contexts: Sequence[Tuple[EventBase, EventContext]],
|
event_and_contexts: Sequence[Tuple[EventBase, EventContext]],
|
||||||
backfilled: bool = False,
|
backfilled: bool = False,
|
||||||
) -> int:
|
) -> int:
|
||||||
|
@ -2910,14 +2921,19 @@ class FederationHandler(BaseHandler):
|
||||||
necessary.
|
necessary.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
event_and_contexts:
|
room_id: The room ID of events being persisted.
|
||||||
|
event_and_contexts: Sequence of events with their associated
|
||||||
|
context that should be persisted. All events must belong to
|
||||||
|
the same room.
|
||||||
backfilled: Whether these events are a result of
|
backfilled: Whether these events are a result of
|
||||||
backfilling or not
|
backfilling or not
|
||||||
"""
|
"""
|
||||||
if self.config.worker.writers.events != self._instance_name:
|
instance = self.config.worker.events_shard_config.get_instance(room_id)
|
||||||
|
if instance != self._instance_name:
|
||||||
result = await self._send_events(
|
result = await self._send_events(
|
||||||
instance_name=self.config.worker.writers.events,
|
instance_name=instance,
|
||||||
store=self.store,
|
store=self.store,
|
||||||
|
room_id=room_id,
|
||||||
event_and_contexts=event_and_contexts,
|
event_and_contexts=event_and_contexts,
|
||||||
backfilled=backfilled,
|
backfilled=backfilled,
|
||||||
)
|
)
|
||||||
|
|
|
@ -376,9 +376,8 @@ class EventCreationHandler(object):
|
||||||
self.notifier = hs.get_notifier()
|
self.notifier = hs.get_notifier()
|
||||||
self.config = hs.config
|
self.config = hs.config
|
||||||
self.require_membership_for_aliases = hs.config.require_membership_for_aliases
|
self.require_membership_for_aliases = hs.config.require_membership_for_aliases
|
||||||
self._is_event_writer = (
|
self._events_shard_config = self.config.worker.events_shard_config
|
||||||
self.config.worker.writers.events == hs.get_instance_name()
|
self._instance_name = hs.get_instance_name()
|
||||||
)
|
|
||||||
|
|
||||||
self.room_invite_state_types = self.hs.config.room_invite_state_types
|
self.room_invite_state_types = self.hs.config.room_invite_state_types
|
||||||
|
|
||||||
|
@ -904,9 +903,10 @@ class EventCreationHandler(object):
|
||||||
|
|
||||||
try:
|
try:
|
||||||
# If we're a worker we need to hit out to the master.
|
# If we're a worker we need to hit out to the master.
|
||||||
if not self._is_event_writer:
|
writer_instance = self._events_shard_config.get_instance(event.room_id)
|
||||||
|
if writer_instance != self._instance_name:
|
||||||
result = await self.send_event(
|
result = await self.send_event(
|
||||||
instance_name=self.config.worker.writers.events,
|
instance_name=writer_instance,
|
||||||
event_id=event.event_id,
|
event_id=event.event_id,
|
||||||
store=self.store,
|
store=self.store,
|
||||||
requester=requester,
|
requester=requester,
|
||||||
|
@ -974,7 +974,9 @@ class EventCreationHandler(object):
|
||||||
|
|
||||||
This should only be run on the instance in charge of persisting events.
|
This should only be run on the instance in charge of persisting events.
|
||||||
"""
|
"""
|
||||||
assert self._is_event_writer
|
assert self._events_shard_config.should_handle(
|
||||||
|
self._instance_name, event.room_id
|
||||||
|
)
|
||||||
|
|
||||||
if ratelimit:
|
if ratelimit:
|
||||||
# We check if this is a room admin redacting an event so that we
|
# We check if this is a room admin redacting an event so that we
|
||||||
|
|
|
@ -804,7 +804,9 @@ class RoomCreationHandler(BaseHandler):
|
||||||
|
|
||||||
# Always wait for room creation to progate before returning
|
# Always wait for room creation to progate before returning
|
||||||
await self._replication.wait_for_stream_position(
|
await self._replication.wait_for_stream_position(
|
||||||
self.hs.config.worker.writers.events, "events", last_stream_id
|
self.hs.config.worker.events_shard_config.get_instance(room_id),
|
||||||
|
"events",
|
||||||
|
last_stream_id,
|
||||||
)
|
)
|
||||||
|
|
||||||
return result, last_stream_id
|
return result, last_stream_id
|
||||||
|
@ -1260,10 +1262,10 @@ class RoomShutdownHandler(object):
|
||||||
# We now wait for the create room to come back in via replication so
|
# We now wait for the create room to come back in via replication so
|
||||||
# that we can assume that all the joins/invites have propogated before
|
# that we can assume that all the joins/invites have propogated before
|
||||||
# we try and auto join below.
|
# we try and auto join below.
|
||||||
#
|
|
||||||
# TODO: Currently the events stream is written to from master
|
|
||||||
await self._replication.wait_for_stream_position(
|
await self._replication.wait_for_stream_position(
|
||||||
self.hs.config.worker.writers.events, "events", stream_id
|
self.hs.config.worker.events_shard_config.get_instance(new_room_id),
|
||||||
|
"events",
|
||||||
|
stream_id,
|
||||||
)
|
)
|
||||||
else:
|
else:
|
||||||
new_room_id = None
|
new_room_id = None
|
||||||
|
@ -1293,7 +1295,9 @@ class RoomShutdownHandler(object):
|
||||||
|
|
||||||
# Wait for leave to come in over replication before trying to forget.
|
# Wait for leave to come in over replication before trying to forget.
|
||||||
await self._replication.wait_for_stream_position(
|
await self._replication.wait_for_stream_position(
|
||||||
self.hs.config.worker.writers.events, "events", stream_id
|
self.hs.config.worker.events_shard_config.get_instance(room_id),
|
||||||
|
"events",
|
||||||
|
stream_id,
|
||||||
)
|
)
|
||||||
|
|
||||||
await self.room_member_handler.forget(target_requester.user, room_id)
|
await self.room_member_handler.forget(target_requester.user, room_id)
|
||||||
|
|
|
@ -82,13 +82,6 @@ class RoomMemberHandler(object):
|
||||||
self._enable_lookup = hs.config.enable_3pid_lookup
|
self._enable_lookup = hs.config.enable_3pid_lookup
|
||||||
self.allow_per_room_profiles = self.config.allow_per_room_profiles
|
self.allow_per_room_profiles = self.config.allow_per_room_profiles
|
||||||
|
|
||||||
self._event_stream_writer_instance = hs.config.worker.writers.events
|
|
||||||
self._is_on_event_persistence_instance = (
|
|
||||||
self._event_stream_writer_instance == hs.get_instance_name()
|
|
||||||
)
|
|
||||||
if self._is_on_event_persistence_instance:
|
|
||||||
self.persist_event_storage = hs.get_storage().persistence
|
|
||||||
|
|
||||||
self._join_rate_limiter_local = Ratelimiter(
|
self._join_rate_limiter_local = Ratelimiter(
|
||||||
clock=self.clock,
|
clock=self.clock,
|
||||||
rate_hz=hs.config.ratelimiting.rc_joins_local.per_second,
|
rate_hz=hs.config.ratelimiting.rc_joins_local.per_second,
|
||||||
|
|
|
@ -65,10 +65,11 @@ class ReplicationFederationSendEventsRestServlet(ReplicationEndpoint):
|
||||||
self.federation_handler = hs.get_handlers().federation_handler
|
self.federation_handler = hs.get_handlers().federation_handler
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
async def _serialize_payload(store, event_and_contexts, backfilled):
|
async def _serialize_payload(store, room_id, event_and_contexts, backfilled):
|
||||||
"""
|
"""
|
||||||
Args:
|
Args:
|
||||||
store
|
store
|
||||||
|
room_id (str)
|
||||||
event_and_contexts (list[tuple[FrozenEvent, EventContext]])
|
event_and_contexts (list[tuple[FrozenEvent, EventContext]])
|
||||||
backfilled (bool): Whether or not the events are the result of
|
backfilled (bool): Whether or not the events are the result of
|
||||||
backfilling
|
backfilling
|
||||||
|
@ -88,7 +89,11 @@ class ReplicationFederationSendEventsRestServlet(ReplicationEndpoint):
|
||||||
}
|
}
|
||||||
)
|
)
|
||||||
|
|
||||||
payload = {"events": event_payloads, "backfilled": backfilled}
|
payload = {
|
||||||
|
"events": event_payloads,
|
||||||
|
"backfilled": backfilled,
|
||||||
|
"room_id": room_id,
|
||||||
|
}
|
||||||
|
|
||||||
return payload
|
return payload
|
||||||
|
|
||||||
|
@ -96,6 +101,7 @@ class ReplicationFederationSendEventsRestServlet(ReplicationEndpoint):
|
||||||
with Measure(self.clock, "repl_fed_send_events_parse"):
|
with Measure(self.clock, "repl_fed_send_events_parse"):
|
||||||
content = parse_json_object_from_request(request)
|
content = parse_json_object_from_request(request)
|
||||||
|
|
||||||
|
room_id = content["room_id"]
|
||||||
backfilled = content["backfilled"]
|
backfilled = content["backfilled"]
|
||||||
|
|
||||||
event_payloads = content["events"]
|
event_payloads = content["events"]
|
||||||
|
@ -120,7 +126,7 @@ class ReplicationFederationSendEventsRestServlet(ReplicationEndpoint):
|
||||||
logger.info("Got %d events from federation", len(event_and_contexts))
|
logger.info("Got %d events from federation", len(event_and_contexts))
|
||||||
|
|
||||||
max_stream_id = await self.federation_handler.persist_events_and_notify(
|
max_stream_id = await self.federation_handler.persist_events_and_notify(
|
||||||
event_and_contexts, backfilled
|
room_id, event_and_contexts, backfilled
|
||||||
)
|
)
|
||||||
|
|
||||||
return 200, {"max_stream_id": max_stream_id}
|
return 200, {"max_stream_id": max_stream_id}
|
||||||
|
|
|
@ -109,7 +109,7 @@ class ReplicationCommandHandler:
|
||||||
if isinstance(stream, (EventsStream, BackfillStream)):
|
if isinstance(stream, (EventsStream, BackfillStream)):
|
||||||
# Only add EventStream and BackfillStream as a source on the
|
# Only add EventStream and BackfillStream as a source on the
|
||||||
# instance in charge of event persistence.
|
# instance in charge of event persistence.
|
||||||
if hs.config.worker.writers.events == hs.get_instance_name():
|
if hs.get_instance_name() in hs.config.worker.writers.events:
|
||||||
self._streams_to_replicate.append(stream)
|
self._streams_to_replicate.append(stream)
|
||||||
|
|
||||||
continue
|
continue
|
||||||
|
|
|
@ -19,7 +19,7 @@ from typing import List, Tuple, Type
|
||||||
|
|
||||||
import attr
|
import attr
|
||||||
|
|
||||||
from ._base import Stream, StreamUpdateResult, Token, current_token_without_instance
|
from ._base import Stream, StreamUpdateResult, Token
|
||||||
|
|
||||||
"""Handling of the 'events' replication stream
|
"""Handling of the 'events' replication stream
|
||||||
|
|
||||||
|
@ -117,7 +117,7 @@ class EventsStream(Stream):
|
||||||
self._store = hs.get_datastore()
|
self._store = hs.get_datastore()
|
||||||
super().__init__(
|
super().__init__(
|
||||||
hs.get_instance_name(),
|
hs.get_instance_name(),
|
||||||
current_token_without_instance(self._store.get_current_events_token),
|
self._store._stream_id_gen.get_current_token_for_writer,
|
||||||
self._update_function,
|
self._update_function,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
|
@ -68,7 +68,7 @@ class Databases(object):
|
||||||
|
|
||||||
# If we're on a process that can persist events also
|
# If we're on a process that can persist events also
|
||||||
# instantiate a `PersistEventsStore`
|
# instantiate a `PersistEventsStore`
|
||||||
if hs.config.worker.writers.events == hs.get_instance_name():
|
if hs.get_instance_name() in hs.config.worker.writers.events:
|
||||||
persist_events = PersistEventsStore(hs, database, main)
|
persist_events = PersistEventsStore(hs, database, main)
|
||||||
|
|
||||||
if "state" in database_config.databases:
|
if "state" in database_config.databases:
|
||||||
|
|
|
@ -438,7 +438,7 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas
|
||||||
"""
|
"""
|
||||||
|
|
||||||
if stream_ordering <= self.stream_ordering_month_ago:
|
if stream_ordering <= self.stream_ordering_month_ago:
|
||||||
raise StoreError(400, "stream_ordering too old")
|
raise StoreError(400, "stream_ordering too old %s" % (stream_ordering,))
|
||||||
|
|
||||||
sql = """
|
sql = """
|
||||||
SELECT event_id FROM stream_ordering_to_exterm
|
SELECT event_id FROM stream_ordering_to_exterm
|
||||||
|
|
|
@ -97,6 +97,7 @@ class PersistEventsStore:
|
||||||
self.store = main_data_store
|
self.store = main_data_store
|
||||||
self.database_engine = db.engine
|
self.database_engine = db.engine
|
||||||
self._clock = hs.get_clock()
|
self._clock = hs.get_clock()
|
||||||
|
self._instance_name = hs.get_instance_name()
|
||||||
|
|
||||||
self._ephemeral_messages_enabled = hs.config.enable_ephemeral_messages
|
self._ephemeral_messages_enabled = hs.config.enable_ephemeral_messages
|
||||||
self.is_mine_id = hs.is_mine_id
|
self.is_mine_id = hs.is_mine_id
|
||||||
|
@ -108,7 +109,7 @@ class PersistEventsStore:
|
||||||
|
|
||||||
# This should only exist on instances that are configured to write
|
# This should only exist on instances that are configured to write
|
||||||
assert (
|
assert (
|
||||||
hs.config.worker.writers.events == hs.get_instance_name()
|
hs.get_instance_name() in hs.config.worker.writers.events
|
||||||
), "Can only instantiate EventsStore on master"
|
), "Can only instantiate EventsStore on master"
|
||||||
|
|
||||||
async def _persist_events_and_state_updates(
|
async def _persist_events_and_state_updates(
|
||||||
|
@ -800,6 +801,7 @@ class PersistEventsStore:
|
||||||
table="events",
|
table="events",
|
||||||
values=[
|
values=[
|
||||||
{
|
{
|
||||||
|
"instance_name": self._instance_name,
|
||||||
"stream_ordering": event.internal_metadata.stream_ordering,
|
"stream_ordering": event.internal_metadata.stream_ordering,
|
||||||
"topological_ordering": event.depth,
|
"topological_ordering": event.depth,
|
||||||
"depth": event.depth,
|
"depth": event.depth,
|
||||||
|
|
|
@ -42,7 +42,8 @@ from synapse.replication.tcp.streams import BackfillStream
|
||||||
from synapse.replication.tcp.streams.events import EventsStream
|
from synapse.replication.tcp.streams.events import EventsStream
|
||||||
from synapse.storage._base import SQLBaseStore, db_to_json, make_in_list_sql_clause
|
from synapse.storage._base import SQLBaseStore, db_to_json, make_in_list_sql_clause
|
||||||
from synapse.storage.database import DatabasePool
|
from synapse.storage.database import DatabasePool
|
||||||
from synapse.storage.util.id_generators import StreamIdGenerator
|
from synapse.storage.engines import PostgresEngine
|
||||||
|
from synapse.storage.util.id_generators import MultiWriterIdGenerator, StreamIdGenerator
|
||||||
from synapse.types import Collection, get_domain_from_id
|
from synapse.types import Collection, get_domain_from_id
|
||||||
from synapse.util.caches.descriptors import Cache, cached
|
from synapse.util.caches.descriptors import Cache, cached
|
||||||
from synapse.util.iterutils import batch_iter
|
from synapse.util.iterutils import batch_iter
|
||||||
|
@ -78,27 +79,54 @@ class EventsWorkerStore(SQLBaseStore):
|
||||||
def __init__(self, database: DatabasePool, db_conn, hs):
|
def __init__(self, database: DatabasePool, db_conn, hs):
|
||||||
super(EventsWorkerStore, self).__init__(database, db_conn, hs)
|
super(EventsWorkerStore, self).__init__(database, db_conn, hs)
|
||||||
|
|
||||||
if hs.config.worker.writers.events == hs.get_instance_name():
|
if isinstance(database.engine, PostgresEngine):
|
||||||
# We are the process in charge of generating stream ids for events,
|
# If we're using Postgres than we can use `MultiWriterIdGenerator`
|
||||||
# so instantiate ID generators based on the database
|
# regardless of whether this process writes to the streams or not.
|
||||||
self._stream_id_gen = StreamIdGenerator(
|
self._stream_id_gen = MultiWriterIdGenerator(
|
||||||
db_conn, "events", "stream_ordering",
|
db_conn=db_conn,
|
||||||
|
db=database,
|
||||||
|
instance_name=hs.get_instance_name(),
|
||||||
|
table="events",
|
||||||
|
instance_column="instance_name",
|
||||||
|
id_column="stream_ordering",
|
||||||
|
sequence_name="events_stream_seq",
|
||||||
)
|
)
|
||||||
self._backfill_id_gen = StreamIdGenerator(
|
self._backfill_id_gen = MultiWriterIdGenerator(
|
||||||
db_conn,
|
db_conn=db_conn,
|
||||||
"events",
|
db=database,
|
||||||
"stream_ordering",
|
instance_name=hs.get_instance_name(),
|
||||||
step=-1,
|
table="events",
|
||||||
extra_tables=[("ex_outlier_stream", "event_stream_ordering")],
|
instance_column="instance_name",
|
||||||
|
id_column="stream_ordering",
|
||||||
|
sequence_name="events_backfill_stream_seq",
|
||||||
|
positive=False,
|
||||||
)
|
)
|
||||||
else:
|
else:
|
||||||
# Another process is in charge of persisting events and generating
|
# We shouldn't be running in worker mode with SQLite, but its useful
|
||||||
# stream IDs: rely on the replication streams to let us know which
|
# to support it for unit tests.
|
||||||
# IDs we can process.
|
#
|
||||||
self._stream_id_gen = SlavedIdTracker(db_conn, "events", "stream_ordering")
|
# If this process is the writer than we need to use
|
||||||
self._backfill_id_gen = SlavedIdTracker(
|
# `StreamIdGenerator`, otherwise we use `SlavedIdTracker` which gets
|
||||||
db_conn, "events", "stream_ordering", step=-1
|
# updated over replication. (Multiple writers are not supported for
|
||||||
)
|
# SQLite).
|
||||||
|
if hs.get_instance_name() in hs.config.worker.writers.events:
|
||||||
|
self._stream_id_gen = StreamIdGenerator(
|
||||||
|
db_conn, "events", "stream_ordering",
|
||||||
|
)
|
||||||
|
self._backfill_id_gen = StreamIdGenerator(
|
||||||
|
db_conn,
|
||||||
|
"events",
|
||||||
|
"stream_ordering",
|
||||||
|
step=-1,
|
||||||
|
extra_tables=[("ex_outlier_stream", "event_stream_ordering")],
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
self._stream_id_gen = SlavedIdTracker(
|
||||||
|
db_conn, "events", "stream_ordering"
|
||||||
|
)
|
||||||
|
self._backfill_id_gen = SlavedIdTracker(
|
||||||
|
db_conn, "events", "stream_ordering", step=-1
|
||||||
|
)
|
||||||
|
|
||||||
self._get_event_cache = Cache(
|
self._get_event_cache = Cache(
|
||||||
"*getEvent*",
|
"*getEvent*",
|
||||||
|
|
|
@ -0,0 +1,16 @@
|
||||||
|
/* Copyright 2020 The Matrix.org Foundation C.I.C.
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
ALTER TABLE events ADD COLUMN instance_name TEXT;
|
|
@ -0,0 +1,26 @@
|
||||||
|
/* Copyright 2020 The Matrix.org Foundation C.I.C.
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
CREATE SEQUENCE IF NOT EXISTS events_stream_seq;
|
||||||
|
|
||||||
|
SELECT setval('events_stream_seq', (
|
||||||
|
SELECT COALESCE(MAX(stream_ordering), 1) FROM events
|
||||||
|
));
|
||||||
|
|
||||||
|
CREATE SEQUENCE IF NOT EXISTS events_backfill_stream_seq;
|
||||||
|
|
||||||
|
SELECT setval('events_backfill_stream_seq', (
|
||||||
|
SELECT COALESCE(-MIN(stream_ordering), 1) FROM events
|
||||||
|
));
|
|
@ -231,8 +231,12 @@ class MultiWriterIdGenerator:
|
||||||
# gaps should be relatively rare it's still worth doing the book keeping
|
# gaps should be relatively rare it's still worth doing the book keeping
|
||||||
# that allows us to skip forwards when there are gapless runs of
|
# that allows us to skip forwards when there are gapless runs of
|
||||||
# positions.
|
# positions.
|
||||||
|
#
|
||||||
|
# We start at 1 here as a) the first generated stream ID will be 2, and
|
||||||
|
# b) other parts of the code assume that stream IDs are strictly greater
|
||||||
|
# than 0.
|
||||||
self._persisted_upto_position = (
|
self._persisted_upto_position = (
|
||||||
min(self._current_positions.values()) if self._current_positions else 0
|
min(self._current_positions.values()) if self._current_positions else 1
|
||||||
)
|
)
|
||||||
self._known_persisted_positions = [] # type: List[int]
|
self._known_persisted_positions = [] # type: List[int]
|
||||||
|
|
||||||
|
@ -362,9 +366,7 @@ class MultiWriterIdGenerator:
|
||||||
equal to it have been successfully persisted.
|
equal to it have been successfully persisted.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
# Currently we don't support this operation, as it's not obvious how to
|
return self.get_persisted_upto_position()
|
||||||
# condense the stream positions of multiple writers into a single int.
|
|
||||||
raise NotImplementedError()
|
|
||||||
|
|
||||||
def get_current_token_for_writer(self, instance_name: str) -> int:
|
def get_current_token_for_writer(self, instance_name: str) -> int:
|
||||||
"""Returns the position of the given writer.
|
"""Returns the position of the given writer.
|
||||||
|
|
Loading…
Reference in New Issue