Files
org-social-relay/app/feeds/tasks.py
Andros Fenollosa b13cf2fad4 Add support for Org Social v1.6
New features from Org Social v1.6:
- Add LOCATION, BIRTHDAY, LANGUAGE, PINNED fields to Profile model
- Support post ID in header (** 2025-05-01T12:00:00+0100)
- Header ID takes priority over property drawer ID
- Parse and store all new v1.6 metadata fields

Changes:
- Updated Profile model with 4 new fields
- Updated parser to extract new metadata fields
- Updated parser to support ID in post headers
- Updated tasks.py to save new profile fields
- Added database migration 0010
- Added 3 new tests for v1.6 features
- Renamed SKILL.md to CLAUDE.MD

All tests passing (58/58)
2026-01-05 13:55:56 +01:00

835 lines
34 KiB
Python

from huey.contrib.djhuey import periodic_task
from huey import crontab
import logging
import requests
import hashlib
from django.utils import timezone
from datetime import timedelta
logger = logging.getLogger(__name__)
@periodic_task(crontab(hour="*/3")) # Run every 3 hours
def discover_feeds_from_relay_nodes():
"""
Periodic task to discover new feeds from other Org Social Relay nodes.
This task:
1. Fetches the list of relay nodes from the public URL
2. Filters out our own domain to avoid self-discovery
3. Calls each relay node's /feeds endpoint to get their registered feeds
4. Stores newly discovered feeds in our local database
"""
import django
django.setup()
from django.conf import settings
from .models import Feed
from .parser import validate_org_social_feed
# URLs to fetch feeds from
feed_sources = [
{
"name": "relay nodes",
"url": "https://cdn.jsdelivr.net/gh/tanrax/org-social/org-social-relay-list.txt",
"type": "relay_nodes",
},
{
"name": "public register",
"url": "https://raw.githubusercontent.com/tanrax/org-social/main/registers.txt",
"type": "direct_feeds",
},
]
total_discovered = 0
for source in feed_sources:
logger.info(f"Fetching feeds from {source['name']}: {source['url']}")
try:
# Fetch the list
response = requests.get(source["url"], timeout=5)
response.raise_for_status()
# The file might be empty or contain one URL per line
urls = [line.strip() for line in response.text.split("\n") if line.strip()]
if source["type"] == "direct_feeds":
# For direct feeds (registers.txt), validate and add them directly
logger.info(f"Found {len(urls)} direct feeds to validate")
for feed_url in urls:
if not feed_url.strip():
continue
feed_url = feed_url.strip()
# Check if we already have this feed
if Feed.objects.filter(url=feed_url).exists():
continue
# Validate the feed before adding it
logger.info(f"Validating direct feed: {feed_url}")
is_valid, error_message = validate_org_social_feed(feed_url)
if not is_valid:
logger.warning(
f"Skipping invalid direct feed {feed_url}: {error_message}"
)
continue
# Create the feed
try:
Feed.objects.create(url=feed_url)
total_discovered += 1
logger.info(f"Added direct feed: {feed_url}")
except Exception as e:
logger.error(f"Failed to create direct feed {feed_url}: {e}")
elif source["type"] == "relay_nodes":
# For relay nodes, get their feeds endpoints
relay_nodes = urls
# Filter out our own domain to avoid self-discovery
site_domain = settings.SITE_DOMAIN
filtered_nodes = []
for node_url in relay_nodes:
# Normalize the URL for comparison
normalized_node = (
node_url.replace("http://", "")
.replace("https://", "")
.strip("/")
)
normalized_site = site_domain.strip("/")
if normalized_node != normalized_site:
filtered_nodes.append(node_url)
else:
logger.info(f"Skipping own domain: {node_url}")
relay_nodes = filtered_nodes
if not relay_nodes:
logger.info(
"No relay nodes found in the list after filtering own domain"
)
continue
logger.info(
f"Found {len(relay_nodes)} relay nodes to check (excluding own domain)"
)
for node_url in relay_nodes:
try:
# Ensure the URL has proper format
if not node_url.startswith(("http://", "https://")):
node_url = f"http://{node_url}"
# Call the /feeds endpoint on each relay node
feeds_url = f"{node_url}/feeds"
feeds_response = requests.get(feeds_url, timeout=10)
feeds_response.raise_for_status()
feeds_data = feeds_response.json()
# Check if response has expected format
if feeds_data.get("type") == "Success" and "data" in feeds_data:
feeds_list = feeds_data["data"]
for feed_url in feeds_list:
if isinstance(feed_url, str) and feed_url.strip():
feed_url = feed_url.strip()
# Check if we already have this feed
if Feed.objects.filter(url=feed_url).exists():
continue
# Validate the feed before adding it
logger.info(
f"Validating discovered feed: {feed_url}"
)
is_valid, error_message = validate_org_social_feed(
feed_url
)
if not is_valid:
logger.warning(
f"Skipping invalid feed {feed_url}: {error_message}"
)
continue
# Create the feed
try:
Feed.objects.create(url=feed_url)
total_discovered += 1
logger.info(
f"Discovered and validated new feed: {feed_url}"
)
except Exception as e:
logger.error(
f"Failed to create feed {feed_url}: {e}"
)
logger.info(f"Successfully checked relay node: {node_url}")
except requests.RequestException as e:
logger.warning(
f"Failed to fetch feeds from relay node {node_url}: {e}"
)
except ValueError as e:
logger.warning(
f"Invalid JSON response from relay node {node_url}: {e}"
)
except Exception as e:
logger.error(
f"Unexpected error checking relay node {node_url}: {e}"
)
except requests.RequestException as e:
logger.error(f"Failed to fetch {source['name']} from {source['url']}: {e}")
except Exception as e:
logger.error(f"Unexpected error processing {source['name']}: {e}")
logger.info(
f"Feed discovery completed. Total new feeds discovered: {total_discovered}"
)
@periodic_task(crontab(hour=0, minute=0)) # Run daily at midnight
def discover_new_feeds_from_follows():
"""
Periodic task to discover new feeds by analyzing the feeds followed by registered users.
This task:
1. Gets all registered feeds from the database
2. For each feed, fetches and parses the content
3. Extracts URLs from #+FOLLOW: lines
4. Adds newly discovered feeds to the database
"""
import django
django.setup()
from .models import Feed, Profile, Follow
from .parser import parse_org_social, validate_org_social_feed
logger.info("Starting discovery of new feeds from user follows")
# Get all registered feeds
all_feeds = Feed.objects.all()
total_feeds = all_feeds.count()
if total_feeds == 0:
logger.info("No registered feeds found")
return
logger.info(f"Analyzing {total_feeds} registered feeds for new follows")
total_discovered = 0
successful_parses = 0
failed_parses = 0
for feed in all_feeds:
try:
# Parse the org social file (may update feed URL if redirected)
parsed_data = parse_org_social(feed.url)
successful_parses += 1
# Refresh feed from database (URL may have changed due to redirect)
feed.refresh_from_db()
# Extract follow URLs from metadata
follows = parsed_data.get("metadata", {}).get("follows", [])
if not follows:
continue
logger.info(f"Found {len(follows)} follows in {feed.url}")
# Get or create profile for this feed
profile = None
try:
metadata = parsed_data.get("metadata", {})
content_for_hash = f"{metadata}{parsed_data.get('posts', [])}"
content_hash = hashlib.md5(content_for_hash.encode()).hexdigest()
profile, _ = Profile.objects.get_or_create(
feed=feed.url,
defaults={
"title": metadata.get("title", ""),
"nick": metadata.get("nick", ""),
"description": metadata.get("description", ""),
"avatar": metadata.get("avatar", ""),
"location": metadata.get("location", ""),
"birthday": metadata.get("birthday") or None,
"language": metadata.get("language", ""),
"pinned": metadata.get("pinned", ""),
"version": content_hash,
},
)
except Exception as e:
logger.warning(f"Failed to create/update profile for {feed.url}: {e}")
# Process each follow URL
for follow_info in follows:
follow_url = follow_info.get("url", "").strip()
follow_nickname = follow_info.get("nickname", "").strip()
# Basic URL validation
if not follow_url or not follow_url.startswith(("http://", "https://")):
continue
# Check if feed already exists
existing_feed = Feed.objects.filter(url=follow_url).first()
if not existing_feed:
# Validate the feed before adding it
logger.info(f"Validating discovered follow feed: {follow_url}")
is_valid, error_message = validate_org_social_feed(follow_url)
if not is_valid:
logger.warning(
f"Skipping invalid follow feed {follow_url}: {error_message}"
)
continue
# Create new feed
try:
Feed.objects.create(url=follow_url)
total_discovered += 1
logger.info(
f"Discovered and validated new feed from follows: {follow_url}"
)
except Exception as e:
logger.warning(f"Failed to create feed {follow_url}: {e}")
# Create follow relationship if we have a profile
if profile:
try:
# Get or create the followed profile
followed_profile, _ = Profile.objects.get_or_create(
feed=follow_url,
defaults={
"title": "",
"nick": follow_nickname or "",
"description": "",
"avatar": "",
"version": "",
},
)
# Create follow relationship
follow_obj, created = Follow.objects.get_or_create(
follower=profile,
followed=followed_profile,
defaults={"nickname": follow_nickname},
)
if not created and follow_obj.nickname != follow_nickname:
# Update nickname if changed
follow_obj.nickname = follow_nickname
follow_obj.save()
except Exception as e:
logger.warning(
f"Failed to create follow relationship {profile.nick} -> {follow_url}: {e}"
)
except requests.RequestException as e:
failed_parses += 1
logger.warning(f"Failed to fetch/parse feed {feed.url}: {e}")
except Exception as e:
failed_parses += 1
logger.error(f"Unexpected error parsing feed {feed.url}: {e}")
logger.info(
f"Feed discovery from follows completed. "
f"Analyzed: {total_feeds} feeds, "
f"Successful: {successful_parses}, "
f"Failed: {failed_parses}, "
f"New feeds discovered: {total_discovered}"
)
@periodic_task(crontab(minute="*")) # Run every minute
def scan_feeds():
"""
Periodic task to scan all registered feeds for new posts and profile updates.
This task:
1. Clears the cache so next requests after scan will get fresh data
2. Gets all registered feeds from the database
3. For each feed, fetches and parses the content
4. Creates or updates Profile data with version control
5. Creates or updates Posts with their properties
6. Manages relationships (follows, contacts, links)
Note: Cache is cleared AFTER scanning so that:
- During scan: users get old cached data (complete and consistent, even if outdated)
- After scan: cache is cleared and next requests get fresh data from database
This ensures data consistency: users either see complete old data or complete new data,
never a mix of both during the scanning process.
"""
import django
django.setup()
from .models import (
Feed,
Profile,
Post,
ProfileLink,
ProfileContact,
PollOption,
PollVote,
Mention,
)
from .parser import parse_org_social
from dateutil import parser as date_parser
logger.info("Starting feed scanning for posts and profile updates")
# Get all registered feeds
all_feeds = Feed.objects.all()
total_feeds = all_feeds.count()
if total_feeds == 0:
logger.info("No registered feeds found")
return
logger.info(f"Scanning {total_feeds} feeds for updates")
successful_scans = 0
failed_scans = 0
profiles_updated = 0
profiles_created = 0
posts_created = 0
posts_updated = 0
for feed in all_feeds:
try:
# Parse the org social file (may update feed URL if redirected)
parsed_data = parse_org_social(feed.url)
successful_scans += 1
# Refresh feed from database (URL may have changed due to redirect)
feed.refresh_from_db()
metadata = parsed_data.get("metadata", {})
posts_data = parsed_data.get("posts", [])
# Generate version hash from content
content_for_hash = f"{metadata}{posts_data}"
content_hash = hashlib.md5(content_for_hash.encode()).hexdigest()
# Get or create profile
profile, profile_created = Profile.objects.get_or_create(
feed=feed.url,
defaults={
"title": metadata.get("title", ""),
"nick": metadata.get("nick", ""),
"description": metadata.get("description", ""),
"avatar": metadata.get("avatar", ""),
"location": metadata.get("location", ""),
"birthday": metadata.get("birthday") or None,
"language": metadata.get("language", ""),
"pinned": metadata.get("pinned", ""),
"version": content_hash,
},
)
if profile_created:
profiles_created += 1
logger.info(f"Created new profile: {profile.nick} ({feed.url})")
else:
# Check if content has changed by comparing version
if profile.version != content_hash:
# Update profile data
profile.title = metadata.get("title", "")
profile.nick = metadata.get("nick", "")
profile.description = metadata.get("description", "")
profile.avatar = metadata.get("avatar", "")
profile.location = metadata.get("location", "")
profile.birthday = metadata.get("birthday") or None
profile.language = metadata.get("language", "")
profile.pinned = metadata.get("pinned", "")
profile.version = content_hash
profile.save()
profiles_updated += 1
logger.info(f"Updated profile: {profile.nick} ({feed.url})")
# Update profile relationships (clear and recreate)
profile.links.all().delete()
profile.contacts.all().delete()
# Create profile links
for link_url in metadata.get("links", []):
if link_url.strip():
ProfileLink.objects.create(profile=profile, url=link_url.strip())
# Create profile contacts
for contact in metadata.get("contacts", []):
if contact.strip():
contact_parts = contact.strip().split(":", 1)
if len(contact_parts) == 2:
contact_type = contact_parts[0].strip()
contact_value = contact_parts[1].strip()
ProfileContact.objects.create(
profile=profile,
contact_type=contact_type,
contact_value=contact_value,
)
# Process posts
for post_data in posts_data:
post_id = post_data.get("id", "")
if not post_id:
continue
content = post_data.get("content", "")
properties = post_data.get("properties", {})
# Parse post_id as timestamp for created_at
# post_id is in RFC 3339 format (e.g., "2025-01-01T12:00:00+00:00")
post_created_at = timezone.now() # Default fallback
try:
post_created_at = date_parser.parse(post_id)
except Exception as e:
logger.warning(
f"Failed to parse post_id {post_id} as timestamp: {e}. Using current time."
)
# Extract group name from GROUP property
# Format: "Emacs https://org-social-relay.andros.dev" or just "Emacs"
# Group names can have spaces and capitals - we slugify them
from django.conf import settings
group_metadata = properties.get("group", "").strip()
group_slug = ""
if group_metadata:
# Extract group name (everything before the URL if present)
parts = group_metadata.split("http", 1)
raw_group_name = parts[0].strip()
if raw_group_name:
# Slugify the group name to match ENABLED_GROUPS format
from core.settings import slugify_group
group_slug = slugify_group(raw_group_name)
# Only save if it matches an enabled group
if group_slug not in settings.ENABLED_GROUPS:
group_slug = ""
# Get or create post
post, post_created = Post.objects.get_or_create(
profile=profile,
post_id=post_id,
defaults={
"content": content,
"language": properties.get("lang", ""),
"tags": properties.get("tags", ""),
"client": properties.get("client", ""),
"reply_to": properties.get("reply_to", ""),
"mood": properties.get("mood", ""),
"group": group_slug,
"include": properties.get("include", ""),
"poll_end": None,
"created_at": post_created_at,
},
)
if post_created:
posts_created += 1
# Publish notifications for NEW posts
from .notification_publisher import publish_notification
# Check if this is a reply (with or without mood/reaction)
if post.reply_to:
reply_to_parts = post.reply_to.split("#")
if len(reply_to_parts) == 2:
replied_feed_url = reply_to_parts[0]
# Determine if it's a reaction or a reply
if post.mood and post.mood.strip():
# It's a reaction
publish_notification(
target_feed_url=replied_feed_url,
notification_type="reaction",
post_url=f"{feed.url}#{post_id}",
emoji=post.mood,
parent=post.reply_to,
)
else:
# It's a regular reply
publish_notification(
target_feed_url=replied_feed_url,
notification_type="reply",
post_url=f"{feed.url}#{post_id}",
parent=post.reply_to,
)
# Check if this is a boost
if post.include:
include_parts = post.include.split("#")
if len(include_parts) == 2:
boosted_feed_url = include_parts[0]
publish_notification(
target_feed_url=boosted_feed_url,
notification_type="boost",
post_url=f"{feed.url}#{post_id}",
boosted=post.include,
)
else:
# Update existing post
post.content = content
post.language = properties.get("lang", "")
post.tags = properties.get("tags", "")
post.client = properties.get("client", "")
post.reply_to = properties.get("reply_to", "")
post.mood = properties.get("mood", "")
post.group = group_slug
post.include = properties.get("include", "")
post.save()
posts_updated += 1
# Handle poll_end if present
poll_end_str = properties.get("poll_end", "")
if poll_end_str:
try:
poll_end_dt = date_parser.parse(poll_end_str)
post.poll_end = poll_end_dt
post.save()
except Exception as e:
logger.warning(f"Failed to parse poll_end {poll_end_str}: {e}")
# Handle poll options
poll_options = post_data.get("poll_options", [])
if poll_options:
# Clear existing poll options
post.poll_options.all().delete()
# Create new poll options
for idx, option_text in enumerate(poll_options):
PollOption.objects.create(
post=post, option_text=option_text, order=idx
)
# Handle poll votes
poll_option = properties.get("poll_option", "")
if poll_option and post.reply_to:
# This is a poll vote
try:
# Extract the poll post reference from reply_to
reply_parts = post.reply_to.split("#")
if len(reply_parts) == 2:
poll_feed_url = reply_parts[0]
poll_post_id = reply_parts[1]
# Find the poll post
poll_profile = Profile.objects.filter(
feed=poll_feed_url
).first()
if poll_profile:
poll_post = Post.objects.filter(
profile=poll_profile, post_id=poll_post_id
).first()
if poll_post:
# Create or update poll vote
poll_vote, created = PollVote.objects.get_or_create(
post=post,
poll_post=poll_post,
defaults={"poll_option": poll_option},
)
if not created:
poll_vote.poll_option = poll_option
poll_vote.save()
except Exception as e:
logger.warning(
f"Failed to process poll vote for post {post_id}: {e}"
)
# Handle mentions - FIXED to detect new mentions
mentions_data = post_data.get("mentions", [])
if mentions_data:
# Get existing mentions for this post
existing_mentions = set(
post.mentions.values_list("mentioned_profile__feed", flat=True)
)
# Process each mention
for mention_info in mentions_data:
mention_url = mention_info.get("url", "").strip()
mention_nickname = mention_info.get("nickname", "").strip()
if not mention_url:
continue
# Extract the base URL (remove post ID after #)
base_mention_url = mention_url.split("#")[0]
# Try to find the mentioned profile
try:
mentioned_profile = Profile.objects.get(
feed=base_mention_url
)
# Only create if it doesn't exist (to detect new mentions)
if base_mention_url not in existing_mentions:
mention, mention_created = (
Mention.objects.get_or_create(
post=post,
mentioned_profile=mentioned_profile,
defaults={"nickname": mention_nickname},
)
)
# If this is a NEW mention, publish notification
if mention_created:
from .notification_publisher import (
publish_notification,
)
publish_notification(
target_feed_url=base_mention_url,
notification_type="mention",
post_url=f"{feed.url}#{post_id}",
)
except Profile.DoesNotExist:
# The mentioned profile doesn't exist in our database
logger.debug(f"Mentioned profile not found: {mention_url}")
continue
except Exception as e:
logger.warning(
f"Failed to create mention for {mention_url} in post {post_id}: {e}"
)
# Detect and remove deleted posts
# Get all post IDs from the current feed scan
current_post_ids = {post_data.get("id", "") for post_data in posts_data}
current_post_ids.discard("") # Remove empty IDs
# Get all post IDs currently in database for this profile
existing_posts = Post.objects.filter(profile=profile)
existing_post_ids = set(existing_posts.values_list("post_id", flat=True))
# Find posts that are in DB but not in current feed (deleted posts)
deleted_post_ids = existing_post_ids - current_post_ids
if deleted_post_ids:
# Delete posts that no longer exist in the feed
deleted_count = Post.objects.filter(
profile=profile, post_id__in=deleted_post_ids
).delete()[0]
logger.info(
f"Removed {deleted_count} deleted post(s) from {feed.url}: {deleted_post_ids}"
)
except requests.RequestException as e:
failed_scans += 1
logger.warning(f"Failed to fetch/parse feed {feed.url}: {e}")
except Exception as e:
failed_scans += 1
logger.error(f"Unexpected error scanning feed {feed.url}: {e}")
logger.info(
f"Feed scanning completed. "
f"Scanned: {total_feeds} feeds, "
f"Successful: {successful_scans}, "
f"Failed: {failed_scans}, "
f"Profiles created: {profiles_created}, "
f"Profiles updated: {profiles_updated}, "
f"Posts created: {posts_created}, "
f"Posts updated: {posts_updated}"
)
# Update global relay metadata BEFORE clearing cache
# This ensures the new ETag/Last-Modified are ready when cache is cleared
from .models import RelayMetadata
RelayMetadata.update_global_metadata()
logger.info("Updated global relay metadata (ETag and Last-Modified)")
# Clear cache AFTER scanning to ensure next requests get fresh data
# This way during scan users see complete old cached data (consistent),
# and after scan they see complete new data (also consistent)
from django.core.cache import cache
# Invalidate middleware cache for headers (will be recreated from DB on next request)
cache.delete("relay_metadata_headers")
# Clear all endpoint caches
cache.clear()
logger.info("Cache cleared after feed scanning - next requests will get fresh data")
def _cleanup_stale_feeds_impl():
"""
Implementation of stale feed cleanup logic.
This is separated from the periodic task to allow for easier testing.
Returns:
int: Number of feeds deleted
"""
from .models import Feed
logger.info("Starting cleanup of stale feeds")
# Calculate the cutoff date (3 days ago)
cutoff_date = timezone.now() - timedelta(days=3)
# Find stale feeds (last_successful_fetch is older than 3 days)
# Exclude feeds where last_successful_fetch is NULL (legacy feeds)
stale_feeds = Feed.objects.filter(
last_successful_fetch__lt=cutoff_date,
last_successful_fetch__isnull=False,
)
stale_count = stale_feeds.count()
if stale_count == 0:
logger.info("No stale feeds found to clean up")
return 0
# Log the feeds being deleted
logger.info(f"Found {stale_count} stale feeds to delete")
for feed in stale_feeds[:10]: # Log first 10 for reference
days_since_fetch = (timezone.now() - feed.last_successful_fetch).days
logger.info(
f"Deleting stale feed: {feed.url} "
f"(last successful fetch: {days_since_fetch} days ago)"
)
if stale_count > 10:
logger.info(f"... and {stale_count - 10} more feeds")
# Delete the stale feeds
deleted_count, _ = stale_feeds.delete()
logger.info(
f"Stale feed cleanup completed. Deleted {deleted_count} feeds that "
f"haven't been successfully fetched in the last 3 days"
)
return deleted_count
@periodic_task(crontab(day="*/3", hour=2, minute=0)) # Run every 3 days at 2 AM
def cleanup_stale_feeds():
"""
Periodic task to clean up feeds that haven't been successfully fetched in 3 days.
This task:
1. Finds all feeds with last_successful_fetch older than 3 days
2. Deletes those feeds from the database
3. Logs the cleanup results
Note: Feeds with last_successful_fetch = NULL are NOT deleted.
This protects feeds that existed before the field was added.
"""
return _cleanup_stale_feeds_impl()