From ea2b8ace4eee5e941d018abd5e6c2fa3fc6c9179 Mon Sep 17 00:00:00 2001 From: Joakim Hulthe Date: Sun, 17 May 2026 11:51:06 +0200 Subject: [PATCH] Cache thumbnails to disk --- Cargo.lock | 11 +++ Cargo.toml | 4 + src/api.rs | 122 +++++++++++++++++++++---------- src/cachemap.rs | 190 ++++++++++++++++++++++++++++++++++++++++++++++++ src/main.rs | 1 + 5 files changed, 291 insertions(+), 37 deletions(-) create mode 100644 src/cachemap.rs diff --git a/Cargo.lock b/Cargo.lock index a85cd9d..04ad762 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2425,6 +2425,7 @@ dependencies = [ "ravif", "rayon", "rgb", + "serde", "tiff", "zune-core", "zune-jpeg", @@ -2460,6 +2461,9 @@ dependencies = [ "base64", "blurhash", "clap", + "either", + "futures", + "image", "immich-sdk", "kameo", "serde", @@ -2470,6 +2474,7 @@ dependencies = [ "tokio", "tracing", "tracing-subscriber", + "xdg", ] [[package]] @@ -6798,6 +6803,12 @@ version = "0.3.10" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bec9e4a500ca8864c5b47b8b482a73d62e4237670e5b5f1d6b9e3cae50f28f2b" +[[package]] +name = "xdg" +version = "3.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2fb433233f2df9344722454bc7e96465c9d03bff9d77c248f9e7523fe79585b5" + [[package]] name = "xkbcommon" version = "0.9.0" diff --git a/Cargo.toml b/Cargo.toml index 5c662d4..415af0d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,6 +8,9 @@ anyhow = "1.0.102" base64 = "0.22.1" blurhash = "0.2.3" clap = { version = "4.6.0", features = ["derive", "env"] } +either = "1.15.0" +futures = "0.3.32" +image = { version = "0.25.10", default-features = false, features = ["serde", "webp"] } immich-sdk.path = "../immich-sdk/" kameo = "0.19.2" serde = { version = "1.0.228", features = ["derive"] } @@ -16,6 +19,7 @@ thumbhash = "0.1.0" tokio = { version = "1.51.0", features = ["full"] } tracing = "0.1.44" tracing-subscriber = { version = "0.3.23", features = ["env-filter"] } +xdg = "3.0.0" [dependencies.slint] version = "1.16.1" diff --git a/src/api.rs b/src/api.rs index 51bd38a..6c92ffb 100644 --- a/src/api.rs +++ b/src/api.rs @@ -1,29 +1,108 @@ use std::{ collections::HashMap, + io::Cursor, iter::repeat, sync::{Arc, Mutex}, }; use anyhow::{Context as _, anyhow}; +use image::{ + DynamicImage, EncodableLayout, + codecs::webp::{WebPDecoder, WebPEncoder}, +}; use immich_sdk::{AssetId, AssetVisibility}; use slint::{Rgba8Pixel, SharedPixelBuffer}; -use crate::thumbhash::thumbhashes_to_pixels; +use crate::{ + cachemap::{AsyncFnFetcher, CacheMap, Fetcher}, + thumbhash::thumbhashes_to_pixels, +}; pub type TimeBucketKey = String; pub struct Api { client: immich_sdk::Client, buckets: Mutex>>, - thumbnails: Mutex>>, + thumbnails: Mutex>>, +} + +fn serialize_thumbnail(_id: &AssetId, thumbnail: &Arc) -> Vec { + let image: &SharedPixelBuffer = &thumbnail.thumbnail; + + let mut webp = vec![]; + WebPEncoder::new_lossless(&mut webp) + .encode( + image.as_bytes(), + image.width(), + image.height(), + image::ExtendedColorType::Rgba8, + ) + .expect("width and height matches image.as_bytes().len()"); + + webp +} +fn deserialize_thumbnail(&id: &AssetId, bytes: &[u8]) -> anyhow::Result> { + let image = WebPDecoder::new(Cursor::new(bytes)) + .and_then(DynamicImage::from_decoder) + .context("Failed to decode image")? + .to_rgba8(); + + let image = SharedPixelBuffer::::clone_from_slice( + image.as_bytes(), + image.width(), + image.height(), + ); + + Ok(Arc::new(AssetThumbnail { + id, + thumbnail: image, + })) } impl Api { pub fn new(client: immich_sdk::Client) -> Arc { + let client_ = client.clone(); + let thumbnail_fetcher = AsyncFnFetcher::new(move |asset_id: &AssetId| { + let client = client_.clone(); + let asset_id = asset_id.clone(); + tokio::spawn(async move { + let response = client + .assets() + .thumbnail(asset_id) + .size(immich_sdk::AssetMediaSize::Thumbnail) + .execute() + .await + .context(anyhow!("Failed to get asset thumbnail for {asset_id}"))?; + + let thumbnail = response + .decode() + .context(anyhow!("Failed to decode asset thumbnail for {asset_id}"))? + .into_rgba8(); + + let pixel_buffer = SharedPixelBuffer::::clone_from_slice( + &thumbnail, + thumbnail.width(), + thumbnail.height(), + ); + + Ok(Arc::new(AssetThumbnail { + id: asset_id, + thumbnail: pixel_buffer, + })) + }) + }); + let cache_map = CacheMap::new( + Arc::new(thumbnail_fetcher) as Arc, Key = AssetId>>, + "thumbnails", + serialize_thumbnail, + deserialize_thumbnail, + ) + .unwrap(); + Arc::new(Self { client, buckets: Default::default(), - thumbnails: Default::default(), + thumbnails: Mutex::new(cache_map), }) } } @@ -136,40 +215,9 @@ impl Api { &self, asset_id: AssetId, ) -> anyhow::Result> { - if let Some(thumbnail) = self.thumbnails.lock().unwrap().get(&asset_id).cloned() { - return Ok(thumbnail); - } - - let response = self - .client - .assets() - .thumbnail(asset_id) - .size(immich_sdk::AssetMediaSize::Thumbnail) - .execute() + let fetch = self.thumbnails.lock().unwrap().get(asset_id); + fetch .await - .context(anyhow!("Failed to get asset thumbnail for {asset_id}"))?; - - let thumbnail = response - .decode() - .context(anyhow!("Failed to decode asset thumbnail for {asset_id}"))? - .into_rgba8(); - - let pixel_buffer = SharedPixelBuffer::::clone_from_slice( - &thumbnail, - thumbnail.width(), - thumbnail.height(), - ); - - let thumbnail = Arc::new(AssetThumbnail { - id: asset_id, - thumbnail: pixel_buffer, - }); - - self.thumbnails - .lock() - .unwrap() - .insert(asset_id, Arc::clone(&thumbnail)); - - Ok(thumbnail) + .context(anyhow!("Failed to fetch thumbnail for {asset_id:?}")) } } diff --git a/src/cachemap.rs b/src/cachemap.rs new file mode 100644 index 0000000..dffa99f --- /dev/null +++ b/src/cachemap.rs @@ -0,0 +1,190 @@ +use std::{ + collections::{HashMap, hash_map::Entry}, + fmt::Debug, + future::ready, + hash::Hash, + marker::PhantomData, + path::PathBuf, + pin::Pin, + str::FromStr, + sync::Arc, +}; + +use anyhow::{Context, anyhow}; +use either::Either; +use futures::{FutureExt, future::WeakShared}; +use tokio::{fs, task::JoinHandle}; + +pub trait Fetcher: Send + Sync + 'static { + type Key: ToString + FromStr; + + fn fetch( + &self, + key: &Self::Key, + ) -> Pin> + Send + Sync>>; +} + +pub struct AsyncFnFetcher { + f: F, + _phantom: PhantomData<(K, V)>, +} + +impl AsyncFnFetcher +where + K: ToString + FromStr + Send + 'static, + F: Fn(&K) -> JoinHandle> + Send + 'static, + V: Send + 'static, +{ + pub fn new(f: F) -> Self { + Self { + f, + _phantom: PhantomData, + } + } +} + +impl Fetcher for AsyncFnFetcher +where + K: ToString + FromStr + Send + Sync + 'static, + F: Fn(&K) -> JoinHandle> + Send + Sync + 'static, + V: Send + Sync + 'static, +{ + type Key = K; + + fn fetch( + &self, + key: &Self::Key, + ) -> Pin> + Send + Sync>> { + let handle = (self.f)(key); + Box::pin(async move { handle.await.context("Fetch task panicked")? }) + } +} + +type FetchJob = Pin> + Send + Sync>>; + +pub struct CacheMap { + fetcher: Arc>, + cache_dir: PathBuf, + serialize: fn(&K, &V) -> Vec, + deserialize: fn(&K, &[u8]) -> anyhow::Result, + + /// Cache of ongoing [`FetchJob`]s. + /// + /// If [`CacheMap::get`] is called while a fetch job is ongoing, [`WeakShared::upgrade`] + /// will succeed, and the [`FetchJob`] can be cloned. + fetch_jobs: HashMap>>, +} + +impl CacheMap +where + K: Debug + FromStr + ToString + Eq + Hash + Clone + Send + Sync + 'static, + V: Clone + Send + Sync + 'static, +{ + pub fn new( + fetcher: Arc + 'static>, + cache_name: &str, + serialize: fn(&K, &V) -> Vec, + deserialize: fn(&K, &[u8]) -> anyhow::Result, + ) -> anyhow::Result { + let crate_name = env!("CARGO_CRATE_NAME"); + let data_dir = xdg::BaseDirectories::new() + .create_cache_directory(format!("{crate_name}/thumbnails")) + .context(anyhow!( + "Failed to create XDG data folder for {cache_name:?}" + ))?; + + Ok(Self { + fetcher, + cache_dir: data_dir, + serialize, + deserialize, + fetch_jobs: HashMap::new(), + }) + } + + fn fetch_from_cache( + &self, + key: &K, + ) -> impl Future> + Send + Sync + use { + let key = key.clone(); + let key_str = key.to_string(); + let path = self.cache_dir.join(key_str); + let deserialize = self.deserialize; + Box::pin(async move { + let bytes = fs::read(&path) + .await + .context(anyhow!("Failed to read {path:?}"))?; + deserialize(&key, &bytes).context(anyhow!("Failed to deserialize value at {path:?}")) + }) + } + + pub fn get(&mut self, key: K) -> impl Future> + use { + // FIXME: creating this future here because lifetimes. + let fetch_from_cache = self.fetch_from_cache(&key); + + let entry = match self.fetch_jobs.entry(key.clone()) { + Entry::Vacant(entry) => entry, + Entry::Occupied(entry) => { + if let Some(fetching) = entry.get().upgrade() { + return match fetching.clone().now_or_never() { + // Value fetched + Some(Some(value)) => { + entry.remove(); + Either::Left(ready(Some(value))) + } + + // Failed to fetch + Some(None) => { + entry.remove(); + Either::Left(ready(None)) + } + + // Still pending + None => Either::Right(fetching.clone()), + }; + } + + entry.remove(); + let Entry::Vacant(entry) = self.fetch_jobs.entry(key.clone()) else { + unreachable!() + }; + entry + } + }; + + let fetcher = self.fetcher.clone(); + + let serialize = self.serialize; + let file_path = self.cache_dir.join(key.to_string()); + let fetching = Box::pin(async move { + if let Ok(value) = fetch_from_cache.await.inspect_err(|e| { + tracing::debug!("Failed to fetch {key:?} from cache: {e}"); + }) { + return Some(value); + } + + let value = match fetcher.fetch(&key).await { + Ok(value) => value, + Err(e) => { + tracing::warn!("Couldn't fetch {key:?}: {e}"); + return None; + } + }; + + let data = serialize(&key, &value); + if let Err(e) = fs::write(file_path, data).await { + tracing::error!("Failed to cahce value for {key:?}: {e}"); + } + + Some(value) + }); + let fetching = fetching as Pin> + Send + Sync>>; + let fetching = fetching.shared(); + + if let Some(fetching) = fetching.downgrade() { + entry.insert(fetching); + } + + Either::Right(fetching) + } +} diff --git a/src/main.rs b/src/main.rs index e85a173..617bf1d 100644 --- a/src/main.rs +++ b/src/main.rs @@ -17,6 +17,7 @@ use crate::{ }; mod api; +pub mod cachemap; mod thumbhash; mod ui {