Files
immich-app/src/cachemap.rs

191 lines
5.7 KiB
Rust

use std::{
collections::{HashMap, hash_map::Entry},
fmt::Debug,
future::ready,
hash::Hash,
marker::PhantomData,
path::PathBuf,
pin::Pin,
str::FromStr,
sync::Arc,
};
use anyhow::{Context, anyhow};
use either::Either;
use futures::{FutureExt, future::WeakShared};
use tokio::{fs, task::JoinHandle};
pub trait Fetcher<V>: Send + Sync + 'static {
type Key: ToString + FromStr;
fn fetch(
&self,
key: &Self::Key,
) -> Pin<Box<dyn Future<Output = anyhow::Result<V>> + Send + Sync>>;
}
pub struct AsyncFnFetcher<K, V, F> {
f: F,
_phantom: PhantomData<(K, V)>,
}
impl<K, V, F> AsyncFnFetcher<K, V, F>
where
K: ToString + FromStr + Send + 'static,
F: Fn(&K) -> JoinHandle<anyhow::Result<V>> + Send + 'static,
V: Send + 'static,
{
pub fn new(f: F) -> Self {
Self {
f,
_phantom: PhantomData,
}
}
}
impl<K, V, F> Fetcher<V> for AsyncFnFetcher<K, V, F>
where
K: ToString + FromStr + Send + Sync + 'static,
F: Fn(&K) -> JoinHandle<anyhow::Result<V>> + Send + Sync + 'static,
V: Send + Sync + 'static,
{
type Key = K;
fn fetch(
&self,
key: &Self::Key,
) -> Pin<Box<dyn Future<Output = anyhow::Result<V>> + Send + Sync>> {
let handle = (self.f)(key);
Box::pin(async move { handle.await.context("Fetch task panicked")? })
}
}
type FetchJob<V> = Pin<Box<dyn Future<Output = Option<V>> + Send + Sync>>;
pub struct CacheMap<K, V> {
fetcher: Arc<dyn Fetcher<V, Key = K>>,
cache_dir: PathBuf,
serialize: fn(&K, &V) -> Vec<u8>,
deserialize: fn(&K, &[u8]) -> anyhow::Result<V>,
/// Cache of ongoing [`FetchJob`]s.
///
/// If [`CacheMap::get`] is called while a fetch job is ongoing, [`WeakShared::upgrade`]
/// will succeed, and the [`FetchJob`] can be cloned.
fetch_jobs: HashMap<K, WeakShared<FetchJob<V>>>,
}
impl<K, V> CacheMap<K, V>
where
K: Debug + FromStr + ToString + Eq + Hash + Clone + Send + Sync + 'static,
V: Clone + Send + Sync + 'static,
{
pub fn new(
fetcher: Arc<dyn Fetcher<V, Key = K> + 'static>,
cache_name: &str,
serialize: fn(&K, &V) -> Vec<u8>,
deserialize: fn(&K, &[u8]) -> anyhow::Result<V>,
) -> anyhow::Result<Self> {
let crate_name = env!("CARGO_CRATE_NAME");
let data_dir = xdg::BaseDirectories::new()
.create_cache_directory(format!("{crate_name}/thumbnails"))
.context(anyhow!(
"Failed to create XDG data folder for {cache_name:?}"
))?;
Ok(Self {
fetcher,
cache_dir: data_dir,
serialize,
deserialize,
fetch_jobs: HashMap::new(),
})
}
fn fetch_from_cache(
&self,
key: &K,
) -> impl Future<Output = anyhow::Result<V>> + Send + Sync + use<K, V> {
let key = key.clone();
let key_str = key.to_string();
let path = self.cache_dir.join(key_str);
let deserialize = self.deserialize;
Box::pin(async move {
let bytes = fs::read(&path)
.await
.context(anyhow!("Failed to read {path:?}"))?;
deserialize(&key, &bytes).context(anyhow!("Failed to deserialize value at {path:?}"))
})
}
pub fn get(&mut self, key: K) -> impl Future<Output = Option<V>> + use<K, V> {
// FIXME: creating this future here because lifetimes.
let fetch_from_cache = self.fetch_from_cache(&key);
let entry = match self.fetch_jobs.entry(key.clone()) {
Entry::Vacant(entry) => entry,
Entry::Occupied(entry) => {
if let Some(fetching) = entry.get().upgrade() {
return match fetching.clone().now_or_never() {
// Value fetched
Some(Some(value)) => {
entry.remove();
Either::Left(ready(Some(value)))
}
// Failed to fetch
Some(None) => {
entry.remove();
Either::Left(ready(None))
}
// Still pending
None => Either::Right(fetching.clone()),
};
}
entry.remove();
let Entry::Vacant(entry) = self.fetch_jobs.entry(key.clone()) else {
unreachable!()
};
entry
}
};
let fetcher = self.fetcher.clone();
let serialize = self.serialize;
let file_path = self.cache_dir.join(key.to_string());
let fetching = Box::pin(async move {
if let Ok(value) = fetch_from_cache.await.inspect_err(|e| {
tracing::debug!("Failed to fetch {key:?} from cache: {e}");
}) {
return Some(value);
}
let value = match fetcher.fetch(&key).await {
Ok(value) => value,
Err(e) => {
tracing::warn!("Couldn't fetch {key:?}: {e}");
return None;
}
};
let data = serialize(&key, &value);
if let Err(e) = fs::write(file_path, data).await {
tracing::error!("Failed to cahce value for {key:?}: {e}");
}
Some(value)
});
let fetching = fetching as Pin<Box<dyn Future<Output = Option<V>> + Send + Sync>>;
let fetching = fetching.shared();
if let Some(fetching) = fetching.downgrade() {
entry.insert(fetching);
}
Either::Right(fetching)
}
}