Allow multiple pushers for a single app ID & pushkey, honouring the 'append' flag in the API.

This commit is contained in:
David Baker 2015-03-25 19:06:22 +00:00
parent ce2766d19c
commit c1a256cc4c
5 changed files with 96 additions and 28 deletions

View File

@ -253,7 +253,8 @@ class Pusher(object):
self.user_name, config, timeout=0) self.user_name, config, timeout=0)
self.last_token = chunk['end'] self.last_token = chunk['end']
self.store.update_pusher_last_token( self.store.update_pusher_last_token(
self.app_id, self.pushkey, self.last_token) self.app_id, self.pushkey, self.user_name, self.last_token
)
logger.info("Pusher %s for user %s starting from token %s", logger.info("Pusher %s for user %s starting from token %s",
self.pushkey, self.user_name, self.last_token) self.pushkey, self.user_name, self.last_token)
@ -314,7 +315,7 @@ class Pusher(object):
pk pk
) )
yield self.hs.get_pusherpool().remove_pusher( yield self.hs.get_pusherpool().remove_pusher(
self.app_id, pk self.app_id, pk, self.user_name
) )
if not self.alive: if not self.alive:
@ -326,6 +327,7 @@ class Pusher(object):
self.store.update_pusher_last_token_and_success( self.store.update_pusher_last_token_and_success(
self.app_id, self.app_id,
self.pushkey, self.pushkey,
self.user_name,
self.last_token, self.last_token,
self.clock.time_msec() self.clock.time_msec()
) )
@ -334,6 +336,7 @@ class Pusher(object):
self.store.update_pusher_failing_since( self.store.update_pusher_failing_since(
self.app_id, self.app_id,
self.pushkey, self.pushkey,
self.user_name,
self.failing_since) self.failing_since)
else: else:
if not self.failing_since: if not self.failing_since:
@ -341,6 +344,7 @@ class Pusher(object):
self.store.update_pusher_failing_since( self.store.update_pusher_failing_since(
self.app_id, self.app_id,
self.pushkey, self.pushkey,
self.user_name,
self.failing_since self.failing_since
) )
@ -358,6 +362,7 @@ class Pusher(object):
self.store.update_pusher_last_token( self.store.update_pusher_last_token(
self.app_id, self.app_id,
self.pushkey, self.pushkey,
self.user_name,
self.last_token self.last_token
) )
@ -365,6 +370,7 @@ class Pusher(object):
self.store.update_pusher_failing_since( self.store.update_pusher_failing_since(
self.app_id, self.app_id,
self.pushkey, self.pushkey,
self.user_name,
self.failing_since self.failing_since
) )
else: else:

View File

