- Parallel HTTP fetching with configurable concurrency - Automatic timeout handling - Custom parser support (JSON by default) - Error handling per request - Progress tracking for large batches
321 lines
14 KiB
EmacsLisp
321 lines
14 KiB
EmacsLisp
;;; async-http-queue-fetch-urls.el --- Async HTTP queue with parallel fetching -*- lexical-binding: t -*-
|
|
|
|
;; Copyright (C) 2025 Andros Fenollosa
|
|
|
|
;; Author: Andros Fenollosa <hi@andros.dev>
|
|
;; Keywords: comm, processes, http
|
|
;; Version: 0.1.0
|
|
;; Package-Requires: ((emacs "28.1"))
|
|
;; URL: https://git.andros.dev/andros/async-http-queue-fetch-urls-el
|
|
|
|
;; This program is free software; you can redistribute it and/or modify
|
|
;; it under the terms of the GNU General Public License as published by
|
|
;; the Free Software Foundation, either version 3 of the License, or
|
|
;; (at your option) any later version.
|
|
|
|
;; This program is distributed in the hope that it will be useful,
|
|
;; but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
;; GNU General Public License for more details.
|
|
|
|
;; You should have received a copy of the GNU General Public License
|
|
;; along with this program. If not, see <https://www.gnu.org/licenses/>.
|
|
|
|
;;; Commentary:
|
|
|
|
;; A lightweight, parallel HTTP fetching library for Emacs that uses
|
|
;; `url-retrieve' to download multiple URLs asynchronously with
|
|
;; configurable concurrency limits.
|
|
;;
|
|
;; Features:
|
|
;; - Parallel downloads with configurable concurrency (default: 5)
|
|
;; - Automatic timeout handling (default: 10 seconds)
|
|
;; - Custom parser support (default: json-parse-buffer)
|
|
;; - Progress tracking for large batches
|
|
;; - Error handling and retry logic
|
|
;;
|
|
;; Example usage:
|
|
;;
|
|
;; (async-http-queue-fetch-urls
|
|
;; '("https://api.example.com/posts/1"
|
|
;; "https://api.example.com/posts/2"
|
|
;; "https://api.example.com/posts/3")
|
|
;; :max-concurrent 5
|
|
;; :timeout 10
|
|
;; :parser #'json-parse-buffer
|
|
;; :callback (lambda (results)
|
|
;; (message "Got %d results" (length results))))
|
|
|
|
;;; Code:
|
|
|
|
(require 'url)
|
|
(require 'json)
|
|
(require 'cl-lib)
|
|
(require 'seq)
|
|
|
|
;;;; Queue State Structure
|
|
|
|
(cl-defstruct (async-http-queue--state
|
|
(:constructor async-http-queue--state-create)
|
|
(:copier nil))
|
|
"State for an async HTTP queue fetch operation."
|
|
queue ; List of items: ((url . URL) (status . STATUS) (data . DATA))
|
|
active-workers ; Number of currently active downloads
|
|
max-concurrent ; Maximum concurrent downloads allowed
|
|
timeout ; Timeout in seconds for each request
|
|
parser ; Parser function for response bodies
|
|
completion-callback ; Callback to call when all items complete
|
|
error-callback) ; Optional callback for individual errors
|
|
|
|
;;;; Queue Operations
|
|
|
|
(defun async-http-queue--update-status (state url status)
|
|
"Update the status of queue item with URL to STATUS in STATE."
|
|
(setf (async-http-queue--state-queue state)
|
|
(mapcar (lambda (item)
|
|
(if (equal (alist-get 'url item) url)
|
|
(let ((new-item (copy-tree item)))
|
|
(setcdr (assoc 'status new-item) status)
|
|
new-item)
|
|
item))
|
|
(async-http-queue--state-queue state))))
|
|
|
|
(defun async-http-queue--update-data (state url data)
|
|
"Update the data of queue item with URL to DATA in STATE."
|
|
(setf (async-http-queue--state-queue state)
|
|
(mapcar (lambda (item)
|
|
(if (equal (alist-get 'url item) url)
|
|
(let ((new-item (copy-tree item)))
|
|
(setcdr (assoc 'data new-item) data)
|
|
new-item)
|
|
item))
|
|
(async-http-queue--state-queue state))))
|
|
|
|
(defun async-http-queue--fetch-url (state url success-callback error-callback)
|
|
"Fetch URL asynchronously using `url-retrieve' with STATE.
|
|
Call SUCCESS-CALLBACK with parsed data on success.
|
|
Call ERROR-CALLBACK on failure.
|
|
Includes automatic timeout handling."
|
|
(let ((timeout-timer nil)
|
|
(callback-called nil)
|
|
(url-buffer nil)
|
|
(timeout (async-http-queue--state-timeout state))
|
|
(parser (async-http-queue--state-parser state)))
|
|
(setq url-buffer
|
|
(url-retrieve
|
|
url
|
|
(lambda (status)
|
|
;; Cancel timeout timer if it exists
|
|
(when timeout-timer
|
|
(cancel-timer timeout-timer))
|
|
|
|
;; Only execute callback once
|
|
(unless callback-called
|
|
(setq callback-called t)
|
|
|
|
(let ((result nil))
|
|
(condition-case err
|
|
(progn
|
|
;; Check for errors first
|
|
(when (plist-get status :error)
|
|
(error "Download failed: %S" (plist-get status :error)))
|
|
|
|
;; Check HTTP status
|
|
(goto-char (point-min))
|
|
(if (re-search-forward "^HTTP/[0-9]\\.[0-9] \\([0-9]\\{3\\}\\)" nil t)
|
|
(let ((status-code (string-to-number (match-string 1))))
|
|
(if (and (>= status-code 200) (< status-code 300))
|
|
(progn
|
|
;; Success - extract content
|
|
(goto-char (point-min))
|
|
(when (re-search-forward "\r\n\r\n\\|\n\n" nil t)
|
|
(setq result (if parser
|
|
(funcall parser)
|
|
(buffer-substring-no-properties (point) (point-max))))))
|
|
;; HTTP error
|
|
(message "HTTP %d error fetching URL: %s" status-code url)
|
|
(setq result nil)))
|
|
;; No HTTP status found
|
|
(message "Invalid HTTP response for URL: %s" url)
|
|
(setq result nil)))
|
|
(error
|
|
(message "Error fetching URL %s: %s" url (error-message-string err))
|
|
(setq result nil)))
|
|
|
|
;; Kill buffer to avoid accumulation
|
|
(kill-buffer (current-buffer))
|
|
|
|
;; Call appropriate callback
|
|
(if result
|
|
(funcall success-callback result)
|
|
(funcall error-callback)))))
|
|
nil t))
|
|
|
|
;; Set up timeout timer
|
|
(setq timeout-timer
|
|
(run-at-time timeout nil
|
|
(lambda ()
|
|
(unless callback-called
|
|
(setq callback-called t)
|
|
(message "Timeout fetching URL %s (%d seconds)" url timeout)
|
|
;; Kill the url-retrieve buffer if it exists
|
|
(when (and url-buffer (buffer-live-p url-buffer))
|
|
;; First kill the process to avoid interactive prompt
|
|
(let ((proc (get-buffer-process url-buffer)))
|
|
(when (and proc (process-live-p proc))
|
|
(delete-process proc)))
|
|
;; Now kill the buffer safely
|
|
(kill-buffer url-buffer))
|
|
(funcall error-callback)))))))
|
|
|
|
(defun async-http-queue--process-next-pending (state)
|
|
"Process the next pending item in STATE's queue if worker slots available."
|
|
(when (< (async-http-queue--state-active-workers state)
|
|
(async-http-queue--state-max-concurrent state))
|
|
(let ((pending-item (seq-find (lambda (item)
|
|
(eq (alist-get 'status item) 'pending))
|
|
(async-http-queue--state-queue state))))
|
|
(when pending-item
|
|
(let ((url (alist-get 'url pending-item)))
|
|
;; Mark as processing and increment active workers
|
|
(async-http-queue--update-status state url 'processing)
|
|
(cl-incf (async-http-queue--state-active-workers state))
|
|
|
|
;; Start the download
|
|
(async-http-queue--fetch-url
|
|
state
|
|
url
|
|
;; Success callback
|
|
(lambda (data)
|
|
(async-http-queue--update-status state url 'done)
|
|
(async-http-queue--update-data state url data)
|
|
(cl-decf (async-http-queue--state-active-workers state))
|
|
;; Process next pending item with small delay
|
|
(run-at-time 0.05 nil #'async-http-queue--process-next-pending state)
|
|
(async-http-queue--check-completion state))
|
|
;; Error callback
|
|
(lambda ()
|
|
(async-http-queue--update-status state url 'error)
|
|
(cl-decf (async-http-queue--state-active-workers state))
|
|
(when (async-http-queue--state-error-callback state)
|
|
(funcall (async-http-queue--state-error-callback state) url))
|
|
;; Process next pending item with small delay
|
|
(run-at-time 0.05 nil #'async-http-queue--process-next-pending state)
|
|
(async-http-queue--check-completion state))))))))
|
|
|
|
(defun async-http-queue--process (state)
|
|
"Process STATE's queue asynchronously with limited concurrency."
|
|
;; Reset active workers counter
|
|
(setf (async-http-queue--state-active-workers state) 0)
|
|
|
|
;; Launch initial batch (up to max concurrent) with staggered start
|
|
(dotimes (i (async-http-queue--state-max-concurrent state))
|
|
(run-at-time (* i 0.05) nil #'async-http-queue--process-next-pending state)))
|
|
|
|
(defun async-http-queue--check-completion (state)
|
|
"Check if STATE's download queue is complete and call callback if done."
|
|
(let* ((total (length (async-http-queue--state-queue state)))
|
|
(done (length (seq-filter (lambda (i) (eq (alist-get 'status i) 'done))
|
|
(async-http-queue--state-queue state))))
|
|
(failed (length (seq-filter (lambda (i) (eq (alist-get 'status i) 'error))
|
|
(async-http-queue--state-queue state))))
|
|
(in-progress (seq-filter
|
|
(lambda (i) (or
|
|
(eq (alist-get 'status i) 'processing)
|
|
(eq (alist-get 'status i) 'pending)))
|
|
(async-http-queue--state-queue state))))
|
|
|
|
;; Show progress for longer downloads
|
|
(when (and (> total 10) (> (length in-progress) 0))
|
|
(message "Loading URLs... %d/%d completed%s"
|
|
done total
|
|
(if (> failed 0) (format " (%d failed)" failed) "")))
|
|
|
|
(when (= (length in-progress) 0)
|
|
;; All downloads complete - collect results in order
|
|
(let ((results (make-vector total nil))
|
|
(index 0))
|
|
;; Fill vector with results, maintaining original order
|
|
;; Use nil for failed items
|
|
(dolist (queue-item (async-http-queue--state-queue state))
|
|
(aset results index
|
|
(if (eq (alist-get 'status queue-item) 'done)
|
|
(alist-get 'data queue-item)
|
|
nil))
|
|
(setq index (1+ index)))
|
|
|
|
;; Final status message
|
|
(if (> failed 0)
|
|
(message "Loaded %d URLs (%d failed)" done failed)
|
|
(message "Loaded %d URLs" done))
|
|
|
|
;; Call completion callback
|
|
(when (async-http-queue--state-completion-callback state)
|
|
(funcall (async-http-queue--state-completion-callback state) results))))))
|
|
|
|
;;;; Public API
|
|
|
|
;;;###autoload
|
|
(cl-defun async-http-queue-fetch-urls (urls
|
|
&key
|
|
(callback nil)
|
|
(error-callback nil)
|
|
(max-concurrent 5)
|
|
(timeout 10)
|
|
(parser #'json-parse-buffer))
|
|
"Fetch URLS asynchronously in parallel and call CALLBACK with results.
|
|
|
|
URLS should be a list of URL strings to fetch.
|
|
|
|
CALLBACK is called with a vector of parsed results when all fetches complete.
|
|
Failed requests will be represented as nil in the result vector.
|
|
|
|
ERROR-CALLBACK is optionally called for each individual URL that fails,
|
|
with the URL as argument.
|
|
|
|
MAX-CONCURRENT controls the maximum number of parallel downloads (default: 5).
|
|
|
|
TIMEOUT is the maximum time in seconds to wait for each request (default: 10).
|
|
|
|
PARSER is a function to parse response bodies (default: `json-parse-buffer').
|
|
It will be called with point positioned at the start of the response body.
|
|
Set to nil to return raw text instead.
|
|
|
|
Returns immediately and processes URLs in parallel.
|
|
|
|
Example:
|
|
|
|
(async-http-queue-fetch-urls
|
|
\\='(\"https://api.example.com/posts/1\"
|
|
\"https://api.example.com/posts/2\"
|
|
\"https://api.example.com/posts/3\")
|
|
:max-concurrent 5
|
|
:timeout 10
|
|
:parser #\\='json-parse-buffer
|
|
:callback (lambda (results)
|
|
(message \"Got %d results\" (length results))))"
|
|
(if (null urls)
|
|
(progn
|
|
(message "No URLs provided")
|
|
(when callback
|
|
(funcall callback (make-vector 0 nil))))
|
|
(let* ((n (length urls))
|
|
(state (async-http-queue--state-create
|
|
:queue (mapcar (lambda (url)
|
|
`((url . ,url)
|
|
(status . pending)
|
|
(data . nil)))
|
|
urls)
|
|
:active-workers 0
|
|
:max-concurrent max-concurrent
|
|
:timeout timeout
|
|
:parser parser
|
|
:completion-callback callback
|
|
:error-callback error-callback)))
|
|
(message "Fetching %d URL%s..." n (if (> n 1) "s" ""))
|
|
(async-http-queue--process state))))
|
|
|
|
(provide 'async-http-queue-fetch-urls)
|
|
|
|
;;; async-http-queue-fetch-urls.el ends here
|