Filter the recent events before applying the limit when doing an incremental sync with a gap
This commit is contained in:
parent
ece828a7b7
commit
22dd1cde2d
|
@ -12,8 +12,6 @@
|
||||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
# See the License for the specific language governing permissions and
|
# See the License for the specific language governing permissions and
|
||||||
# limitations under the License.
|
# limitations under the License.
|
||||||
from twisted.internet import defer
|
|
||||||
|
|
||||||
from synapse.api.errors import SynapseError
|
from synapse.api.errors import SynapseError
|
||||||
from synapse.types import UserID, RoomID
|
from synapse.types import UserID, RoomID
|
||||||
|
|
||||||
|
|
|
@ -278,6 +278,40 @@ class SyncHandler(BaseHandler):
|
||||||
next_batch=now_token,
|
next_batch=now_token,
|
||||||
))
|
))
|
||||||
|
|
||||||
|
@defer.inlineCallbacks
|
||||||
|
def load_filtered_recents(self, room_id, sync_config, since_token,
|
||||||
|
now_token):
|
||||||
|
limited = True
|
||||||
|
recents = []
|
||||||
|
filtering_factor = 2
|
||||||
|
load_limit = max(sync_config.limit * filtering_factor, 100)
|
||||||
|
max_repeat = 3 # Only try a few times per room, otherwise
|
||||||
|
room_key = now_token.room_key
|
||||||
|
|
||||||
|
while limited and len(recents) < sync_config.limit and max_repeat:
|
||||||
|
events, room_key = yield self.store.get_recent_events_for_room(
|
||||||
|
room_id,
|
||||||
|
limit=load_limit + 1,
|
||||||
|
from_token=since_token.room_key,
|
||||||
|
end_token=room_key,
|
||||||
|
)
|
||||||
|
loaded_recents = sync_config.filter.filter_room_events(events)
|
||||||
|
loaded_recents.extend(recents)
|
||||||
|
recents = loaded_recents
|
||||||
|
if len(events) <= load_limit:
|
||||||
|
limited = False
|
||||||
|
max_repeat -= 1
|
||||||
|
|
||||||
|
if len(recents) > sync_config.limit:
|
||||||
|
recents = recents[-sync_config.limit:]
|
||||||
|
room_key = recents[0].internal_metadata.before
|
||||||
|
|
||||||
|
prev_batch_token = now_token.copy_and_replace(
|
||||||
|
"room_key", room_key
|
||||||
|
)
|
||||||
|
|
||||||
|
defer.returnValue((recents, prev_batch_token, limited))
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def incremental_sync_with_gap_for_room(self, room_id, sync_config,
|
def incremental_sync_with_gap_for_room(self, room_id, sync_config,
|
||||||
since_token, now_token,
|
since_token, now_token,
|
||||||
|
@ -288,28 +322,17 @@ class SyncHandler(BaseHandler):
|
||||||
Returns:
|
Returns:
|
||||||
A Deferred RoomSyncResult
|
A Deferred RoomSyncResult
|
||||||
"""
|
"""
|
||||||
|
|
||||||
# TODO(mjark): Check if they have joined the room between
|
# TODO(mjark): Check if they have joined the room between
|
||||||
# the previous sync and this one.
|
# the previous sync and this one.
|
||||||
# TODO(mjark): Apply the event filter in sync_config taking care to get
|
|
||||||
# enough events to reach the limit
|
|
||||||
# TODO(mjark): Check for redactions we might have missed.
|
# TODO(mjark): Check for redactions we might have missed.
|
||||||
recents, token = yield self.store.get_recent_events_for_room(
|
|
||||||
room_id,
|
recents, prev_batch_token, limited = self.load_filtered_recents(
|
||||||
limit=sync_config.limit + 1,
|
room_id, sync_config, since_token,
|
||||||
from_token=since_token.room_key,
|
|
||||||
end_token=now_token.room_key,
|
|
||||||
)
|
)
|
||||||
|
|
||||||
logging.debug("Recents %r", recents)
|
logging.debug("Recents %r", recents)
|
||||||
|
|
||||||
if len(recents) > sync_config.limit:
|
|
||||||
limited = True
|
|
||||||
recents = recents[1:]
|
|
||||||
else:
|
|
||||||
limited = False
|
|
||||||
|
|
||||||
prev_batch_token = now_token.copy_and_replace("room_key", token[0])
|
|
||||||
|
|
||||||
# TODO(mjark): This seems racy since this isn't being passed a
|
# TODO(mjark): This seems racy since this isn't being passed a
|
||||||
# token to indicate what point in the stream this is
|
# token to indicate what point in the stream this is
|
||||||
current_state_events = yield self.state_handler.get_current_state(
|
current_state_events = yield self.state_handler.get_current_state(
|
||||||
|
|
|
@ -116,7 +116,7 @@ class SyncRestServlet(RestServlet):
|
||||||
user.localpart, filter_id
|
user.localpart, filter_id
|
||||||
)
|
)
|
||||||
except:
|
except:
|
||||||
filter = Filter({})
|
filter = Filter({})
|
||||||
# filter = filter.apply_overrides(http_request)
|
# filter = filter.apply_overrides(http_request)
|
||||||
#if filter.matches(event):
|
#if filter.matches(event):
|
||||||
# # stuff
|
# # stuff
|
||||||
|
|
|
@ -181,15 +181,11 @@ class StreamStore(SQLBaseStore):
|
||||||
get_prev_content=True
|
get_prev_content=True
|
||||||
)
|
)
|
||||||
|
|
||||||
for event, row in zip(ret, rows):
|
self._set_before_and_after(ret, rows)
|
||||||
stream = row["stream_ordering"]
|
|
||||||
topo = event.depth
|
|
||||||
internal = event.internal_metadata
|
|
||||||
internal.before = str(_StreamToken(topo, stream - 1))
|
|
||||||
internal.after = str(_StreamToken(topo, stream))
|
|
||||||
|
|
||||||
if rows:
|
if rows:
|
||||||
key = "s%d" % max([r["stream_ordering"] for r in rows])
|
key = "s%d" % max([r["stream_ordering"] for r in rows])
|
||||||
|
|
||||||
else:
|
else:
|
||||||
# Assume we didn't get anything because there was nothing to
|
# Assume we didn't get anything because there was nothing to
|
||||||
# get.
|
# get.
|
||||||
|
@ -267,6 +263,8 @@ class StreamStore(SQLBaseStore):
|
||||||
get_prev_content=True
|
get_prev_content=True
|
||||||
)
|
)
|
||||||
|
|
||||||
|
self._set_before_and_after(events, rows)
|
||||||
|
|
||||||
return events, next_token,
|
return events, next_token,
|
||||||
|
|
||||||
return self.runInteraction("paginate_room_events", f)
|
return self.runInteraction("paginate_room_events", f)
|
||||||
|
@ -328,6 +326,8 @@ class StreamStore(SQLBaseStore):
|
||||||
get_prev_content=True
|
get_prev_content=True
|
||||||
)
|
)
|
||||||
|
|
||||||
|
self._set_before_and_after(events, rows)
|
||||||
|
|
||||||
return events, token
|
return events, token
|
||||||
|
|
||||||
return self.runInteraction(
|
return self.runInteraction(
|
||||||
|
@ -354,3 +354,12 @@ class StreamStore(SQLBaseStore):
|
||||||
|
|
||||||
key = res[0]["m"]
|
key = res[0]["m"]
|
||||||
return "s%d" % (key,)
|
return "s%d" % (key,)
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def _set_before_and_after(events, rows):
|
||||||
|
for event, row in zip(events, rows):
|
||||||
|
stream = row["stream_ordering"]
|
||||||
|
topo = event.depth
|
||||||
|
internal = event.internal_metadata
|
||||||
|
internal.before = str(_StreamToken(topo, stream - 1))
|
||||||
|
internal.after = str(_StreamToken(topo, stream))
|
||||||
|
|
Loading…
Reference in New Issue