@ -84,6 +84,21 @@ class PusherPool:
pushkey, lang, data pushkey, lang, data
) )
@defer.inlineCallbacks
def remove_pushers_by_app_id_and_pushkey_not_user(self, app_id, pushkey,
not_user_id):
to_remove = yield self.store.get_pushers_by_app_id_and_pushkey(
app_id, pushkey
)
for p in to_remove:
if p['user_name'] != not_user_id:
logger.info(
"Removing pusher for app id %s, pushkey %s, user %s",
app_id, pushkey, p['user_name']
)
self.remove_pusher(p['app_id'], p['pushkey'], p['user_name'])
@defer.inlineCallbacks @defer.inlineCallbacks
def _add_pusher_to_store(self, user_name, access_token, profile_tag, kind, def _add_pusher_to_store(self, user_name, access_token, profile_tag, kind,
app_id, app_display_name, device_display_name, app_id, app_display_name, device_display_name,
@ -101,7 +116,7 @@ class PusherPool:
lang=lang, lang=lang,
data=encode_canonical_json(data).decode("UTF-8"), data=encode_canonical_json(data).decode("UTF-8"),
) )
self._refresh_pusher((app_id, pushkey)) self._refresh_pusher(app_id, pushkey, user_name)
def _create_pusher(self, pusherdict): def _create_pusher(self, pusherdict):
if pusherdict['kind'] == 'http': if pusherdict['kind'] == 'http':
@ -126,10 +141,16 @@ class PusherPool:
) )
@defer.inlineCallbacks @defer.inlineCallbacks
def _refresh_pusher(self, app_id_pushkey): def _refresh_pusher(self, app_id, pushkey, user_name):
p = yield self.store.get_pushers_by_app_id_and_pushkey( resultlist = yield self.store.get_pushers_by_app_id_and_pushkey(
app_id_pushkey app_id, pushkey
) )
p = None
for r in resultlist:
if r['user_name'] == user_name:
p = r
if p:
p['data'] = json.loads(p['data']) p['data'] = json.loads(p['data'])
self._start_pushers([p]) self._start_pushers([p])
@ -139,17 +160,23 @@ class PusherPool:
for pusherdict in pushers: for pusherdict in pushers:
p = self._create_pusher(pusherdict) p = self._create_pusher(pusherdict)
if p: if p:
fullid = "%s:%s" % (pusherdict['app_id'], pusherdict['pushkey']) fullid = "%s:%s:%s" % (
pusherdict['app_id'],
pusherdict['pushkey'],
pusherdict['user_name']
)
if fullid in self.pushers: if fullid in self.pushers:
self.pushers[fullid].stop() self.pushers[fullid].stop()
self.pushers[fullid] = p self.pushers[fullid] = p
p.start() p.start()
@defer.inlineCallbacks @defer.inlineCallbacks
def remove_pusher(self, app_id, pushkey): def remove_pusher(self, app_id, pushkey, user_name):
fullid = "%s:%s" % (app_id, pushkey) fullid = "%s:%s:%s" % (app_id, pushkey, user_name)
if fullid in self.pushers: if fullid in self.pushers:
logger.info("Stopping pusher %s", fullid) logger.info("Stopping pusher %s", fullid)
self.pushers[fullid].stop() self.pushers[fullid].stop()
del self.pushers[fullid] del self.pushers[fullid]
yield self.store.delete_pusher_by_app_id_pushkey(app_id, pushkey) yield self.store.delete_pusher_by_app_id_pushkey_user_name(
app_id, pushkey, user_name
)

View File

