1use std::borrow::Cow;
2use std::io::{Error, ErrorKind, Result};
3use std::ops::{Bound, RangeBounds};
4use std::path::{Path, PathBuf};
5use std::pin::Pin;
6use std::sync::Arc;
7use std::task::Poll;
8
9use bytes::Bytes;
10use futures::{Stream, StreamExt};
11use reqwest::header::{CONTENT_LENGTH, CONTENT_TYPE, LAST_MODIFIED};
12use reqwest::{StatusCode, Url, header};
13use time::OffsetDateTime;
14use time::format_description::well_known::Rfc2822;
15
16mod parser;
17
18pub(crate) fn error_from_status(code: StatusCode) -> Result<StatusCode> {
21 if code.is_server_error() {
22 Err(Error::other(
23 code.canonical_reason().unwrap_or(code.as_str()),
24 ))
25 } else if code.is_client_error() {
26 let kind = match code {
27 StatusCode::NOT_FOUND => ErrorKind::NotFound,
28 StatusCode::UNAUTHORIZED | StatusCode::FORBIDDEN => ErrorKind::PermissionDenied,
29 _ => ErrorKind::Other,
30 };
31 let msg = code.canonical_reason().unwrap_or(code.as_str());
32 Err(Error::new(kind, msg))
33 } else {
34 Ok(code)
35 }
36}
37
38pub(crate) struct RangeHeader<R: RangeBounds<u64>>(pub R);
40
41impl<R: RangeBounds<u64>> std::fmt::Display for RangeHeader<R> {
42 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
44 f.write_str("bytes=")?;
45 match self.0.start_bound() {
46 Bound::Unbounded => write!(f, "0-"),
47 Bound::Included(v) => write!(f, "{v}-"),
48 Bound::Excluded(v) => write!(f, "{}-", v + 1),
49 }?;
50 match self.0.end_bound() {
51 Bound::Unbounded => {}
52 Bound::Included(v) => {
53 write!(f, "{}", v + 1)?;
54 }
55 Bound::Excluded(v) => {
56 write!(f, "{}", v)?;
57 }
58 };
59 Ok(())
60 }
61}
62
63#[derive(Clone, Debug)]
64#[cfg_attr(feature = "serde", derive(serde::Deserialize))]
65pub struct HttpStoreConfig {
66 pub base_url: String,
67}
68
69impl HttpStoreConfig {
70 pub fn build(&self) -> Result<HttpStore> {
71 HttpStore::new(&self.base_url)
72 }
73}
74
75struct InnerHttpStore {
77 base_url: Url,
78 parser: parser::Parser,
79 client: reqwest::Client,
80}
81
82impl InnerHttpStore {
83 fn get_url(&self, path: &Path) -> Result<Url> {
85 let clean = crate::util::clean_path(path)?;
86 self.base_url
87 .join(&clean.to_string_lossy())
88 .map_err(|err| Error::new(ErrorKind::InvalidData, err))
89 }
90}
91
92#[derive(Clone)]
95pub struct HttpStore(Arc<InnerHttpStore>);
96
97impl std::fmt::Debug for HttpStore {
98 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
99 f.debug_struct(stringify!(HttpStore))
100 .field("base_url", &self.0.base_url)
101 .finish_non_exhaustive()
102 }
103}
104
105impl HttpStore {
106 pub fn new(base_url: impl AsRef<str>) -> Result<Self> {
111 let base_url = base_url.as_ref();
112 let base_url = if base_url.ends_with("/") {
113 Cow::Borrowed(base_url)
114 } else {
115 Cow::Owned(format!("{base_url}/"))
116 };
117 let base_url = Url::parse(base_url.as_ref())
118 .map_err(|err| Error::new(ErrorKind::InvalidInput, err))?;
119 Ok(Self(Arc::new(InnerHttpStore {
120 base_url,
121 parser: parser::Parser::default(),
122 client: reqwest::Client::new(),
123 })))
124 }
125}
126
127impl crate::Store for HttpStore {
128 type Directory = HttpStoreDirectory;
129 type File = HttpStoreFile;
130
131 async fn get_file<P: Into<std::path::PathBuf>>(&self, path: P) -> Result<Self::File> {
133 Ok(HttpStoreFile {
134 store: self.0.clone(),
135 path: path.into(),
136 })
137 }
138
139 async fn get_dir<P: Into<PathBuf>>(&self, path: P) -> Result<Self::Directory> {
141 Ok(HttpStoreDirectory {
142 store: self.0.clone(),
143 path: path.into(),
144 })
145 }
146}
147
148pub struct HttpStoreDirectory {
150 store: Arc<InnerHttpStore>,
151 path: PathBuf,
152}
153
154impl std::fmt::Debug for HttpStoreDirectory {
155 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
156 f.debug_struct(stringify!(HttpStoreDirectory))
157 .field("path", &self.path)
158 .finish_non_exhaustive()
159 }
160}
161
162impl crate::StoreDirectory for HttpStoreDirectory {
163 type Entry = HttpStoreEntry;
164 type Reader = HttpStoreDirectoryReader;
165
166 fn path(&self) -> &std::path::Path {
167 &self.path
168 }
169
170 async fn exists(&self) -> Result<bool> {
172 let url = self.store.get_url(&self.path)?;
173 match self.store.client.head(url).send().await {
174 Ok(res) => match res.status() {
175 StatusCode::NOT_FOUND => Ok(false),
176 other => error_from_status(other).map(|_| true),
177 },
178 Err(err) => Err(Error::other(err)),
179 }
180 }
181
182 async fn read(&self) -> Result<Self::Reader> {
184 let url = self.store.get_url(&self.path)?;
185 let res = self
186 .store
187 .client
188 .get(url)
189 .send()
190 .await
191 .map_err(Error::other)?;
192 error_from_status(res.status())?;
193 let html = res.text().await.map_err(Error::other)?;
194 let mut entries = self.store.parser.parse(&html).collect::<Vec<_>>();
195 entries.reverse();
196
197 Ok(HttpStoreDirectoryReader {
198 store: self.store.clone(),
199 path: self.path.clone(),
200 entries,
201 })
202 }
203
204 async fn delete(&self) -> Result<()> {
205 Err(Error::new(
206 ErrorKind::Unsupported,
207 "http store doesn't support write operations",
208 ))
209 }
210
211 async fn delete_recursive(&self) -> Result<()> {
212 Err(Error::new(
213 ErrorKind::Unsupported,
214 "http store doesn't support write operations",
215 ))
216 }
217}
218
219pub struct HttpStoreDirectoryReader {
221 store: Arc<InnerHttpStore>,
222 path: PathBuf,
223 entries: Vec<String>,
224}
225
226impl std::fmt::Debug for HttpStoreDirectoryReader {
227 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
228 f.debug_struct(stringify!(HttpStoreDirectoryReader))
229 .field("path", &self.path)
230 .field("entries", &self.entries)
231 .finish_non_exhaustive()
232 }
233}
234
235impl Stream for HttpStoreDirectoryReader {
236 type Item = Result<HttpStoreEntry>;
237
238 fn poll_next(
240 mut self: Pin<&mut Self>,
241 _cx: &mut std::task::Context<'_>,
242 ) -> Poll<Option<Self::Item>> {
243 let mut this = self.as_mut();
244
245 if let Some(entry) = this.entries.pop() {
246 Poll::Ready(Some(HttpStoreEntry::new(
247 self.store.clone(),
248 self.path.clone(),
249 entry,
250 )))
251 } else {
252 Poll::Ready(None)
253 }
254 }
255}
256
257impl crate::StoreDirectoryReader<HttpStoreEntry> for HttpStoreDirectoryReader {}
258
259pub struct HttpStoreFile {
261 store: Arc<InnerHttpStore>,
262 path: PathBuf,
263}
264
265impl std::fmt::Debug for HttpStoreFile {
266 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
267 f.debug_struct(stringify!(HttpStoreFile))
268 .field("path", &self.path)
269 .finish_non_exhaustive()
270 }
271}
272
273impl crate::StoreFile for HttpStoreFile {
274 type FileReader = HttpStoreFileReader;
275 type FileWriter = crate::noop::NoopStoreFileWriter;
276 type Metadata = HttpStoreFileMetadata;
277
278 fn path(&self) -> &std::path::Path {
279 &self.path
280 }
281
282 async fn exists(&self) -> Result<bool> {
284 let url = self.store.get_url(&self.path)?;
285 let res = self
286 .store
287 .client
288 .head(url)
289 .send()
290 .await
291 .map_err(Error::other)?;
292 match res.status() {
293 StatusCode::NOT_FOUND => Ok(false),
294 other => error_from_status(other).map(|_| true),
295 }
296 }
297
298 async fn metadata(&self) -> Result<Self::Metadata> {
300 let url = self.store.get_url(&self.path)?;
301 let res = self
302 .store
303 .client
304 .head(url)
305 .send()
306 .await
307 .map_err(Error::other)?;
308 error_from_status(res.status())?;
309 let size = res
310 .headers()
311 .get(CONTENT_LENGTH)
312 .and_then(|value| value.to_str().ok())
313 .and_then(|value| value.parse::<u64>().ok())
314 .unwrap_or(0);
315 let modified = res
316 .headers()
317 .get(LAST_MODIFIED)
318 .and_then(|value| value.to_str().ok())
319 .and_then(|value| OffsetDateTime::parse(value, &Rfc2822).ok())
320 .map(|dt| dt.unix_timestamp() as u64)
321 .unwrap_or(0);
322 let content_type = res
323 .headers()
324 .get(CONTENT_TYPE)
325 .and_then(|value| value.to_str().ok().map(String::from));
326 Ok(HttpStoreFileMetadata {
327 size,
328 modified,
329 content_type,
330 })
331 }
332
333 async fn read<R: std::ops::RangeBounds<u64>>(&self, range: R) -> Result<Self::FileReader> {
335 let url = self.store.get_url(&self.path)?;
336 let res = self
337 .store
338 .client
339 .get(url)
340 .header(header::RANGE, RangeHeader(range).to_string())
341 .send()
342 .await
343 .map_err(Error::other)?;
344 HttpStoreFileReader::from_response(res)
345 }
346
347 async fn write(&self, _options: crate::WriteOptions) -> Result<Self::FileWriter> {
348 Err(Error::new(
349 ErrorKind::Unsupported,
350 "http store doesn't support write operations",
351 ))
352 }
353
354 async fn delete(&self) -> Result<()> {
355 Err(Error::new(
356 ErrorKind::Unsupported,
357 "http store doesn't support write operations",
358 ))
359 }
360}
361
362#[derive(Clone, Debug)]
364pub struct HttpStoreFileMetadata {
365 size: u64,
366 modified: u64,
367 content_type: Option<String>,
368}
369
370impl super::StoreMetadata for HttpStoreFileMetadata {
371 fn size(&self) -> u64 {
373 self.size
374 }
375
376 fn created(&self) -> u64 {
378 0
379 }
380
381 fn modified(&self) -> u64 {
383 self.modified
384 }
385
386 fn content_type(&self) -> Option<&str> {
387 self.content_type.as_deref()
388 }
389}
390
391pub struct HttpStoreFileReader {
393 stream: Pin<Box<dyn Stream<Item = reqwest::Result<Bytes>> + std::marker::Send>>,
394}
395
396impl std::fmt::Debug for HttpStoreFileReader {
397 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
398 f.debug_struct(stringify!(HttpStoreFileReader))
399 .finish_non_exhaustive()
400 }
401}
402
403impl HttpStoreFileReader {
404 pub(crate) fn from_response(res: reqwest::Response) -> Result<Self> {
408 crate::http::error_from_status(res.status())?;
409 let stream = res.bytes_stream().boxed();
411 Ok(Self { stream })
412 }
413}
414
415impl tokio::io::AsyncRead for HttpStoreFileReader {
416 fn poll_read(
420 self: std::pin::Pin<&mut Self>,
421 cx: &mut std::task::Context<'_>,
422 buf: &mut tokio::io::ReadBuf<'_>,
423 ) -> std::task::Poll<std::io::Result<()>> {
424 let stream = &mut self.get_mut().stream;
425
426 match Pin::new(stream).poll_next(cx) {
427 Poll::Ready(Some(Ok(chunk))) => {
428 let len = buf.remaining();
429 let to_read = chunk.len().min(len);
430 buf.put_slice(&chunk[..to_read]);
431 Poll::Ready(Ok(()))
432 }
433 Poll::Ready(Some(Err(err))) => Poll::Ready(Err(Error::new(ErrorKind::Other, err))),
435 Poll::Ready(None) => Poll::Ready(Ok(())),
437 Poll::Pending => Poll::Pending,
438 }
439 }
440}
441
442impl crate::StoreFileReader for HttpStoreFileReader {}
443
444pub type HttpStoreEntry = crate::Entry<HttpStoreFile, HttpStoreDirectory>;
446
447impl HttpStoreEntry {
448 fn new(store: Arc<InnerHttpStore>, parent: PathBuf, entry: String) -> Result<Self> {
453 let path = parent.join(&entry);
454 Ok(if entry.ends_with('/') {
455 Self::Directory(HttpStoreDirectory { store, path })
456 } else {
457 Self::File(HttpStoreFile { store, path })
458 })
459 }
460}
461
462#[cfg(test)]
463mod tests {
464 use std::io::ErrorKind;
465 use std::path::PathBuf;
466
467 use futures::StreamExt;
468 use reqwest::header::{CONTENT_LENGTH, LAST_MODIFIED};
469 use tokio::io::AsyncReadExt;
470
471 use crate::http::HttpStore;
472 use crate::{Store, StoreDirectory, StoreFile, StoreMetadata};
473
474 #[test_case::test_case("http://localhost", "/foo.txt", "http://localhost/foo.txt"; "root with simple path with prefix")]
475 #[test_case::test_case("http://localhost", "foo.txt", "http://localhost/foo.txt"; "root with simple path without prefix")]
476 #[test_case::test_case("http://localhost/", "foo.txt", "http://localhost/foo.txt"; "root with simple path with slash on base")]
477 #[test_case::test_case("http://localhost/", "/foo.txt", "http://localhost/foo.txt"; "root with simple path with slashes")]
478 #[test_case::test_case("http://localhost/foo", "/bar/baz.txt", "http://localhost/foo/bar/baz.txt"; "with more children")]
479 #[test_case::test_case("http://localhost/foo", "/bar/with space.txt", "http://localhost/foo/bar/with%20space.txt"; "with spaces")]
480 fn building_path(base_url: &str, path: &str, expected: &str) {
481 let store = HttpStore::new(base_url).unwrap();
482 let path = PathBuf::from(path);
483 let url = store.0.get_url(&path).unwrap();
484 assert_eq!(url.as_str(), expected);
485 }
486
487 #[tokio::test]
488 async fn file_should_handle_base_with_ending_slash() {
489 let mut srv = mockito::Server::new_async().await;
490 let mock = srv
491 .mock("HEAD", "/foo/not-found.txt")
492 .with_status(404)
493 .create_async()
494 .await;
495 let store = HttpStore::new(format!("{}/foo/", srv.url())).unwrap();
496 let file = store.get_file("/not-found.txt").await.unwrap();
497 assert!(!file.exists().await.unwrap());
498 mock.assert_async().await;
499 }
500
501 #[tokio::test]
502 async fn file_should_check_if_file_exists() {
503 let mut srv = mockito::Server::new_async().await;
504 let mock = srv
505 .mock("HEAD", "/not-found.txt")
506 .with_status(404)
507 .create_async()
508 .await;
509 let store = HttpStore::new(srv.url()).unwrap();
510 let file = store.get_file("/not-found.txt").await.unwrap();
511 assert!(!file.exists().await.unwrap());
512 mock.assert_async().await;
513 }
514
515 #[tokio::test]
516 async fn file_should_get_filename() {
517 let srv = mockito::Server::new_async().await;
518 let store = HttpStore::new(srv.url()).unwrap();
519 let file = store.get_file("/test/file.txt").await.unwrap();
520 let name = file.filename().unwrap();
521 assert_eq!(name, "file.txt");
522 }
523
524 #[tokio::test]
525 async fn file_should_get_filename_with_space() {
526 let srv = mockito::Server::new_async().await;
527 let store = HttpStore::new(srv.url()).unwrap();
528 let file = store.get_file("/test/with space.txt").await.unwrap();
529 let name = file.filename().unwrap();
530 assert_eq!(name, "with space.txt");
531 }
532
533 #[tokio::test]
534 async fn file_meta_should_give_all() {
535 let mut srv = mockito::Server::new_async().await;
536 let mock = srv
537 .mock("HEAD", "/test/file.txt")
538 .with_status(200)
539 .with_header(CONTENT_LENGTH, "1234")
540 .with_header(LAST_MODIFIED, "Thu, 01 May 2025 09:57:28 GMT")
541 .create_async()
542 .await;
543 let store = HttpStore::new(srv.url()).unwrap();
544 let file = store.get_file("/test/file.txt").await.unwrap();
545 let meta = file.metadata().await.unwrap();
546 assert_eq!(meta.size, 1234);
547 assert_eq!(meta.created(), 0);
548 assert_eq!(meta.modified(), 1746093448);
549 mock.assert_async().await;
550 }
551
552 #[tokio::test]
553 async fn file_reader_should_read_entire_file() {
554 let mut srv = mockito::Server::new_async().await;
555 let _m = srv
556 .mock("GET", "/test/file")
557 .with_status(200)
558 .with_header("Content-Type", "application/octet-stream")
559 .with_body("Hello, world!")
560 .create();
561 let store = HttpStore::new(srv.url()).unwrap();
562 let file = store.get_file("/test/file").await.unwrap();
563
564 let reader = file.read(0..5).await.unwrap();
565
566 let mut buf = vec![0; 5];
567 let mut async_reader = tokio::io::BufReader::new(reader);
568 let n = async_reader.read(&mut buf).await.unwrap();
569
570 assert_eq!(n, 5);
571 assert_eq!(&buf, b"Hello");
572 }
573
574 #[tokio::test]
575 async fn file_reader_should_read_single_range() {
576 let mut srv = mockito::Server::new_async().await;
577 let _m = srv
578 .mock("GET", "/test/file")
579 .with_status(206) .with_header("Content-Type", "application/octet-stream")
581 .with_header("Content-Range", "bytes 0-4/12")
582 .with_body("Hello, world!")
583 .create();
584
585 let store = HttpStore::new(srv.url()).unwrap();
586 let file = store.get_file("/test/file").await.unwrap();
587
588 let reader = file.read(0..5).await.unwrap();
589 let mut buf = vec![0; 5];
590
591 let mut async_reader = tokio::io::BufReader::new(reader);
592 let n = async_reader.read(&mut buf).await.unwrap();
593
594 assert_eq!(n, 5);
595
596 assert_eq!(&buf, b"Hello");
597 }
598
599 #[tokio::test]
600 async fn file_reader_should_fail_with_not_found() {
601 let mut srv = mockito::Server::new_async().await;
602 let _m = srv.mock("GET", "/test/file").with_status(404).create();
603
604 let store = HttpStore::new(srv.url()).unwrap();
605 let file = store.get_file("/test/file").await.unwrap();
606
607 let result = file.read(0..5).await;
608 match result {
609 Ok(_) => panic!("should fail"),
610 Err(err) => assert_eq!(err.kind(), ErrorKind::NotFound),
611 }
612 }
613
614 #[tokio::test]
615 async fn dir_should_list_entries() {
616 let mut srv = mockito::Server::new_async().await;
617 let _m = srv
618 .mock("GET", "/NEH")
619 .with_status(200)
620 .with_body(include_str!("../../assets/apache.html"))
621 .create();
622
623 let store = HttpStore::new(srv.url()).unwrap();
624 let dir = store.get_dir("/NEH").await.unwrap();
625 let mut content = dir.read().await.unwrap();
626
627 let mut result = Vec::new();
628 while let Some(entry) = content.next().await {
629 result.push(entry.unwrap());
630 }
631 assert_eq!(result.len(), 46);
632
633 assert_eq!(result.iter().filter(|item| item.is_directory()).count(), 41);
634 assert_eq!(result.iter().filter(|item| item.is_file()).count(), 5);
635 }
636}