Merge pull request #1680 from matrix-org/erikj/joined_rooms
Add new room membership APIs
This commit is contained in:
commit
1697f6a323
|
@ -87,12 +87,12 @@ class BulkPushRuleEvaluator:
|
||||||
condition_cache = {}
|
condition_cache = {}
|
||||||
|
|
||||||
for uid, rules in self.rules_by_user.items():
|
for uid, rules in self.rules_by_user.items():
|
||||||
display_name = None
|
display_name = room_members.get(uid, {}).get("display_name", None)
|
||||||
member_ev_id = context.current_state_ids.get((EventTypes.Member, uid))
|
if not display_name:
|
||||||
if member_ev_id:
|
# Handle the case where we are pushing a membership event to
|
||||||
member_ev = yield self.store.get_event(member_ev_id, allow_none=True)
|
# that user, as they might not be already joined.
|
||||||
if member_ev:
|
if event.type == EventTypes.Member and event.state_key == uid:
|
||||||
display_name = member_ev.content.get("displayname", None)
|
display_name = event.content.get("displayname", None)
|
||||||
|
|
||||||
filtered = filtered_by_user[uid]
|
filtered = filtered_by_user[uid]
|
||||||
if len(filtered) == 0:
|
if len(filtered) == 0:
|
||||||
|
|
|
@ -369,6 +369,24 @@ class RoomMemberListRestServlet(ClientV1RestServlet):
|
||||||
}))
|
}))
|
||||||
|
|
||||||
|
|
||||||
|
class JoinedRoomMemberListRestServlet(ClientV1RestServlet):
|
||||||
|
PATTERNS = client_path_patterns("/rooms/(?P<room_id>[^/]*)/joined_members$")
|
||||||
|
|
||||||
|
def __init__(self, hs):
|
||||||
|
super(JoinedRoomMemberListRestServlet, self).__init__(hs)
|
||||||
|
self.state = hs.get_state_handler()
|
||||||
|
|
||||||
|
@defer.inlineCallbacks
|
||||||
|
def on_GET(self, request, room_id):
|
||||||
|
yield self.auth.get_user_by_req(request)
|
||||||
|
|
||||||
|
users_with_profile = yield self.state.get_current_user_in_room(room_id)
|
||||||
|
|
||||||
|
defer.returnValue((200, {
|
||||||
|
"joined": users_with_profile
|
||||||
|
}))
|
||||||
|
|
||||||
|
|
||||||
# TODO: Needs better unit testing
|
# TODO: Needs better unit testing
|
||||||
class RoomMessageListRestServlet(ClientV1RestServlet):
|
class RoomMessageListRestServlet(ClientV1RestServlet):
|
||||||
PATTERNS = client_path_patterns("/rooms/(?P<room_id>[^/]*)/messages$")
|
PATTERNS = client_path_patterns("/rooms/(?P<room_id>[^/]*)/messages$")
|
||||||
|
@ -692,6 +710,22 @@ class SearchRestServlet(ClientV1RestServlet):
|
||||||
defer.returnValue((200, results))
|
defer.returnValue((200, results))
|
||||||
|
|
||||||
|
|
||||||
|
class JoinedRoomsRestServlet(ClientV1RestServlet):
|
||||||
|
PATTERNS = client_path_patterns("/joined_rooms$")
|
||||||
|
|
||||||
|
def __init__(self, hs):
|
||||||
|
super(JoinedRoomsRestServlet, self).__init__(hs)
|
||||||
|
self.store = hs.get_datastore()
|
||||||
|
|
||||||
|
@defer.inlineCallbacks
|
||||||
|
def on_GET(self, request):
|
||||||
|
requester = yield self.auth.get_user_by_req(request, allow_guest=True)
|
||||||
|
|
||||||
|
rooms = yield self.store.get_rooms_for_user(requester.user.to_string())
|
||||||
|
room_ids = set(r.room_id for r in rooms) # Ensure they're unique.
|
||||||
|
defer.returnValue((200, {"joined_rooms": list(room_ids)}))
|
||||||
|
|
||||||
|
|
||||||
def register_txn_path(servlet, regex_string, http_server, with_get=False):
|
def register_txn_path(servlet, regex_string, http_server, with_get=False):
|
||||||
"""Registers a transaction-based path.
|
"""Registers a transaction-based path.
|
||||||
|
|
||||||
|
@ -727,6 +761,7 @@ def register_servlets(hs, http_server):
|
||||||
RoomStateEventRestServlet(hs).register(http_server)
|
RoomStateEventRestServlet(hs).register(http_server)
|
||||||
RoomCreateRestServlet(hs).register(http_server)
|
RoomCreateRestServlet(hs).register(http_server)
|
||||||
RoomMemberListRestServlet(hs).register(http_server)
|
RoomMemberListRestServlet(hs).register(http_server)
|
||||||
|
JoinedRoomMemberListRestServlet(hs).register(http_server)
|
||||||
RoomMessageListRestServlet(hs).register(http_server)
|
RoomMessageListRestServlet(hs).register(http_server)
|
||||||
JoinRoomAliasServlet(hs).register(http_server)
|
JoinRoomAliasServlet(hs).register(http_server)
|
||||||
RoomForgetRestServlet(hs).register(http_server)
|
RoomForgetRestServlet(hs).register(http_server)
|
||||||
|
@ -738,4 +773,5 @@ def register_servlets(hs, http_server):
|
||||||
RoomRedactEventRestServlet(hs).register(http_server)
|
RoomRedactEventRestServlet(hs).register(http_server)
|
||||||
RoomTypingRestServlet(hs).register(http_server)
|
RoomTypingRestServlet(hs).register(http_server)
|
||||||
SearchRestServlet(hs).register(http_server)
|
SearchRestServlet(hs).register(http_server)
|
||||||
|
JoinedRoomsRestServlet(hs).register(http_server)
|
||||||
RoomEventContext(hs).register(http_server)
|
RoomEventContext(hs).register(http_server)
|
||||||
|
|
|
@ -222,6 +222,7 @@ class DataStore(RoomMemberStore, RoomStore,
|
||||||
)
|
)
|
||||||
|
|
||||||
self._stream_order_on_start = self.get_room_max_stream_ordering()
|
self._stream_order_on_start = self.get_room_max_stream_ordering()
|
||||||
|
self._min_stream_order_on_start = self.get_room_min_stream_ordering()
|
||||||
|
|
||||||
super(DataStore, self).__init__(hs)
|
super(DataStore, self).__init__(hs)
|
||||||
|
|
||||||
|
|
|
@ -24,6 +24,7 @@ from synapse.api.constants import Membership, EventTypes
|
||||||
from synapse.types import get_domain_from_id
|
from synapse.types import get_domain_from_id
|
||||||
|
|
||||||
import logging
|
import logging
|
||||||
|
import ujson as json
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
@ -34,7 +35,15 @@ RoomsForUser = namedtuple(
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
_MEMBERSHIP_PROFILE_UPDATE_NAME = "room_membership_profile_update"
|
||||||
|
|
||||||
|
|
||||||
class RoomMemberStore(SQLBaseStore):
|
class RoomMemberStore(SQLBaseStore):
|
||||||
|
def __init__(self, hs):
|
||||||
|
super(RoomMemberStore, self).__init__(hs)
|
||||||
|
self.register_background_update_handler(
|
||||||
|
_MEMBERSHIP_PROFILE_UPDATE_NAME, self._background_add_membership_profile
|
||||||
|
)
|
||||||
|
|
||||||
def _store_room_members_txn(self, txn, events, backfilled):
|
def _store_room_members_txn(self, txn, events, backfilled):
|
||||||
"""Store a room member in the database.
|
"""Store a room member in the database.
|
||||||
|
@ -49,6 +58,8 @@ class RoomMemberStore(SQLBaseStore):
|
||||||
"sender": event.user_id,
|
"sender": event.user_id,
|
||||||
"room_id": event.room_id,
|
"room_id": event.room_id,
|
||||||
"membership": event.membership,
|
"membership": event.membership,
|
||||||
|
"display_name": event.content.get("displayname", None),
|
||||||
|
"avatar_url": event.content.get("avatar_url", None),
|
||||||
}
|
}
|
||||||
for event in events
|
for event in events
|
||||||
]
|
]
|
||||||
|
@ -398,7 +409,7 @@ class RoomMemberStore(SQLBaseStore):
|
||||||
table="room_memberships",
|
table="room_memberships",
|
||||||
column="event_id",
|
column="event_id",
|
||||||
iterable=member_event_ids,
|
iterable=member_event_ids,
|
||||||
retcols=['user_id'],
|
retcols=['user_id', 'display_name', 'avatar_url'],
|
||||||
keyvalues={
|
keyvalues={
|
||||||
"membership": Membership.JOIN,
|
"membership": Membership.JOIN,
|
||||||
},
|
},
|
||||||
|
@ -406,11 +417,21 @@ class RoomMemberStore(SQLBaseStore):
|
||||||
desc="_get_joined_users_from_context",
|
desc="_get_joined_users_from_context",
|
||||||
)
|
)
|
||||||
|
|
||||||
users_in_room = set(row["user_id"] for row in rows)
|
users_in_room = {
|
||||||
|
row["user_id"]: {
|
||||||
|
"display_name": row["display_name"],
|
||||||
|
"avatar_url": row["avatar_url"],
|
||||||
|
}
|
||||||
|
for row in rows
|
||||||
|
}
|
||||||
|
|
||||||
if event is not None and event.type == EventTypes.Member:
|
if event is not None and event.type == EventTypes.Member:
|
||||||
if event.membership == Membership.JOIN:
|
if event.membership == Membership.JOIN:
|
||||||
if event.event_id in member_event_ids:
|
if event.event_id in member_event_ids:
|
||||||
users_in_room.add(event.state_key)
|
users_in_room[event.state_key] = {
|
||||||
|
"display_name": event.content.get("displayname", None),
|
||||||
|
"avatar_url": event.content.get("avatar_url", None),
|
||||||
|
}
|
||||||
|
|
||||||
defer.returnValue(users_in_room)
|
defer.returnValue(users_in_room)
|
||||||
|
|
||||||
|
@ -448,3 +469,78 @@ class RoomMemberStore(SQLBaseStore):
|
||||||
defer.returnValue(True)
|
defer.returnValue(True)
|
||||||
|
|
||||||
defer.returnValue(False)
|
defer.returnValue(False)
|
||||||
|
|
||||||
|
@defer.inlineCallbacks
|
||||||
|
def _background_add_membership_profile(self, progress, batch_size):
|
||||||
|
target_min_stream_id = progress.get(
|
||||||
|
"target_min_stream_id_inclusive", self._min_stream_order_on_start
|
||||||
|
)
|
||||||
|
max_stream_id = progress.get(
|
||||||
|
"max_stream_id_exclusive", self._stream_order_on_start + 1
|
||||||
|
)
|
||||||
|
|
||||||
|
INSERT_CLUMP_SIZE = 1000
|
||||||
|
|
||||||
|
def add_membership_profile_txn(txn):
|
||||||
|
sql = ("""
|
||||||
|
SELECT stream_ordering, event_id, room_id, content
|
||||||
|
FROM events
|
||||||
|
INNER JOIN room_memberships USING (room_id, event_id)
|
||||||
|
WHERE ? <= stream_ordering AND stream_ordering < ?
|
||||||
|
AND type = 'm.room.member'
|
||||||
|
ORDER BY stream_ordering DESC
|
||||||
|
LIMIT ?
|
||||||
|
""")
|
||||||
|
|
||||||
|
txn.execute(sql, (target_min_stream_id, max_stream_id, batch_size))
|
||||||
|
|
||||||
|
rows = self.cursor_to_dict(txn)
|
||||||
|
if not rows:
|
||||||
|
return 0
|
||||||
|
|
||||||
|
min_stream_id = rows[-1]["stream_ordering"]
|
||||||
|
|
||||||
|
to_update = []
|
||||||
|
for row in rows:
|
||||||
|
event_id = row["event_id"]
|
||||||
|
room_id = row["room_id"]
|
||||||
|
try:
|
||||||
|
content = json.loads(row["content"])
|
||||||
|
except:
|
||||||
|
continue
|
||||||
|
|
||||||
|
display_name = content.get("displayname", None)
|
||||||
|
avatar_url = content.get("avatar_url", None)
|
||||||
|
|
||||||
|
if display_name or avatar_url:
|
||||||
|
to_update.append((
|
||||||
|
display_name, avatar_url, event_id, room_id
|
||||||
|
))
|
||||||
|
|
||||||
|
to_update_sql = ("""
|
||||||
|
UPDATE room_memberships SET display_name = ?, avatar_url = ?
|
||||||
|
WHERE event_id = ? AND room_id = ?
|
||||||
|
""")
|
||||||
|
for index in range(0, len(to_update), INSERT_CLUMP_SIZE):
|
||||||
|
clump = to_update[index:index + INSERT_CLUMP_SIZE]
|
||||||
|
txn.executemany(to_update_sql, clump)
|
||||||
|
|
||||||
|
progress = {
|
||||||
|
"target_min_stream_id_inclusive": target_min_stream_id,
|
||||||
|
"max_stream_id_exclusive": min_stream_id,
|
||||||
|
}
|
||||||
|
|
||||||
|
self._background_update_progress_txn(
|
||||||
|
txn, _MEMBERSHIP_PROFILE_UPDATE_NAME, progress
|
||||||
|
)
|
||||||
|
|
||||||
|
return len(to_update)
|
||||||
|
|
||||||
|
result = yield self.runInteraction(
|
||||||
|
_MEMBERSHIP_PROFILE_UPDATE_NAME, add_membership_profile_txn
|
||||||
|
)
|
||||||
|
|
||||||
|
if not result:
|
||||||
|
yield self._end_background_update(_MEMBERSHIP_PROFILE_UPDATE_NAME)
|
||||||
|
|
||||||
|
defer.returnValue(result)
|
||||||
|
|
|
@ -0,0 +1,20 @@
|
||||||
|
/* Copyright 2016 OpenMarket Ltd
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
ALTER TABLE room_memberships ADD COLUMN display_name TEXT;
|
||||||
|
ALTER TABLE room_memberships ADD COLUMN avatar_url TEXT;
|
||||||
|
|
||||||
|
INSERT into background_updates (update_name, progress_json)
|
||||||
|
VALUES ('room_membership_profile_update', '{}');
|
|
@ -541,6 +541,9 @@ class StreamStore(SQLBaseStore):
|
||||||
def get_room_max_stream_ordering(self):
|
def get_room_max_stream_ordering(self):
|
||||||
return self._stream_id_gen.get_current_token()
|
return self._stream_id_gen.get_current_token()
|
||||||
|
|
||||||
|
def get_room_min_stream_ordering(self):
|
||||||
|
return self._backfill_id_gen.get_current_token()
|
||||||
|
|
||||||
def get_stream_token_for_event(self, event_id):
|
def get_stream_token_for_event(self, event_id):
|
||||||
"""The stream token for an event
|
"""The stream token for an event
|
||||||
Args:
|
Args:
|
||||||
|
|
Loading…
Reference in New Issue