any_storage/http/
mod.rs

1use std::borrow::Cow;
2use std::io::{Error, ErrorKind, Result};
3use std::ops::{Bound, RangeBounds};
4use std::path::{Path, PathBuf};
5use std::pin::Pin;
6use std::sync::Arc;
7use std::task::Poll;
8
9use bytes::Bytes;
10use futures::{Stream, StreamExt};
11use reqwest::header::{CONTENT_LENGTH, CONTENT_TYPE, LAST_MODIFIED};
12use reqwest::{StatusCode, Url, header};
13use time::OffsetDateTime;
14use time::format_description::well_known::Rfc2822;
15
16mod parser;
17
18/// Converts an HTTP status code into a `Result`, returning an `io::Error`
19/// for client or server errors, and `Ok(code)` otherwise.
20pub(crate) fn error_from_status(code: StatusCode) -> Result<StatusCode> {
21    if code.is_server_error() {
22        Err(Error::other(
23            code.canonical_reason().unwrap_or(code.as_str()),
24        ))
25    } else if code.is_client_error() {
26        let kind = match code {
27            StatusCode::NOT_FOUND => ErrorKind::NotFound,
28            StatusCode::UNAUTHORIZED | StatusCode::FORBIDDEN => ErrorKind::PermissionDenied,
29            _ => ErrorKind::Other,
30        };
31        let msg = code.canonical_reason().unwrap_or(code.as_str());
32        Err(Error::new(kind, msg))
33    } else {
34        Ok(code)
35    }
36}
37
38/// Helper struct to format HTTP Range headers from a `RangeBounds<u64>`.
39pub(crate) struct RangeHeader<R: RangeBounds<u64>>(pub R);
40
41impl<R: RangeBounds<u64>> std::fmt::Display for RangeHeader<R> {
42    /// Formats the HTTP `Range` header value (e.g., "bytes=0-100").
43    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
44        f.write_str("bytes=")?;
45        match self.0.start_bound() {
46            Bound::Unbounded => write!(f, "0-"),
47            Bound::Included(v) => write!(f, "{v}-"),
48            Bound::Excluded(v) => write!(f, "{}-", v + 1),
49        }?;
50        match self.0.end_bound() {
51            Bound::Unbounded => {}
52            Bound::Included(v) => {
53                write!(f, "{}", v + 1)?;
54            }
55            Bound::Excluded(v) => {
56                write!(f, "{}", v)?;
57            }
58        };
59        Ok(())
60    }
61}
62
63#[derive(Clone, Debug)]
64#[cfg_attr(feature = "serde", derive(serde::Deserialize))]
65pub struct HttpStoreConfig {
66    pub base_url: String,
67}
68
69impl HttpStoreConfig {
70    pub fn build(&self) -> Result<HttpStore> {
71        HttpStore::new(&self.base_url)
72    }
73}
74
75/// Internal representation of the HTTP-backed store.
76struct InnerHttpStore {
77    base_url: Url,
78    parser: parser::Parser,
79    client: reqwest::Client,
80}
81
82impl InnerHttpStore {
83    /// Resolves a relative file or directory path into a full URL.
84    fn get_url(&self, path: &Path) -> Result<Url> {
85        let clean = crate::util::clean_path(path)?;
86        self.base_url
87            .join(&clean.to_string_lossy())
88            .map_err(|err| Error::new(ErrorKind::InvalidData, err))
89    }
90}
91
92/// Public HTTP-backed file store supporting asynchronous access to remote files
93/// and directories.
94#[derive(Clone)]
95pub struct HttpStore(Arc<InnerHttpStore>);
96
97impl std::fmt::Debug for HttpStore {
98    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
99        f.debug_struct(stringify!(HttpStore))
100            .field("base_url", &self.0.base_url)
101            .finish_non_exhaustive()
102    }
103}
104
105impl HttpStore {
106    /// Creates a new `HttpStore` from a base URL.
107    ///
108    /// Ensures the base URL ends with a trailing slash and initializes the HTTP
109    /// client and parser.
110    pub fn new(base_url: impl AsRef<str>) -> Result<Self> {
111        let base_url = base_url.as_ref();
112        let base_url = if base_url.ends_with("/") {
113            Cow::Borrowed(base_url)
114        } else {
115            Cow::Owned(format!("{base_url}/"))
116        };
117        let base_url = Url::parse(base_url.as_ref())
118            .map_err(|err| Error::new(ErrorKind::InvalidInput, err))?;
119        Ok(Self(Arc::new(InnerHttpStore {
120            base_url,
121            parser: parser::Parser::default(),
122            client: reqwest::Client::new(),
123        })))
124    }
125}
126
127impl crate::Store for HttpStore {
128    type Directory = HttpStoreDirectory;
129    type File = HttpStoreFile;
130
131    /// Retrieves a file from the HTTP store at the given path.
132    async fn get_file<P: Into<std::path::PathBuf>>(&self, path: P) -> Result<Self::File> {
133        Ok(HttpStoreFile {
134            store: self.0.clone(),
135            path: path.into(),
136        })
137    }
138
139    /// Retrieves a directory from the HTTP store at the given path.
140    async fn get_dir<P: Into<PathBuf>>(&self, path: P) -> Result<Self::Directory> {
141        Ok(HttpStoreDirectory {
142            store: self.0.clone(),
143            path: path.into(),
144        })
145    }
146}
147
148/// Representation of a directory in the HTTP store.
149pub struct HttpStoreDirectory {
150    store: Arc<InnerHttpStore>,
151    path: PathBuf,
152}
153
154impl std::fmt::Debug for HttpStoreDirectory {
155    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
156        f.debug_struct(stringify!(HttpStoreDirectory))
157            .field("path", &self.path)
158            .finish_non_exhaustive()
159    }
160}
161
162impl crate::StoreDirectory for HttpStoreDirectory {
163    type Entry = HttpStoreEntry;
164    type Reader = HttpStoreDirectoryReader;
165
166    fn path(&self) -> &std::path::Path {
167        &self.path
168    }
169
170    /// Checks if the HTTP directory exists via a HEAD request.
171    async fn exists(&self) -> Result<bool> {
172        let url = self.store.get_url(&self.path)?;
173        match self.store.client.head(url).send().await {
174            Ok(res) => match res.status() {
175                StatusCode::NOT_FOUND => Ok(false),
176                other => error_from_status(other).map(|_| true),
177            },
178            Err(err) => Err(Error::other(err)),
179        }
180    }
181
182    /// Lists the entries in the HTTP directory by fetching and parsing HTML.
183    async fn read(&self) -> Result<Self::Reader> {
184        let url = self.store.get_url(&self.path)?;
185        let res = self
186            .store
187            .client
188            .get(url)
189            .send()
190            .await
191            .map_err(Error::other)?;
192        error_from_status(res.status())?;
193        let html = res.text().await.map_err(Error::other)?;
194        let mut entries = self.store.parser.parse(&html).collect::<Vec<_>>();
195        entries.reverse();
196
197        Ok(HttpStoreDirectoryReader {
198            store: self.store.clone(),
199            path: self.path.clone(),
200            entries,
201        })
202    }
203
204    async fn delete(&self) -> Result<()> {
205        Err(Error::new(
206            ErrorKind::Unsupported,
207            "http store doesn't support write operations",
208        ))
209    }
210
211    async fn delete_recursive(&self) -> Result<()> {
212        Err(Error::new(
213            ErrorKind::Unsupported,
214            "http store doesn't support write operations",
215        ))
216    }
217}
218
219/// Stream reader over entries within an HTTP directory listing.
220pub struct HttpStoreDirectoryReader {
221    store: Arc<InnerHttpStore>,
222    path: PathBuf,
223    entries: Vec<String>,
224}
225
226impl std::fmt::Debug for HttpStoreDirectoryReader {
227    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
228        f.debug_struct(stringify!(HttpStoreDirectoryReader))
229            .field("path", &self.path)
230            .field("entries", &self.entries)
231            .finish_non_exhaustive()
232    }
233}
234
235impl Stream for HttpStoreDirectoryReader {
236    type Item = Result<HttpStoreEntry>;
237
238    /// Returns the next directory entry from the parsed HTML listing.
239    fn poll_next(
240        mut self: Pin<&mut Self>,
241        _cx: &mut std::task::Context<'_>,
242    ) -> Poll<Option<Self::Item>> {
243        let mut this = self.as_mut();
244
245        if let Some(entry) = this.entries.pop() {
246            Poll::Ready(Some(HttpStoreEntry::new(
247                self.store.clone(),
248                self.path.clone(),
249                entry,
250            )))
251        } else {
252            Poll::Ready(None)
253        }
254    }
255}
256
257impl crate::StoreDirectoryReader<HttpStoreEntry> for HttpStoreDirectoryReader {}
258
259/// Representation of a file in the HTTP store.
260pub struct HttpStoreFile {
261    store: Arc<InnerHttpStore>,
262    path: PathBuf,
263}
264
265impl std::fmt::Debug for HttpStoreFile {
266    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
267        f.debug_struct(stringify!(HttpStoreFile))
268            .field("path", &self.path)
269            .finish_non_exhaustive()
270    }
271}
272
273impl crate::StoreFile for HttpStoreFile {
274    type FileReader = HttpStoreFileReader;
275    type FileWriter = crate::noop::NoopStoreFileWriter;
276    type Metadata = HttpStoreFileMetadata;
277
278    fn path(&self) -> &std::path::Path {
279        &self.path
280    }
281
282    /// Checks if the HTTP file exists via a HEAD request.
283    async fn exists(&self) -> Result<bool> {
284        let url = self.store.get_url(&self.path)?;
285        let res = self
286            .store
287            .client
288            .head(url)
289            .send()
290            .await
291            .map_err(Error::other)?;
292        match res.status() {
293            StatusCode::NOT_FOUND => Ok(false),
294            other => error_from_status(other).map(|_| true),
295        }
296    }
297
298    /// Retrieves the HTTP file metadata (size and last modified).
299    async fn metadata(&self) -> Result<Self::Metadata> {
300        let url = self.store.get_url(&self.path)?;
301        let res = self
302            .store
303            .client
304            .head(url)
305            .send()
306            .await
307            .map_err(Error::other)?;
308        error_from_status(res.status())?;
309        let size = res
310            .headers()
311            .get(CONTENT_LENGTH)
312            .and_then(|value| value.to_str().ok())
313            .and_then(|value| value.parse::<u64>().ok())
314            .unwrap_or(0);
315        let modified = res
316            .headers()
317            .get(LAST_MODIFIED)
318            .and_then(|value| value.to_str().ok())
319            .and_then(|value| OffsetDateTime::parse(value, &Rfc2822).ok())
320            .map(|dt| dt.unix_timestamp() as u64)
321            .unwrap_or(0);
322        let content_type = res
323            .headers()
324            .get(CONTENT_TYPE)
325            .and_then(|value| value.to_str().ok().map(String::from));
326        Ok(HttpStoreFileMetadata {
327            size,
328            modified,
329            content_type,
330        })
331    }
332
333    /// Begins reading a file from the HTTP store for the given byte range.
334    async fn read<R: std::ops::RangeBounds<u64>>(&self, range: R) -> Result<Self::FileReader> {
335        let url = self.store.get_url(&self.path)?;
336        let res = self
337            .store
338            .client
339            .get(url)
340            .header(header::RANGE, RangeHeader(range).to_string())
341            .send()
342            .await
343            .map_err(Error::other)?;
344        HttpStoreFileReader::from_response(res)
345    }
346
347    async fn write(&self, _options: crate::WriteOptions) -> Result<Self::FileWriter> {
348        Err(Error::new(
349            ErrorKind::Unsupported,
350            "http store doesn't support write operations",
351        ))
352    }
353
354    async fn delete(&self) -> Result<()> {
355        Err(Error::new(
356            ErrorKind::Unsupported,
357            "http store doesn't support write operations",
358        ))
359    }
360}
361
362/// Metadata for an HTTP file, containing size and last modification time.
363#[derive(Clone, Debug)]
364pub struct HttpStoreFileMetadata {
365    size: u64,
366    modified: u64,
367    content_type: Option<String>,
368}
369
370impl super::StoreMetadata for HttpStoreFileMetadata {
371    /// Returns the file size in bytes.
372    fn size(&self) -> u64 {
373        self.size
374    }
375
376    /// Returns 0 as creation time is not available over HTTP.
377    fn created(&self) -> u64 {
378        0
379    }
380
381    /// Returns the last modified time (as a UNIX timestamp).
382    fn modified(&self) -> u64 {
383        self.modified
384    }
385
386    fn content_type(&self) -> Option<&str> {
387        self.content_type.as_deref()
388    }
389}
390
391/// Reader for streaming bytes from a remote HTTP file.
392pub struct HttpStoreFileReader {
393    stream: Pin<Box<dyn Stream<Item = reqwest::Result<Bytes>> + std::marker::Send>>,
394}
395
396impl std::fmt::Debug for HttpStoreFileReader {
397    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
398        f.debug_struct(stringify!(HttpStoreFileReader))
399            .finish_non_exhaustive()
400    }
401}
402
403impl HttpStoreFileReader {
404    /// Creates a `HttpStoreFileReader` from a `reqwest::Response`.
405    ///
406    /// Validates the response and initializes the byte stream.
407    pub(crate) fn from_response(res: reqwest::Response) -> Result<Self> {
408        crate::http::error_from_status(res.status())?;
409        // TODO handle when status code is not 206
410        let stream = res.bytes_stream().boxed();
411        Ok(Self { stream })
412    }
413}
414
415impl tokio::io::AsyncRead for HttpStoreFileReader {
416    /// Polls the next chunk of data from the HTTP byte stream.
417    ///
418    /// Copies bytes into the provided buffer.
419    fn poll_read(
420        self: std::pin::Pin<&mut Self>,
421        cx: &mut std::task::Context<'_>,
422        buf: &mut tokio::io::ReadBuf<'_>,
423    ) -> std::task::Poll<std::io::Result<()>> {
424        let stream = &mut self.get_mut().stream;
425
426        match Pin::new(stream).poll_next(cx) {
427            Poll::Ready(Some(Ok(chunk))) => {
428                let len = buf.remaining();
429                let to_read = chunk.len().min(len);
430                buf.put_slice(&chunk[..to_read]);
431                Poll::Ready(Ok(()))
432            }
433            // Stream has ended with an error, propagate it
434            Poll::Ready(Some(Err(err))) => Poll::Ready(Err(Error::new(ErrorKind::Other, err))),
435            // No more data to read
436            Poll::Ready(None) => Poll::Ready(Ok(())),
437            Poll::Pending => Poll::Pending,
438        }
439    }
440}
441
442impl crate::StoreFileReader for HttpStoreFileReader {}
443
444/// Represents an entry in the HTTP store (file or directory).
445pub type HttpStoreEntry = crate::Entry<HttpStoreFile, HttpStoreDirectory>;
446
447impl HttpStoreEntry {
448    /// Constructs a new `HttpStoreEntry` (either file or directory) from a path
449    /// component.
450    ///
451    /// Assumes directory entries end with a `/`.
452    fn new(store: Arc<InnerHttpStore>, parent: PathBuf, entry: String) -> Result<Self> {
453        let path = parent.join(&entry);
454        Ok(if entry.ends_with('/') {
455            Self::Directory(HttpStoreDirectory { store, path })
456        } else {
457            Self::File(HttpStoreFile { store, path })
458        })
459    }
460}
461
462#[cfg(test)]
463mod tests {
464    use std::io::ErrorKind;
465    use std::path::PathBuf;
466
467    use futures::StreamExt;
468    use reqwest::header::{CONTENT_LENGTH, LAST_MODIFIED};
469    use tokio::io::AsyncReadExt;
470
471    use crate::http::HttpStore;
472    use crate::{Store, StoreDirectory, StoreFile, StoreMetadata};
473
474    #[test_case::test_case("http://localhost", "/foo.txt", "http://localhost/foo.txt"; "root with simple path with prefix")]
475    #[test_case::test_case("http://localhost", "foo.txt", "http://localhost/foo.txt"; "root with simple path without prefix")]
476    #[test_case::test_case("http://localhost/", "foo.txt", "http://localhost/foo.txt"; "root with simple path with slash on base")]
477    #[test_case::test_case("http://localhost/", "/foo.txt", "http://localhost/foo.txt"; "root with simple path with slashes")]
478    #[test_case::test_case("http://localhost/foo", "/bar/baz.txt", "http://localhost/foo/bar/baz.txt"; "with more children")]
479    #[test_case::test_case("http://localhost/foo", "/bar/with space.txt", "http://localhost/foo/bar/with%20space.txt"; "with spaces")]
480    fn building_path(base_url: &str, path: &str, expected: &str) {
481        let store = HttpStore::new(base_url).unwrap();
482        let path = PathBuf::from(path);
483        let url = store.0.get_url(&path).unwrap();
484        assert_eq!(url.as_str(), expected);
485    }
486
487    #[tokio::test]
488    async fn file_should_handle_base_with_ending_slash() {
489        let mut srv = mockito::Server::new_async().await;
490        let mock = srv
491            .mock("HEAD", "/foo/not-found.txt")
492            .with_status(404)
493            .create_async()
494            .await;
495        let store = HttpStore::new(format!("{}/foo/", srv.url())).unwrap();
496        let file = store.get_file("/not-found.txt").await.unwrap();
497        assert!(!file.exists().await.unwrap());
498        mock.assert_async().await;
499    }
500
501    #[tokio::test]
502    async fn file_should_check_if_file_exists() {
503        let mut srv = mockito::Server::new_async().await;
504        let mock = srv
505            .mock("HEAD", "/not-found.txt")
506            .with_status(404)
507            .create_async()
508            .await;
509        let store = HttpStore::new(srv.url()).unwrap();
510        let file = store.get_file("/not-found.txt").await.unwrap();
511        assert!(!file.exists().await.unwrap());
512        mock.assert_async().await;
513    }
514
515    #[tokio::test]
516    async fn file_should_get_filename() {
517        let srv = mockito::Server::new_async().await;
518        let store = HttpStore::new(srv.url()).unwrap();
519        let file = store.get_file("/test/file.txt").await.unwrap();
520        let name = file.filename().unwrap();
521        assert_eq!(name, "file.txt");
522    }
523
524    #[tokio::test]
525    async fn file_should_get_filename_with_space() {
526        let srv = mockito::Server::new_async().await;
527        let store = HttpStore::new(srv.url()).unwrap();
528        let file = store.get_file("/test/with space.txt").await.unwrap();
529        let name = file.filename().unwrap();
530        assert_eq!(name, "with space.txt");
531    }
532
533    #[tokio::test]
534    async fn file_meta_should_give_all() {
535        let mut srv = mockito::Server::new_async().await;
536        let mock = srv
537            .mock("HEAD", "/test/file.txt")
538            .with_status(200)
539            .with_header(CONTENT_LENGTH, "1234")
540            .with_header(LAST_MODIFIED, "Thu, 01 May 2025 09:57:28 GMT")
541            .create_async()
542            .await;
543        let store = HttpStore::new(srv.url()).unwrap();
544        let file = store.get_file("/test/file.txt").await.unwrap();
545        let meta = file.metadata().await.unwrap();
546        assert_eq!(meta.size, 1234);
547        assert_eq!(meta.created(), 0);
548        assert_eq!(meta.modified(), 1746093448);
549        mock.assert_async().await;
550    }
551
552    #[tokio::test]
553    async fn file_reader_should_read_entire_file() {
554        let mut srv = mockito::Server::new_async().await;
555        let _m = srv
556            .mock("GET", "/test/file")
557            .with_status(200)
558            .with_header("Content-Type", "application/octet-stream")
559            .with_body("Hello, world!")
560            .create();
561        let store = HttpStore::new(srv.url()).unwrap();
562        let file = store.get_file("/test/file").await.unwrap();
563
564        let reader = file.read(0..5).await.unwrap();
565
566        let mut buf = vec![0; 5];
567        let mut async_reader = tokio::io::BufReader::new(reader);
568        let n = async_reader.read(&mut buf).await.unwrap();
569
570        assert_eq!(n, 5);
571        assert_eq!(&buf, b"Hello");
572    }
573
574    #[tokio::test]
575    async fn file_reader_should_read_single_range() {
576        let mut srv = mockito::Server::new_async().await;
577        let _m = srv
578            .mock("GET", "/test/file")
579            .with_status(206) // Partial content status for range requests
580            .with_header("Content-Type", "application/octet-stream")
581            .with_header("Content-Range", "bytes 0-4/12")
582            .with_body("Hello, world!")
583            .create();
584
585        let store = HttpStore::new(srv.url()).unwrap();
586        let file = store.get_file("/test/file").await.unwrap();
587
588        let reader = file.read(0..5).await.unwrap();
589        let mut buf = vec![0; 5];
590
591        let mut async_reader = tokio::io::BufReader::new(reader);
592        let n = async_reader.read(&mut buf).await.unwrap();
593
594        assert_eq!(n, 5);
595
596        assert_eq!(&buf, b"Hello");
597    }
598
599    #[tokio::test]
600    async fn file_reader_should_fail_with_not_found() {
601        let mut srv = mockito::Server::new_async().await;
602        let _m = srv.mock("GET", "/test/file").with_status(404).create();
603
604        let store = HttpStore::new(srv.url()).unwrap();
605        let file = store.get_file("/test/file").await.unwrap();
606
607        let result = file.read(0..5).await;
608        match result {
609            Ok(_) => panic!("should fail"),
610            Err(err) => assert_eq!(err.kind(), ErrorKind::NotFound),
611        }
612    }
613
614    #[tokio::test]
615    async fn dir_should_list_entries() {
616        let mut srv = mockito::Server::new_async().await;
617        let _m = srv
618            .mock("GET", "/NEH")
619            .with_status(200)
620            .with_body(include_str!("../../assets/apache.html"))
621            .create();
622
623        let store = HttpStore::new(srv.url()).unwrap();
624        let dir = store.get_dir("/NEH").await.unwrap();
625        let mut content = dir.read().await.unwrap();
626
627        let mut result = Vec::new();
628        while let Some(entry) = content.next().await {
629            result.push(entry.unwrap());
630        }
631        assert_eq!(result.len(), 46);
632
633        assert_eq!(result.iter().filter(|item| item.is_directory()).count(), 41);
634        assert_eq!(result.iter().filter(|item| item.is_file()).count(), 5);
635    }
636}