From 4294d5c79e4d3282f79f50cb1919db57fea6ec0f Mon Sep 17 00:00:00 2001 From: Joakim Hulthe Date: Sat, 11 Nov 2023 17:57:04 +0100 Subject: [PATCH] Split MQTT socket handling from BulbManager --- Cargo.lock | 386 ++++++++++++++++++++++++----------- Cargo.toml | 2 + manager/Cargo.toml | 5 +- manager/src/lib.rs | 1 + manager/src/main.rs | 32 ++- manager/src/manager.rs | 306 ++++++++------------------- manager/src/mqtt_conf.rs | 6 +- manager/src/provider.rs | 34 +++ manager/src/provider/mock.rs | 65 ++++++ manager/src/provider/mqtt.rs | 234 +++++++++++++++++++++ 10 files changed, 726 insertions(+), 345 deletions(-) create mode 100644 manager/src/provider.rs create mode 100644 manager/src/provider/mock.rs create mode 100644 manager/src/provider/mqtt.rs diff --git a/Cargo.lock b/Cargo.lock index 47d925a..3c8baf4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3,19 +3,39 @@ version = 3 [[package]] -name = "aho-corasick" -version = "0.7.18" +name = "addr2line" +version = "0.21.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1e37cfd5e7657ada45f742d6e99ca5788580b5c529dc78faf11ece6dc702656f" +checksum = "8a30b2e23b9e17a9f90641c7ab1549cd9b44f296d3ccbf309d2863cfe398a0cb" +dependencies = [ + "gimli", +] + +[[package]] +name = "adler" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f26201604c87b1e01bd3d98f8d5d9a8fcbb815e8cedb41ffccbeb4bf593a35fe" + +[[package]] +name = "aho-corasick" +version = "1.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b2969dcb958b36655471fc61f7e416fa76033bdd4bfed0678d8fee1e2d07a1f0" dependencies = [ "memchr", ] [[package]] -name = "anyhow" -version = "1.0.58" +name = "async-trait" +version = "0.1.74" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bb07d2053ccdbe10e2af2995a2f116c1330396493dc1269f6a91d0ae82e19704" +checksum = "a66537f1bb974b254c98ed142ff995236e81b9d0fe4db0575f46612cb15eb0f9" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.39", +] [[package]] name = "atty" @@ -23,7 +43,7 @@ version = "0.2.14" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d9b39be18770d11421cdb1b9947a45dd3f37e93092cbf377614828a319d5fee8" dependencies = [ - "hermit-abi", + "hermit-abi 0.1.19", "libc", "winapi", ] @@ -34,6 +54,21 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa" +[[package]] +name = "backtrace" +version = "0.3.69" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2089b7e3f35b9dd2d0ed921ead4f6d318c27680d4a5bd167b3ee120edb105837" +dependencies = [ + "addr2line", + "cc", + "cfg-if", + "libc", + "miniz_oxide", + "object", + "rustc-demangle", +] + [[package]] name = "bitflags" version = "1.3.2" @@ -42,15 +77,24 @@ checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a" [[package]] name = "byteorder" -version = "1.4.3" +version = "1.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "14c189c53d098945499cdfa7ecc63567cf3886b3332b312a5b4585d8d3a6a610" +checksum = "1fd0f2584146f6f2ef48085050886acf353beff7305ebd1ae69500e27c67f64b" [[package]] name = "bytes" -version = "1.1.0" +version = "1.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c4872d67bab6358e59559027aa3b9157c53d9358c51423c17554809a8858e0f8" +checksum = "a2bd12c1caf447e69cd4528f47f94d203fd2582878ecb9e9465484c4148a8223" + +[[package]] +name = "cc" +version = "1.0.83" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f1174fb0b6ec23863f8b971027804a42614e347eafb0a95bf0b12cdae21fc4d0" +dependencies = [ + "libc", +] [[package]] name = "cfg-if" @@ -60,9 +104,9 @@ checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" [[package]] name = "clap" -version = "3.2.8" +version = "3.2.25" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "190814073e85d238f31ff738fcb0bf6910cedeb73376c87cd69291028966fd83" +checksum = "4ea181bf566f71cb9a5d17a59e1871af638180a18fb0035c92ae62b705207123" dependencies = [ "atty", "bitflags", @@ -77,15 +121,15 @@ dependencies = [ [[package]] name = "clap_derive" -version = "3.2.7" +version = "3.2.25" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "759bf187376e1afa7b85b959e6a664a3e7a95203415dba952ad19139e798f902" +checksum = "ae6371b8bdc8b7d3959e9cf7b22d4435ef3e79e138688421ec654acf8c81b008" dependencies = [ "heck", "proc-macro-error", "proc-macro2", "quote", - "syn", + "syn 1.0.109", ] [[package]] @@ -115,10 +159,20 @@ dependencies = [ ] [[package]] -name = "getrandom" -version = "0.2.7" +name = "eyre" +version = "0.6.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4eb1a864a501629691edf6c15a593b7a51eebaa1e8468e9ddc623de7c9b58ec6" +checksum = "4c2b6b5a29c02cdc822728b7d7b8ae1bab3e3b05d44522770ddd49722eeac7eb" +dependencies = [ + "indenter", + "once_cell", +] + +[[package]] +name = "getrandom" +version = "0.2.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fe9006bed769170c11f845cf00c7c1e9092aeb3f268e007c3e760ac68008070f" dependencies = [ "cfg-if", "libc", @@ -126,16 +180,22 @@ dependencies = [ ] [[package]] -name = "hashbrown" -version = "0.12.1" +name = "gimli" +version = "0.28.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "db0d4cf898abf0081f964436dc980e96670a0f36863e4b83aaacdb65c9d7ccc3" +checksum = "6fb8d784f27acf97159b40fc4db5ecd8aa23b9ad5ef69cdd136d3bc80665f0c0" + +[[package]] +name = "hashbrown" +version = "0.12.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8a9ee70c43aaf417c914396645a0fa852624801b24ebb7ae78fe8272889ac888" [[package]] name = "heck" -version = "0.4.0" +version = "0.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2540771e65fc8cb83cd6e8a237f70c319bd5c29f78ed1084ba5d50eeac86f7f9" +checksum = "95505c38b4572b2d910cecb0281560f54b440a19336cbbcb27bf6ce6adc6f5a8" [[package]] name = "hermit-abi" @@ -146,6 +206,12 @@ dependencies = [ "libc", ] +[[package]] +name = "hermit-abi" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d77f7ec81a6d05a3abb01ab6eb7590f6083d08449fe5a1c8b1e620283546ccb7" + [[package]] name = "humantime" version = "1.3.0" @@ -156,10 +222,16 @@ dependencies = [ ] [[package]] -name = "indexmap" -version = "1.9.1" +name = "indenter" +version = "0.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "10a35a97730320ffe8e2d410b5d3b69279b98d2c14bdb8b70ea89ecf7888d41e" +checksum = "ce23b50ad8242c51a442f3ff322d56b02f08852c77e4c0b4d3fd684abc89c683" + +[[package]] +name = "indexmap" +version = "1.9.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bd070e393353796e801d209ad339e89596eb4c8d430d18ede6a1cced8fafbd99" dependencies = [ "autocfg", "hashbrown", @@ -167,9 +239,9 @@ dependencies = [ [[package]] name = "itoa" -version = "1.0.2" +version = "1.0.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "112c678d4050afce233f4f2852bb2eb519230b3cf12f33585275537d7e41578d" +checksum = "af150ab688ff2122fcef229be89cb50dd66af9e01a4ff320cc137eecc9bacc38" [[package]] name = "lazy_static" @@ -179,9 +251,9 @@ checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646" [[package]] name = "libc" -version = "0.2.126" +version = "0.2.150" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "349d5a591cd28b49e1d1037471617a32ddcda5731b99419008085f72d5a53836" +checksum = "89d92a4743f9a61002fae18374ed11e7973f530cb3a3255fb354818118b2203c" [[package]] name = "lighter_lib" @@ -194,8 +266,9 @@ dependencies = [ name = "lighter_manager" version = "0.1.0" dependencies = [ - "anyhow", + "async-trait", "clap", + "eyre", "lighter_lib", "log", "mqtt-protocol", @@ -209,27 +282,32 @@ dependencies = [ [[package]] name = "log" -version = "0.4.17" +version = "0.4.20" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "abb12e687cfb44aa40f41fc3978ef76448f9b6038cad6aef4259d3c095a2382e" -dependencies = [ - "cfg-if", -] +checksum = "b5e6163cb8c49088c2c36f57875e58ccd8c87c7427f7fbd50ea6710b2f3f2e8f" [[package]] name = "memchr" -version = "2.5.0" +version = "2.6.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2dffe52ecf27772e601905b7522cb4ef790d2cc203488bbd0e2fe85fcb74566d" +checksum = "f665ee40bc4a3c5590afb1e9677db74a508659dfd71e126420da8274909a0167" + +[[package]] +name = "miniz_oxide" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e7810e0be55b428ada41041c41f32c9f1a42817901b4ccf45fa3d4b6561e74c7" +dependencies = [ + "adler", +] [[package]] name = "mio" -version = "0.8.4" +version = "0.8.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "57ee1c23c7c63b0c9250c339ffdc69255f110b298b901b9f6c82547b7b87caaf" +checksum = "3dce281c5e46beae905d4de1870d8b1509a9142b62eedf18b443b011ca8343d0" dependencies = [ "libc", - "log", "wasi", "windows-sys", ] @@ -250,31 +328,40 @@ dependencies = [ [[package]] name = "num_cpus" -version = "1.13.1" +version = "1.16.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "19e64526ebdee182341572e50e9ad03965aa510cd94427a4549448f285e957a1" +checksum = "4161fcb6d602d4d2081af7c3a45852d875a03dd337a6bfdd6e06407b61342a43" dependencies = [ - "hermit-abi", + "hermit-abi 0.3.3", "libc", ] [[package]] -name = "once_cell" -version = "1.13.0" +name = "object" +version = "0.32.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "18a6dbe30758c9f83eb00cbea4ac95966305f5a7772f3f42ebfc7fc7eddbd8e1" +checksum = "9cf5f9dd3933bd50a9e1f149ec995f39ae2c496d31fd772c1fd45ebc27e902b0" +dependencies = [ + "memchr", +] + +[[package]] +name = "once_cell" +version = "1.18.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dd8b5dd2ae5ed71462c540258bedcb51965123ad7e7ccf4b9a8cafaa4a63576d" [[package]] name = "os_str_bytes" -version = "6.1.0" +version = "6.6.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "21326818e99cfe6ce1e524c2a805c189a99b5ae555a35d19f9a284b427d86afa" +checksum = "e2355d85b9a3786f481747ced0e0ff2ba35213a1f9bd406ed906554d7af805a1" [[package]] name = "pin-project-lite" -version = "0.2.9" +version = "0.2.13" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e0a7ae3ac2f1173085d398531c705756c94a4c56843785df85a60c1a0afac116" +checksum = "8afb450f006bf6385ca15ef45d71d2288452bc3683ce2e2cacc0d18e4be60b58" [[package]] name = "pretty_env_logger" @@ -295,7 +382,7 @@ dependencies = [ "proc-macro-error-attr", "proc-macro2", "quote", - "syn", + "syn 1.0.109", "version_check", ] @@ -312,9 +399,9 @@ dependencies = [ [[package]] name = "proc-macro2" -version = "1.0.40" +version = "1.0.69" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dd96a1e8ed2596c337f8eae5f24924ec83f5ad5ab21ea8e455d3566c69fbcaf7" +checksum = "134c189feb4956b20f6f547d2cf727d4c0fe06722b20a0eec87ed445a97f92da" dependencies = [ "unicode-ident", ] @@ -327,18 +414,30 @@ checksum = "a1d01941d82fa2ab50be1e79e6714289dd7cde78eba4c074bc5a4374f650dfe0" [[package]] name = "quote" -version = "1.0.20" +version = "1.0.33" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3bcdf212e9776fbcb2d23ab029360416bb1706b1aea2d1a5ba002727cbcab804" +checksum = "5267fca4496028628a95160fc423a33e8b2e6af8a5302579e322e4b520293cae" dependencies = [ "proc-macro2", ] [[package]] name = "regex" -version = "1.6.0" +version = "1.10.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4c4eb3267174b8c6c2f654116623910a0fef09c4753f8dd83db29c48a0df988b" +checksum = "380b951a9c5e80ddfd6136919eef32310721aa4aacd4889a8d39124b026ab343" +dependencies = [ + "aho-corasick", + "memchr", + "regex-automata", + "regex-syntax", +] + +[[package]] +name = "regex-automata" +version = "0.4.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5f804c7828047e88b2d32e2d7fe5a105da8ee3264f01902f796c8e067dc2483f" dependencies = [ "aho-corasick", "memchr", @@ -347,41 +446,47 @@ dependencies = [ [[package]] name = "regex-syntax" -version = "0.6.27" +version = "0.8.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a3f87b73ce11b1619a3c6332f45341e0047173771e8b8b73f87bfeefb7b56244" +checksum = "c08c74e62047bb2de4ff487b251e4a92e24f48745648451635cec7d591162d9f" + +[[package]] +name = "rustc-demangle" +version = "0.1.23" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d626bb9dae77e28219937af045c257c28bfd3f69333c512553507f5f9798cb76" [[package]] name = "ryu" -version = "1.0.10" +version = "1.0.15" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f3f6f92acf49d1b98f7a81226834412ada05458b7364277387724a237f062695" +checksum = "1ad4cc8da4ef723ed60bced201181d83791ad433213d8c24efffda1eec85d741" [[package]] name = "serde" -version = "1.0.138" +version = "1.0.192" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1578c6245786b9d168c5447eeacfb96856573ca56c9d68fdcf394be134882a47" +checksum = "bca2a08484b285dcb282d0f67b26cadc0df8b19f8c12502c13d966bf9482f001" dependencies = [ "serde_derive", ] [[package]] name = "serde_derive" -version = "1.0.138" +version = "1.0.192" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "023e9b1467aef8a10fb88f25611870ada9800ef7e22afce356bb0d2387b6f27c" +checksum = "d6c7207fbec9faa48073f3e3074cbe553af6ea512d7c21ba46e434e70ea9fbc1" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.39", ] [[package]] name = "serde_json" -version = "1.0.82" +version = "1.0.108" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "82c2c1fdcd807d1098552c5b9a36e425e42e9fbd7c6a37a8425f390f781f7fa7" +checksum = "3d1c7e3eac408d115102c4c24ad393e0821bb3a5df4d506a80f85f7a742a526b" dependencies = [ "itoa", "ryu", @@ -390,12 +495,12 @@ dependencies = [ [[package]] name = "socket2" -version = "0.4.4" +version = "0.5.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "66d72b759436ae32898a2af0a14218dbf55efde3feeb170eb623637db85ee1e0" +checksum = "7b5fac59a5cb5dd637972e5fca70daf0523c9067fcdc4842f053dae04a18f8e9" dependencies = [ "libc", - "winapi", + "windows-sys", ] [[package]] @@ -406,9 +511,20 @@ checksum = "73473c0e59e6d5812c5dfe2a064a6444949f089e20eec9a2e5506596494e4623" [[package]] name = "syn" -version = "1.0.98" +version = "1.0.109" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c50aef8a904de4c23c788f104b7dddc7d6f79c647c7c8ce4cc8f73eb0ca773dd" +checksum = "72b64191b275b66ffe2469e8af2c1cfe3bafa67b529ead792a6d0160888b4237" +dependencies = [ + "proc-macro2", + "quote", + "unicode-ident", +] + +[[package]] +name = "syn" +version = "2.0.39" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "23e78b90f2fcf45d3e842032ce32e3f2d1545ba6636271dcbf24fa306d87be7a" dependencies = [ "proc-macro2", "quote", @@ -417,88 +533,107 @@ dependencies = [ [[package]] name = "termcolor" -version = "1.1.3" +version = "1.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bab24d30b911b2376f3a13cc2cd443142f0c81dda04c118693e35b3835757755" +checksum = "6093bad37da69aab9d123a8091e4be0aa4a03e4d601ec641c327398315f62b64" dependencies = [ "winapi-util", ] [[package]] name = "textwrap" -version = "0.15.0" +version = "0.16.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b1141d4d61095b28419e22cb0bbf02755f5e54e0526f97f1e3d1d160e60885fb" +checksum = "222a222a5bfe1bba4a77b45ec488a741b3cb8872e5e499451fd7d0129c9c7c3d" [[package]] name = "thiserror" -version = "1.0.31" +version = "1.0.50" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bd829fe32373d27f76265620b5309d0340cb8550f523c1dda251d6298069069a" +checksum = "f9a7210f5c9a7156bb50aa36aed4c95afb51df0df00713949448cf9e97d382d2" dependencies = [ "thiserror-impl", ] [[package]] name = "thiserror-impl" -version = "1.0.31" +version = "1.0.50" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0396bc89e626244658bef819e22d0cc459e795a5ebe878e6ec336d1674a8d79a" +checksum = "266b2e40bc00e5a6c09c3584011e08b06f123c00362c92b975ba9843aaaa14b8" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.39", ] [[package]] name = "tokio" -version = "1.19.2" +version = "1.34.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c51a52ed6686dd62c320f9b89299e9dfb46f730c7a48e635c19f21d116cb1439" +checksum = "d0c014766411e834f7af5b8f4cf46257aab4036ca95e9d2c144a10f59ad6f5b9" dependencies = [ + "backtrace", "bytes", "libc", - "memchr", "mio", "num_cpus", - "once_cell", "pin-project-lite", "socket2", "tokio-macros", - "winapi", + "tracing", + "windows-sys", ] [[package]] name = "tokio-macros" -version = "1.8.0" +version = "2.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9724f9a975fb987ef7a3cd9be0350edcbe130698af5b8f7a631e23d42d052484" +checksum = "5b8a1e28f2deaa14e508979454cb3a223b10b938b45af148bc0986de36f1923b" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.39", ] [[package]] name = "toml" -version = "0.5.9" +version = "0.5.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8d82e1a7758622a465f8cee077614c73484dac5b836c02ff6a40d5d1010324d7" +checksum = "f4f7f0dd8d50a853a531c426359045b1998f04219d88799810762cd4ad314234" dependencies = [ "serde", ] [[package]] -name = "unicode-ident" -version = "1.0.1" +name = "tracing" +version = "0.1.40" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5bd2fe26506023ed7b5e1e315add59d6f584c621d037f9368fea9cfb988f368c" +checksum = "c3523ab5a71916ccf420eebdf5521fcef02141234bbc0b8a49f2fdc4544364ef" +dependencies = [ + "pin-project-lite", + "tracing-core", +] + +[[package]] +name = "tracing-core" +version = "0.1.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c06d3da6113f116aaee68e4d601191614c9053067f9ab7f6edbcb161237daa54" +dependencies = [ + "once_cell", +] + +[[package]] +name = "unicode-ident" +version = "1.0.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3354b9ac3fae1ff6755cb6db53683adb661634f67557942dea4facebec0fee4b" [[package]] name = "uuid" -version = "1.1.2" +version = "1.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dd6469f4314d5f1ffec476e05f17cc9a78bc7a27a6a857842170bdf8d6f98d2f" +checksum = "88ad59a7560b41a70d191093a945f0b87bc1deeda46fb237479708a1d6b6cdfc" dependencies = [ "getrandom", ] @@ -533,9 +668,9 @@ checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6" [[package]] name = "winapi-util" -version = "0.1.5" +version = "0.1.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "70ec6ce85bb158151cae5e5c87f95a8e97d2c0c4b001223f33a334e3ce5de178" +checksum = "f29e6f9198ba0d26b4c9f07dbe6f9ed633e1f3d5b8b414090084349e46a52596" dependencies = [ "winapi", ] @@ -548,43 +683,66 @@ checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" [[package]] name = "windows-sys" -version = "0.36.1" +version = "0.48.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ea04155a16a59f9eab786fe12a4a450e75cdb175f9e0d80da1e17db09f55b8d2" +checksum = "677d2418bec65e3338edb076e806bc1ec15693c5d0104683f2efe857f61056a9" dependencies = [ + "windows-targets", +] + +[[package]] +name = "windows-targets" +version = "0.48.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9a2fa6e2155d7247be68c096456083145c183cbbbc2764150dda45a87197940c" +dependencies = [ + "windows_aarch64_gnullvm", "windows_aarch64_msvc", "windows_i686_gnu", "windows_i686_msvc", "windows_x86_64_gnu", + "windows_x86_64_gnullvm", "windows_x86_64_msvc", ] [[package]] -name = "windows_aarch64_msvc" -version = "0.36.1" +name = "windows_aarch64_gnullvm" +version = "0.48.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9bb8c3fd39ade2d67e9874ac4f3db21f0d710bee00fe7cab16949ec184eeaa47" +checksum = "2b38e32f0abccf9987a4e3079dfb67dcd799fb61361e53e2882c3cbaf0d905d8" + +[[package]] +name = "windows_aarch64_msvc" +version = "0.48.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dc35310971f3b2dbbf3f0690a219f40e2d9afcf64f9ab7cc1be722937c26b4bc" [[package]] name = "windows_i686_gnu" -version = "0.36.1" +version = "0.48.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "180e6ccf01daf4c426b846dfc66db1fc518f074baa793aa7d9b9aaeffad6a3b6" +checksum = "a75915e7def60c94dcef72200b9a8e58e5091744960da64ec734a6c6e9b3743e" [[package]] name = "windows_i686_msvc" -version = "0.36.1" +version = "0.48.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e2e7917148b2812d1eeafaeb22a97e4813dfa60a3f8f78ebe204bcc88f12f024" +checksum = "8f55c233f70c4b27f66c523580f78f1004e8b5a8b659e05a4eb49d4166cca406" [[package]] name = "windows_x86_64_gnu" -version = "0.36.1" +version = "0.48.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4dcd171b8776c41b97521e5da127a2d86ad280114807d0b2ab1e462bc764d9e1" +checksum = "53d40abd2583d23e4718fddf1ebec84dbff8381c07cae67ff7768bbf19c6718e" + +[[package]] +name = "windows_x86_64_gnullvm" +version = "0.48.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0b7b52767868a23d5bab768e390dc5f5c55825b6d30b86c844ff2dc7414044cc" [[package]] name = "windows_x86_64_msvc" -version = "0.36.1" +version = "0.48.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c811ca4a8c853ef420abd8592ba53ddbbac90410fab6903b3e79972a631f7680" +checksum = "ed94fce61571a4006852b7389a063ab983c02eb1bb37b47f8272ce92d06d9538" diff --git a/Cargo.toml b/Cargo.toml index badb438..1f71491 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,2 +1,4 @@ [workspace] members = ["manager", "lib", "cli"] + +resolver = "2" diff --git a/manager/Cargo.toml b/manager/Cargo.toml index 6e1ce0f..c0ebe48 100644 --- a/manager/Cargo.toml +++ b/manager/Cargo.toml @@ -4,16 +4,17 @@ version = "0.1.0" edition = "2021" [dependencies] -anyhow = "1" serde = { version = "1", features = ["derive"] } clap = { version = "3.2", features = ["derive"] } log = "0.4" mqtt-protocol = { version = "0.11", features = ["tokio"] } pretty_env_logger = "0.4.0" serde_json = "1" -tokio = { version = "1.19.2", features = ["net", "rt-multi-thread", "macros", "sync", "io-std", "io-util", "time"] } +tokio = { version = "1.19.2", features = ["net", "rt-multi-thread", "macros", "sync", "io-std", "io-util", "time", "tracing"] } toml = "0.5" uuid = { version = "1.1", features = ["v4"] } +async-trait = "0.1.74" +eyre = "0.6.8" [dependencies.lighter_lib] path = "../lib" diff --git a/manager/src/lib.rs b/manager/src/lib.rs index 1f83fd4..df2bfff 100644 --- a/manager/src/lib.rs +++ b/manager/src/lib.rs @@ -3,3 +3,4 @@ extern crate log; pub mod manager; pub mod mqtt_conf; +pub mod provider; diff --git a/manager/src/main.rs b/manager/src/main.rs index 1a59ce5..d0bb1b4 100644 --- a/manager/src/main.rs +++ b/manager/src/main.rs @@ -5,6 +5,9 @@ use clap::Parser; use lighter_lib::BulbColor; use lighter_manager::manager::{BulbCommand, BulbManager, BulbSelector, BulbsConfig}; use lighter_manager::mqtt_conf::MqttConfig; +use lighter_manager::provider::mock::BulbsMock; +use lighter_manager::provider::mqtt::BulbsMqtt; +use lighter_manager::provider::BulbProvider; use log::LevelFilter; use serde::Deserialize; use std::error::Error; @@ -15,7 +18,7 @@ use std::path::PathBuf; use std::str::FromStr; use tokio::io::{stdin, AsyncBufReadExt, BufReader}; -async fn ask(for_what: &str) -> anyhow::Result +async fn ask(for_what: &str) -> eyre::Result where T: FromStr, ::Err: Display + Error + Send + Sync + 'static, @@ -40,6 +43,9 @@ struct Opt { #[clap(short, long)] config: PathBuf, + + #[clap(short, long)] + mock: bool, } #[derive(Deserialize)] @@ -51,7 +57,7 @@ struct Config { } #[tokio::main] -async fn main() -> anyhow::Result<()> { +async fn main() -> eyre::Result<()> { let opt = Opt::parse(); let log_level = match opt.verbose { @@ -69,7 +75,12 @@ async fn main() -> anyhow::Result<()> { let config: String = fs::read_to_string(&opt.config)?; let config: Config = toml::from_str(&config)?; - let (commands, state) = BulbManager::launch(config.bulbs, config.mqtt).await?; + let provider: Box = if opt.mock { + Box::new(BulbsMock::new(config.bulbs.clone())) + } else { + Box::new(BulbsMqtt::new(config.bulbs.clone(), config.mqtt.clone())) + }; + let manager = BulbManager::launch(config.bulbs, provider).await?; loop { let command: String = ask("command").await?; @@ -77,13 +88,13 @@ async fn main() -> anyhow::Result<()> { let command = match command.as_str() { "power" => { let power: bool = ask("on").await?; - commands.send(BulbCommand::SetPower(BulbSelector::All, power)) + manager.send_command(BulbCommand::SetPower(BulbSelector::All, power)) } "kelvin" => { let t: f32 = ask("temperature").await?; let b: f32 = ask("brightness").await?; - commands.send(BulbCommand::SetColor( + manager.send_command(BulbCommand::SetColor( BulbSelector::All, BulbColor::kelvin(t, b), )) @@ -93,7 +104,7 @@ async fn main() -> anyhow::Result<()> { let s: f32 = ask("saturation").await?; let b: f32 = ask("brightness").await?; - commands.send(BulbCommand::SetColor( + manager.send_command(BulbCommand::SetColor( BulbSelector::All, BulbColor::hsb(h, s, b), )) @@ -104,11 +115,14 @@ async fn main() -> anyhow::Result<()> { } }; - let notify = state.notify_on_change(); - command.await?; + let notify = manager.notify_on_change(); + info!("1"); + command.await; + info!("2"); notify.await; + info!("3"); - let bulbs = state.bulbs().await; + let bulbs = manager.bulbs().await; info!("bulbs: {bulbs:?}"); } } diff --git a/manager/src/manager.rs b/manager/src/manager.rs index 37e0179..419a6f3 100644 --- a/manager/src/manager.rs +++ b/manager/src/manager.rs @@ -1,22 +1,14 @@ -use crate::mqtt_conf::MqttConfig; +use crate::provider::{BulbProvider, BulbUpdate}; use lighter_lib::{BulbColor, BulbId, BulbMode}; -use mqtt::{ - packet::{PublishPacket, QoSWithPacketIdentifier, SubscribePacket, VariablePacket}, - Encodable, QualityOfService, TopicFilter, TopicName, -}; use serde::Deserialize; use std::collections::BTreeMap; -use std::str::from_utf8; use std::sync::Arc; use tokio::{ - io::AsyncWriteExt, - net::TcpStream, sync::{futures::Notified, mpsc, Notify, RwLock, RwLockReadGuard}, task, - time::{sleep, Duration}, }; -#[derive(Debug)] +#[derive(Clone, Debug)] pub enum BulbSelector { All, Id(BulbId), @@ -32,54 +24,48 @@ pub struct BulbConfig { pub id: BulbId, } -pub struct BulbManager { - config: BulbsConfig, - mqtt: MqttConfig, - command_rx: mpsc::Receiver, - socket: TcpStream, - state: Arc, -} - -#[derive(Debug)] +#[derive(Debug, Clone)] pub enum BulbCommand { SetPower(BulbSelector, bool), SetColor(BulbSelector, BulbColor), } -pub struct BulbsState { +#[derive(Clone)] +pub struct BulbManager { + state: Arc, +} + +struct BulbsState { /// Notify on any change to the bulbs notify: Notify, + /// Send commands to the background task + command: mpsc::Sender, + /// State of all bulbs bulbs: RwLock>, } -enum Loop { - Break, - Continue, -} - -#[derive(Deserialize)] -struct BulbResult { - #[serde(rename(deserialize = "POWER"))] - power: Option, - - #[serde(rename(deserialize = "Color"))] - color: Option, +struct ManagerState

{ + #[allow(dead_code)] + config: BulbsConfig, + provider: P, + command_rx: mpsc::Receiver, + state: Arc, } impl BulbManager { pub async fn launch( bulbs: BulbsConfig, - mqtt: MqttConfig, - ) -> anyhow::Result<(mpsc::Sender, Arc)> { - info!("launching"); - let socket = mqtt.connect().await?; + provider: impl BulbProvider + Send + 'static, + ) -> eyre::Result { + info!("launching BulbManager"); let (command_tx, command_rx) = mpsc::channel(100); - let bulbs_state = BulbsState { + let bulbs_state = Arc::new(BulbsState { notify: Notify::new(), + command: command_tx, bulbs: RwLock::new( bulbs .bulbs @@ -88,198 +74,84 @@ impl BulbManager { .map(|id| (id, Default::default())) .collect(), ), - }; - let bulbs_state = Arc::new(bulbs_state); + }); - let mut manager = BulbManager { + let state = ManagerState { config: bulbs, - mqtt, + provider, command_rx, - socket, state: Arc::clone(&bulbs_state), }; - manager.subscribe().await?; - task::spawn(manager.run()); + let manager = BulbManager { state: bulbs_state }; - Ok((command_tx, bulbs_state)) + task::spawn(run(state)); + + Ok(manager) } - async fn subscribe(&mut self) -> anyhow::Result<()> { - let packet = SubscribePacket::new( - 1, - vec![(TopicFilter::new("+/lampa/#")?, QualityOfService::Level0)], - ); - let mut buf = vec![]; - packet.encode(&mut buf)?; - self.socket.write_all(&buf).await?; - - Ok(()) - } - - async fn run(mut self) -> anyhow::Result<()> { - loop { - match self.run_loop().await { - Ok(Loop::Continue) => {} - Ok(Loop::Break) => break, - Err(e) => { - const ERROR_TIMEOUT: u64 = 10; - error!("{e}"); - - loop { - info!("waiting for {ERROR_TIMEOUT} seconds before trying again"); - - sleep(Duration::from_secs(ERROR_TIMEOUT)).await; - - match self.mqtt.connect().await { - Ok(new_socket) => { - self.socket = new_socket; - self.subscribe().await?; - break; - } - Err(e) => { - error!("failed to re-establish connections: {e}"); - } - } - } - } - } - } - - info!("exiting"); - - Ok(()) - } - - async fn run_loop(&mut self) -> anyhow::Result { - let receive_packet = VariablePacket::parse(&mut self.socket); - let receive_command = self.command_rx.recv(); - - struct Publish<'a, P: ToString> { - pub topic_prefix: &'static str, - pub topic_suffix: &'static str, - pub payload: P, - pub socket: &'a mut TcpStream, - } - - impl Publish<'_, P> { - async fn send(&mut self, id: &BulbId) -> anyhow::Result<()> { - let topic_name = TopicName::new(format!( - "{}/{}/{}", - self.topic_prefix, id, self.topic_suffix - ))?; - let qos = QoSWithPacketIdentifier::Level0; - let packet = PublishPacket::new(topic_name, qos, self.payload.to_string()); - let mut buf = vec![]; - packet.encode(&mut buf)?; - self.socket.write_all(&buf).await?; - - anyhow::Ok(()) - } - } - - tokio::select!( - packet = receive_packet => { - debug!("packet received: {packet:?}"); - match packet? { - VariablePacket::PublishPacket(publish) => { - let topic_name = publish.topic_name(); - let topic_segments: Vec<&str> = topic_name.split('/').collect(); - match &topic_segments[..] { - [prefix, id@.., suffix] => { - let id = BulbId(id.join("/")); - let mut bulbs = self.state.bulbs.write().await; - let bulb = match bulbs.get_mut(&id) { - None => { - warn!("unknown bulb: {id}"); - return Ok(Loop::Continue); - } - Some(bulb) => bulb, - }; - - let payload = from_utf8(publish.payload())?; - - match (*prefix, *suffix) { - ("cmnd", _) => {} - ("stat", "POWER") => { - bulb.power = payload == "ON"; - } - ("stat", "RESULT") => { - let result: BulbResult = serde_json::from_str(payload)?; - if let Some(power) = result.power { - bulb.power = power == "ON"; - } - if let Some(color) = result.color { - bulb.color = color.parse()?; - } - } - ("tele", "STATE") => {}, - _ => { - warn!("unrecognized topic: {topic_name}"); - return Ok(Loop::Continue); - } - } - } - _ => { - warn!("unrecognized topic: {topic_name}"); - return Ok(Loop::Continue); - } - } - } - packet => warn!("unhandled packet: {packet:?}"), - } - self.state.notify.notify_waiters(); - } - command = receive_command => { - info!("command received: {command:?}"); - - async fn send(config: &BulbsConfig, selector: BulbSelector, publish: &mut Publish<'_, P>) -> anyhow::Result<()>{ - match selector { - BulbSelector::All => { - for bulb in &config.bulbs { - publish.send(&bulb.id).await?; - } - } - BulbSelector::Id(id) =>publish.send(&id).await?, - } - Ok(()) - } - - match command { - Some(BulbCommand::SetPower(selector, power)) => { - let payload = if power { "ON" } else { "OFF" }; - let mut publish = Publish { - topic_prefix: "cmnd", - topic_suffix: "POWER", - payload, - socket: &mut self.socket, - }; - send(&self.config, selector, &mut publish).await?; - } - Some(BulbCommand::SetColor(selector, color)) => { - let mut publish = Publish { - topic_prefix: "cmnd", - topic_suffix: "COLOR", - payload: color.color_string(), - socket: &mut self.socket, - }; - send(&self.config, selector, &mut publish).await?; - } - None => return Ok(Loop::Break), - } - } - ); - - Ok(Loop::Continue) - } -} - -impl BulbsState { pub fn notify_on_change(&self) -> Notified { - self.notify.notified() + self.state.notify.notified() + } + + pub async fn send_command(&self, command: BulbCommand) { + info!("sending command {command:?}"); + if let Err(e) = self.state.command.send(command).await { + error!("error sending bulb command: {e:#}"); + } + info!("sent command"); } pub async fn bulbs(&self) -> RwLockReadGuard<'_, BTreeMap> { - self.bulbs.read().await + self.state.bulbs.read().await + } +} + +async fn run

(state: ManagerState

) +where + P: BulbProvider + Send, +{ + debug!("manager task running"); + if let Err(e) = run_inner(state).await { + error!("bulb manage exited with error: {e:#}"); + } + info!("manager task exited"); +} +async fn run_inner

(mut state: ManagerState

) -> eyre::Result<()> +where + P: BulbProvider + Send, +{ + loop { + tokio::select!( + command = state.command_rx.recv() => { + let Some(command) = command else { + info!("handle closed, shutting down"); + return Ok(()); + }; + info!("command received: {command:?}"); + state.provider.send_command(command.clone()).await?; + } + update = state.provider.listen() => { + let (id, update) = update?; + + info!("update received: {id:?} {update:?}"); + + let mut bulbs = state.state.bulbs.write().await; + let Some(bulb) = bulbs.get_mut(&id) else { + continue; + }; + + match update { + BulbUpdate::Power(power) => { + bulb.power = power; + } + BulbUpdate::Color(color) => { + bulb.color = color; + } + } + + state.state.notify.notify_waiters(); + } + ) } } diff --git a/manager/src/mqtt_conf.rs b/manager/src/mqtt_conf.rs index cfb34db..e354646 100644 --- a/manager/src/mqtt_conf.rs +++ b/manager/src/mqtt_conf.rs @@ -17,7 +17,7 @@ pub struct MqttConfig { } impl MqttConfig { - pub async fn connect(&self) -> anyhow::Result { + pub async fn connect(&self) -> eyre::Result { let mut socket = TcpStream::connect((self.address.as_str(), self.port.unwrap_or(1883))).await?; @@ -35,9 +35,9 @@ impl MqttConfig { match VariablePacket::parse(&mut socket).await? { VariablePacket::ConnackPacket(ack) => match ack.connect_return_code() { ConnectReturnCode::ConnectionAccepted => Ok(socket), - return_code => anyhow::bail!("connection refused: {return_code:?}"), + return_code => eyre::bail!("connection refused: {return_code:?}"), }, - response => anyhow::bail!("mqtt connect, unexpected response: {response:?}"), + response => eyre::bail!("mqtt connect, unexpected response: {response:?}"), } } } diff --git a/manager/src/provider.rs b/manager/src/provider.rs new file mode 100644 index 0000000..8f2294d --- /dev/null +++ b/manager/src/provider.rs @@ -0,0 +1,34 @@ +use std::ops::DerefMut; + +use crate::manager::BulbCommand; +use async_trait::async_trait; +use lighter_lib::{BulbColor, BulbId}; + +pub mod mock; +pub mod mqtt; + +#[derive(Debug, Clone)] +pub enum BulbUpdate { + Power(bool), + Color(BulbColor), +} + +// An interface that allows communication with bulbs. +#[async_trait] +pub trait BulbProvider { + // Send a command to some bulbs to update their state + async fn send_command(&mut self, cmd: BulbCommand) -> eyre::Result<()>; + + // Wait for any bulb to send an update + async fn listen(&mut self) -> eyre::Result<(BulbId, BulbUpdate)>; +} + +#[async_trait] +impl BulbProvider for Box

{ + async fn send_command(&mut self, cmd: BulbCommand) -> eyre::Result<()> { + self.deref_mut().send_command(cmd).await + } + async fn listen(&mut self) -> eyre::Result<(BulbId, BulbUpdate)> { + self.deref_mut().listen().await + } +} diff --git a/manager/src/provider/mock.rs b/manager/src/provider/mock.rs new file mode 100644 index 0000000..6031726 --- /dev/null +++ b/manager/src/provider/mock.rs @@ -0,0 +1,65 @@ +use std::{collections::VecDeque, future::pending}; + +use async_trait::async_trait; +use lighter_lib::BulbId; + +use crate::{ + manager::{BulbCommand, BulbSelector, BulbsConfig}, + provider::{BulbProvider, BulbUpdate}, +}; + +pub struct BulbsMock { + config: BulbsConfig, + queue: VecDeque<(BulbId, BulbUpdate)>, +} + +impl BulbsMock { + pub fn new(config: BulbsConfig) -> Self { + BulbsMock { + config, + queue: VecDeque::new(), + } + } +} + +#[async_trait] +impl BulbProvider for BulbsMock { + async fn send_command(&mut self, command: BulbCommand) -> eyre::Result<()> { + info!("mock: sending command {command:?}"); + + let selector = match &command { + BulbCommand::SetPower(selector, _) => selector, + BulbCommand::SetColor(selector, _) => selector, + }; + + let bulbs = match selector { + BulbSelector::All => self.config.bulbs.iter().map(|b| b.id.clone()).collect(), + + BulbSelector::Id(id) => vec![id.clone()], + }; + + let update = match command { + BulbCommand::SetPower(_, power) => BulbUpdate::Power(power), + BulbCommand::SetColor(_, color) => BulbUpdate::Color(color), + }; + + for bulb in bulbs { + info!("mock: updating bulb {bulb} {update:?}"); + self.queue.push_back((bulb, update.clone())); + } + + Ok(()) + } + + async fn listen(&mut self) -> eyre::Result<(BulbId, super::BulbUpdate)> { + info!("mock: listening for updates"); + let Some(update) = self.queue.pop_front() else { + info!("mock: no updates in queue"); + pending().await + }; + + info!("mock: returning update {update:?}"); + + Ok(update) + } +} diff --git a/manager/src/provider/mqtt.rs b/manager/src/provider/mqtt.rs new file mode 100644 index 0000000..681bc6e --- /dev/null +++ b/manager/src/provider/mqtt.rs @@ -0,0 +1,234 @@ +use std::{collections::HashSet, str, time::Duration}; + +use async_trait::async_trait; +use lighter_lib::BulbId; +use mqtt::{ + packet::{PublishPacket, QoSWithPacketIdentifier, SubscribePacket, VariablePacket}, + Encodable, QualityOfService, TopicFilter, TopicName, +}; +use serde::Deserialize; +use tokio::{ + io::AsyncWriteExt, + net::TcpStream, + time::{sleep_until, Instant}, +}; + +use crate::{ + manager::{BulbCommand, BulbSelector, BulbsConfig}, + mqtt_conf::MqttConfig, + provider::{BulbProvider, BulbUpdate}, +}; + +const RECONNECT_DELAYS: &[u64] = &[0, 1, 2, 5, 10, 10, 10, 20]; + +pub struct BulbsMqtt { + known_bulbs: HashSet, + socket: SocketState, +} + +struct SocketState { + mqtt_config: MqttConfig, + last_connection_attempt: Instant, + failed_connect_attempts: usize, + socket: Option, +} + +impl BulbsMqtt { + pub fn new(bulbs_config: BulbsConfig, mqtt_config: MqttConfig) -> BulbsMqtt { + BulbsMqtt { + known_bulbs: bulbs_config.bulbs.into_iter().map(|b| b.id).collect(), + socket: SocketState { + mqtt_config, + last_connection_attempt: Instant::now(), + failed_connect_attempts: 0, + socket: None, + }, + } + } +} + +impl SocketState { + async fn get_connection(&mut self) -> eyre::Result<&mut TcpStream> { + let socket = &mut self.socket; + + if let Some(socket) = socket { + return Ok(socket); + } + + let attempt = self.failed_connect_attempts; + let wait_for = Duration::from_secs(RECONNECT_DELAYS[attempt]); + sleep_until(self.last_connection_attempt + wait_for).await; + + self.failed_connect_attempts += 1; + self.last_connection_attempt = Instant::now(); + + info!("connecting to MQTT (attempt {attempt})"); + let mut new_socket = self.mqtt_config.connect().await?; + subscribe(&mut new_socket).await?; + info!("connected to MQTT"); + + self.failed_connect_attempts = 0; + Ok(socket.insert(new_socket)) + } +} + +#[async_trait] +impl BulbProvider for BulbsMqtt { + async fn send_command(&mut self, command: BulbCommand) -> eyre::Result<()> { + debug!("mqtt sending command {command:?}"); + let socket = self.socket.get_connection().await?; + + async fn send( + all_bulbs: &HashSet, + selector: BulbSelector, + publish: &mut Publish<'_, P>, + ) -> eyre::Result<()> { + match selector { + BulbSelector::Id(id) => publish.send(&id).await?, + BulbSelector::All => { + for id in all_bulbs { + publish.send(id).await?; + } + } + } + Ok(()) + } + + match command { + BulbCommand::SetPower(selector, power) => { + let payload = if power { "ON" } else { "OFF" }; + let mut publish = Publish { + topic_prefix: "cmnd", + topic_suffix: "POWER", + payload, + socket, + }; + send(&self.known_bulbs, selector, &mut publish).await?; + } + BulbCommand::SetColor(selector, color) => { + let mut publish = Publish { + topic_prefix: "cmnd", + topic_suffix: "COLOR", + payload: color.color_string(), + socket, + }; + send(&self.known_bulbs, selector, &mut publish).await?; + } + } + Ok(()) + } + + async fn listen(&mut self) -> eyre::Result<(BulbId, BulbUpdate)> { + debug!("mqtt listening for updates"); + let socket = self.socket.get_connection().await?; + + loop { + let packet = VariablePacket::parse(socket).await?; + + let VariablePacket::PublishPacket(publish) = &packet else { + continue; + }; + + let topic_name = publish.topic_name(); + let topic_segments: Vec<&str> = topic_name.split('/').collect(); + match &topic_segments[..] { + [prefix, id @ .., suffix] => { + let id = BulbId(id.join("/")); + + if !self.known_bulbs.contains(&id) { + warn!("ignoring publish from unknown bulb {id}"); + continue; + } + + let payload = str::from_utf8(publish.payload())?; + + let update = match (*prefix, *suffix) { + ("stat", "POWER") => BulbUpdate::Power(payload == "ON"), + ("stat", "RESULT") => { + let result: BulbResult = serde_json::from_str(payload)?; + + // TODO: color and power can be updated at the same time? + if let Some(color) = result.color { + BulbUpdate::Color(color.parse()?) + } else if let Some(power) = result.power { + BulbUpdate::Power(power == "ON") + } else { + continue; + } + } + // TODO: handle STATE message + //("tele", "STATE") => todo!(), + + // ignore known useless messages + ("cmnd", _) => continue, + ("tele", "LWT") => continue, + + _ => { + warn!("unrecognized topic: {topic_name} payload={payload:?}"); + continue; + } + }; + + return Ok((id, update)); + } + _ => { + warn!("unrecognized topic: {topic_name}"); + continue; + } + } + } + } +} + +struct Publish<'a, P: ToString> { + pub topic_prefix: &'static str, + pub topic_suffix: &'static str, + pub payload: P, + pub socket: &'a mut TcpStream, +} + +impl Publish<'_, P> { + async fn send(&mut self, id: &BulbId) -> eyre::Result<()> { + let topic_name = TopicName::new(format!( + "{}/{}/{}", + self.topic_prefix, id, self.topic_suffix + ))?; + let qos = QoSWithPacketIdentifier::Level0; + let payload = self.payload.to_string(); + debug!("publishing {topic_name:?} {payload}"); + + let packet = PublishPacket::new(topic_name, qos, payload); + let mut buf = vec![]; + packet.encode(&mut buf)?; + self.socket.write_all(&buf).await?; + + debug!("published"); + + Ok(()) + } +} + +/// Send the MQTT subscribe packet to subscribe to bulb updates +async fn subscribe(socket: &mut TcpStream) -> eyre::Result<()> { + debug!("subscribing to mqtt bulb updates"); + let packet = SubscribePacket::new( + 1, + vec![(TopicFilter::new("+/lampa/#")?, QualityOfService::Level0)], + ); + let mut buf = vec![]; + packet.encode(&mut buf)?; + socket.write_all(&buf).await?; + + debug!("subscribed to mqtt bulb updates"); + + Ok(()) +} + +#[derive(Deserialize)] +struct BulbResult { + #[serde(rename(deserialize = "POWER"))] + power: Option, + + #[serde(rename(deserialize = "Color"))] + color: Option, +}