Measure federation send transaction resources
This commit is contained in:
parent
d45489474d
commit
3bc9629be5
|
@ -26,6 +26,7 @@ from synapse.util.logcontext import PreserveLoggingContext
|
||||||
from synapse.util.retryutils import (
|
from synapse.util.retryutils import (
|
||||||
get_retry_limiter, NotRetryingDestination,
|
get_retry_limiter, NotRetryingDestination,
|
||||||
)
|
)
|
||||||
|
from synapse.util.metrics import measure_func
|
||||||
import synapse.metrics
|
import synapse.metrics
|
||||||
|
|
||||||
import logging
|
import logging
|
||||||
|
@ -51,7 +52,7 @@ class TransactionQueue(object):
|
||||||
|
|
||||||
self.transport_layer = transport_layer
|
self.transport_layer = transport_layer
|
||||||
|
|
||||||
self._clock = hs.get_clock()
|
self.clock = hs.get_clock()
|
||||||
|
|
||||||
# Is a mapping from destinations -> deferreds. Used to keep track
|
# Is a mapping from destinations -> deferreds. Used to keep track
|
||||||
# of which destinations have transactions in flight and when they are
|
# of which destinations have transactions in flight and when they are
|
||||||
|
@ -82,7 +83,7 @@ class TransactionQueue(object):
|
||||||
self.pending_failures_by_dest = {}
|
self.pending_failures_by_dest = {}
|
||||||
|
|
||||||
# HACK to get unique tx id
|
# HACK to get unique tx id
|
||||||
self._next_txn_id = int(self._clock.time_msec())
|
self._next_txn_id = int(self.clock.time_msec())
|
||||||
|
|
||||||
def can_send_to(self, destination):
|
def can_send_to(self, destination):
|
||||||
"""Can we send messages to the given server?
|
"""Can we send messages to the given server?
|
||||||
|
@ -197,6 +198,7 @@ class TransactionQueue(object):
|
||||||
|
|
||||||
yield deferred
|
yield deferred
|
||||||
|
|
||||||
|
@measure_func("attempt_new_transaction")
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
@log_function
|
@log_function
|
||||||
def _attempt_new_transaction(self, destination):
|
def _attempt_new_transaction(self, destination):
|
||||||
|
@ -246,7 +248,7 @@ class TransactionQueue(object):
|
||||||
|
|
||||||
limiter = yield get_retry_limiter(
|
limiter = yield get_retry_limiter(
|
||||||
destination,
|
destination,
|
||||||
self._clock,
|
self.clock,
|
||||||
self.store,
|
self.store,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -262,7 +264,7 @@ class TransactionQueue(object):
|
||||||
logger.debug("TX [%s] Persisting transaction...", destination)
|
logger.debug("TX [%s] Persisting transaction...", destination)
|
||||||
|
|
||||||
transaction = Transaction.create_new(
|
transaction = Transaction.create_new(
|
||||||
origin_server_ts=int(self._clock.time_msec()),
|
origin_server_ts=int(self.clock.time_msec()),
|
||||||
transaction_id=txn_id,
|
transaction_id=txn_id,
|
||||||
origin=self.server_name,
|
origin=self.server_name,
|
||||||
destination=destination,
|
destination=destination,
|
||||||
|
@ -293,7 +295,7 @@ class TransactionQueue(object):
|
||||||
# keys work
|
# keys work
|
||||||
def json_data_cb():
|
def json_data_cb():
|
||||||
data = transaction.get_dict()
|
data = transaction.get_dict()
|
||||||
now = int(self._clock.time_msec())
|
now = int(self.clock.time_msec())
|
||||||
if "pdus" in data:
|
if "pdus" in data:
|
||||||
for p in data["pdus"]:
|
for p in data["pdus"]:
|
||||||
if "age_ts" in p:
|
if "age_ts" in p:
|
||||||
|
|
|
@ -13,10 +13,12 @@
|
||||||
# See the License for the specific language governing permissions and
|
# See the License for the specific language governing permissions and
|
||||||
# limitations under the License.
|
# limitations under the License.
|
||||||
|
|
||||||
|
from twisted.internet import defer
|
||||||
|
|
||||||
from synapse.util.logcontext import LoggingContext
|
from synapse.util.logcontext import LoggingContext
|
||||||
import synapse.metrics
|
import synapse.metrics
|
||||||
|
|
||||||
|
from functools import wraps
|
||||||
import logging
|
import logging
|
||||||
|
|
||||||
|
|
||||||
|
@ -47,6 +49,18 @@ block_db_txn_duration = metrics.register_distribution(
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def measure_func(name):
|
||||||
|
def wrapper(func):
|
||||||
|
@wraps(func)
|
||||||
|
@defer.inlineCallbacks
|
||||||
|
def measured_func(self, *args, **kwargs):
|
||||||
|
with Measure(self.clock, name):
|
||||||
|
r = yield func(self, *args, **kwargs)
|
||||||
|
defer.returnValue(r)
|
||||||
|
return measured_func
|
||||||
|
return wrapper
|
||||||
|
|
||||||
|
|
||||||
class Measure(object):
|
class Measure(object):
|
||||||
__slots__ = [
|
__slots__ = [
|
||||||
"clock", "name", "start_context", "start", "new_context", "ru_utime",
|
"clock", "name", "start_context", "start", "new_context", "ru_utime",
|
||||||
|
|
Loading…
Reference in New Issue