Cache thumbnails to disk
This commit is contained in:
190
src/cachemap.rs
Normal file
190
src/cachemap.rs
Normal file
@@ -0,0 +1,190 @@
|
||||
use std::{
|
||||
collections::{HashMap, hash_map::Entry},
|
||||
fmt::Debug,
|
||||
future::ready,
|
||||
hash::Hash,
|
||||
marker::PhantomData,
|
||||
path::PathBuf,
|
||||
pin::Pin,
|
||||
str::FromStr,
|
||||
sync::Arc,
|
||||
};
|
||||
|
||||
use anyhow::{Context, anyhow};
|
||||
use either::Either;
|
||||
use futures::{FutureExt, future::WeakShared};
|
||||
use tokio::{fs, task::JoinHandle};
|
||||
|
||||
pub trait Fetcher<V>: Send + Sync + 'static {
|
||||
type Key: ToString + FromStr;
|
||||
|
||||
fn fetch(
|
||||
&self,
|
||||
key: &Self::Key,
|
||||
) -> Pin<Box<dyn Future<Output = anyhow::Result<V>> + Send + Sync>>;
|
||||
}
|
||||
|
||||
pub struct AsyncFnFetcher<K, V, F> {
|
||||
f: F,
|
||||
_phantom: PhantomData<(K, V)>,
|
||||
}
|
||||
|
||||
impl<K, V, F> AsyncFnFetcher<K, V, F>
|
||||
where
|
||||
K: ToString + FromStr + Send + 'static,
|
||||
F: Fn(&K) -> JoinHandle<anyhow::Result<V>> + Send + 'static,
|
||||
V: Send + 'static,
|
||||
{
|
||||
pub fn new(f: F) -> Self {
|
||||
Self {
|
||||
f,
|
||||
_phantom: PhantomData,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<K, V, F> Fetcher<V> for AsyncFnFetcher<K, V, F>
|
||||
where
|
||||
K: ToString + FromStr + Send + Sync + 'static,
|
||||
F: Fn(&K) -> JoinHandle<anyhow::Result<V>> + Send + Sync + 'static,
|
||||
V: Send + Sync + 'static,
|
||||
{
|
||||
type Key = K;
|
||||
|
||||
fn fetch(
|
||||
&self,
|
||||
key: &Self::Key,
|
||||
) -> Pin<Box<dyn Future<Output = anyhow::Result<V>> + Send + Sync>> {
|
||||
let handle = (self.f)(key);
|
||||
Box::pin(async move { handle.await.context("Fetch task panicked")? })
|
||||
}
|
||||
}
|
||||
|
||||
type FetchJob<V> = Pin<Box<dyn Future<Output = Option<V>> + Send + Sync>>;
|
||||
|
||||
pub struct CacheMap<K, V> {
|
||||
fetcher: Arc<dyn Fetcher<V, Key = K>>,
|
||||
cache_dir: PathBuf,
|
||||
serialize: fn(&K, &V) -> Vec<u8>,
|
||||
deserialize: fn(&K, &[u8]) -> anyhow::Result<V>,
|
||||
|
||||
/// Cache of ongoing [`FetchJob`]s.
|
||||
///
|
||||
/// If [`CacheMap::get`] is called while a fetch job is ongoing, [`WeakShared::upgrade`]
|
||||
/// will succeed, and the [`FetchJob`] can be cloned.
|
||||
fetch_jobs: HashMap<K, WeakShared<FetchJob<V>>>,
|
||||
}
|
||||
|
||||
impl<K, V> CacheMap<K, V>
|
||||
where
|
||||
K: Debug + FromStr + ToString + Eq + Hash + Clone + Send + Sync + 'static,
|
||||
V: Clone + Send + Sync + 'static,
|
||||
{
|
||||
pub fn new(
|
||||
fetcher: Arc<dyn Fetcher<V, Key = K> + 'static>,
|
||||
cache_name: &str,
|
||||
serialize: fn(&K, &V) -> Vec<u8>,
|
||||
deserialize: fn(&K, &[u8]) -> anyhow::Result<V>,
|
||||
) -> anyhow::Result<Self> {
|
||||
let crate_name = env!("CARGO_CRATE_NAME");
|
||||
let data_dir = xdg::BaseDirectories::new()
|
||||
.create_cache_directory(format!("{crate_name}/thumbnails"))
|
||||
.context(anyhow!(
|
||||
"Failed to create XDG data folder for {cache_name:?}"
|
||||
))?;
|
||||
|
||||
Ok(Self {
|
||||
fetcher,
|
||||
cache_dir: data_dir,
|
||||
serialize,
|
||||
deserialize,
|
||||
fetch_jobs: HashMap::new(),
|
||||
})
|
||||
}
|
||||
|
||||
fn fetch_from_cache(
|
||||
&self,
|
||||
key: &K,
|
||||
) -> impl Future<Output = anyhow::Result<V>> + Send + Sync + use<K, V> {
|
||||
let key = key.clone();
|
||||
let key_str = key.to_string();
|
||||
let path = self.cache_dir.join(key_str);
|
||||
let deserialize = self.deserialize;
|
||||
Box::pin(async move {
|
||||
let bytes = fs::read(&path)
|
||||
.await
|
||||
.context(anyhow!("Failed to read {path:?}"))?;
|
||||
deserialize(&key, &bytes).context(anyhow!("Failed to deserialize value at {path:?}"))
|
||||
})
|
||||
}
|
||||
|
||||
pub fn get(&mut self, key: K) -> impl Future<Output = Option<V>> + use<K, V> {
|
||||
// FIXME: creating this future here because lifetimes.
|
||||
let fetch_from_cache = self.fetch_from_cache(&key);
|
||||
|
||||
let entry = match self.fetch_jobs.entry(key.clone()) {
|
||||
Entry::Vacant(entry) => entry,
|
||||
Entry::Occupied(entry) => {
|
||||
if let Some(fetching) = entry.get().upgrade() {
|
||||
return match fetching.clone().now_or_never() {
|
||||
// Value fetched
|
||||
Some(Some(value)) => {
|
||||
entry.remove();
|
||||
Either::Left(ready(Some(value)))
|
||||
}
|
||||
|
||||
// Failed to fetch
|
||||
Some(None) => {
|
||||
entry.remove();
|
||||
Either::Left(ready(None))
|
||||
}
|
||||
|
||||
// Still pending
|
||||
None => Either::Right(fetching.clone()),
|
||||
};
|
||||
}
|
||||
|
||||
entry.remove();
|
||||
let Entry::Vacant(entry) = self.fetch_jobs.entry(key.clone()) else {
|
||||
unreachable!()
|
||||
};
|
||||
entry
|
||||
}
|
||||
};
|
||||
|
||||
let fetcher = self.fetcher.clone();
|
||||
|
||||
let serialize = self.serialize;
|
||||
let file_path = self.cache_dir.join(key.to_string());
|
||||
let fetching = Box::pin(async move {
|
||||
if let Ok(value) = fetch_from_cache.await.inspect_err(|e| {
|
||||
tracing::debug!("Failed to fetch {key:?} from cache: {e}");
|
||||
}) {
|
||||
return Some(value);
|
||||
}
|
||||
|
||||
let value = match fetcher.fetch(&key).await {
|
||||
Ok(value) => value,
|
||||
Err(e) => {
|
||||
tracing::warn!("Couldn't fetch {key:?}: {e}");
|
||||
return None;
|
||||
}
|
||||
};
|
||||
|
||||
let data = serialize(&key, &value);
|
||||
if let Err(e) = fs::write(file_path, data).await {
|
||||
tracing::error!("Failed to cahce value for {key:?}: {e}");
|
||||
}
|
||||
|
||||
Some(value)
|
||||
});
|
||||
let fetching = fetching as Pin<Box<dyn Future<Output = Option<V>> + Send + Sync>>;
|
||||
let fetching = fetching.shared();
|
||||
|
||||
if let Some(fetching) = fetching.downgrade() {
|
||||
entry.insert(fetching);
|
||||
}
|
||||
|
||||
Either::Right(fetching)
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user