Merge pull request #120 from mouse-reeve/celery-tasks
Create celery tasks
This commit is contained in:
commit
5d022f5f91
16 changed files with 311 additions and 79 deletions
|
@ -1,9 +0,0 @@
|
|||
''' we need this file to initialize celery '''
|
||||
from __future__ import absolute_import, unicode_literals
|
||||
|
||||
# This will make sure the app is always imported when
|
||||
# Django starts so that shared_task will use this app.
|
||||
from .celery import app as celery_app
|
||||
|
||||
__all__ = ('celery_app',)
|
||||
|
|
@ -9,6 +9,7 @@ import requests
|
|||
from urllib.parse import urlparse
|
||||
|
||||
from fedireads import models
|
||||
from fedireads.tasks import app
|
||||
|
||||
|
||||
def get_recipients(user, post_privacy, direct_recipients=None, limit=False):
|
||||
|
@ -39,6 +40,9 @@ def get_recipients(user, post_privacy, direct_recipients=None, limit=False):
|
|||
fedireads_user = limit == 'fedireads'
|
||||
followers = user.followers.filter(fedireads_user=fedireads_user).all()
|
||||
|
||||
# we don't need to broadcast to ourself
|
||||
followers = followers.filter(local=False)
|
||||
|
||||
# TODO I don't think this is actually accomplishing pubic/followers only?
|
||||
if post_privacy == 'public':
|
||||
# post to public shared inboxes
|
||||
|
@ -58,6 +62,13 @@ def get_recipients(user, post_privacy, direct_recipients=None, limit=False):
|
|||
|
||||
def broadcast(sender, activity, recipients):
|
||||
''' send out an event '''
|
||||
broadcast_task.delay(sender.id, activity, recipients)
|
||||
|
||||
|
||||
@app.task
|
||||
def broadcast_task(sender_id, activity, recipients):
|
||||
''' the celery task for broadcast '''
|
||||
sender = models.User.objects.get(id=sender_id)
|
||||
errors = []
|
||||
for recipient in recipients:
|
||||
try:
|
||||
|
|
|
@ -1,24 +0,0 @@
|
|||
from __future__ import absolute_import, unicode_literals
|
||||
|
||||
import os
|
||||
|
||||
from celery import Celery
|
||||
|
||||
# set the default Django settings module for the 'celery' program.
|
||||
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'fedireads.settings')
|
||||
|
||||
app = Celery('fedireads')
|
||||
|
||||
# Using a string here means the worker doesn't have to serialize
|
||||
# the configuration object to child processes.
|
||||
# - namespace='CELERY' means all celery-related configuration keys
|
||||
# should have a `CELERY_` prefix.
|
||||
app.config_from_object('django.conf:settings', namespace='CELERY')
|
||||
|
||||
# Load task modules from all registered Django app configs.
|
||||
app.autodiscover_tasks()
|
||||
|
||||
|
||||
@app.task(bind=True)
|
||||
def debug_task(self):
|
||||
print('Request: {0!r}'.format(self.request))
|
|
@ -13,6 +13,21 @@ import requests
|
|||
from fedireads import models, outgoing
|
||||
from fedireads import status as status_builder
|
||||
from fedireads.remote_user import get_or_create_remote_user
|
||||
from fedireads.tasks import app
|
||||
|
||||
|
||||
@csrf_exempt
|
||||
def inbox(request, username):
|
||||
''' incoming activitypub events '''
|
||||
# TODO: should do some kind of checking if the user accepts
|
||||
# this action from the sender probably? idk
|
||||
# but this will just throw a 404 if the user doesn't exist
|
||||
try:
|
||||
models.User.objects.get(localname=username)
|
||||
except models.User.DoesNotExist:
|
||||
return HttpResponseNotFound()
|
||||
|
||||
return shared_inbox(request)
|
||||
|
||||
|
||||
@csrf_exempt
|
||||
|
@ -40,7 +55,7 @@ def shared_inbox(request):
|
|||
'Like': handle_favorite,
|
||||
'Announce': handle_boost,
|
||||
'Add': {
|
||||
'Tag': handle_add,
|
||||
'Tag': handle_tag,
|
||||
},
|
||||
'Undo': {
|
||||
'Follow': handle_unfollow,
|
||||
|
@ -57,10 +72,11 @@ def shared_inbox(request):
|
|||
if isinstance(handler, dict):
|
||||
handler = handler.get(activity['object']['type'], None)
|
||||
|
||||
if handler:
|
||||
return handler(activity)
|
||||
if not handler:
|
||||
return HttpResponseNotFound()
|
||||
|
||||
return HttpResponseNotFound()
|
||||
handler.delay(activity)
|
||||
return HttpResponse()
|
||||
|
||||
|
||||
def verify_signature(request):
|
||||
|
@ -109,20 +125,7 @@ def verify_signature(request):
|
|||
return True
|
||||
|
||||
|
||||
@csrf_exempt
|
||||
def inbox(request, username):
|
||||
''' incoming activitypub events '''
|
||||
# TODO: should do some kind of checking if the user accepts
|
||||
# this action from the sender probably? idk
|
||||
# but this will just throw a 404 if the user doesn't exist
|
||||
try:
|
||||
models.User.objects.get(localname=username)
|
||||
except models.User.DoesNotExist:
|
||||
return HttpResponseNotFound()
|
||||
|
||||
return shared_inbox(request)
|
||||
|
||||
|
||||
@app.task
|
||||
def handle_follow(activity):
|
||||
''' someone wants to follow a local user '''
|
||||
# figure out who they want to follow
|
||||
|
@ -141,7 +144,7 @@ def handle_follow(activity):
|
|||
# Duplicate follow request. Not sure what the correct behaviour is, but
|
||||
# just dropping it works for now. We should perhaps generate the
|
||||
# Accept, but then do we need to match the activity id?
|
||||
return HttpResponse()
|
||||
return
|
||||
|
||||
if not to_follow.manually_approves_followers:
|
||||
status_builder.create_notification(
|
||||
|
@ -156,9 +159,9 @@ def handle_follow(activity):
|
|||
'FOLLOW_REQUEST',
|
||||
related_user=user
|
||||
)
|
||||
return HttpResponse()
|
||||
|
||||
|
||||
@app.task
|
||||
def handle_unfollow(activity):
|
||||
''' unfollow a local user '''
|
||||
obj = activity['object']
|
||||
|
@ -172,9 +175,9 @@ def handle_unfollow(activity):
|
|||
return HttpResponseNotFound()
|
||||
|
||||
to_unfollow.followers.remove(requester)
|
||||
return HttpResponse()
|
||||
|
||||
|
||||
@app.task
|
||||
def handle_follow_accept(activity):
|
||||
''' hurray, someone remote accepted a follow request '''
|
||||
# figure out who they want to follow
|
||||
|
@ -191,9 +194,9 @@ def handle_follow_accept(activity):
|
|||
except models.UserFollowRequest.DoesNotExist:
|
||||
pass
|
||||
accepter.followers.add(requester)
|
||||
return HttpResponse()
|
||||
|
||||
|
||||
@app.task
|
||||
def handle_follow_reject(activity):
|
||||
''' someone is rejecting a follow request '''
|
||||
requester = models.User.objects.get(actor=activity['object']['actor'])
|
||||
|
@ -208,8 +211,8 @@ def handle_follow_reject(activity):
|
|||
except models.UserFollowRequest.DoesNotExist:
|
||||
pass
|
||||
|
||||
return HttpResponse()
|
||||
|
||||
@app.task
|
||||
def handle_create(activity):
|
||||
''' someone did something, good on them '''
|
||||
user = get_or_create_remote_user(activity['actor'])
|
||||
|
@ -219,7 +222,7 @@ def handle_create(activity):
|
|||
|
||||
if user.local:
|
||||
# we really oughtn't even be sending in this case
|
||||
return HttpResponse()
|
||||
return
|
||||
|
||||
if activity['object'].get('fedireadsType') in ['Review', 'Comment'] and \
|
||||
'inReplyToBook' in activity['object']:
|
||||
|
@ -251,9 +254,9 @@ def handle_create(activity):
|
|||
except ValueError:
|
||||
return HttpResponseBadRequest()
|
||||
|
||||
return HttpResponse()
|
||||
|
||||
|
||||
@app.task
|
||||
def handle_favorite(activity):
|
||||
''' approval of your good good post '''
|
||||
try:
|
||||
|
@ -261,7 +264,7 @@ def handle_favorite(activity):
|
|||
status = models.Status.objects.get(id=status_id)
|
||||
liker = get_or_create_remote_user(activity['actor'])
|
||||
except (models.Status.DoesNotExist, models.User.DoesNotExist):
|
||||
return HttpResponseNotFound()
|
||||
return
|
||||
|
||||
if not liker.local:
|
||||
status_builder.create_favorite_from_activity(liker, activity)
|
||||
|
@ -272,21 +275,20 @@ def handle_favorite(activity):
|
|||
related_user=liker,
|
||||
related_status=status,
|
||||
)
|
||||
return HttpResponse()
|
||||
|
||||
|
||||
@app.task
|
||||
def handle_unfavorite(activity):
|
||||
''' approval of your good good post '''
|
||||
try:
|
||||
favorite_id = activity['object']['id']
|
||||
fav = status_builder.get_favorite(favorite_id)
|
||||
except models.Favorite.DoesNotExist:
|
||||
favorite_id = activity['object']['id']
|
||||
fav = status_builder.get_favorite(favorite_id)
|
||||
if not fav:
|
||||
return HttpResponseNotFound()
|
||||
|
||||
fav.delete()
|
||||
return HttpResponse()
|
||||
|
||||
|
||||
@app.task
|
||||
def handle_boost(activity):
|
||||
''' someone gave us a boost! '''
|
||||
try:
|
||||
|
@ -306,16 +308,12 @@ def handle_boost(activity):
|
|||
related_status=status,
|
||||
)
|
||||
|
||||
return HttpResponse()
|
||||
|
||||
def handle_add(activity):
|
||||
@app.task
|
||||
def handle_tag(activity):
|
||||
''' someone is tagging or shelving a book '''
|
||||
if activity['object']['type'] == 'Tag':
|
||||
user = get_or_create_remote_user(activity['actor'])
|
||||
if not user.local:
|
||||
book = activity['target']['id'].split('/')[-1]
|
||||
status_builder.create_tag(user, book, activity['object']['name'])
|
||||
return HttpResponse()
|
||||
return HttpResponse()
|
||||
return HttpResponseNotFound()
|
||||
user = get_or_create_remote_user(activity['actor'])
|
||||
if not user.local:
|
||||
book = activity['target']['id'].split('/')[-1]
|
||||
status_builder.create_tag(user, book, activity['object']['name'])
|
||||
|
||||
|
|
|
@ -81,9 +81,7 @@ def handle_account_search(query):
|
|||
def handle_follow(user, to_follow):
|
||||
''' someone local wants to follow someone '''
|
||||
activity = activitypub.get_follow_request(user, to_follow)
|
||||
errors = broadcast(user, activity, [to_follow.inbox])
|
||||
for error in errors:
|
||||
raise(error['error'])
|
||||
broadcast(user, activity, [to_follow.inbox])
|
||||
|
||||
|
||||
def handle_unfollow(user, to_unfollow):
|
||||
|
@ -93,10 +91,8 @@ def handle_unfollow(user, to_unfollow):
|
|||
user_object=to_unfollow
|
||||
)
|
||||
activity = activitypub.get_unfollow(relationship)
|
||||
errors = broadcast(user, activity, [to_unfollow.inbox])
|
||||
broadcast(user, activity, [to_unfollow.inbox])
|
||||
to_unfollow.followers.remove(user)
|
||||
for error in errors:
|
||||
raise(error['error'])
|
||||
|
||||
|
||||
def handle_accept(user, to_follow, follow_request):
|
||||
|
@ -187,7 +183,8 @@ def handle_import_books(user, items):
|
|||
|
||||
create_activity = activitypub.get_create(
|
||||
user, activitypub.get_status(status))
|
||||
broadcast(user, create_activity, get_recipients(user, 'public'))
|
||||
recipients = get_recipients(user, 'public')
|
||||
broadcast(user, create_activity, recipients)
|
||||
|
||||
|
||||
def handle_review(user, book, name, content, rating):
|
||||
|
|
|
@ -5,6 +5,13 @@ from environs import Env
|
|||
|
||||
env = Env()
|
||||
|
||||
# celery
|
||||
CELERY_BROKER = env('CELERY_BROKER')
|
||||
CELERY_RESULT_BACKEND = env('CELERY_RESULT_BACKEND')
|
||||
CELERY_ACCEPT_CONTENT = ['application/json']
|
||||
CELERY_TASK_SERIALIZER = 'json'
|
||||
CELERY_RESULT_SERIALIZER = 'json'
|
||||
|
||||
# Build paths inside the project like this: os.path.join(BASE_DIR, ...)
|
||||
BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
|
||||
|
||||
|
@ -21,12 +28,6 @@ DOMAIN = env('DOMAIN')
|
|||
ALLOWED_HOSTS = env.list('ALLOWED_HOSTS', ['*'])
|
||||
OL_URL = env('OL_URL')
|
||||
|
||||
# celery/rebbitmq
|
||||
CELERY_BROKER_URL = env('CELERY_BROKER')
|
||||
CELERY_ACCEPT_CONTENT = ['json']
|
||||
CELERY_TASK_SERIALIZER = 'json'
|
||||
CELERY_RESULT_BACKEND = 'amqp'
|
||||
|
||||
# Application definition
|
||||
|
||||
INSTALLED_APPS = [
|
||||
|
|
14
fedireads/tasks.py
Normal file
14
fedireads/tasks.py
Normal file
|
@ -0,0 +1,14 @@
|
|||
''' background tasks '''
|
||||
from celery import Celery
|
||||
import os
|
||||
|
||||
from fedireads import settings
|
||||
|
||||
# set the default Django settings module for the 'celery' program.
|
||||
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'fr_celery.settings')
|
||||
app = Celery(
|
||||
'tasks',
|
||||
broker=settings.CELERY_BROKER,
|
||||
)
|
||||
|
||||
|
Loading…
Add table
Add a link
Reference in a new issue