Files
async-http-queue-fetch-urls-el/async-http-queue-fetch-urls.el
Andros Fenollosa c8b175fc65 Initial commit: async HTTP queue fetching library
- Parallel HTTP fetching with configurable concurrency
- Automatic timeout handling
- Custom parser support (JSON by default)
- Error handling per request
- Progress tracking for large batches
2026-02-23 19:38:54 +01:00

321 lines
14 KiB
EmacsLisp

;;; async-http-queue-fetch-urls.el --- Async HTTP queue with parallel fetching -*- lexical-binding: t -*-
;; Copyright (C) 2025 Andros Fenollosa
;; Author: Andros Fenollosa <hi@andros.dev>
;; Keywords: comm, processes, http
;; Version: 0.1.0
;; Package-Requires: ((emacs "28.1"))
;; URL: https://git.andros.dev/andros/async-http-queue-fetch-urls-el
;; This program is free software; you can redistribute it and/or modify
;; it under the terms of the GNU General Public License as published by
;; the Free Software Foundation, either version 3 of the License, or
;; (at your option) any later version.
;; This program is distributed in the hope that it will be useful,
;; but WITHOUT ANY WARRANTY; without even the implied warranty of
;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
;; GNU General Public License for more details.
;; You should have received a copy of the GNU General Public License
;; along with this program. If not, see <https://www.gnu.org/licenses/>.
;;; Commentary:
;; A lightweight, parallel HTTP fetching library for Emacs that uses
;; `url-retrieve' to download multiple URLs asynchronously with
;; configurable concurrency limits.
;;
;; Features:
;; - Parallel downloads with configurable concurrency (default: 5)
;; - Automatic timeout handling (default: 10 seconds)
;; - Custom parser support (default: json-parse-buffer)
;; - Progress tracking for large batches
;; - Error handling and retry logic
;;
;; Example usage:
;;
;; (async-http-queue-fetch-urls
;; '("https://api.example.com/posts/1"
;; "https://api.example.com/posts/2"
;; "https://api.example.com/posts/3")
;; :max-concurrent 5
;; :timeout 10
;; :parser #'json-parse-buffer
;; :callback (lambda (results)
;; (message "Got %d results" (length results))))
;;; Code:
(require 'url)
(require 'json)
(require 'cl-lib)
(require 'seq)
;;;; Queue State Structure
(cl-defstruct (async-http-queue--state
(:constructor async-http-queue--state-create)
(:copier nil))
"State for an async HTTP queue fetch operation."
queue ; List of items: ((url . URL) (status . STATUS) (data . DATA))
active-workers ; Number of currently active downloads
max-concurrent ; Maximum concurrent downloads allowed
timeout ; Timeout in seconds for each request
parser ; Parser function for response bodies
completion-callback ; Callback to call when all items complete
error-callback) ; Optional callback for individual errors
;;;; Queue Operations
(defun async-http-queue--update-status (state url status)
"Update the status of queue item with URL to STATUS in STATE."
(setf (async-http-queue--state-queue state)
(mapcar (lambda (item)
(if (equal (alist-get 'url item) url)
(let ((new-item (copy-tree item)))
(setcdr (assoc 'status new-item) status)
new-item)
item))
(async-http-queue--state-queue state))))
(defun async-http-queue--update-data (state url data)
"Update the data of queue item with URL to DATA in STATE."
(setf (async-http-queue--state-queue state)
(mapcar (lambda (item)
(if (equal (alist-get 'url item) url)
(let ((new-item (copy-tree item)))
(setcdr (assoc 'data new-item) data)
new-item)
item))
(async-http-queue--state-queue state))))
(defun async-http-queue--fetch-url (state url success-callback error-callback)
"Fetch URL asynchronously using `url-retrieve' with STATE.
Call SUCCESS-CALLBACK with parsed data on success.
Call ERROR-CALLBACK on failure.
Includes automatic timeout handling."
(let ((timeout-timer nil)
(callback-called nil)
(url-buffer nil)
(timeout (async-http-queue--state-timeout state))
(parser (async-http-queue--state-parser state)))
(setq url-buffer
(url-retrieve
url
(lambda (status)
;; Cancel timeout timer if it exists
(when timeout-timer
(cancel-timer timeout-timer))
;; Only execute callback once
(unless callback-called
(setq callback-called t)
(let ((result nil))
(condition-case err
(progn
;; Check for errors first
(when (plist-get status :error)
(error "Download failed: %S" (plist-get status :error)))
;; Check HTTP status
(goto-char (point-min))
(if (re-search-forward "^HTTP/[0-9]\\.[0-9] \\([0-9]\\{3\\}\\)" nil t)
(let ((status-code (string-to-number (match-string 1))))
(if (and (>= status-code 200) (< status-code 300))
(progn
;; Success - extract content
(goto-char (point-min))
(when (re-search-forward "\r\n\r\n\\|\n\n" nil t)
(setq result (if parser
(funcall parser)
(buffer-substring-no-properties (point) (point-max))))))
;; HTTP error
(message "HTTP %d error fetching URL: %s" status-code url)
(setq result nil)))
;; No HTTP status found
(message "Invalid HTTP response for URL: %s" url)
(setq result nil)))
(error
(message "Error fetching URL %s: %s" url (error-message-string err))
(setq result nil)))
;; Kill buffer to avoid accumulation
(kill-buffer (current-buffer))
;; Call appropriate callback
(if result
(funcall success-callback result)
(funcall error-callback)))))
nil t))
;; Set up timeout timer
(setq timeout-timer
(run-at-time timeout nil
(lambda ()
(unless callback-called
(setq callback-called t)
(message "Timeout fetching URL %s (%d seconds)" url timeout)
;; Kill the url-retrieve buffer if it exists
(when (and url-buffer (buffer-live-p url-buffer))
;; First kill the process to avoid interactive prompt
(let ((proc (get-buffer-process url-buffer)))
(when (and proc (process-live-p proc))
(delete-process proc)))
;; Now kill the buffer safely
(kill-buffer url-buffer))
(funcall error-callback)))))))
(defun async-http-queue--process-next-pending (state)
"Process the next pending item in STATE's queue if worker slots available."
(when (< (async-http-queue--state-active-workers state)
(async-http-queue--state-max-concurrent state))
(let ((pending-item (seq-find (lambda (item)
(eq (alist-get 'status item) 'pending))
(async-http-queue--state-queue state))))
(when pending-item
(let ((url (alist-get 'url pending-item)))
;; Mark as processing and increment active workers
(async-http-queue--update-status state url 'processing)
(cl-incf (async-http-queue--state-active-workers state))
;; Start the download
(async-http-queue--fetch-url
state
url
;; Success callback
(lambda (data)
(async-http-queue--update-status state url 'done)
(async-http-queue--update-data state url data)
(cl-decf (async-http-queue--state-active-workers state))
;; Process next pending item with small delay
(run-at-time 0.05 nil #'async-http-queue--process-next-pending state)
(async-http-queue--check-completion state))
;; Error callback
(lambda ()
(async-http-queue--update-status state url 'error)
(cl-decf (async-http-queue--state-active-workers state))
(when (async-http-queue--state-error-callback state)
(funcall (async-http-queue--state-error-callback state) url))
;; Process next pending item with small delay
(run-at-time 0.05 nil #'async-http-queue--process-next-pending state)
(async-http-queue--check-completion state))))))))
(defun async-http-queue--process (state)
"Process STATE's queue asynchronously with limited concurrency."
;; Reset active workers counter
(setf (async-http-queue--state-active-workers state) 0)
;; Launch initial batch (up to max concurrent) with staggered start
(dotimes (i (async-http-queue--state-max-concurrent state))
(run-at-time (* i 0.05) nil #'async-http-queue--process-next-pending state)))
(defun async-http-queue--check-completion (state)
"Check if STATE's download queue is complete and call callback if done."
(let* ((total (length (async-http-queue--state-queue state)))
(done (length (seq-filter (lambda (i) (eq (alist-get 'status i) 'done))
(async-http-queue--state-queue state))))
(failed (length (seq-filter (lambda (i) (eq (alist-get 'status i) 'error))
(async-http-queue--state-queue state))))
(in-progress (seq-filter
(lambda (i) (or
(eq (alist-get 'status i) 'processing)
(eq (alist-get 'status i) 'pending)))
(async-http-queue--state-queue state))))
;; Show progress for longer downloads
(when (and (> total 10) (> (length in-progress) 0))
(message "Loading URLs... %d/%d completed%s"
done total
(if (> failed 0) (format " (%d failed)" failed) "")))
(when (= (length in-progress) 0)
;; All downloads complete - collect results in order
(let ((results (make-vector total nil))
(index 0))
;; Fill vector with results, maintaining original order
;; Use nil for failed items
(dolist (queue-item (async-http-queue--state-queue state))
(aset results index
(if (eq (alist-get 'status queue-item) 'done)
(alist-get 'data queue-item)
nil))
(setq index (1+ index)))
;; Final status message
(if (> failed 0)
(message "Loaded %d URLs (%d failed)" done failed)
(message "Loaded %d URLs" done))
;; Call completion callback
(when (async-http-queue--state-completion-callback state)
(funcall (async-http-queue--state-completion-callback state) results))))))
;;;; Public API
;;;###autoload
(cl-defun async-http-queue-fetch-urls (urls
&key
(callback nil)
(error-callback nil)
(max-concurrent 5)
(timeout 10)
(parser #'json-parse-buffer))
"Fetch URLS asynchronously in parallel and call CALLBACK with results.
URLS should be a list of URL strings to fetch.
CALLBACK is called with a vector of parsed results when all fetches complete.
Failed requests will be represented as nil in the result vector.
ERROR-CALLBACK is optionally called for each individual URL that fails,
with the URL as argument.
MAX-CONCURRENT controls the maximum number of parallel downloads (default: 5).
TIMEOUT is the maximum time in seconds to wait for each request (default: 10).
PARSER is a function to parse response bodies (default: `json-parse-buffer').
It will be called with point positioned at the start of the response body.
Set to nil to return raw text instead.
Returns immediately and processes URLs in parallel.
Example:
(async-http-queue-fetch-urls
\\='(\"https://api.example.com/posts/1\"
\"https://api.example.com/posts/2\"
\"https://api.example.com/posts/3\")
:max-concurrent 5
:timeout 10
:parser #\\='json-parse-buffer
:callback (lambda (results)
(message \"Got %d results\" (length results))))"
(if (null urls)
(progn
(message "No URLs provided")
(when callback
(funcall callback (make-vector 0 nil))))
(let* ((n (length urls))
(state (async-http-queue--state-create
:queue (mapcar (lambda (url)
`((url . ,url)
(status . pending)
(data . nil)))
urls)
:active-workers 0
:max-concurrent max-concurrent
:timeout timeout
:parser parser
:completion-callback callback
:error-callback error-callback)))
(message "Fetching %d URL%s..." n (if (> n 1) "s" ""))
(async-http-queue--process state))))
(provide 'async-http-queue-fetch-urls)
;;; async-http-queue-fetch-urls.el ends here