Batch upsert user ips

This commit is contained in:
Erik Johnston 2017-06-27 13:37:04 +01:00
parent 036f439f53
commit ed3d0170d9
2 changed files with 39 additions and 21 deletions

View File

@ -23,7 +23,6 @@ from synapse import event_auth
from synapse.api.constants import EventTypes, Membership, JoinRules from synapse.api.constants import EventTypes, Membership, JoinRules
from synapse.api.errors import AuthError, Codes from synapse.api.errors import AuthError, Codes
from synapse.types import UserID from synapse.types import UserID
from synapse.util import logcontext
from synapse.util.metrics import Measure from synapse.util.metrics import Measure
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -200,7 +199,7 @@ class Auth(object):
default=[""] default=[""]
)[0] )[0]
if user and access_token and ip_addr: if user and access_token and ip_addr:
logcontext.preserve_fn(self.store.insert_client_ip)( self.store.insert_client_ip(
user=user, user=user,
access_token=access_token, access_token=access_token,
ip=ip_addr, ip=ip_addr,

View File

@ -15,7 +15,7 @@
import logging import logging
from twisted.internet import defer from twisted.internet import defer, reactor
from ._base import Cache from ._base import Cache
from . import background_updates from . import background_updates
@ -50,7 +50,13 @@ class ClientIpStore(background_updates.BackgroundUpdateStore):
columns=["user_id", "device_id", "last_seen"], columns=["user_id", "device_id", "last_seen"],
) )
@defer.inlineCallbacks self._batch_row_update = {}
self._client_ip_looper = self._clock.looping_call(
self._update_client_ips_batch, 5 * 1000
)
reactor.addSystemEventTrigger("before", "shutdown", self._update_client_ips_batch)
def insert_client_ip(self, user, access_token, ip, user_agent, device_id): def insert_client_ip(self, user, access_token, ip, user_agent, device_id):
now = int(self._clock.time_msec()) now = int(self._clock.time_msec())
key = (user.to_string(), access_token, ip) key = (user.to_string(), access_token, ip)
@ -62,25 +68,38 @@ class ClientIpStore(background_updates.BackgroundUpdateStore):
# Rate-limited inserts # Rate-limited inserts
if last_seen is not None and (now - last_seen) < LAST_SEEN_GRANULARITY: if last_seen is not None and (now - last_seen) < LAST_SEEN_GRANULARITY:
defer.returnValue(None) return
self.client_ip_last_seen.prefill(key, now) self.client_ip_last_seen.prefill(key, now)
# It's safe not to lock here: a) no unique constraint, self._batch_row_update[key] = (user_agent, device_id, now)
# b) LAST_SEEN_GRANULARITY makes concurrent updates incredibly unlikely
yield self._simple_upsert( def _update_client_ips_batch(self):
"user_ips", to_update = self._batch_row_update
self._batch_row_update = {}
return self.runInteraction(
"_update_client_ips_batch", self._update_client_ips_batch_txn, to_update
)
def _update_client_ips_batch_txn(self, txn, to_update):
self.database_engine.lock_table(txn, "user_ips")
for entry in to_update.iteritems():
(user_id, access_token, ip), (user_agent, device_id, last_seen) = entry
self._simple_upsert_txn(
txn,
table="user_ips",
keyvalues={ keyvalues={
"user_id": user.to_string(), "user_id": user_id,
"access_token": access_token, "access_token": access_token,
"ip": ip, "ip": ip,
"user_agent": user_agent, "user_agent": user_agent,
"device_id": device_id, "device_id": device_id,
}, },
values={ values={
"last_seen": now, "last_seen": last_seen,
}, },
desc="insert_client_ip",
lock=False, lock=False,
) )