Files
org-social-relay/app/feeds/parser.py
Andros Fenollosa b13cf2fad4 Add support for Org Social v1.6
New features from Org Social v1.6:
- Add LOCATION, BIRTHDAY, LANGUAGE, PINNED fields to Profile model
- Support post ID in header (** 2025-05-01T12:00:00+0100)
- Header ID takes priority over property drawer ID
- Parse and store all new v1.6 metadata fields

Changes:
- Updated Profile model with 4 new fields
- Updated parser to extract new metadata fields
- Updated parser to support ID in post headers
- Updated tasks.py to save new profile fields
- Added database migration 0010
- Added 3 new tests for v1.6 features
- Renamed SKILL.md to CLAUDE.MD

All tests passing (58/58)
2026-01-05 13:55:56 +01:00

634 lines
25 KiB
Python

import re
import requests
from typing import Dict, Any, Tuple
from django.utils import timezone
import logging
logger = logging.getLogger(__name__)
def _update_feed_last_successful_fetch(url: str):
"""
Update the last_successful_fetch field for a feed URL.
This is called when a feed is successfully fetched with HTTP 200.
"""
try:
from .models import Feed
Feed.objects.filter(url=url).update(last_successful_fetch=timezone.now())
except Exception:
# Silently fail if Feed model is not available or update fails
# This prevents breaking existing code during migrations or in tests
pass
def _handle_feed_redirect(old_url: str, new_url: str):
"""
Handle feed URL redirect by updating or merging feeds.
If a feed redirects to a new URL:
1. Check if new URL already exists as a feed
2. If yes: merge data and delete old URL
3. If no: update old URL to new URL
Args:
old_url: Original URL that redirected
new_url: Final URL after redirect
"""
try:
from .models import Feed, Profile, Post, Follow, Mention, PollVote
from django.db import transaction
old_feed = Feed.objects.filter(url=old_url).first()
new_feed = Feed.objects.filter(url=new_url).first()
if old_feed and new_feed:
# Both URLs exist - merge them
logger.info(f"Feed redirect detected: {old_url} -> {new_url}")
logger.info("Both feeds exist. Merging old feed into new feed.")
with transaction.atomic():
# Get profiles for both feeds
old_profile = Profile.objects.filter(feed=old_url).first()
new_profile = Profile.objects.filter(feed=new_url).first()
if old_profile and new_profile:
# Merge profiles - keep the new one, migrate relationships
logger.info(
f"Merging profile data: {old_profile.nick} -> {new_profile.nick}"
)
# Migrate Follow relationships where old_profile is followed
follows_as_followed = Follow.objects.filter(followed=old_profile)
for follow in follows_as_followed:
# Check if this relationship already exists with new_profile
existing = Follow.objects.filter(
follower=follow.follower, followed=new_profile
).first()
if not existing:
# Update to point to new_profile
follow.followed = new_profile
try:
follow.save()
logger.debug(
f"Migrated follow relationship: {follow.follower.nick} -> {new_profile.nick}"
)
except Exception as e:
logger.warning(
f"Could not migrate follow relationship, deleting: {e}"
)
follow.delete()
else:
# Relationship already exists, delete duplicate
follow.delete()
logger.debug("Deleted duplicate follow relationship")
# Migrate Follow relationships where old_profile is follower
follows_as_follower = Follow.objects.filter(follower=old_profile)
for follow in follows_as_follower:
# Check if this relationship already exists with new_profile
existing = Follow.objects.filter(
follower=new_profile, followed=follow.followed
).first()
if not existing:
# Update to point to new_profile
follow.follower = new_profile
try:
follow.save()
logger.debug(
f"Migrated follow relationship: {new_profile.nick} -> {follow.followed.nick}"
)
except Exception as e:
logger.warning(
f"Could not migrate follow relationship, deleting: {e}"
)
follow.delete()
else:
# Relationship already exists, delete duplicate
follow.delete()
logger.debug("Deleted duplicate follow relationship")
# Update all Mentions pointing to old_profile
# Mentions don't have unique constraints, so bulk update is safe
Mention.objects.filter(mentioned_profile=old_profile).update(
mentioned_profile=new_profile
)
# Migrate posts from old_profile to new_profile (avoid duplicates)
old_posts = Post.objects.filter(profile=old_profile)
for old_post in old_posts:
# Check if post already exists in new profile
existing_post = Post.objects.filter(
profile=new_profile, post_id=old_post.post_id
).first()
if not existing_post:
# Migrate post to new profile
old_post.profile = new_profile
old_post.save()
logger.debug(
f"Migrated post {old_post.post_id} to new profile"
)
else:
# Post already exists, handle poll votes carefully
# Get all poll votes pointing to old_post
poll_votes = PollVote.objects.filter(poll_post=old_post)
for poll_vote in poll_votes:
try:
# Check if this vote already exists for the existing_post
existing_vote = PollVote.objects.filter(
post=poll_vote.post, poll_post=existing_post
).first()
if not existing_vote:
# Update to point to existing_post
poll_vote.poll_post = existing_post
poll_vote.save()
logger.debug(
"Migrated poll vote to existing post"
)
else:
# Vote already exists, delete duplicate
poll_vote.delete()
logger.debug("Deleted duplicate poll vote")
except Exception as e:
# If there's any constraint error, just delete the vote
logger.warning(
f"Error migrating poll vote, deleting: {e}"
)
poll_vote.delete()
# Delete duplicate post
old_post.delete()
logger.debug(f"Removed duplicate post {old_post.post_id}")
# Delete old profile
old_profile.delete()
logger.info(f"Deleted old profile: {old_url}")
elif old_profile and not new_profile:
# Only old profile exists - update its feed URL
logger.info(f"Updating profile feed URL: {old_url} -> {new_url}")
old_profile.feed = new_url
old_profile.save()
# Delete the old feed
old_feed.delete()
logger.info(f"Deleted old feed: {old_url}")
elif old_feed and not new_feed:
# Only old URL exists - update it to new URL
logger.info(f"Feed redirect detected: {old_url} -> {new_url}")
logger.info(f"Updating feed URL to: {new_url}")
with transaction.atomic():
# Update the feed URL
old_feed.url = new_url
old_feed.save()
# Update all profiles pointing to old URL
profiles_updated = Profile.objects.filter(feed=old_url).update(
feed=new_url
)
logger.info(f"Updated {profiles_updated} profile(s) to new URL")
# If new_feed exists but not old_feed, nothing to do
# This can happen if the redirect was already processed
except Exception as e:
# Don't break parsing if redirect handling fails
logger.error(
f"Failed to handle redirect {old_url} -> {new_url}: {e}", exc_info=True
)
def parse_org_social(url: str) -> Dict[str, Any]:
"""
Parse an Org Social file from a URL and return structured data.
Args:
url: The URL to the social.org file
Returns:
Dictionary containing parsed metadata and posts
"""
try:
response = requests.get(url, timeout=5)
response.raise_for_status()
# Decode content as UTF-8 explicitly to avoid encoding issues
# when the server doesn't specify charset in Content-Type header
content = response.content.decode("utf-8")
# Check if URL was redirected
final_url = response.url
if final_url != url and response.history:
# URL was redirected - handle the redirect
logger.info(
f"Redirect detected: {url} -> {final_url} (status: {response.history[0].status_code})"
)
_handle_feed_redirect(url, final_url)
# Use final URL for further operations
url = final_url
# Update last_successful_fetch if we got a 200 response
if response.status_code == 200:
_update_feed_last_successful_fetch(url)
except requests.RequestException as e:
raise Exception(f"Failed to fetch URL {url}: {str(e)}")
# Initialize result structure
result: Dict[str, Any] = {
"metadata": {
"title": "",
"nick": "",
"description": "",
"avatar": "",
"location": "",
"birthday": "",
"language": "",
"pinned": "",
"links": [],
"follows": [],
"contacts": [],
},
"posts": [],
}
# Parse metadata with regex (case insensitive)
title_match = re.search(
r"^\s*\#\+TITLE:\s*(.+)$", content, re.MULTILINE | re.IGNORECASE
)
result["metadata"]["title"] = title_match.group(1).strip() if title_match else ""
nick_match = re.search(
r"^\s*\#\+NICK:\s*(.+)$", content, re.MULTILINE | re.IGNORECASE
)
result["metadata"]["nick"] = nick_match.group(1).strip() if nick_match else ""
description_match = re.search(
r"^\s*\#\+DESCRIPTION:\s*(.+)$", content, re.MULTILINE | re.IGNORECASE
)
result["metadata"]["description"] = (
description_match.group(1).strip() if description_match else ""
)
avatar_match = re.search(r"^\s*\#\+AVATAR:\s*(.+)$", content, re.MULTILINE)
result["metadata"]["avatar"] = avatar_match.group(1).strip() if avatar_match else ""
# Parse new v1.6 fields
location_match = re.search(r"^\s*\#\+LOCATION:\s*(.+)$", content, re.MULTILINE)
result["metadata"]["location"] = (
location_match.group(1).strip() if location_match else ""
)
birthday_match = re.search(r"^\s*\#\+BIRTHDAY:\s*(.+)$", content, re.MULTILINE)
result["metadata"]["birthday"] = (
birthday_match.group(1).strip() if birthday_match else ""
)
language_match = re.search(r"^\s*\#\+LANGUAGE:\s*(.+)$", content, re.MULTILINE)
result["metadata"]["language"] = (
language_match.group(1).strip() if language_match else ""
)
pinned_match = re.search(r"^\s*\#\+PINNED:\s*(.+)$", content, re.MULTILINE)
result["metadata"]["pinned"] = pinned_match.group(1).strip() if pinned_match else ""
# Parse multiple values
result["metadata"]["links"] = [
match.group(1).strip()
for match in re.finditer(r"^\s*\#\+LINK:\s*(.+)$", content, re.MULTILINE)
]
result["metadata"]["contacts"] = [
match.group(1).strip()
for match in re.finditer(r"^\s*\#\+CONTACT:\s*(.+)$", content, re.MULTILINE)
]
# Parse follows (can have nickname)
follow_matches = re.finditer(r"^\s*\#\+FOLLOW:\s*(.+)$", content, re.MULTILINE)
for match in follow_matches:
follow_data = match.group(1).strip()
parts = follow_data.split()
if len(parts) == 1:
result["metadata"]["follows"].append({"url": parts[0], "nickname": ""})
elif len(parts) >= 2:
result["metadata"]["follows"].append(
{"nickname": parts[0], "url": parts[1]}
)
# Parse posts - find everything after * Posts
posts_pattern = r"\*\s+Posts\s*\n(.*)"
posts_match = re.search(posts_pattern, content, re.DOTALL)
if posts_match:
posts_content = posts_match.group(1)
# Split posts by ** headers (exactly 2 asterisks, not 3+)
# Use negative lookahead (?!\*) to ensure we don't match *** or ****
# Use ^ anchor to match ** only at start of line
# Capture group 1: header content (can contain ID in v1.6)
# Capture group 2: properties text
# Capture group 3: post content
post_pattern = r"^\*\*(?!\*)([^\n]*)\n(?::PROPERTIES:\s*\n((?::[^:\n]+:[^\n]*\n)*):END:\s*\n)?(.*?)(?=^\*\*(?!\*)|\Z)"
post_matches = re.finditer(
post_pattern, posts_content, re.DOTALL | re.MULTILINE
)
for post_match in post_matches:
header_text = post_match.group(1).strip() if post_match.group(1) else ""
properties_text = post_match.group(2) or ""
content_text = post_match.group(3).strip() if post_match.group(3) else ""
post: Dict[str, Any] = {
"id": "",
"content": content_text,
"properties": {},
"mentions": [],
"poll_options": [],
}
# First check if ID is in header (v1.6 feature)
# Header ID takes priority over property drawer ID
if header_text:
# RFC 3339 format: ####-##-##T##:##:##[+-]####
header_id_match = re.match(
r"^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}[+-]\d{2}:?\d{2}$",
header_text,
)
if header_id_match:
post["id"] = header_text
# Parse properties
if properties_text:
# Use [ \t]* instead of \s* to avoid capturing newlines
prop_matches = re.finditer(r":([^:]+):[ \t]*([^\n]*)", properties_text)
for prop_match in prop_matches:
prop_name = prop_match.group(1).lower().strip()
prop_value = prop_match.group(2).strip()
# Only add non-empty properties
if prop_value:
post["properties"][prop_name] = prop_value
# If ID not already set from header, use property ID
if prop_name == "id" and not post["id"]:
post["id"] = prop_value
# Extract mentions from content
mention_matches = re.finditer(
r"\[\[org-social:([^\]]+)\]\[([^\]]+)\]\]", content_text
)
post["mentions"] = [
{"url": m.group(1), "nickname": m.group(2)} for m in mention_matches
]
# Extract poll options from content
poll_matches = re.finditer(
r"^\s*-\s*\[\s*\]\s*(.+)$", content_text, re.MULTILINE
)
post["poll_options"] = [m.group(1).strip() for m in poll_matches]
if post["id"]: # Only add posts with valid ID
result["posts"].append(post)
return result
def parse_org_social_content(content: str) -> Dict[str, Any]:
"""
Parse Org Social content directly and return structured data.
Args:
content: The raw content of the social.org file
Returns:
Dictionary containing parsed metadata and posts
"""
# Initialize result structure
result: Dict[str, Any] = {
"metadata": {
"title": "",
"nick": "",
"description": "",
"avatar": "",
"location": "",
"birthday": "",
"language": "",
"pinned": "",
"links": [],
"follows": [],
"contacts": [],
},
"posts": [],
}
# Parse metadata with regex (case insensitive)
title_match = re.search(
r"^\s*\#\+TITLE:\s*(.+)$", content, re.MULTILINE | re.IGNORECASE
)
result["metadata"]["title"] = title_match.group(1).strip() if title_match else ""
nick_match = re.search(
r"^\s*\#\+NICK:\s*(.+)$", content, re.MULTILINE | re.IGNORECASE
)
result["metadata"]["nick"] = nick_match.group(1).strip() if nick_match else ""
description_match = re.search(
r"^\s*\#\+DESCRIPTION:\s*(.+)$", content, re.MULTILINE | re.IGNORECASE
)
result["metadata"]["description"] = (
description_match.group(1).strip() if description_match else ""
)
avatar_match = re.search(r"^\s*\#\+AVATAR:\s*(.+)$", content, re.MULTILINE)
result["metadata"]["avatar"] = avatar_match.group(1).strip() if avatar_match else ""
# Parse new v1.6 fields
location_match = re.search(r"^\s*\#\+LOCATION:\s*(.+)$", content, re.MULTILINE)
result["metadata"]["location"] = (
location_match.group(1).strip() if location_match else ""
)
birthday_match = re.search(r"^\s*\#\+BIRTHDAY:\s*(.+)$", content, re.MULTILINE)
result["metadata"]["birthday"] = (
birthday_match.group(1).strip() if birthday_match else ""
)
language_match = re.search(r"^\s*\#\+LANGUAGE:\s*(.+)$", content, re.MULTILINE)
result["metadata"]["language"] = (
language_match.group(1).strip() if language_match else ""
)
pinned_match = re.search(r"^\s*\#\+PINNED:\s*(.+)$", content, re.MULTILINE)
result["metadata"]["pinned"] = pinned_match.group(1).strip() if pinned_match else ""
# Parse multiple values
result["metadata"]["links"] = [
match.group(1).strip()
for match in re.finditer(r"^\s*\#\+LINK:\s*(.+)$", content, re.MULTILINE)
]
result["metadata"]["contacts"] = [
match.group(1).strip()
for match in re.finditer(r"^\s*\#\+CONTACT:\s*(.+)$", content, re.MULTILINE)
]
# Parse follows (can have nickname)
follow_matches = re.finditer(r"^\s*\#\+FOLLOW:\s*(.+)$", content, re.MULTILINE)
for match in follow_matches:
follow_data = match.group(1).strip()
parts = follow_data.split()
if len(parts) == 1:
result["metadata"]["follows"].append({"url": parts[0], "nickname": ""})
elif len(parts) >= 2:
result["metadata"]["follows"].append(
{"nickname": parts[0], "url": parts[1]}
)
# Parse posts - find everything after * Posts
posts_pattern = r"\*\s+Posts\s*\n(.*)"
posts_match = re.search(posts_pattern, content, re.DOTALL)
if posts_match:
posts_content = posts_match.group(1)
# Split posts by ** headers (exactly 2 asterisks, not 3+)
# Use negative lookahead (?!\*) to ensure we don't match *** or ****
# Use ^ anchor to match ** only at start of line
# Capture group 1: header content (can contain ID in v1.6)
# Capture group 2: properties text
# Capture group 3: post content
post_pattern = r"^\*\*(?!\*)([^\n]*)\n(?::PROPERTIES:\s*\n((?::[^:\n]+:[^\n]*\n)*):END:\s*\n)?(.*?)(?=^\*\*(?!\*)|\Z)"
post_matches = re.finditer(
post_pattern, posts_content, re.DOTALL | re.MULTILINE
)
for post_match in post_matches:
header_text = post_match.group(1).strip() if post_match.group(1) else ""
properties_text = post_match.group(2) or ""
content_text = post_match.group(3).strip() if post_match.group(3) else ""
post: Dict[str, Any] = {
"id": "",
"content": content_text,
"properties": {},
"mentions": [],
"poll_options": [],
}
# First check if ID is in header (v1.6 feature)
# Header ID takes priority over property drawer ID
if header_text:
# RFC 3339 format: ####-##-##T##:##:##[+-]####
header_id_match = re.match(
r"^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}[+-]\d{2}:?\d{2}$",
header_text,
)
if header_id_match:
post["id"] = header_text
# Parse properties
if properties_text:
# Use [ \t]* instead of \s* to avoid capturing newlines
prop_matches = re.finditer(r":([^:]+):[ \t]*([^\n]*)", properties_text)
for prop_match in prop_matches:
prop_name = prop_match.group(1).lower().strip()
prop_value = prop_match.group(2).strip()
# Only add non-empty properties
if prop_value:
post["properties"][prop_name] = prop_value
# If ID not already set from header, use property ID
if prop_name == "id" and not post["id"]:
post["id"] = prop_value
# Extract mentions from content
mention_matches = re.finditer(
r"\[\[org-social:([^\]]+)\]\[([^\]]+)\]\]", content_text
)
post["mentions"] = [
{"url": m.group(1), "nickname": m.group(2)} for m in mention_matches
]
# Extract poll options from content
poll_matches = re.finditer(
r"^\s*-\s*\[\s*\]\s*(.+)$", content_text, re.MULTILINE
)
post["poll_options"] = [m.group(1).strip() for m in poll_matches]
if post["id"]: # Only add posts with valid ID
result["posts"].append(post)
return result
def validate_org_social_feed(url: str) -> Tuple[bool, str]:
"""
Validate if a URL returns a valid Org Social feed.
Args:
url: The URL to validate
Returns:
Tuple of (is_valid, error_message)
"""
try:
# Check if URL responds with 200
response = requests.get(url, timeout=5)
if response.status_code != 200:
return False, f"URL returned status code {response.status_code}"
# Check if URL was redirected
final_url = response.url
if final_url != url and response.history:
logger.info(f"Validation: Redirect detected: {url} -> {final_url}")
_handle_feed_redirect(url, final_url)
# Use final URL for validation
url = final_url
# Update last_successful_fetch since we got a 200 response
_update_feed_last_successful_fetch(url)
# Decode content as UTF-8 explicitly to avoid encoding issues
content = response.content.decode("utf-8")
# Check if content has basic Org Social structure
# At minimum should have at least one #+TITLE, #+NICK, or #+DESCRIPTION (case insensitive)
has_title = bool(
re.search(r"^\s*\#\+TITLE:\s*(.+)$", content, re.MULTILINE | re.IGNORECASE)
)
has_nick = bool(
re.search(r"^\s*\#\+NICK:\s*(.+)$", content, re.MULTILINE | re.IGNORECASE)
)
has_description = bool(
re.search(
r"^\s*\#\+DESCRIPTION:\s*(.+)$", content, re.MULTILINE | re.IGNORECASE
)
)
if not (has_title or has_nick or has_description):
return (
False,
"Content does not appear to be a valid Org Social file (missing basic metadata)",
)
# Try to parse the content to ensure it's valid
try:
parsed_data = parse_org_social_content(content)
# Check that we have at least some metadata
metadata = parsed_data.get("metadata", {})
if not any(
[
metadata.get("title"),
metadata.get("nick"),
metadata.get("description"),
]
):
return False, "Parsed content lacks required metadata"
except Exception as e:
return False, f"Failed to parse Org Social content: {str(e)}"
return True, ""
except requests.RequestException as e:
return False, f"Failed to fetch URL: {str(e)}"
except Exception as e:
return False, f"Validation error: {str(e)}"