any_storage/
local.rs

1use std::io::{Error, ErrorKind, Result, SeekFrom};
2use std::ops::{Bound, RangeBounds};
3use std::os::unix::fs::MetadataExt;
4use std::path::{Path, PathBuf};
5use std::pin::Pin;
6use std::sync::Arc;
7use std::task::Poll;
8use std::time::SystemTime;
9
10use futures::Stream;
11use tokio::io::AsyncSeekExt;
12
13use crate::{Entry, Store, StoreDirectory, StoreFile, StoreFileReader, WriteMode};
14
15/// Configuration for [`LocalStore`].
16#[derive(Clone, Debug)]
17#[cfg_attr(feature = "serde", derive(serde::Deserialize))]
18pub struct LocalStoreConfig {
19    /// Root directory for that store
20    pub path: PathBuf,
21}
22
23impl LocalStoreConfig {
24    pub fn build(&self) -> Result<LocalStore> {
25        Ok(LocalStore::new(self.path.clone()))
26    }
27}
28
29/// Internal representation of the local store with a root path.
30#[derive(Debug)]
31struct InnerLocalStore {
32    root: PathBuf,
33}
34
35impl InnerLocalStore {
36    fn real_path(&self, path: &Path) -> Result<PathBuf> {
37        crate::util::merge_path(&self.root, path, false)
38    }
39}
40
41/// Wrapper for the local store, enabling shared ownership.
42#[derive(Debug, Clone)]
43pub struct LocalStore(Arc<InnerLocalStore>);
44
45impl LocalStore {
46    /// Constructor of the localstore
47    pub fn new<P: Into<PathBuf>>(path: P) -> Self {
48        Self::from(path.into())
49    }
50}
51
52impl From<PathBuf> for LocalStore {
53    /// Converts a `PathBuf` into a `LocalStore`.
54    ///
55    /// Takes the root path of the local store and wraps it in an `Arc`.
56    fn from(value: PathBuf) -> Self {
57        Self(Arc::new(InnerLocalStore { root: value }))
58    }
59}
60
61impl Store for LocalStore {
62    type Directory = LocalStoreDirectory;
63    type File = LocalStoreFile;
64
65    /// Retrieves a directory at the specified path in the local store.
66    ///
67    /// Merges the root path with the given path to obtain the full directory
68    /// path.
69    async fn get_dir<P: Into<PathBuf>>(&self, path: P) -> Result<Self::Directory> {
70        let path = path.into();
71        crate::util::clean_path(&path).map(|path| LocalStoreDirectory {
72            store: self.0.clone(),
73            path,
74        })
75    }
76
77    /// Retrieves a file at the specified path in the local store.
78    ///
79    /// Merges the root path with the given path to obtain the full file path.
80    async fn get_file<P: Into<PathBuf>>(&self, path: P) -> Result<Self::File> {
81        let path = path.into();
82        crate::util::clean_path(&path).map(|path| LocalStoreFile {
83            store: self.0.clone(),
84            path,
85        })
86    }
87}
88
89/// Type alias for entries in the local store, which can be files or
90/// directories.
91pub type LocalStoreEntry = Entry<LocalStoreFile, LocalStoreDirectory>;
92
93impl LocalStoreEntry {
94    /// Creates a new `LocalStoreEntry` from a `tokio::fs::DirEntry`.
95    ///
96    /// The entry is classified as either a file or directory based on its path.
97    fn new(store: Arc<InnerLocalStore>, entry: tokio::fs::DirEntry) -> Result<Self> {
98        let path = entry.path();
99        let path = crate::util::remove_path_prefix(&store.root, &path)?;
100        if path.is_dir() {
101            Ok(Self::Directory(LocalStoreDirectory { store, path }))
102        } else if path.is_file() {
103            Ok(Self::File(LocalStoreFile { store, path }))
104        } else {
105            Err(Error::new(
106                ErrorKind::Unsupported,
107                "expected a file or a directory",
108            ))
109        }
110    }
111}
112
113/// Representation of a directory in the local store.
114#[derive(Debug)]
115pub struct LocalStoreDirectory {
116    store: Arc<InnerLocalStore>,
117    path: PathBuf,
118}
119
120impl StoreDirectory for LocalStoreDirectory {
121    type Entry = LocalStoreEntry;
122    type Reader = LocalStoreDirectoryReader;
123
124    fn path(&self) -> &std::path::Path {
125        &self.path
126    }
127
128    /// Checks if the directory exists.
129    ///
130    /// Returns a future that resolves to `true` if the directory exists,
131    /// otherwise `false`.
132    async fn exists(&self) -> Result<bool> {
133        let path = self.store.real_path(&self.path)?;
134        tokio::fs::try_exists(path).await
135    }
136
137    /// Reads the contents of the directory.
138    ///
139    /// Returns a future that resolves to a reader for iterating over the
140    /// directory's entries.
141    async fn read(&self) -> Result<Self::Reader> {
142        let path = self.store.real_path(&self.path)?;
143        tokio::fs::read_dir(path)
144            .await
145            .map(|value| LocalStoreDirectoryReader {
146                store: self.store.clone(),
147                inner: Box::pin(value),
148            })
149    }
150
151    fn delete(&self) -> impl Future<Output = Result<()>> {
152        tokio::fs::remove_dir(&self.path)
153    }
154
155    fn delete_recursive(&self) -> impl Future<Output = Result<()>> {
156        tokio::fs::remove_dir_all(&self.path)
157    }
158}
159
160/// Reader for streaming entries from a local store directory.
161#[derive(Debug)]
162pub struct LocalStoreDirectoryReader {
163    store: Arc<InnerLocalStore>,
164    inner: Pin<Box<tokio::fs::ReadDir>>,
165}
166
167impl Stream for LocalStoreDirectoryReader {
168    type Item = Result<LocalStoreEntry>;
169
170    /// Polls for the next directory entry.
171    ///
172    /// This function is used to asynchronously retrieve the next entry in the
173    /// directory.
174    fn poll_next(
175        self: Pin<&mut Self>,
176        cx: &mut std::task::Context<'_>,
177    ) -> Poll<Option<Self::Item>> {
178        let store = self.store.clone();
179        let mut inner = self.get_mut().inner.as_mut();
180
181        match inner.poll_next_entry(cx) {
182            Poll::Ready(Ok(Some(entry))) => Poll::Ready(Some(LocalStoreEntry::new(store, entry))),
183            Poll::Ready(Ok(None)) => Poll::Ready(None),
184            Poll::Ready(Err(e)) => Poll::Ready(Some(Err(e))),
185            Poll::Pending => Poll::Pending,
186        }
187    }
188}
189
190impl crate::StoreDirectoryReader<LocalStoreEntry> for LocalStoreDirectoryReader {}
191
192/// Representation of a file in the local store.
193#[derive(Debug)]
194pub struct LocalStoreFile {
195    store: Arc<InnerLocalStore>,
196    path: PathBuf,
197}
198
199impl StoreFile for LocalStoreFile {
200    type FileReader = LocalStoreFileReader;
201    type FileWriter = LocalStoreFileWriter;
202    type Metadata = LocalStoreFileMetadata;
203
204    fn path(&self) -> &std::path::Path {
205        &self.path
206    }
207
208    /// Checks if the file exists.
209    ///
210    /// Returns a future that resolves to `true` if the file exists, otherwise
211    /// `false`.
212    async fn exists(&self) -> Result<bool> {
213        let path = self.store.real_path(&self.path)?;
214        tokio::fs::try_exists(&path).await
215    }
216
217    /// Retrieves the metadata of the file.
218    ///
219    /// Returns a future that resolves to the file's metadata, such as size and
220    /// timestamps.
221    async fn metadata(&self) -> Result<Self::Metadata> {
222        let path = self.store.real_path(&self.path)?;
223        let meta = tokio::fs::metadata(&path).await?;
224        let size = meta.size();
225        let created = meta
226            .created()
227            .ok()
228            .and_then(|v| v.duration_since(SystemTime::UNIX_EPOCH).ok())
229            .map(|d| d.as_secs())
230            .unwrap_or(0);
231        let modified = meta
232            .modified()
233            .ok()
234            .and_then(|v| v.duration_since(SystemTime::UNIX_EPOCH).ok())
235            .map(|d| d.as_secs())
236            .unwrap_or(0);
237        let content_type = mime_guess::from_path(&self.path).first_raw();
238        Ok(LocalStoreFileMetadata {
239            size,
240            created,
241            modified,
242            content_type,
243        })
244    }
245
246    /// Reads a portion of the file's content, specified by a byte range.
247    ///
248    /// Returns a future that resolves to a reader that can read the specified
249    /// range of the file.
250    async fn read<R: RangeBounds<u64>>(&self, range: R) -> Result<Self::FileReader> {
251        use tokio::io::AsyncSeekExt;
252
253        let start = match range.start_bound() {
254            Bound::Included(&n) => n,
255            Bound::Excluded(&n) => n + 1,
256            Bound::Unbounded => 0,
257        };
258
259        let end = match range.end_bound() {
260            Bound::Included(&n) => Some(n + 1),
261            Bound::Excluded(&n) => Some(n),
262            Bound::Unbounded => None, // no limit
263        };
264
265        let path = self.store.real_path(&self.path)?;
266        let mut file = tokio::fs::OpenOptions::new().read(true).open(&path).await?;
267        file.seek(std::io::SeekFrom::Start(start)).await?;
268        Ok(LocalStoreFileReader {
269            file,
270            start,
271            end,
272            position: start,
273        })
274    }
275
276    async fn write(&self, options: crate::WriteOptions) -> Result<Self::FileWriter> {
277        let path = self.store.real_path(&self.path)?;
278        let mut file = tokio::fs::OpenOptions::new()
279            .append(matches!(options.mode, WriteMode::Append))
280            .truncate(matches!(options.mode, WriteMode::Truncate { .. }))
281            .write(true)
282            .create(true)
283            .open(&path)
284            .await?;
285        match options.mode {
286            WriteMode::Truncate { offset } if offset > 0 => {
287                file.seek(SeekFrom::Start(offset)).await?;
288            }
289            _ => {}
290        };
291        Ok(LocalStoreFileWriter(file))
292    }
293
294    async fn delete(&self) -> Result<()> {
295        let path = self.store.real_path(&self.path)?;
296        tokio::fs::remove_file(&path).await
297    }
298}
299
300/// Metadata associated with a file in the local store (size, created, modified
301/// timestamps).
302#[derive(Clone, Debug)]
303pub struct LocalStoreFileMetadata {
304    size: u64,
305    created: u64,
306    modified: u64,
307    content_type: Option<&'static str>,
308}
309
310impl super::StoreMetadata for LocalStoreFileMetadata {
311    /// Returns the size of the file in bytes.
312    fn size(&self) -> u64 {
313        self.size
314    }
315
316    /// Returns the creation timestamp of the file (epoch time).
317    fn created(&self) -> u64 {
318        self.created
319    }
320
321    /// Returns the last modification timestamp of the file (epoch time).
322    fn modified(&self) -> u64 {
323        self.modified
324    }
325
326    fn content_type(&self) -> Option<&str> {
327        self.content_type
328    }
329}
330
331/// Reader for asynchronously reading the contents of a file in the local store.
332#[derive(Debug)]
333pub struct LocalStoreFileReader {
334    file: tokio::fs::File,
335    #[allow(unused)]
336    start: u64,
337    end: Option<u64>,
338    position: u64,
339}
340
341impl tokio::io::AsyncRead for LocalStoreFileReader {
342    /// Polls for reading data from the file.
343    ///
344    /// This function reads data into the provided buffer, handling partial
345    /// reads within the given range.
346    fn poll_read(
347        mut self: std::pin::Pin<&mut Self>,
348        cx: &mut std::task::Context<'_>,
349        buf: &mut tokio::io::ReadBuf<'_>,
350    ) -> Poll<std::io::Result<()>> {
351        let remaining = match self.end {
352            Some(end) => end.saturating_sub(self.position) as usize,
353            None => buf.remaining(),
354        };
355
356        if remaining == 0 {
357            return std::task::Poll::Ready(Ok(()));
358        }
359
360        // Limit the read buffer to the remaining range
361        let read_len = std::cmp::min(remaining, buf.remaining()) as usize;
362        let mut temp_buf = vec![0u8; read_len];
363        let mut temp_read_buf = tokio::io::ReadBuf::new(&mut temp_buf);
364
365        let this = self.as_mut().get_mut();
366        let pinned_file = Pin::new(&mut this.file);
367
368        match pinned_file.poll_read(cx, &mut temp_read_buf) {
369            Poll::Ready(Ok(())) => {
370                let bytes_read = temp_read_buf.filled().len();
371                buf.put_slice(temp_read_buf.filled());
372                this.position += bytes_read as u64;
373                Poll::Ready(Ok(()))
374            }
375            other => other,
376        }
377    }
378}
379
380impl StoreFileReader for LocalStoreFileReader {}
381
382#[derive(Debug)]
383pub struct LocalStoreFileWriter(tokio::fs::File);
384
385impl tokio::io::AsyncWrite for LocalStoreFileWriter {
386    fn poll_write(
387        mut self: Pin<&mut Self>,
388        cx: &mut std::task::Context<'_>,
389        buf: &[u8],
390    ) -> Poll<Result<usize>> {
391        // Pinning the inner file (unwrap since it is wrapped in Pin)
392        let file = &mut self.as_mut().0;
393
394        // Use tokio::io::AsyncWriteExt::write to write to the file
395        Pin::new(file).poll_write(cx, buf)
396    }
397
398    fn poll_flush(mut self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Result<()>> {
399        let file = &mut self.as_mut().0;
400        Pin::new(file).poll_flush(cx)
401    }
402
403    fn poll_shutdown(
404        mut self: Pin<&mut Self>,
405        cx: &mut std::task::Context<'_>,
406    ) -> Poll<Result<()>> {
407        let file = &mut self.as_mut().0;
408        Pin::new(file).poll_shutdown(cx)
409    }
410}
411
412impl crate::StoreFileWriter for LocalStoreFileWriter {}
413
414#[cfg(test)]
415mod tests {
416    use std::path::PathBuf;
417
418    use tokio::io::AsyncReadExt;
419
420    use super::*;
421    use crate::Store;
422
423    #[tokio::test]
424    async fn should_not_go_in_parent_folder() {
425        let current = PathBuf::from(env!("PWD"));
426        let store = LocalStore::from(current);
427
428        let _ = store.get_file("anywhere/../hello.txt").await.unwrap();
429
430        let err = store.get_file("../hello.txt").await.unwrap_err();
431        assert_eq!(err.to_string(), "No such file or directory");
432    }
433
434    #[tokio::test]
435    async fn should_find_existing_files() {
436        let current = PathBuf::from(env!("PWD"));
437        let store = LocalStore::from(current);
438
439        let lib = store.get_file("/src/lib.rs").await.unwrap();
440        println!("{:?}", lib.path());
441        assert!(lib.exists().await.unwrap());
442
443        let lib = store.get_file("src/lib.rs").await.unwrap();
444        assert!(lib.exists().await.unwrap());
445
446        let lib = store.get_file("nothing/../src/lib.rs").await.unwrap();
447        assert!(lib.exists().await.unwrap());
448
449        let missing = store.get_file("nothing.rs").await.unwrap();
450        assert!(!missing.exists().await.unwrap());
451    }
452
453    #[tokio::test]
454    async fn should_read_lib_file() {
455        let current = PathBuf::from(env!("PWD"));
456        let store = LocalStore::from(current);
457
458        let lib = store.get_file("/src/lib.rs").await.unwrap();
459        let mut reader = lib.read(0..10).await.unwrap();
460        let mut buffer = vec![];
461        reader.read_to_end(&mut buffer).await.unwrap();
462
463        let content = include_bytes!("./lib.rs");
464        assert_eq!(buffer, content[0..10]);
465    }
466
467    #[tokio::test]
468    async fn should_read_lib_metadata() {
469        let current = PathBuf::from(env!("PWD"));
470        let store = LocalStore::from(current);
471
472        let lib = store.get_file("/src/lib.rs").await.unwrap();
473        let meta = lib.metadata().await.unwrap();
474
475        assert!(meta.size > 0);
476        assert!(meta.created > 0);
477        assert!(meta.modified > 0);
478        assert_eq!(meta.content_type.unwrap(), "text/x-rust");
479    }
480}