Merge pull request #2103 from matrix-org/erikj/no-double-encode
Don't double encode replication data
This commit is contained in:
commit
98ce212093
|
@ -51,7 +51,6 @@ from daemonize import Daemonize
|
||||||
import sys
|
import sys
|
||||||
import logging
|
import logging
|
||||||
import gc
|
import gc
|
||||||
import ujson as json
|
|
||||||
|
|
||||||
logger = logging.getLogger("synapse.app.appservice")
|
logger = logging.getLogger("synapse.app.appservice")
|
||||||
|
|
||||||
|
@ -290,8 +289,7 @@ class FederationSenderHandler(object):
|
||||||
# Parse the rows in the stream
|
# Parse the rows in the stream
|
||||||
for row in rows:
|
for row in rows:
|
||||||
typ = row.type
|
typ = row.type
|
||||||
content_js = row.data
|
content = row.data
|
||||||
content = json.loads(content_js)
|
|
||||||
|
|
||||||
if typ == send_queue.PRESENCE_TYPE:
|
if typ == send_queue.PRESENCE_TYPE:
|
||||||
destination = content["destination"]
|
destination = content["destination"]
|
||||||
|
|
|
@ -62,7 +62,6 @@ import sys
|
||||||
import logging
|
import logging
|
||||||
import contextlib
|
import contextlib
|
||||||
import gc
|
import gc
|
||||||
import ujson as json
|
|
||||||
|
|
||||||
logger = logging.getLogger("synapse.app.synchrotron")
|
logger = logging.getLogger("synapse.app.synchrotron")
|
||||||
|
|
||||||
|
@ -254,9 +253,8 @@ class SynchrotronTyping(object):
|
||||||
self._latest_room_serial = token
|
self._latest_room_serial = token
|
||||||
|
|
||||||
for row in rows:
|
for row in rows:
|
||||||
typing = json.loads(row.user_ids)
|
|
||||||
self._room_serials[row.room_id] = token
|
self._room_serials[row.room_id] = token
|
||||||
self._room_typing[row.room_id] = typing
|
self._room_typing[row.room_id] = row.user_ids
|
||||||
|
|
||||||
|
|
||||||
class SynchrotronApplicationService(object):
|
class SynchrotronApplicationService(object):
|
||||||
|
|
|
@ -35,7 +35,6 @@ from synapse.util.metrics import Measure
|
||||||
import synapse.metrics
|
import synapse.metrics
|
||||||
|
|
||||||
from blist import sorteddict
|
from blist import sorteddict
|
||||||
import ujson
|
|
||||||
|
|
||||||
|
|
||||||
metrics = synapse.metrics.get_metrics_for(__name__)
|
metrics = synapse.metrics.get_metrics_for(__name__)
|
||||||
|
@ -258,10 +257,10 @@ class FederationRemoteSendQueue(object):
|
||||||
)
|
)
|
||||||
|
|
||||||
for (key, (dest, user_id)) in dest_user_ids:
|
for (key, (dest, user_id)) in dest_user_ids:
|
||||||
rows.append((key, PRESENCE_TYPE, ujson.dumps({
|
rows.append((key, PRESENCE_TYPE, {
|
||||||
"destination": dest,
|
"destination": dest,
|
||||||
"state": self.presence_map[user_id].as_dict(),
|
"state": self.presence_map[user_id].as_dict(),
|
||||||
})))
|
}))
|
||||||
|
|
||||||
# Fetch changes keyed edus
|
# Fetch changes keyed edus
|
||||||
keys = self.keyed_edu_changed.keys()
|
keys = self.keyed_edu_changed.keys()
|
||||||
|
@ -271,10 +270,10 @@ class FederationRemoteSendQueue(object):
|
||||||
|
|
||||||
for (pos, (destination, edu_key)) in keyed_edus:
|
for (pos, (destination, edu_key)) in keyed_edus:
|
||||||
rows.append(
|
rows.append(
|
||||||
(pos, KEYED_EDU_TYPE, ujson.dumps({
|
(pos, KEYED_EDU_TYPE, {
|
||||||
"key": edu_key,
|
"key": edu_key,
|
||||||
"edu": self.keyed_edu[(destination, edu_key)].get_internal_dict(),
|
"edu": self.keyed_edu[(destination, edu_key)].get_internal_dict(),
|
||||||
}))
|
})
|
||||||
)
|
)
|
||||||
|
|
||||||
# Fetch changed edus
|
# Fetch changed edus
|
||||||
|
@ -284,7 +283,7 @@ class FederationRemoteSendQueue(object):
|
||||||
edus = set((k, self.edus[k]) for k in keys[i:j])
|
edus = set((k, self.edus[k]) for k in keys[i:j])
|
||||||
|
|
||||||
for (pos, edu) in edus:
|
for (pos, edu) in edus:
|
||||||
rows.append((pos, EDU_TYPE, ujson.dumps(edu.get_internal_dict())))
|
rows.append((pos, EDU_TYPE, edu.get_internal_dict()))
|
||||||
|
|
||||||
# Fetch changed failures
|
# Fetch changed failures
|
||||||
keys = self.failures.keys()
|
keys = self.failures.keys()
|
||||||
|
@ -293,10 +292,10 @@ class FederationRemoteSendQueue(object):
|
||||||
failures = set((k, self.failures[k]) for k in keys[i:j])
|
failures = set((k, self.failures[k]) for k in keys[i:j])
|
||||||
|
|
||||||
for (pos, (destination, failure)) in failures:
|
for (pos, (destination, failure)) in failures:
|
||||||
rows.append((pos, FAILURE_TYPE, ujson.dumps({
|
rows.append((pos, FAILURE_TYPE, {
|
||||||
"destination": destination,
|
"destination": destination,
|
||||||
"failure": failure,
|
"failure": failure,
|
||||||
})))
|
}))
|
||||||
|
|
||||||
# Fetch changed device messages
|
# Fetch changed device messages
|
||||||
keys = self.device_messages.keys()
|
keys = self.device_messages.keys()
|
||||||
|
@ -305,9 +304,9 @@ class FederationRemoteSendQueue(object):
|
||||||
device_messages = set((k, self.device_messages[k]) for k in keys[i:j])
|
device_messages = set((k, self.device_messages[k]) for k in keys[i:j])
|
||||||
|
|
||||||
for (pos, destination) in device_messages:
|
for (pos, destination) in device_messages:
|
||||||
rows.append((pos, DEVICE_MESSAGE_TYPE, ujson.dumps({
|
rows.append((pos, DEVICE_MESSAGE_TYPE, {
|
||||||
"destination": destination,
|
"destination": destination,
|
||||||
})))
|
}))
|
||||||
|
|
||||||
# Sort rows based on pos
|
# Sort rows based on pos
|
||||||
rows.sort()
|
rows.sort()
|
||||||
|
|
|
@ -24,7 +24,6 @@ from synapse.types import UserID, get_domain_from_id
|
||||||
import logging
|
import logging
|
||||||
|
|
||||||
from collections import namedtuple
|
from collections import namedtuple
|
||||||
import ujson as json
|
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
@ -288,8 +287,7 @@ class TypingHandler(object):
|
||||||
for room_id, serial in self._room_serials.items():
|
for room_id, serial in self._room_serials.items():
|
||||||
if last_id < serial and serial <= current_id:
|
if last_id < serial and serial <= current_id:
|
||||||
typing = self._room_typing[room_id]
|
typing = self._room_typing[room_id]
|
||||||
typing_bytes = json.dumps(list(typing), ensure_ascii=False)
|
rows.append((serial, room_id, list(typing)))
|
||||||
rows.append((serial, room_id, typing_bytes))
|
|
||||||
rows.sort()
|
rows.sort()
|
||||||
return rows
|
return rows
|
||||||
|
|
||||||
|
|
|
@ -36,34 +36,82 @@ logger = logging.getLogger(__name__)
|
||||||
MAX_EVENTS_BEHIND = 10000
|
MAX_EVENTS_BEHIND = 10000
|
||||||
|
|
||||||
|
|
||||||
EventStreamRow = namedtuple("EventStreamRow",
|
EventStreamRow = namedtuple("EventStreamRow", (
|
||||||
("event_id", "room_id", "type", "state_key", "redacts"))
|
"event_id", # str
|
||||||
BackfillStreamRow = namedtuple("BackfillStreamRow",
|
"room_id", # str
|
||||||
("event_id", "room_id", "type", "state_key", "redacts"))
|
"type", # str
|
||||||
PresenceStreamRow = namedtuple("PresenceStreamRow",
|
"state_key", # str, optional
|
||||||
("user_id", "state", "last_active_ts",
|
"redacts", # str, optional
|
||||||
"last_federation_update_ts", "last_user_sync_ts",
|
))
|
||||||
"status_msg", "currently_active"))
|
BackfillStreamRow = namedtuple("BackfillStreamRow", (
|
||||||
TypingStreamRow = namedtuple("TypingStreamRow",
|
"event_id", # str
|
||||||
("room_id", "user_ids"))
|
"room_id", # str
|
||||||
ReceiptsStreamRow = namedtuple("ReceiptsStreamRow",
|
"type", # str
|
||||||
("room_id", "receipt_type", "user_id", "event_id",
|
"state_key", # str, optional
|
||||||
"data"))
|
"redacts", # str, optional
|
||||||
PushRulesStreamRow = namedtuple("PushRulesStreamRow", ("user_id",))
|
))
|
||||||
PushersStreamRow = namedtuple("PushersStreamRow",
|
PresenceStreamRow = namedtuple("PresenceStreamRow", (
|
||||||
("user_id", "app_id", "pushkey", "deleted",))
|
"user_id", # str
|
||||||
CachesStreamRow = namedtuple("CachesStreamRow",
|
"state", # str
|
||||||
("cache_func", "keys", "invalidation_ts",))
|
"last_active_ts", # int
|
||||||
PublicRoomsStreamRow = namedtuple("PublicRoomsStreamRow",
|
"last_federation_update_ts", # int
|
||||||
("room_id", "visibility", "appservice_id",
|
"last_user_sync_ts", # int
|
||||||
"network_id",))
|
"status_msg", # str
|
||||||
DeviceListsStreamRow = namedtuple("DeviceListsStreamRow", ("user_id", "destination",))
|
"currently_active", # bool
|
||||||
ToDeviceStreamRow = namedtuple("ToDeviceStreamRow", ("entity",))
|
))
|
||||||
FederationStreamRow = namedtuple("FederationStreamRow", ("type", "data",))
|
TypingStreamRow = namedtuple("TypingStreamRow", (
|
||||||
TagAccountDataStreamRow = namedtuple("TagAccountDataStreamRow",
|
"room_id", # str
|
||||||
("user_id", "room_id", "data"))
|
"user_ids", # list(str)
|
||||||
AccountDataStreamRow = namedtuple("AccountDataStream",
|
))
|
||||||
("user_id", "room_id", "data_type", "data"))
|
ReceiptsStreamRow = namedtuple("ReceiptsStreamRow", (
|
||||||
|
"room_id", # str
|
||||||
|
"receipt_type", # str
|
||||||
|
"user_id", # str
|
||||||
|
"event_id", # str
|
||||||
|
"data", # dict
|
||||||
|
))
|
||||||
|
PushRulesStreamRow = namedtuple("PushRulesStreamRow", (
|
||||||
|
"user_id", # str
|
||||||
|
))
|
||||||
|
PushersStreamRow = namedtuple("PushersStreamRow", (
|
||||||
|
"user_id", # str
|
||||||
|
"app_id", # str
|
||||||
|
"pushkey", # str
|
||||||
|
"deleted", # bool
|
||||||
|
))
|
||||||
|
CachesStreamRow = namedtuple("CachesStreamRow", (
|
||||||
|
"cache_func", # str
|
||||||
|
"keys", # list(str)
|
||||||
|
"invalidation_ts", # int
|
||||||
|
))
|
||||||
|
PublicRoomsStreamRow = namedtuple("PublicRoomsStreamRow", (
|
||||||
|
"room_id", # str
|
||||||
|
"visibility", # str
|
||||||
|
"appservice_id", # str, optional
|
||||||
|
"network_id", # str, optional
|
||||||
|
))
|
||||||
|
DeviceListsStreamRow = namedtuple("DeviceListsStreamRow", (
|
||||||
|
"user_id", # str
|
||||||
|
"destination", # str
|
||||||
|
))
|
||||||
|
ToDeviceStreamRow = namedtuple("ToDeviceStreamRow", (
|
||||||
|
"entity", # str
|
||||||
|
))
|
||||||
|
FederationStreamRow = namedtuple("FederationStreamRow", (
|
||||||
|
"type", # str
|
||||||
|
"data", # dict
|
||||||
|
))
|
||||||
|
TagAccountDataStreamRow = namedtuple("TagAccountDataStreamRow", (
|
||||||
|
"user_id", # str
|
||||||
|
"room_id", # str
|
||||||
|
"data", # dict
|
||||||
|
))
|
||||||
|
AccountDataStreamRow = namedtuple("AccountDataStream", (
|
||||||
|
"user_id", # str
|
||||||
|
"room_id", # str
|
||||||
|
"data_type", # str
|
||||||
|
"data", # dict
|
||||||
|
))
|
||||||
|
|
||||||
|
|
||||||
class Stream(object):
|
class Stream(object):
|
||||||
|
|
Loading…
Reference in New Issue