Index sources in a nicer fashion.
This commit is contained in:
parent
05672a6a8c
commit
bfe9faad5a
|
@ -114,7 +114,7 @@ class MessageHandler(BaseHandler):
|
||||||
"""
|
"""
|
||||||
yield self.auth.check_joined_room(room_id, user_id)
|
yield self.auth.check_joined_room(room_id, user_id)
|
||||||
|
|
||||||
data_source = self.hs.get_event_sources().sources[0]
|
data_source = self.hs.get_event_sources().sources["room"]
|
||||||
|
|
||||||
if not pagin_config.from_token:
|
if not pagin_config.from_token:
|
||||||
pagin_config.from_token = yield self.hs.get_event_sources().get_current_token()
|
pagin_config.from_token = yield self.hs.get_event_sources().get_current_token()
|
||||||
|
@ -274,7 +274,7 @@ class MessageHandler(BaseHandler):
|
||||||
now_token = yield self.hs.get_event_sources().get_current_token()
|
now_token = yield self.hs.get_event_sources().get_current_token()
|
||||||
|
|
||||||
# FIXME (erikj): Fix this.
|
# FIXME (erikj): Fix this.
|
||||||
presence_stream = self.hs.get_event_sources().sources[1]
|
presence_stream = self.hs.get_event_sources().sources["presence"]
|
||||||
pagination_config = PaginationConfig(from_token=now_token)
|
pagination_config = PaginationConfig(from_token=now_token)
|
||||||
presence, _ = yield presence_stream.get_pagination_rows(
|
presence, _ = yield presence_stream.get_pagination_rows(
|
||||||
user, pagination_config, None
|
user, pagination_config, None
|
||||||
|
|
|
@ -69,7 +69,7 @@ class Notifier(object):
|
||||||
def on_new_room_event(self, event, extra_users=[]):
|
def on_new_room_event(self, event, extra_users=[]):
|
||||||
room_id = event.room_id
|
room_id = event.room_id
|
||||||
|
|
||||||
source = self.event_sources.sources[0]
|
source = self.event_sources.sources["room"]
|
||||||
|
|
||||||
listeners = self.rooms_to_listeners.get(room_id, set()).copy()
|
listeners = self.rooms_to_listeners.get(room_id, set()).copy()
|
||||||
|
|
||||||
|
@ -94,7 +94,7 @@ class Notifier(object):
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def on_new_user_event(self, users=[], rooms=[]):
|
def on_new_user_event(self, users=[], rooms=[]):
|
||||||
source = self.event_sources.sources[1]
|
source = self.event_sources.sources["presence"]
|
||||||
|
|
||||||
listeners = set()
|
listeners = set()
|
||||||
|
|
||||||
|
@ -176,7 +176,7 @@ class Notifier(object):
|
||||||
limit = listener.limit
|
limit = listener.limit
|
||||||
|
|
||||||
# TODO (erikj): DeferredList?
|
# TODO (erikj): DeferredList?
|
||||||
for source in self.event_sources.sources:
|
for source in self.event_sources.sources.values():
|
||||||
stuff, new_token = yield source.get_new_events_for_user(
|
stuff, new_token = yield source.get_new_events_for_user(
|
||||||
listener.user,
|
listener.user,
|
||||||
from_token,
|
from_token,
|
||||||
|
|
|
@ -20,8 +20,6 @@ from synapse.types import StreamToken
|
||||||
|
|
||||||
|
|
||||||
class RoomEventSource(object):
|
class RoomEventSource(object):
|
||||||
SIGNAL_NAME = "RoomEventSource"
|
|
||||||
|
|
||||||
def __init__(self, hs):
|
def __init__(self, hs):
|
||||||
self.store = hs.get_datastore()
|
self.store = hs.get_datastore()
|
||||||
|
|
||||||
|
@ -70,8 +68,6 @@ class RoomEventSource(object):
|
||||||
|
|
||||||
|
|
||||||
class PresenceSource(object):
|
class PresenceSource(object):
|
||||||
SIGNAL_NAME = "PresenceSource"
|
|
||||||
|
|
||||||
def __init__(self, hs):
|
def __init__(self, hs):
|
||||||
self.hs = hs
|
self.hs = hs
|
||||||
self.clock = hs.get_clock()
|
self.clock = hs.get_clock()
|
||||||
|
@ -150,13 +146,16 @@ class PresenceSource(object):
|
||||||
|
|
||||||
|
|
||||||
class EventSources(object):
|
class EventSources(object):
|
||||||
SOURCE_TYPES = [
|
SOURCE_TYPES = {
|
||||||
RoomEventSource,
|
"room": RoomEventSource,
|
||||||
PresenceSource,
|
"presence": PresenceSource,
|
||||||
]
|
}
|
||||||
|
|
||||||
def __init__(self, hs):
|
def __init__(self, hs):
|
||||||
self.sources = [t(hs) for t in EventSources.SOURCE_TYPES]
|
self.sources = {
|
||||||
|
name: cls(hs)
|
||||||
|
for name, cls in EventSources.SOURCE_TYPES.items()
|
||||||
|
}
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def create_token(events_key, presence_key):
|
def create_token(events_key, presence_key):
|
||||||
|
@ -164,8 +163,8 @@ class EventSources(object):
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def get_current_token(self):
|
def get_current_token(self):
|
||||||
events_key = yield self.sources[0].get_current_token_part()
|
events_key = yield self.sources["room"].get_current_token_part()
|
||||||
presence_key = yield self.sources[1].get_current_token_part()
|
presence_key = yield self.sources["presence"].get_current_token_part()
|
||||||
token = EventSources.create_token(events_key, presence_key)
|
token = EventSources.create_token(events_key, presence_key)
|
||||||
defer.returnValue(token)
|
defer.returnValue(token)
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue