1
0
Fork 0

Use asyncio for broadcasting

This commit is contained in:
Mouse Reeve 2022-11-10 15:41:56 -08:00
parent cbd14a69ea
commit ddcaf8e3b8
3 changed files with 58 additions and 27 deletions

View file

@ -1,14 +1,15 @@
""" activitypub model functionality """
import asyncio
from base64 import b64encode
from collections import namedtuple
from functools import reduce
import json
import operator
import logging
from typing import List
from uuid import uuid4
import requests
from requests.exceptions import RequestException
import aiohttp
from Crypto.PublicKey import RSA
from Crypto.Signature import pkcs1_15
from Crypto.Hash import SHA256
@ -136,7 +137,7 @@ class ActivitypubMixin:
queue=queue,
)
def get_recipients(self, software=None):
def get_recipients(self, software=None) -> List[str]:
"""figure out which inbox urls to post to"""
# first we have to figure out who should receive this activity
privacy = self.privacy if hasattr(self, "privacy") else "public"
@ -506,19 +507,31 @@ def unfurl_related_field(related_field, sort_field=None):
@app.task(queue=MEDIUM)
def broadcast_task(sender_id, activity, recipients):
def broadcast_task(sender_id: int, activity: str, recipients: List[str]):
"""the celery task for broadcast"""
user_model = apps.get_model("bookwyrm.User", require_ready=True)
sender = user_model.objects.get(id=sender_id)
for recipient in recipients:
try:
sign_and_send(sender, activity, recipient)
except RequestException:
pass
sender = user_model.objects.select_related("key_pair").get(id=sender_id)
asyncio.run(async_broadcast(recipients, sender, activity))
def sign_and_send(sender, data, destination):
"""crpyto whatever and http junk"""
async def async_broadcast(recipients: List[str], sender, data: str):
"""Send all the broadcasts simultaneously"""
timeout = aiohttp.ClientTimeout(total=10)
async with aiohttp.ClientSession(timeout=timeout) as session:
tasks = []
for recipient in recipients:
tasks.append(
asyncio.ensure_future(sign_and_send(session, sender, data, recipient))
)
results = await asyncio.gather(*tasks)
return results
async def sign_and_send(
session: aiohttp.ClientSession, sender, data: str, destination: str
):
"""Sign the messages and send them in an asynchronous bundle"""
now = http_date()
if not sender.key_pair.private_key:
@ -527,20 +540,25 @@ def sign_and_send(sender, data, destination):
digest = make_digest(data)
response = requests.post(
destination,
data=data,
headers={
"Date": now,
"Digest": digest,
"Signature": make_signature(sender, destination, now, digest),
"Content-Type": "application/activity+json; charset=utf-8",
"User-Agent": USER_AGENT,
},
)
if not response.ok:
response.raise_for_status()
return response
headers = {
"Date": now,
"Digest": digest,
"Signature": make_signature(sender, destination, now, digest),
"Content-Type": "application/activity+json; charset=utf-8",
"User-Agent": USER_AGENT,
}
try:
async with session.post(destination, data=data, headers=headers) as response:
if not response.ok:
logger.exception(
"Failed to send broadcast to %s: %s", destination, response.reason
)
return response
except asyncio.TimeoutError:
logger.info("Connection timed out for url: %s", destination)
except aiohttp.ClientError as err:
logger.exception(err)
# pylint: disable=unused-argument