Add a module API to send an HTTP push notification (#15387)
Co-authored-by: Patrick Cloke <clokep@users.noreply.github.com>
This commit is contained in:
parent
e2e9b545ff
commit
247e6a8a78
|
@ -0,0 +1 @@
|
|||
Add a module API to send an HTTP push notification.
|
|
@ -105,6 +105,7 @@ from synapse.module_api.callbacks.spamchecker_callbacks import (
|
|||
USER_MAY_SEND_3PID_INVITE_CALLBACK,
|
||||
SpamCheckerModuleApiCallbacks,
|
||||
)
|
||||
from synapse.push.httppusher import HttpPusher
|
||||
from synapse.rest.client.login import LoginResponse
|
||||
from synapse.storage import DataStore
|
||||
from synapse.storage.background_updates import (
|
||||
|
@ -248,6 +249,7 @@ class ModuleApi:
|
|||
self._registration_handler = hs.get_registration_handler()
|
||||
self._send_email_handler = hs.get_send_email_handler()
|
||||
self._push_rules_handler = hs.get_push_rules_handler()
|
||||
self._pusherpool = hs.get_pusherpool()
|
||||
self._device_handler = hs.get_device_handler()
|
||||
self.custom_template_dir = hs.config.server.custom_template_directory
|
||||
self._callbacks = hs.get_module_api_callbacks()
|
||||
|
@ -1225,6 +1227,50 @@ class ModuleApi:
|
|||
|
||||
await self._clock.sleep(seconds)
|
||||
|
||||
async def send_http_push_notification(
|
||||
self,
|
||||
user_id: str,
|
||||
device_id: Optional[str],
|
||||
content: JsonDict,
|
||||
tweaks: Optional[JsonMapping] = None,
|
||||
default_payload: Optional[JsonMapping] = None,
|
||||
) -> Dict[str, bool]:
|
||||
"""Send an HTTP push notification that is forwarded to the registered push gateway
|
||||
for the specified user/device.
|
||||
|
||||
Added in Synapse v1.82.0.
|
||||
|
||||
Args:
|
||||
user_id: The user ID to send the push notification to.
|
||||
device_id: The device ID of the device where to send the push notification. If `None`,
|
||||
the notification will be sent to all registered HTTP pushers of the user.
|
||||
content: A dict of values that will be put in the `notification` field of the push
|
||||
(cf Push Gateway spec). `devices` field will be overrided if included.
|
||||
tweaks: A dict of `tweaks` that will be inserted in the `devices` section, cf spec.
|
||||
default_payload: default payload to add in `devices[0].data.default_payload`.
|
||||
This will be merged (and override if some matching values already exist there)
|
||||
with existing `default_payload`.
|
||||
|
||||
Returns:
|
||||
a dict reprensenting the status of the push per device ID
|
||||
"""
|
||||
status = {}
|
||||
if user_id in self._pusherpool.pushers:
|
||||
for p in self._pusherpool.pushers[user_id].values():
|
||||
if isinstance(p, HttpPusher) and (
|
||||
not device_id or p.device_id == device_id
|
||||
):
|
||||
res = await p.dispatch_push(content, tweaks, default_payload)
|
||||
# Check if the push was successful and no pushers were rejected.
|
||||
sent = res is not False and not res
|
||||
|
||||
# This is mainly to accomodate mypy
|
||||
# device_id should never be empty after the `set_device_id_for_pushers`
|
||||
# background job has been properly run.
|
||||
if p.device_id:
|
||||
status[p.device_id] = sent
|
||||
return status
|
||||
|
||||
async def send_mail(
|
||||
self,
|
||||
recipient: str,
|
||||
|
|
|
@ -14,7 +14,7 @@
|
|||
# limitations under the License.
|
||||
import logging
|
||||
import urllib.parse
|
||||
from typing import TYPE_CHECKING, Any, Dict, Iterable, List, Optional, Union
|
||||
from typing import TYPE_CHECKING, Dict, List, Optional, Union
|
||||
|
||||
from prometheus_client import Counter
|
||||
|
||||
|
@ -27,6 +27,7 @@ from synapse.logging import opentracing
|
|||
from synapse.metrics.background_process_metrics import run_as_background_process
|
||||
from synapse.push import Pusher, PusherConfig, PusherConfigException
|
||||
from synapse.storage.databases.main.event_push_actions import HttpPushAction
|
||||
from synapse.types import JsonDict, JsonMapping
|
||||
|
||||
from . import push_tools
|
||||
|
||||
|
@ -56,7 +57,7 @@ http_badges_failed_counter = Counter(
|
|||
)
|
||||
|
||||
|
||||
def tweaks_for_actions(actions: List[Union[str, Dict]]) -> Dict[str, Any]:
|
||||
def tweaks_for_actions(actions: List[Union[str, Dict]]) -> JsonMapping:
|
||||
"""
|
||||
Converts a list of actions into a `tweaks` dict (which can then be passed to
|
||||
the push gateway).
|
||||
|
@ -101,6 +102,7 @@ class HttpPusher(Pusher):
|
|||
self._storage_controllers = self.hs.get_storage_controllers()
|
||||
self.app_display_name = pusher_config.app_display_name
|
||||
self.device_display_name = pusher_config.device_display_name
|
||||
self.device_id = pusher_config.device_id
|
||||
self.pushkey_ts = pusher_config.ts
|
||||
self.data = pusher_config.data
|
||||
self.backoff_delay = HttpPusher.INITIAL_BACKOFF_SEC
|
||||
|
@ -324,7 +326,7 @@ class HttpPusher(Pusher):
|
|||
event = await self.store.get_event(push_action.event_id, allow_none=True)
|
||||
if event is None:
|
||||
return True # It's been redacted
|
||||
rejected = await self.dispatch_push(event, tweaks, badge)
|
||||
rejected = await self.dispatch_push_event(event, tweaks, badge)
|
||||
if rejected is False:
|
||||
return False
|
||||
|
||||
|
@ -342,9 +344,83 @@ class HttpPusher(Pusher):
|
|||
await self._pusherpool.remove_pusher(self.app_id, pk, self.user_id)
|
||||
return True
|
||||
|
||||
async def _build_notification_dict(
|
||||
self, event: EventBase, tweaks: Dict[str, bool], badge: int
|
||||
) -> Dict[str, Any]:
|
||||
async def dispatch_push(
|
||||
self,
|
||||
content: JsonDict,
|
||||
tweaks: Optional[JsonMapping] = None,
|
||||
default_payload: Optional[JsonMapping] = None,
|
||||
) -> Union[bool, List[str]]:
|
||||
"""Send a notification to the registered push gateway, with `content` being
|
||||
the content of the `notification` top property specified in the spec.
|
||||
Note that the `devices` property will be added with device-specific
|
||||
information for this pusher.
|
||||
|
||||
Args:
|
||||
content: the content
|
||||
tweaks: tweaks to add into the `devices` section
|
||||
default_payload: default payload to add in `devices[0].data.default_payload`.
|
||||
This will be merged (and override if some matching values already exist there)
|
||||
with existing `default_payload`.
|
||||
|
||||
Returns:
|
||||
False if an error occured when calling the push gateway, or an array of
|
||||
rejected push keys otherwise. If this array is empty, the push fully
|
||||
succeeded.
|
||||
"""
|
||||
content = content.copy()
|
||||
|
||||
data = self.data_minus_url.copy()
|
||||
if default_payload:
|
||||
data.setdefault("default_payload", {}).update(default_payload)
|
||||
|
||||
device = {
|
||||
"app_id": self.app_id,
|
||||
"pushkey": self.pushkey,
|
||||
"pushkey_ts": int(self.pushkey_ts / 1000),
|
||||
"data": data,
|
||||
}
|
||||
if tweaks:
|
||||
device["tweaks"] = tweaks
|
||||
|
||||
content["devices"] = [device]
|
||||
|
||||
try:
|
||||
resp = await self.http_client.post_json_get_json(
|
||||
self.url, {"notification": content}
|
||||
)
|
||||
except Exception as e:
|
||||
logger.warning(
|
||||
"Failed to push data to %s: %s %s",
|
||||
self.name,
|
||||
type(e),
|
||||
e,
|
||||
)
|
||||
return False
|
||||
rejected = []
|
||||
if "rejected" in resp:
|
||||
rejected = resp["rejected"]
|
||||
return rejected
|
||||
|
||||
async def dispatch_push_event(
|
||||
self,
|
||||
event: EventBase,
|
||||
tweaks: JsonMapping,
|
||||
badge: int,
|
||||
) -> Union[bool, List[str]]:
|
||||
"""Send a notification to the registered push gateway by building it
|
||||
from an event.
|
||||
|
||||
Args:
|
||||
event: the event
|
||||
tweaks: tweaks to add into the `devices` section, used to decide the
|
||||
push priority
|
||||
badge: unread count to send with the push notification
|
||||
|
||||
Returns:
|
||||
False if an error occured when calling the push gateway, or an array of
|
||||
rejected push keys otherwise. If this array is empty, the push fully
|
||||
succeeded.
|
||||
"""
|
||||
priority = "low"
|
||||
if (
|
||||
event.type == EventTypes.Encrypted
|
||||
|
@ -358,30 +434,20 @@ class HttpPusher(Pusher):
|
|||
# This was checked in the __init__, but mypy doesn't seem to know that.
|
||||
assert self.data is not None
|
||||
if self.data.get("format") == "event_id_only":
|
||||
d: Dict[str, Any] = {
|
||||
"notification": {
|
||||
"event_id": event.event_id,
|
||||
"room_id": event.room_id,
|
||||
"counts": {"unread": badge},
|
||||
"prio": priority,
|
||||
"devices": [
|
||||
{
|
||||
"app_id": self.app_id,
|
||||
"pushkey": self.pushkey,
|
||||
"pushkey_ts": int(self.pushkey_ts / 1000),
|
||||
"data": self.data_minus_url,
|
||||
}
|
||||
],
|
||||
}
|
||||
content: JsonDict = {
|
||||
"event_id": event.event_id,
|
||||
"room_id": event.room_id,
|
||||
"counts": {"unread": badge},
|
||||
"prio": priority,
|
||||
}
|
||||
return d
|
||||
# event_id_only doesn't include the tweaks, so override them.
|
||||
tweaks = {}
|
||||
else:
|
||||
ctx = await push_tools.get_context_for_event(
|
||||
self._storage_controllers, event, self.user_id
|
||||
)
|
||||
|
||||
ctx = await push_tools.get_context_for_event(
|
||||
self._storage_controllers, event, self.user_id
|
||||
)
|
||||
|
||||
d = {
|
||||
"notification": {
|
||||
content = {
|
||||
"id": event.event_id, # deprecated: remove soon
|
||||
"event_id": event.event_id,
|
||||
"room_id": event.room_id,
|
||||
|
@ -392,57 +458,27 @@ class HttpPusher(Pusher):
|
|||
"unread": badge,
|
||||
# 'missed_calls': 2
|
||||
},
|
||||
"devices": [
|
||||
{
|
||||
"app_id": self.app_id,
|
||||
"pushkey": self.pushkey,
|
||||
"pushkey_ts": int(self.pushkey_ts / 1000),
|
||||
"data": self.data_minus_url,
|
||||
"tweaks": tweaks,
|
||||
}
|
||||
],
|
||||
}
|
||||
}
|
||||
if event.type == "m.room.member" and event.is_state():
|
||||
d["notification"]["membership"] = event.content["membership"]
|
||||
d["notification"]["user_is_target"] = event.state_key == self.user_id
|
||||
if self.hs.config.push.push_include_content and event.content:
|
||||
d["notification"]["content"] = event.content
|
||||
if event.type == "m.room.member" and event.is_state():
|
||||
content["membership"] = event.content["membership"]
|
||||
content["user_is_target"] = event.state_key == self.user_id
|
||||
if self.hs.config.push.push_include_content and event.content:
|
||||
content["content"] = event.content
|
||||
|
||||
# We no longer send aliases separately, instead, we send the human
|
||||
# readable name of the room, which may be an alias.
|
||||
if "sender_display_name" in ctx and len(ctx["sender_display_name"]) > 0:
|
||||
d["notification"]["sender_display_name"] = ctx["sender_display_name"]
|
||||
if "name" in ctx and len(ctx["name"]) > 0:
|
||||
d["notification"]["room_name"] = ctx["name"]
|
||||
# We no longer send aliases separately, instead, we send the human
|
||||
# readable name of the room, which may be an alias.
|
||||
if "sender_display_name" in ctx and len(ctx["sender_display_name"]) > 0:
|
||||
content["sender_display_name"] = ctx["sender_display_name"]
|
||||
if "name" in ctx and len(ctx["name"]) > 0:
|
||||
content["room_name"] = ctx["name"]
|
||||
|
||||
return d
|
||||
res = await self.dispatch_push(content, tweaks)
|
||||
|
||||
async def dispatch_push(
|
||||
self, event: EventBase, tweaks: Dict[str, bool], badge: int
|
||||
) -> Union[bool, Iterable[str]]:
|
||||
notification_dict = await self._build_notification_dict(event, tweaks, badge)
|
||||
if not notification_dict:
|
||||
return []
|
||||
try:
|
||||
resp = await self.http_client.post_json_get_json(
|
||||
self.url, notification_dict
|
||||
)
|
||||
except Exception as e:
|
||||
logger.warning(
|
||||
"Failed to push event %s to %s: %s %s",
|
||||
event.event_id,
|
||||
self.name,
|
||||
type(e),
|
||||
e,
|
||||
)
|
||||
return False
|
||||
rejected = []
|
||||
if "rejected" in resp:
|
||||
rejected = resp["rejected"]
|
||||
if not rejected:
|
||||
# If the push is successful and none are rejected, update the badge count.
|
||||
if res is not False and not res:
|
||||
self.badge_count_last_call = badge
|
||||
return rejected
|
||||
|
||||
return res
|
||||
|
||||
async def _send_badge(self, badge: int) -> None:
|
||||
"""
|
||||
|
|
Loading…
Reference in New Issue