parent
0fd1cd2400
commit
92e3071623
|
@ -80,6 +80,9 @@ class ReceiptsHandler(BaseHandler):
|
||||||
def _handle_new_receipts(self, receipts):
|
def _handle_new_receipts(self, receipts):
|
||||||
"""Takes a list of receipts, stores them and informs the notifier.
|
"""Takes a list of receipts, stores them and informs the notifier.
|
||||||
"""
|
"""
|
||||||
|
min_batch_id = None
|
||||||
|
max_batch_id = None
|
||||||
|
|
||||||
for receipt in receipts:
|
for receipt in receipts:
|
||||||
room_id = receipt["room_id"]
|
room_id = receipt["room_id"]
|
||||||
receipt_type = receipt["receipt_type"]
|
receipt_type = receipt["receipt_type"]
|
||||||
|
@ -97,9 +100,19 @@ class ReceiptsHandler(BaseHandler):
|
||||||
|
|
||||||
stream_id, max_persisted_id = res
|
stream_id, max_persisted_id = res
|
||||||
|
|
||||||
|
if min_batch_id is None or stream_id < min_batch_id:
|
||||||
|
min_batch_id = stream_id
|
||||||
|
if max_batch_id is None or max_persisted_id > max_batch_id:
|
||||||
|
max_batch_id = max_persisted_id
|
||||||
|
|
||||||
|
affected_room_ids = list(set([r["room_id"] for r in receipts]))
|
||||||
|
|
||||||
with PreserveLoggingContext():
|
with PreserveLoggingContext():
|
||||||
self.notifier.on_new_event(
|
self.notifier.on_new_event(
|
||||||
"receipt_key", max_persisted_id, rooms=[room_id]
|
"receipt_key", max_batch_id, rooms=affected_room_ids
|
||||||
|
)
|
||||||
|
self.hs.get_pusherpool().on_new_receipts(
|
||||||
|
min_batch_id, max_batch_id, affected_room_ids
|
||||||
)
|
)
|
||||||
|
|
||||||
defer.returnValue(True)
|
defer.returnValue(True)
|
||||||
|
|
|
@ -76,15 +76,25 @@ class HttpPusher(object):
|
||||||
self.data_minus_url.update(self.data)
|
self.data_minus_url.update(self.data)
|
||||||
del self.data_minus_url['url']
|
del self.data_minus_url['url']
|
||||||
|
|
||||||
|
@defer.inlineCallbacks
|
||||||
def on_started(self):
|
def on_started(self):
|
||||||
self._process()
|
yield self._process()
|
||||||
|
|
||||||
|
@defer.inlineCallbacks
|
||||||
def on_new_notifications(self, min_stream_ordering, max_stream_ordering):
|
def on_new_notifications(self, min_stream_ordering, max_stream_ordering):
|
||||||
self.max_stream_ordering = max_stream_ordering
|
self.max_stream_ordering = max_stream_ordering
|
||||||
self._process()
|
yield self._process()
|
||||||
|
|
||||||
|
@defer.inlineCallbacks
|
||||||
|
def on_new_receipts(self, min_stream_id, max_stream_id):
|
||||||
|
# We could check the receipts are actually m.read receipts here,
|
||||||
|
# but currently that's the only type of receipt anyway...
|
||||||
|
badge = yield push_tools.get_badge_count(self.hs, self.user_id)
|
||||||
|
yield self.send_badge(badge)
|
||||||
|
|
||||||
|
@defer.inlineCallbacks
|
||||||
def on_timer(self):
|
def on_timer(self):
|
||||||
self._process()
|
yield self._process()
|
||||||
|
|
||||||
def on_stop(self):
|
def on_stop(self):
|
||||||
if self.timed_call:
|
if self.timed_call:
|
||||||
|
@ -106,12 +116,14 @@ class HttpPusher(object):
|
||||||
self.last_stream_ordering,
|
self.last_stream_ordering,
|
||||||
self.clock.time_msec()
|
self.clock.time_msec()
|
||||||
)
|
)
|
||||||
|
if self.failing_since:
|
||||||
self.failing_since = None
|
self.failing_since = None
|
||||||
yield self.store.update_pusher_failing_since(
|
yield self.store.update_pusher_failing_since(
|
||||||
self.app_id, self.pushkey, self.user_id,
|
self.app_id, self.pushkey, self.user_id,
|
||||||
self.failing_since
|
self.failing_since
|
||||||
)
|
)
|
||||||
else:
|
else:
|
||||||
|
if not self.failing_since:
|
||||||
self.failing_since = self.clock.time_msec()
|
self.failing_since = self.clock.time_msec()
|
||||||
yield self.store.update_pusher_failing_since(
|
yield self.store.update_pusher_failing_since(
|
||||||
self.app_id, self.pushkey, self.user_id,
|
self.app_id, self.pushkey, self.user_id,
|
||||||
|
@ -121,7 +133,7 @@ class HttpPusher(object):
|
||||||
if (
|
if (
|
||||||
self.failing_since and
|
self.failing_since and
|
||||||
self.failing_since <
|
self.failing_since <
|
||||||
self.clock.time_msec() - HttpPusher.GIVE_UP_AFTER
|
self.clock.time_msec() - HttpPusher.GIVE_UP_AFTER_MS
|
||||||
):
|
):
|
||||||
# we really only give up so that if the URL gets
|
# we really only give up so that if the URL gets
|
||||||
# fixed, we don't suddenly deliver a load
|
# fixed, we don't suddenly deliver a load
|
||||||
|
@ -148,7 +160,7 @@ class HttpPusher(object):
|
||||||
else:
|
else:
|
||||||
logger.info("Push failed: delaying for %ds", self.backoff_delay)
|
logger.info("Push failed: delaying for %ds", self.backoff_delay)
|
||||||
self.timed_call = reactor.callLater(self.backoff_delay, self.on_timer)
|
self.timed_call = reactor.callLater(self.backoff_delay, self.on_timer)
|
||||||
self.backoff_delay = min(self.backoff_delay, self.MAX_BACKOFF_SEC)
|
self.backoff_delay = min(self.backoff_delay * 2, self.MAX_BACKOFF_SEC)
|
||||||
break
|
break
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
|
@ -191,7 +203,8 @@ class HttpPusher(object):
|
||||||
|
|
||||||
d = {
|
d = {
|
||||||
'notification': {
|
'notification': {
|
||||||
'id': event.event_id,
|
'id': event.event_id, # deprecated: remove soon
|
||||||
|
'event_id': event.event_id,
|
||||||
'room_id': event.room_id,
|
'room_id': event.room_id,
|
||||||
'type': event.type,
|
'type': event.type,
|
||||||
'sender': event.user_id,
|
'sender': event.user_id,
|
||||||
|
|
|
@ -126,10 +126,28 @@ class PusherPool:
|
||||||
for u in users_affected:
|
for u in users_affected:
|
||||||
if u in self.pushers:
|
if u in self.pushers:
|
||||||
for p in self.pushers[u].values():
|
for p in self.pushers[u].values():
|
||||||
p.on_new_notifications(min_stream_id, max_stream_id)
|
yield p.on_new_notifications(min_stream_id, max_stream_id)
|
||||||
except:
|
except:
|
||||||
logger.exception("Exception in pusher on_new_notifications")
|
logger.exception("Exception in pusher on_new_notifications")
|
||||||
|
|
||||||
|
@defer.inlineCallbacks
|
||||||
|
def on_new_receipts(self, min_stream_id, max_stream_id, affected_room_ids):
|
||||||
|
yield run_on_reactor()
|
||||||
|
try:
|
||||||
|
# Need to subtract 1 from the minimum because the lower bound here
|
||||||
|
# is not inclusive
|
||||||
|
updated_receipts = yield self.store.get_all_updated_receipts(
|
||||||
|
min_stream_id - 1, max_stream_id
|
||||||
|
)
|
||||||
|
# This returns a tuple, user_id is at index 3
|
||||||
|
users_affected = set([r[3] for r in updated_receipts])
|
||||||
|
for u in users_affected:
|
||||||
|
if u in self.pushers:
|
||||||
|
for p in self.pushers[u].values():
|
||||||
|
yield p.on_new_receipts(min_stream_id, max_stream_id)
|
||||||
|
except:
|
||||||
|
logger.exception("Exception in pusher on_new_receipts")
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def _refresh_pusher(self, app_id, pushkey, user_id):
|
def _refresh_pusher(self, app_id, pushkey, user_id):
|
||||||
resultlist = yield self.store.get_pushers_by_app_id_and_pushkey(
|
resultlist = yield self.store.get_pushers_by_app_id_and_pushkey(
|
||||||
|
|
|
@ -390,16 +390,19 @@ class ReceiptsStore(SQLBaseStore):
|
||||||
}
|
}
|
||||||
)
|
)
|
||||||
|
|
||||||
def get_all_updated_receipts(self, last_id, current_id, limit):
|
def get_all_updated_receipts(self, last_id, current_id, limit=None):
|
||||||
def get_all_updated_receipts_txn(txn):
|
def get_all_updated_receipts_txn(txn):
|
||||||
sql = (
|
sql = (
|
||||||
"SELECT stream_id, room_id, receipt_type, user_id, event_id, data"
|
"SELECT stream_id, room_id, receipt_type, user_id, event_id, data"
|
||||||
" FROM receipts_linearized"
|
" FROM receipts_linearized"
|
||||||
" WHERE ? < stream_id AND stream_id <= ?"
|
" WHERE ? < stream_id AND stream_id <= ?"
|
||||||
" ORDER BY stream_id ASC"
|
" ORDER BY stream_id ASC"
|
||||||
" LIMIT ?"
|
|
||||||
)
|
)
|
||||||
txn.execute(sql, (last_id, current_id, limit))
|
args = [last_id, current_id]
|
||||||
|
if limit is not None:
|
||||||
|
sql += " LIMIT ?"
|
||||||
|
args.append(limit)
|
||||||
|
txn.execute(sql, args)
|
||||||
|
|
||||||
return txn.fetchall()
|
return txn.fetchall()
|
||||||
return self.runInteraction(
|
return self.runInteraction(
|
||||||
|
|
Loading…
Reference in New Issue