Create a new stream_id per presence update
This commit is contained in:
parent
13f86c3489
commit
6451fcd085
|
@ -58,17 +58,20 @@ class UserPresenceState(namedtuple("UserPresenceState",
|
||||||
class PresenceStore(SQLBaseStore):
|
class PresenceStore(SQLBaseStore):
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def update_presence(self, presence_states):
|
def update_presence(self, presence_states):
|
||||||
stream_id_manager = yield self._presence_id_gen.get_next(self)
|
stream_ordering_manager = yield self._presence_id_gen.get_next_mult(
|
||||||
with stream_id_manager as stream_id:
|
self, len(presence_states)
|
||||||
yield self.runInteraction(
|
|
||||||
"update_presence",
|
|
||||||
self._update_presence_txn, stream_id, presence_states,
|
|
||||||
)
|
)
|
||||||
|
|
||||||
defer.returnValue((stream_id, self._presence_id_gen.get_max_token()))
|
with stream_ordering_manager as stream_orderings:
|
||||||
|
yield self.runInteraction(
|
||||||
|
"update_presence",
|
||||||
|
self._update_presence_txn, stream_orderings, presence_states,
|
||||||
|
)
|
||||||
|
|
||||||
def _update_presence_txn(self, txn, stream_id, presence_states):
|
defer.returnValue((stream_orderings[-1], self._presence_id_gen.get_max_token()))
|
||||||
for state in presence_states:
|
|
||||||
|
def _update_presence_txn(self, txn, stream_orderings, presence_states):
|
||||||
|
for stream_id, state in zip(stream_orderings, presence_states):
|
||||||
txn.call_after(
|
txn.call_after(
|
||||||
self.presence_stream_cache.entity_has_changed,
|
self.presence_stream_cache.entity_has_changed,
|
||||||
state.user_id, stream_id,
|
state.user_id, stream_id,
|
||||||
|
|
Loading…
Reference in New Issue