@ -37,7 +37,7 @@ class PusherRestServlet(ClientV1RestServlet):
and 'kind' in content and and 'kind' in content and
content['kind'] is None): content['kind'] is None):
yield pusher_pool.remove_pusher( yield pusher_pool.remove_pusher(
content['app_id'], content['pushkey'] content['app_id'], content['pushkey'], user_name=user.to_string()
) )
defer.returnValue((200, {})) defer.returnValue((200, {}))
@ -51,6 +51,17 @@ class PusherRestServlet(ClientV1RestServlet):
raise SynapseError(400, "Missing parameters: "+','.join(missing), raise SynapseError(400, "Missing parameters: "+','.join(missing),
errcode=Codes.MISSING_PARAM) errcode=Codes.MISSING_PARAM)
append = False
if 'append' in content:
append = content['append']
if not append:
yield pusher_pool.remove_pushers_by_app_id_and_pushkey_not_user(
app_id=content['app_id'],
pushkey=content['pushkey'],
not_user_id=user.to_string()
)
try: try:
yield pusher_pool.add_pusher( yield pusher_pool.add_pusher(
user_name=user.to_string(), user_name=user.to_string(),

View File

@ -27,7 +27,7 @@ logger = logging.getLogger(__name__)
class PusherStore(SQLBaseStore): class PusherStore(SQLBaseStore):
@defer.inlineCallbacks @defer.inlineCallbacks
def get_pushers_by_app_id_and_pushkey(self, app_id_and_pushkey): def get_pushers_by_app_id_and_pushkey(self, app_id, pushkey):
sql = ( sql = (
"SELECT id, user_name, kind, profile_tag, app_id," "SELECT id, user_name, kind, profile_tag, app_id,"
"app_display_name, device_display_name, pushkey, ts, data, " "app_display_name, device_display_name, pushkey, ts, data, "
@ -38,7 +38,7 @@ class PusherStore(SQLBaseStore):
rows = yield self._execute( rows = yield self._execute(
"get_pushers_by_app_id_and_pushkey", None, sql, "get_pushers_by_app_id_and_pushkey", None, sql,
app_id_and_pushkey[0], app_id_and_pushkey[1] app_id, pushkey
) )
ret = [ ret = [
@ -60,7 +60,7 @@ class PusherStore(SQLBaseStore):
for r in rows for r in rows
] ]
defer.returnValue(ret[0]) defer.returnValue(ret)
@defer.inlineCallbacks @defer.inlineCallbacks
def get_all_pushers(self): def get_all_pushers(self):
@ -104,9 +104,9 @@ class PusherStore(SQLBaseStore):
dict( dict(
app_id=app_id, app_id=app_id,
pushkey=pushkey, pushkey=pushkey,
user_name=user_name,
), ),
dict( dict(
user_name=user_name,
access_token=access_token, access_token=access_token,
kind=kind, kind=kind,
profile_tag=profile_tag, profile_tag=profile_tag,
@ -123,37 +123,38 @@ class PusherStore(SQLBaseStore):
raise StoreError(500, "Problem creating pusher.") raise StoreError(500, "Problem creating pusher.")
@defer.inlineCallbacks @defer.inlineCallbacks
def delete_pusher_by_app_id_pushkey(self, app_id, pushkey): def delete_pusher_by_app_id_pushkey_user_name(self, app_id, pushkey, user_name):
yield self._simple_delete_one( yield self._simple_delete_one(
PushersTable.table_name, PushersTable.table_name,
{"app_id": app_id, "pushkey": pushkey}, {"app_id": app_id, "pushkey": pushkey, 'user_name': user_name},
desc="delete_pusher_by_app_id_pushkey", desc="delete_pusher_by_app_id_pushkey_user_name",
) )
@defer.inlineCallbacks @defer.inlineCallbacks
def update_pusher_last_token(self, app_id, pushkey, last_token): def update_pusher_last_token(self, app_id, pushkey, user_name, last_token):
yield self._simple_update_one( yield self._simple_update_one(
PushersTable.table_name, PushersTable.table_name,
{'app_id': app_id, 'pushkey': pushkey}, {'app_id': app_id, 'pushkey': pushkey, 'user_name': user_name},
{'last_token': last_token}, {'last_token': last_token},
desc="update_pusher_last_token", desc="update_pusher_last_token",
) )
@defer.inlineCallbacks @defer.inlineCallbacks
def update_pusher_last_token_and_success(self, app_id, pushkey, def update_pusher_last_token_and_success(self, app_id, pushkey, user_name,
last_token, last_success): last_token, last_success):
yield self._simple_update_one( yield self._simple_update_one(
PushersTable.table_name, PushersTable.table_name,
{'app_id': app_id, 'pushkey': pushkey}, {'app_id': app_id, 'pushkey': pushkey, 'user_name': user_name},
{'last_token': last_token, 'last_success': last_success}, {'last_token': last_token, 'last_success': last_success},
desc="update_pusher_last_token_and_success", desc="update_pusher_last_token_and_success",
) )
@defer.inlineCallbacks @defer.inlineCallbacks
def update_pusher_failing_since(self, app_id, pushkey, failing_since): def update_pusher_failing_since(self, app_id, pushkey, user_name,
failing_since):
yield self._simple_update_one( yield self._simple_update_one(
PushersTable.table_name, PushersTable.table_name,
{'app_id': app_id, 'pushkey': pushkey}, {'app_id': app_id, 'pushkey': pushkey, 'user_name': user_name},
{'failing_since': failing_since}, {'failing_since': failing_since},
desc="update_pusher_failing_since", desc="update_pusher_failing_since",
) )

View File

@ -1,2 +1,25 @@
ALTER TABLE pushers ADD COLUMN access_token INTEGER DEFAULT NULL; -- Drop, copy & recreate pushers table to change unique key
-- Also add access_token column at the same time
CREATE TABLE IF NOT EXISTS pushers2 (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_name TEXT NOT NULL,
access_token INTEGER DEFAULT NULL,
profile_tag varchar(32) NOT NULL,
kind varchar(8) NOT NULL,
app_id varchar(64) NOT NULL,
app_display_name varchar(64) NOT NULL,
device_display_name varchar(128) NOT NULL,
pushkey blob NOT NULL,
ts BIGINT NOT NULL,
lang varchar(8),
data blob,
last_token TEXT,
last_success BIGINT,
failing_since BIGINT,
FOREIGN KEY(user_name) REFERENCES users(name),
UNIQUE (app_id, pushkey, user_name)
);
INSERT INTO pushers2 (id, user_name, profile_tag, kind, app_id, app_display_name, device_display_name, pushkey, ts, lang, data, last_token, last_success, failing_since)
SELECT id, user_name, profile_tag, kind, app_id, app_display_name, device_display_name, pushkey, ts, lang, data, last_token, last_success, failing_since FROM pushers;
DROP TABLE pushers;
ALTER TABLE pushers2 RENAME TO pushers;