;;; async-http-queue-fetch-urls.el --- Async HTTP queue with parallel fetching -*- lexical-binding: t -*- ;; Copyright (C) 2025 Andros Fenollosa ;; Author: Andros Fenollosa ;; Keywords: comm, processes, http ;; Version: 0.1.0 ;; Package-Requires: ((emacs "28.1")) ;; URL: https://git.andros.dev/andros/async-http-queue-fetch-urls-el ;; This program is free software; you can redistribute it and/or modify ;; it under the terms of the GNU General Public License as published by ;; the Free Software Foundation, either version 3 of the License, or ;; (at your option) any later version. ;; This program is distributed in the hope that it will be useful, ;; but WITHOUT ANY WARRANTY; without even the implied warranty of ;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the ;; GNU General Public License for more details. ;; You should have received a copy of the GNU General Public License ;; along with this program. If not, see . ;;; Commentary: ;; A lightweight, parallel HTTP fetching library for Emacs that uses ;; `url-retrieve' to download multiple URLs asynchronously with ;; configurable concurrency limits. ;; ;; Features: ;; - Parallel downloads with configurable concurrency (default: 5) ;; - Automatic timeout handling (default: 10 seconds) ;; - Custom parser support (default: json-parse-buffer) ;; - Progress tracking for large batches ;; - Error handling and retry logic ;; ;; Example usage: ;; ;; (async-http-queue-fetch-urls ;; '("https://api.example.com/posts/1" ;; "https://api.example.com/posts/2" ;; "https://api.example.com/posts/3") ;; :max-concurrent 5 ;; :timeout 10 ;; :parser #'json-parse-buffer ;; :callback (lambda (results) ;; (message "Got %d results" (length results)))) ;;; Code: (require 'url) (require 'json) (require 'cl-lib) (require 'seq) ;;;; Queue State Structure (cl-defstruct (async-http-queue--state (:constructor async-http-queue--state-create) (:copier nil)) "State for an async HTTP queue fetch operation." queue ; List of items: ((url . URL) (status . STATUS) (data . DATA)) active-workers ; Number of currently active downloads max-concurrent ; Maximum concurrent downloads allowed timeout ; Timeout in seconds for each request parser ; Parser function for response bodies completion-callback ; Callback to call when all items complete error-callback) ; Optional callback for individual errors ;;;; Queue Operations (defun async-http-queue--update-status (state url status) "Update the status of queue item with URL to STATUS in STATE." (setf (async-http-queue--state-queue state) (mapcar (lambda (item) (if (equal (alist-get 'url item) url) (let ((new-item (copy-tree item))) (setcdr (assoc 'status new-item) status) new-item) item)) (async-http-queue--state-queue state)))) (defun async-http-queue--update-data (state url data) "Update the data of queue item with URL to DATA in STATE." (setf (async-http-queue--state-queue state) (mapcar (lambda (item) (if (equal (alist-get 'url item) url) (let ((new-item (copy-tree item))) (setcdr (assoc 'data new-item) data) new-item) item)) (async-http-queue--state-queue state)))) (defun async-http-queue--fetch-url (state url success-callback error-callback) "Fetch URL asynchronously using `url-retrieve' with STATE. Call SUCCESS-CALLBACK with parsed data on success. Call ERROR-CALLBACK on failure. Includes automatic timeout handling." (let ((timeout-timer nil) (callback-called nil) (url-buffer nil) (timeout (async-http-queue--state-timeout state)) (parser (async-http-queue--state-parser state))) (setq url-buffer (url-retrieve url (lambda (status) ;; Cancel timeout timer if it exists (when timeout-timer (cancel-timer timeout-timer)) ;; Only execute callback once (unless callback-called (setq callback-called t) (let ((result nil)) (condition-case err (progn ;; Check for errors first (when (plist-get status :error) (error "Download failed: %S" (plist-get status :error))) ;; Check HTTP status (goto-char (point-min)) (if (re-search-forward "^HTTP/[0-9]\\.[0-9] \\([0-9]\\{3\\}\\)" nil t) (let ((status-code (string-to-number (match-string 1)))) (if (and (>= status-code 200) (< status-code 300)) (progn ;; Success - extract content (goto-char (point-min)) (when (re-search-forward "\r\n\r\n\\|\n\n" nil t) (setq result (if parser (funcall parser) (buffer-substring-no-properties (point) (point-max)))))) ;; HTTP error (message "HTTP %d error fetching URL: %s" status-code url) (setq result nil))) ;; No HTTP status found (message "Invalid HTTP response for URL: %s" url) (setq result nil))) (error (message "Error fetching URL %s: %s" url (error-message-string err)) (setq result nil))) ;; Kill buffer to avoid accumulation (kill-buffer (current-buffer)) ;; Call appropriate callback (if result (funcall success-callback result) (funcall error-callback))))) nil t)) ;; Set up timeout timer (setq timeout-timer (run-at-time timeout nil (lambda () (unless callback-called (setq callback-called t) (message "Timeout fetching URL %s (%d seconds)" url timeout) ;; Kill the url-retrieve buffer if it exists (when (and url-buffer (buffer-live-p url-buffer)) ;; First kill the process to avoid interactive prompt (let ((proc (get-buffer-process url-buffer))) (when (and proc (process-live-p proc)) (delete-process proc))) ;; Now kill the buffer safely (kill-buffer url-buffer)) (funcall error-callback))))))) (defun async-http-queue--process-next-pending (state) "Process the next pending item in STATE's queue if worker slots available." (when (< (async-http-queue--state-active-workers state) (async-http-queue--state-max-concurrent state)) (let ((pending-item (seq-find (lambda (item) (eq (alist-get 'status item) 'pending)) (async-http-queue--state-queue state)))) (when pending-item (let ((url (alist-get 'url pending-item))) ;; Mark as processing and increment active workers (async-http-queue--update-status state url 'processing) (cl-incf (async-http-queue--state-active-workers state)) ;; Start the download (async-http-queue--fetch-url state url ;; Success callback (lambda (data) (async-http-queue--update-status state url 'done) (async-http-queue--update-data state url data) (cl-decf (async-http-queue--state-active-workers state)) ;; Process next pending item with small delay (run-at-time 0.05 nil #'async-http-queue--process-next-pending state) (async-http-queue--check-completion state)) ;; Error callback (lambda () (async-http-queue--update-status state url 'error) (cl-decf (async-http-queue--state-active-workers state)) (when (async-http-queue--state-error-callback state) (funcall (async-http-queue--state-error-callback state) url)) ;; Process next pending item with small delay (run-at-time 0.05 nil #'async-http-queue--process-next-pending state) (async-http-queue--check-completion state)))))))) (defun async-http-queue--process (state) "Process STATE's queue asynchronously with limited concurrency." ;; Reset active workers counter (setf (async-http-queue--state-active-workers state) 0) ;; Launch initial batch (up to max concurrent) with staggered start (dotimes (i (async-http-queue--state-max-concurrent state)) (run-at-time (* i 0.05) nil #'async-http-queue--process-next-pending state))) (defun async-http-queue--check-completion (state) "Check if STATE's download queue is complete and call callback if done." (let* ((total (length (async-http-queue--state-queue state))) (done (length (seq-filter (lambda (i) (eq (alist-get 'status i) 'done)) (async-http-queue--state-queue state)))) (failed (length (seq-filter (lambda (i) (eq (alist-get 'status i) 'error)) (async-http-queue--state-queue state)))) (in-progress (seq-filter (lambda (i) (or (eq (alist-get 'status i) 'processing) (eq (alist-get 'status i) 'pending))) (async-http-queue--state-queue state)))) ;; Show progress for longer downloads (when (and (> total 10) (> (length in-progress) 0)) (message "Loading URLs... %d/%d completed%s" done total (if (> failed 0) (format " (%d failed)" failed) ""))) (when (= (length in-progress) 0) ;; All downloads complete - collect results in order (let ((results (make-vector total nil)) (index 0)) ;; Fill vector with results, maintaining original order ;; Use nil for failed items (dolist (queue-item (async-http-queue--state-queue state)) (aset results index (if (eq (alist-get 'status queue-item) 'done) (alist-get 'data queue-item) nil)) (setq index (1+ index))) ;; Final status message (if (> failed 0) (message "Loaded %d URLs (%d failed)" done failed) (message "Loaded %d URLs" done)) ;; Call completion callback (when (async-http-queue--state-completion-callback state) (funcall (async-http-queue--state-completion-callback state) results)))))) ;;;; Public API ;;;###autoload (cl-defun async-http-queue-fetch-urls (urls &key (callback nil) (error-callback nil) (max-concurrent 5) (timeout 10) (parser #'json-parse-buffer)) "Fetch URLS asynchronously in parallel and call CALLBACK with results. URLS should be a list of URL strings to fetch. CALLBACK is called with a vector of parsed results when all fetches complete. Failed requests will be represented as nil in the result vector. ERROR-CALLBACK is optionally called for each individual URL that fails, with the URL as argument. MAX-CONCURRENT controls the maximum number of parallel downloads (default: 5). TIMEOUT is the maximum time in seconds to wait for each request (default: 10). PARSER is a function to parse response bodies (default: `json-parse-buffer'). It will be called with point positioned at the start of the response body. Set to nil to return raw text instead. Returns immediately and processes URLs in parallel. Example: (async-http-queue-fetch-urls \\='(\"https://api.example.com/posts/1\" \"https://api.example.com/posts/2\" \"https://api.example.com/posts/3\") :max-concurrent 5 :timeout 10 :parser #\\='json-parse-buffer :callback (lambda (results) (message \"Got %d results\" (length results))))" (if (null urls) (progn (message "No URLs provided") (when callback (funcall callback (make-vector 0 nil)))) (let* ((n (length urls)) (state (async-http-queue--state-create :queue (mapcar (lambda (url) `((url . ,url) (status . pending) (data . nil))) urls) :active-workers 0 :max-concurrent max-concurrent :timeout timeout :parser parser :completion-callback callback :error-callback error-callback))) (message "Fetching %d URL%s..." n (if (> n 1) "s" "")) (async-http-queue--process state)))) (provide 'async-http-queue-fetch-urls) ;;; async-http-queue-fetch-urls.el ends here