diff --git a/Cargo.lock b/Cargo.lock index 9719584..381971d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -46,7 +46,7 @@ dependencies = [ "actix-utils", "ahash", "base64", - "bitflags 2.4.0", + "bitflags 2.4.1", "brotli", "bytes", "bytestring", @@ -118,7 +118,7 @@ dependencies = [ "futures-core", "futures-util", "mio", - "socket2 0.5.4", + "socket2 0.5.5", "tokio", "tracing", ] @@ -179,7 +179,7 @@ dependencies = [ "serde_json", "serde_urlencoded", "smallvec", - "socket2 0.5.4", + "socket2 0.5.5", "time", "url", ] @@ -219,21 +219,22 @@ checksum = "aae1277d39aeec15cb388266ecc24b11c80469deae6067e17a1a7aa9e5c1f234" [[package]] name = "ahash" -version = "0.8.3" +version = "0.8.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2c99f64d1e06488f620f932677e24bc6e2897582980441ae90a671415bd7ec2f" +checksum = "cd7d5a2cecb58716e47d67d5703a249964b14c7be1ec3cad3affc295b2d1c35d" dependencies = [ "cfg-if 1.0.0", "getrandom", "once_cell", "version_check", + "zerocopy", ] [[package]] name = "aho-corasick" -version = "1.1.1" +version = "1.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ea5d730647d4fadd988536d06fecce94b7b4f2a7efdae548f1cf4b63205518ab" +checksum = "b2969dcb958b36655471fc61f7e416fa76033bdd4bfed0678d8fee1e2d07a1f0" dependencies = [ "memchr", ] @@ -285,9 +286,9 @@ checksum = "a26fa4d7e3f2eebadf743988fc8aec9fa9a9e82611acafd77c1462ed6262440a" [[package]] name = "async-trait" -version = "0.1.73" +version = "0.1.74" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bc00ceb34980c03614e35a3a4e218276a0a824e911d07651cd0d858a51e8c0f0" +checksum = "a66537f1bb974b254c98ed142ff995236e81b9d0fe4db0575f46612cb15eb0f9" dependencies = [ "proc-macro2", "quote", @@ -328,9 +329,9 @@ dependencies = [ [[package]] name = "base64" -version = "0.21.4" +version = "0.21.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9ba43ea6f343b788c8764558649e08df62f86c6ef251fdaeb1ffd010a9ae50a2" +checksum = "35636a1494ede3b646cc98f74f8e62c773a38a659ebc777a2cf26b9b74171df9" [[package]] name = "bincode" @@ -349,9 +350,9 @@ checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a" [[package]] name = "bitflags" -version = "2.4.0" +version = "2.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b4682ae6287fcf752ecaabbfcc7b6f9b72aa33933dc23a554d853aea8eea8635" +checksum = "327762f6e5a765692301e5bb513e0d9fef63be86bbc14528052b1cd3e6f03e07" [[package]] name = "block-buffer" @@ -496,9 +497,9 @@ checksum = "e496a50fda8aacccc86d7529e2c1e0892dbd0f898a6b5645b5561b89c3210efa" [[package]] name = "cpufeatures" -version = "0.2.9" +version = "0.2.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a17b76ff3a4162b0b27f354a0c87015ddad39d35f9c0c36607a3bdd175dde1f1" +checksum = "3fbc60abd742b35f2492f808e1abbb83d45f72db402e14c55057edc9c7b1e9e4" dependencies = [ "libc", ] @@ -512,6 +513,17 @@ dependencies = [ "cfg-if 1.0.0", ] +[[package]] +name = "cron" +version = "0.12.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1ff76b51e4c068c52bfd2866e1567bee7c567ae8f24ada09fd4307019e25eab7" +dependencies = [ + "chrono", + "nom", + "once_cell", +] + [[package]] name = "crossbeam-channel" version = "0.5.8" @@ -634,9 +646,12 @@ dependencies = [ [[package]] name = "deranged" -version = "0.3.8" +version = "0.3.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f2696e8a945f658fd14dc3b87242e6b80cd0f36ff04ea560fa39082368847946" +checksum = "0f32d04922c60427da6f9fef14d042d9edddef64cb9d4ce0d64d0685fbeb1fd3" +dependencies = [ + "powerfmt", +] [[package]] name = "derive_more" @@ -657,7 +672,7 @@ version = "2.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2268a214a6f118fce1838edba3d1561cf0e78d8de785475957a580a7f8c69d33" dependencies = [ - "bitflags 2.4.0", + "bitflags 2.4.1", "byteorder", "diesel_derives", "itoa", @@ -775,11 +790,10 @@ dependencies = [ [[package]] name = "errno" -version = "0.3.4" +version = "0.3.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "add4f07d43996f76ef320709726a556a9d4f965d9410d8d0271132d2f8293480" +checksum = "ac3e13f66a2f95e32a39eaa81f6b95d42878ca0e1db0c7543723dfe12557e860" dependencies = [ - "errno-dragonfly", "libc", "windows-sys", ] @@ -802,9 +816,9 @@ checksum = "25cbce373ec4653f1a01a31e8a5e5ec0c622dc27ff9c4e6606eefef5cbbed4a5" [[package]] name = "flate2" -version = "1.0.27" +version = "1.0.28" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c6c98ee8095e9d1dcbf2fcc6d95acccb90d1c81db1e44725c6a984b1dbdfb010" +checksum = "46303f565772937ffe1d394a4fac6f411c6013172fadde9dcdb1e147a086940e" dependencies = [ "crc32fast", "miniz_oxide", @@ -1001,9 +1015,9 @@ checksum = "8a9ee70c43aaf417c914396645a0fa852624801b24ebb7ae78fe8272889ac888" [[package]] name = "hashbrown" -version = "0.14.1" +version = "0.14.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7dfda62a12f55daeae5015f81b0baea145391cb4520f86c248fc615d72640d12" +checksum = "f93e7192158dbcda357bdec5fb5788eebf8bbac027f3f33e719d29135ae84156" [[package]] name = "headers" @@ -1110,7 +1124,7 @@ dependencies = [ "httpdate", "itoa", "pin-project-lite", - "socket2 0.4.9", + "socket2 0.4.10", "tokio", "tower-service", "tracing", @@ -1132,16 +1146,16 @@ dependencies = [ [[package]] name = "iana-time-zone" -version = "0.1.57" +version = "0.1.58" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2fad5b825842d2b38bd206f3e81d6957625fd7f0a361e345c30e01a0ae2dd613" +checksum = "8326b86b6cff230b97d0d312a6c40a60726df3332e721f72a1b035f451663b20" dependencies = [ "android_system_properties", "core-foundation-sys", "iana-time-zone-haiku", "js-sys", "wasm-bindgen", - "windows", + "windows-core", ] [[package]] @@ -1214,7 +1228,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8adf3ddd720272c6ea8bf59463c04e0f93d0bbf7c5439b691bca2987e0270897" dependencies = [ "equivalent", - "hashbrown 0.14.1", + "hashbrown 0.14.2", ] [[package]] @@ -1236,9 +1250,9 @@ dependencies = [ [[package]] name = "ipnet" -version = "2.8.0" +version = "2.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "28b29a3cd74f0f4598934efe3aeba42bae0eb4680554128851ebbecb02af14e6" +checksum = "8f518f335dce6725a761382244631d86cf0ccb2863413590b31338feb467f9c3" [[package]] name = "itoa" @@ -1248,9 +1262,9 @@ checksum = "af150ab688ff2122fcef229be89cb50dd66af9e01a4ff320cc137eecc9bacc38" [[package]] name = "jobserver" -version = "0.1.26" +version = "0.1.27" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "936cfd212a0155903bcbc060e316fb6cc7cbf2e1907329391ebadc1fe0ce77c2" +checksum = "8c37f63953c4c63420ed5fd3d6d398c719489b9f872b9fa683262f8edd363c7d" dependencies = [ "libc", ] @@ -1313,9 +1327,9 @@ dependencies = [ [[package]] name = "linux-raw-sys" -version = "0.4.8" +version = "0.4.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3852614a3bd9ca9804678ba6be5e3b8ce76dfc902cae004e3e0c44051b6e88db" +checksum = "da2479e8c062e40bf0066ffa0bc823de0a9368974af99c9f6df941d2c231e03f" [[package]] name = "local-channel" @@ -1336,9 +1350,9 @@ checksum = "e34f76eb3611940e0e7d53a9aaa4e6a3151f69541a282fd0dad5571420c53ff1" [[package]] name = "lock_api" -version = "0.4.10" +version = "0.4.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c1cc9717a20b1bb222f333e6a92fd32f7d8a18ddc5a3191a11af45dcbf4dcd16" +checksum = "3c168f8615b12bc01f9c17e2eb0cc07dcae1940121185446edc3744920e8ef45" dependencies = [ "autocfg", "scopeguard", @@ -1422,6 +1436,12 @@ dependencies = [ "unicase", ] +[[package]] +name = "minimal-lexical" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "68354c5c6bd36d73ff3feceb05efa59b6acb7626617f4962be322a825e61f79a" + [[package]] name = "miniz_oxide" version = "0.7.1" @@ -1433,9 +1453,9 @@ dependencies = [ [[package]] name = "mio" -version = "0.8.8" +version = "0.8.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "927a765cd3fc26206e66b296465fa9d3e5ab003e651c1b3c060e7956d96b19d2" +checksum = "3dce281c5e46beae905d4de1870d8b1509a9142b62eedf18b443b011ca8343d0" dependencies = [ "libc", "log", @@ -1492,6 +1512,16 @@ dependencies = [ "void", ] +[[package]] +name = "nom" +version = "7.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d273983c5a657a70a3e8f2a01329822f3b8c8172b73826411a55751e404a0a4a" +dependencies = [ + "memchr", + "minimal-lexical", +] + [[package]] name = "ntapi" version = "0.3.7" @@ -1513,9 +1543,9 @@ dependencies = [ [[package]] name = "num-traits" -version = "0.2.16" +version = "0.2.17" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f30b0abd723be7e2ffca1272140fac1a2f084c77ec3e123c192b66af1ee9e6c2" +checksum = "39e3200413f237f41ab11ad6d161bc7239c84dcb631773ccd7de3dfe4b5c267c" dependencies = [ "autocfg", ] @@ -1551,7 +1581,7 @@ version = "0.10.57" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bac25ee399abb46215765b1cb35bc0212377e58a061560d8b29b024fd0430e7c" dependencies = [ - "bitflags 2.4.0", + "bitflags 2.4.1", "cfg-if 1.0.0", "foreign-types", "libc", @@ -1607,13 +1637,13 @@ dependencies = [ [[package]] name = "parking_lot_core" -version = "0.9.8" +version = "0.9.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "93f00c865fe7cabf650081affecd3871070f26767e7b2070a3ffae14c654b447" +checksum = "4c42a9226546d68acdd9c0a280d17ce19bfe27a46bf68784e4066115788d008e" dependencies = [ "cfg-if 1.0.0", "libc", - "redox_syscall", + "redox_syscall 0.4.1", "smallvec", "windows-targets", ] @@ -1674,6 +1704,12 @@ version = "3.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4503fa043bf02cee09a9582e9554b4c6403b2ef55e4612e96561d294419429f8" +[[package]] +name = "powerfmt" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "439ee305def115ba05938db6eb1644ff94165c5ab5e9420d1c1bcedbba909391" + [[package]] name = "ppv-lite86" version = "0.2.17" @@ -1721,9 +1757,9 @@ checksum = "dc375e1527247fe1a97d8b7156678dfe7c1af2fc075c9a4db3690ecd2a148068" [[package]] name = "proc-macro2" -version = "1.0.68" +version = "1.0.69" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5b1106fec09662ec6dd98ccac0f81cef56984d0b49f75c92d8cbad76e20c005c" +checksum = "134c189feb4956b20f6f547d2cf727d4c0fe06722b20a0eec87ed445a97f92da" dependencies = [ "unicode-ident", ] @@ -1796,16 +1832,25 @@ dependencies = [ "bitflags 1.3.2", ] +[[package]] +name = "redox_syscall" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4722d768eff46b75989dd134e5c353f0d6296e5aaa3132e776cbdb56be7731aa" +dependencies = [ + "bitflags 1.3.2", +] + [[package]] name = "regex" -version = "1.9.6" +version = "1.10.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ebee201405406dbf528b8b672104ae6d6d63e6d118cb10e4d51abbc7b58044ff" +checksum = "380b951a9c5e80ddfd6136919eef32310721aa4aacd4889a8d39124b026ab343" dependencies = [ "aho-corasick", "memchr", - "regex-automata 0.3.9", - "regex-syntax 0.7.5", + "regex-automata 0.4.3", + "regex-syntax 0.8.2", ] [[package]] @@ -1819,13 +1864,13 @@ dependencies = [ [[package]] name = "regex-automata" -version = "0.3.9" +version = "0.4.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "59b23e92ee4318893fa3fe3e6fb365258efbfe6ac6ab30f090cdcbb7aa37efa9" +checksum = "5f804c7828047e88b2d32e2d7fe5a105da8ee3264f01902f796c8e067dc2483f" dependencies = [ "aho-corasick", "memchr", - "regex-syntax 0.7.5", + "regex-syntax 0.8.2", ] [[package]] @@ -1836,9 +1881,9 @@ checksum = "f162c6dd7b008981e4d40210aca20b4bd0f9b60ca9271061b07f78537722f2e1" [[package]] name = "regex-syntax" -version = "0.7.5" +version = "0.8.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dbb5fb1acd8a1a18b3dd5be62d25485eb770e05afb408a9627d14d451bae12da" +checksum = "c08c74e62047bb2de4ff487b251e4a92e24f48745648451635cec7d591162d9f" [[package]] name = "reqwest" @@ -1996,12 +2041,12 @@ dependencies = [ [[package]] name = "rustix" -version = "0.38.17" +version = "0.38.20" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f25469e9ae0f3d0047ca8b93fc56843f38e6774f0914a107ff8b41be8be8e0b7" +checksum = "67ce50cb2e16c2903e30d1cbccfd8387a74b9d4c938b6a4c5ec6cc7556f7a8a0" dependencies = [ - "bitflags 2.4.0", - "errno 0.3.4", + "bitflags 2.4.1", + "errno 0.3.5", "libc", "linux-raw-sys", "windows-sys", @@ -2109,24 +2154,24 @@ dependencies = [ [[package]] name = "semver" -version = "1.0.19" +version = "1.0.20" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ad977052201c6de01a8ef2aa3378c4bd23217a056337d1d6da40468d267a4fb0" +checksum = "836fa6a3e1e547f9a2c4040802ec865b5d85f4014efe00555d7090a3dcaa1090" [[package]] name = "serde" -version = "1.0.188" +version = "1.0.189" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cf9e0fcba69a370eed61bcf2b728575f726b50b55cba78064753d708ddc7549e" +checksum = "8e422a44e74ad4001bdc8eede9a4570ab52f71190e9c076d14369f38b9200537" dependencies = [ "serde_derive", ] [[package]] name = "serde_derive" -version = "1.0.188" +version = "1.0.189" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4eca7ac642d82aa35b60049a6eccb4be6be75e599bd2e9adb5f875a737654af2" +checksum = "1e48d1f918009ce3145511378cf68d613e3b3d9137d67272562080d68a2b32d5" dependencies = [ "proc-macro2", "quote", @@ -2160,9 +2205,9 @@ dependencies = [ [[package]] name = "serde_spanned" -version = "0.6.3" +version = "0.6.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "96426c9936fd7a0124915f9185ea1d20aa9445cc9821142f0a73bc9207a2e186" +checksum = "12022b835073e5b11e90a14f86838ceb1c8fb0325b72416845c487ac0fa95e80" dependencies = [ "serde", ] @@ -2252,9 +2297,9 @@ checksum = "942b4a808e05215192e39f4ab80813e599068285906cc91aa64f923db842bd5a" [[package]] name = "socket2" -version = "0.4.9" +version = "0.4.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "64a4a911eed85daf18834cfaa86a79b7d266ff93ff5ba14005426219480ed662" +checksum = "9f7916fc008ca5542385b89a3d3ce689953c143e9304a9bf8beec1de48994c0d" dependencies = [ "libc", "winapi", @@ -2262,9 +2307,9 @@ dependencies = [ [[package]] name = "socket2" -version = "0.5.4" +version = "0.5.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4031e820eb552adee9295814c0ced9e5cf38ddf1e8b7d566d6de8e2538ea989e" +checksum = "7b5fac59a5cb5dd637972e5fca70daf0523c9067fcdc4842f053dae04a18f8e9" dependencies = [ "libc", "windows-sys", @@ -2420,7 +2465,7 @@ checksum = "cb94d2f3cc536af71caac6b6fcebf65860b347e7ce0cc9ebe8f70d3e521054ef" dependencies = [ "cfg-if 1.0.0", "fastrand", - "redox_syscall", + "redox_syscall 0.3.5", "rustix", "windows-sys", ] @@ -2466,12 +2511,13 @@ dependencies = [ [[package]] name = "time" -version = "0.3.29" +version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "426f806f4089c493dcac0d24c29c01e2c38baf8e30f1b716ee37e83d200b18fe" +checksum = "c4a34ab300f2dee6e562c10a046fc05e358b29f9bf92277f30c3c8d82275f6f5" dependencies = [ "deranged", "itoa", + "powerfmt", "serde", "time-core", "time-macros", @@ -2509,9 +2555,9 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20" [[package]] name = "tokio" -version = "1.32.0" +version = "1.33.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "17ed6077ed6cd6c74735e21f37eb16dc3935f96878b1fe961074089cc80893f9" +checksum = "4f38200e3ef7995e5ef13baec2f432a6da0aa9ac495b2c0e8f3b7eec2c92d653" dependencies = [ "backtrace", "bytes", @@ -2521,7 +2567,7 @@ dependencies = [ "parking_lot", "pin-project-lite", "signal-hook-registry", - "socket2 0.5.4", + "socket2 0.5.5", "tokio-macros", "windows-sys", ] @@ -2608,9 +2654,9 @@ dependencies = [ [[package]] name = "toml_datetime" -version = "0.6.3" +version = "0.6.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7cda73e2f1397b1262d6dfdcef8aafae14d1de7748d66822d3bfeeb6d03e5e4b" +checksum = "3550f4e9685620ac18a50ed434eb3aec30db8ba93b0287467bca5826ea25baf1" dependencies = [ "serde", ] @@ -2636,11 +2682,10 @@ checksum = "b6bc1c9ce2b5135ac7f93c72918fc37feb872bdc6a5533a8b85eb4b86bfdae52" [[package]] name = "tracing" -version = "0.1.37" +version = "0.1.40" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8ce8c33a8d48bd45d624a6e523445fd21ec13d3653cd51f681abf67418f54eb8" +checksum = "c3523ab5a71916ccf420eebdf5521fcef02141234bbc0b8a49f2fdc4544364ef" dependencies = [ - "cfg-if 1.0.0", "log", "pin-project-lite", "tracing-attributes", @@ -2660,9 +2705,9 @@ dependencies = [ [[package]] name = "tracing-attributes" -version = "0.1.26" +version = "0.1.27" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5f4f31f56159e98206da9efd823404b79b6ef3143b4a7ab76e67b1751b25a4ab" +checksum = "34704c8d6ebcbc939824180af020566b01a7c01f80641264eba0999f6c2b6be7" dependencies = [ "proc-macro2", "quote", @@ -2671,9 +2716,9 @@ dependencies = [ [[package]] name = "tracing-core" -version = "0.1.31" +version = "0.1.32" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0955b8137a1df6f1a2e9a37d8a6656291ff0297c1a97c24e0d8425fe2312f79a" +checksum = "c06d3da6113f116aaee68e4d601191614c9053067f9ab7f6edbcb161237daa54" dependencies = [ "once_cell", "valuable", @@ -2681,12 +2726,12 @@ dependencies = [ [[package]] name = "tracing-log" -version = "0.1.3" +version = "0.1.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "78ddad33d2d10b1ed7eb9d1f518a5674713876e97e5bb9b7345a7984fbb4f922" +checksum = "f751112709b4e791d8ce53e32c4ed2d353565a795ce84da2285393f41557bdf2" dependencies = [ - "lazy_static", "log", + "once_cell", "tracing-core", ] @@ -2756,8 +2801,10 @@ name = "u_lib" version = "0.1.0" dependencies = [ "anyhow", + "async-trait", "bincode", "chrono", + "cron", "daemonize", "deadpool-diesel", "diesel", @@ -2909,9 +2956,9 @@ checksum = "09cc8ee72d2a9becf2f2febe0205bbed8fc6615b7cb429ad062dc7b7ddd036a9" [[package]] name = "uuid" -version = "1.4.1" +version = "1.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "79daa5ed5740825c40b389c5e50312b9c86df53fccd33f281df655642b43869d" +checksum = "88ad59a7560b41a70d191093a945f0b87bc1deeda46fb237479708a1d6b6cdfc" dependencies = [ "getrandom", "serde", @@ -3112,10 +3159,10 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" [[package]] -name = "windows" -version = "0.48.0" +name = "windows-core" +version = "0.51.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e686886bc078bc1b0b600cac0147aadb815089b6e4da64016cbd754b6342700f" +checksum = "f1f8cf84f35d2db49a46868f947758c7a1138116f7fac3bc844f43ade1292e64" dependencies = [ "windows-targets", ] @@ -3188,9 +3235,9 @@ checksum = "ed94fce61571a4006852b7389a063ab983c02eb1bb37b47f8272ce92d06d9538" [[package]] name = "winnow" -version = "0.5.16" +version = "0.5.17" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "037711d82167854aff2018dfd193aa0fef5370f456732f0d5a0c59b0f1b4b907" +checksum = "a3b801d0e0a6726477cc207f60162da452f3a95adb368399bef20a946e06f65c" dependencies = [ "memchr", ] @@ -3205,6 +3252,26 @@ dependencies = [ "windows-sys", ] +[[package]] +name = "zerocopy" +version = "0.7.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4c19fae0c8a9efc6a8281f2e623db8af1db9e57852e04cde3e754dd2dc29340f" +dependencies = [ + "zerocopy-derive", +] + +[[package]] +name = "zerocopy-derive" +version = "0.7.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fc56589e9ddd1f1c28d4b4b5c773ce232910a6bb67a70133d61c9e347585efe9" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.38", +] + [[package]] name = "zstd" version = "0.12.4" @@ -3226,11 +3293,10 @@ dependencies = [ [[package]] name = "zstd-sys" -version = "2.0.8+zstd.1.5.5" +version = "2.0.9+zstd.1.5.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5556e6ee25d32df2586c098bbfa278803692a20d0ab9565e049480d52707ec8c" +checksum = "9e16efa8a874a0481a574084d34cc26fdb3b99627480f785888deb6386506656" dependencies = [ "cc", - "libc", "pkg-config", ] diff --git a/bin/u_agent/src/lib.rs b/bin/u_agent/src/lib.rs index faa3d3f..7c69c2e 100644 --- a/bin/u_agent/src/lib.rs +++ b/bin/u_agent/src/lib.rs @@ -1,29 +1,29 @@ #[macro_use] extern crate log; -use std::process::exit; use tokio::runtime::Builder; use tokio::time::{sleep, Duration}; +use u_lib::models::PreparedJob; +use u_lib::scheduler::SCHEDULER; +use u_lib::u_runner::URunner; use u_lib::{ api::HttpClient, cache::JobCache, config::{get_self_id, EndpointsEnv, AGENT_ITERATION_INTERVAL}, error::ErrChan, - executor, - jobs::AnonymousJobBatch, logging::init_logger, messaging::Reportable, models::AssignedJobById, }; -async fn process_request(jobs: Vec, client: &HttpClient) { - if !jobs.is_empty() { - for jr in &jobs { - if !JobCache::contains(jr.job_id) { - info!("Fetching job: {}", &jr.job_id); +async fn process_request(assigned_jobs: Vec, client: &HttpClient) { + if !assigned_jobs.is_empty() { + for asgn_job in assigned_jobs { + if !JobCache::contains(asgn_job.job_id) { + info!("Fetching job: {}", &asgn_job.job_id); let mut fetched_job = loop { //todo: use payload cache - match client.get_full_job(jr.job_id).await { + match client.get_full_job(asgn_job.job_id).await { Ok(result) => break result, Err(err) => { debug!("{:?} \nretrying...", err); @@ -33,78 +33,72 @@ async fn process_request(jobs: Vec, client: &HttpClient) { }; if let Some(payload) = &mut fetched_job.payload { if let Err(e) = payload.maybe_split_payload() { - ErrChan::send(e, "pld").await; + ErrChan::send(e, "pay").await; + continue; } } JobCache::insert(fetched_job); } - } - info!( - "Scheduling jobs: {}", - jobs.iter() - .map(|j| j.job_id.to_string()) - .collect::>() - .join(", ") - ); - let meta_with_ids = jobs - .into_iter() - .map(|job| { - let meta = JobCache::get(job.job_id).unwrap().clone(); - (meta, job) - }) - .collect::>(); - - AnonymousJobBatch::from_meta_with_id(meta_with_ids) - .spawn() - .await; - } -} + let job = match JobCache::get(asgn_job.job_id).as_deref() { + Some(job) => job.clone(), + None => continue, + }; + + info!("Scheduling job {}", job.meta.id.to_string()); -async fn error_reporting(client: HttpClient) -> ! { - loop { - match ErrChan::recv().await { - Some(err) => { - 'retry: for _ in 0..3 { - match client.report([Reportable::Error(err.clone())]).await { - Ok(_) => break 'retry, - Err(e) => { - debug!("Reporting error: {:?}", e); - sleep(Duration::from_secs(10)).await; + let schedule = match job.meta.schedule.clone() { + Some(sched) => { + if sched.is_empty() { + None + } else { + match sched.as_str().try_into() { + Ok(s) => Some(s), + Err(err) => { + ErrChan::send(err, "sch").await; + continue; + } } } } - } - None => sleep(Duration::from_secs(3)).await, + None => None, + }; + SCHEDULER + .add_job(schedule, PreparedJob { job, ids: asgn_job }) + .await; } } } -async fn agent_loop(client: HttpClient) -> ! { +async fn error_reporting(client: HttpClient) { + while let Some(err) = ErrChan::recv().await { + let _ = client.report([Reportable::Error(err.clone())]).await; + } +} + +async fn agent_loop(client: HttpClient) { let self_id = get_self_id(); - loop { - match client.get_personal_jobs(self_id).await { - Ok(jobs) => { - process_request(jobs, &client).await; - } - Err(err) => ErrChan::send(err, "processing").await, + + match client.get_personal_jobs(self_id).await { + Ok(jobs) => { + process_request(jobs, &client).await; } + Err(err) => ErrChan::send(err, "pro").await, + } - let result: Vec = executor::pop_completed() - .await - .into_iter() - .map(|result| match result { - Ok(r) => Reportable::Assigned(r), - Err(e) => Reportable::Error(e), - }) - .collect(); - - if !result.is_empty() { - if let Err(err) = client.report(result).await { - ErrChan::send(err, "report").await; - } + let result: Vec = URunner::pop_completed() + .await + .into_iter() + .map(|result| match result { + Ok(r) => Reportable::Assigned(r), + Err(e) => Reportable::Error(e), + }) + .collect(); + + if !result.is_empty() { + if let Err(err) = client.report(result).await { + ErrChan::send(err, "rep").await; } - sleep(AGENT_ITERATION_INTERVAL).await; } } @@ -134,15 +128,33 @@ pub fn run_forever() -> ! { .build() .unwrap() .block_on(async { - match HttpClient::new(&env.u_server, None).await { - Ok(client) => { - tokio::spawn(error_reporting(client.clone())); - agent_loop(client).await - } - Err(e) => { - error!("client init failed: {}", e); - exit(7) // todo: wtf? + let client = loop { + match HttpClient::new(&env.u_server, None).await { + Ok(client) => break client, + Err(e) => { + error!("client init failed: {}", e); + sleep(Duration::from_secs(5)).await; + continue; + } } + }; + { + let client = client.clone(); + SCHEDULER + .add_job(Some("*/3 * * * * * *".try_into().unwrap()), move || { + let client = client.clone(); + error_reporting(client.clone()) + }) + .await; } + + SCHEDULER + .add_job(Some("*/3 * * * * * *".try_into().unwrap()), move || { + let client = client.clone(); + agent_loop(client) + }) + .await; + + SCHEDULER.start_blocking().await }) } diff --git a/integration-tests/tests/fixtures/agent.rs b/integration-tests/tests/fixtures/agent.rs index db67502..07d912c 100644 --- a/integration-tests/tests/fixtures/agent.rs +++ b/integration-tests/tests/fixtures/agent.rs @@ -25,7 +25,7 @@ pub fn registered_agent(client: &HttpClient) -> RegisteredAgent { assert_eq!(job.meta.alias, Some("agent_hello".to_string())); - let mut agent_data = AssignedJob::from((&job.meta, resp)); + let mut agent_data = AssignedJob::from(&PreparedJob { job, ids: resp }); agent_data.set_result(&agent); client diff --git a/lib/u_lib/Cargo.toml b/lib/u_lib/Cargo.toml index 8cf77d7..e091b62 100644 --- a/lib/u_lib/Cargo.toml +++ b/lib/u_lib/Cargo.toml @@ -33,6 +33,8 @@ uuid = { workspace = true, features = ["serde", "v4"] } parking_lot = "0.12.1" bincode = "1.3.3" sha3 = "0.10.7" +cron = "0.12.0" +async-trait = "0.1.74" [target.'cfg(unix)'.dependencies] daemonize = "0.5" diff --git a/lib/u_lib/src/api.rs b/lib/u_lib/src/api.rs index e0e9771..48ac150 100644 --- a/lib/u_lib/src/api.rs +++ b/lib/u_lib/src/api.rs @@ -233,11 +233,8 @@ impl HttpClient { &self, assigned: impl IntoIterator, ) -> Result { - self.req_with_payload( - format!("assign_jobs"), - &assigned.into_iter().collect::>(), - ) - .await + self.req_with_payload("assign_jobs", &assigned.into_iter().collect::>()) + .await } /// get jobs for any agent by job_id, agent_id or result_id diff --git a/lib/u_lib/src/jobs.rs b/lib/u_lib/src/jobs.rs index 899aa67..9e4f42d 100644 --- a/lib/u_lib/src/jobs.rs +++ b/lib/u_lib/src/jobs.rs @@ -1,26 +1,26 @@ use crate::{ combined_result::CombinedResult, - executor::{ExecResult, Waiter}, - models::{Agent, AssignedJob, AssignedJobById, Job, JobType, RawJob}, + models::{Agent, AssignedJob, AssignedJobById, Job, JobType, PreparedJob, RawJob}, proc_output::ProcOutput, + u_runner::{ExecResult, URunner}, }; use std::collections::HashMap; use std::process::exit; use tokio::process::Command; pub struct AnonymousJobBatch { - waiter: Waiter, + runner: URunner, is_running: bool, } impl AnonymousJobBatch { - pub fn from_meta_with_id(jobs: impl IntoIterator) -> Self { - let mut waiter = Waiter::new(); - for (job, ids) in jobs { - waiter.push(run_assigned_job(job, ids)); + pub fn from_prepared_jobs(jobs: impl IntoIterator) -> Self { + let mut runner = URunner::new(); + for job in jobs { + runner.push(run_assigned_job(job)); } Self { - waiter, + runner, is_running: false, } } @@ -28,36 +28,33 @@ impl AnonymousJobBatch { pub fn from_meta(jobs: impl IntoIterator) -> Self { let jobs_ids: Vec<_> = jobs .into_iter() - .map(|job| { - let job_id = job.meta.id; - ( - job, - AssignedJobById { - job_id, - ..Default::default() - }, - ) + .map(|job| PreparedJob { + ids: AssignedJobById { + job_id: job.meta.id, + ..Default::default() + }, + job, }) .collect(); - AnonymousJobBatch::from_meta_with_id(jobs_ids) + AnonymousJobBatch::from_prepared_jobs(jobs_ids) } /// Spawn jobs pub async fn spawn(mut self) -> Self { debug!("spawning jobs"); - self.waiter = self.waiter.spawn().await; + self.runner = self.runner.spawn().await; self.is_running = true; self } /// Spawn jobs and wait for result pub async fn wait(self) -> Vec { - let waiter = if !self.is_running { - self.spawn().await.waiter + let runner = if !self.is_running { + self.spawn().await.runner } else { - self.waiter + self.runner }; - waiter.wait().await + runner.wait().await } /// Spawn one job and wait for result @@ -132,9 +129,9 @@ impl NamedJobBatch { } } -pub async fn run_assigned_job(job: Job, ids: AssignedJobById) -> ExecResult { - let Job { meta, payload } = job; - let mut result = AssignedJob::from((&meta, ids)); +pub async fn run_assigned_job(prepared_job: PreparedJob) -> ExecResult { + let mut result = AssignedJob::from(&prepared_job); + let Job { meta, payload } = prepared_job.job; match meta.exec_type { JobType::Shell => { let (argv, _prepared_payload) = { @@ -150,7 +147,11 @@ pub async fn run_assigned_job(job: Job, ids: AssignedJobById) -> ExecResult { let mut split_cmd = shlex::split(&argv).unwrap().into_iter(); let cmd = split_cmd.nth(0).unwrap(); let args = split_cmd.collect::>(); - let cmd_result = Command::new(cmd).args(args).output().await; + let cmd_result = Command::new(cmd) + .kill_on_drop(true) + .args(args) + .output() + .await; let (data, retcode) = match cmd_result { Ok(output) => ( ProcOutput::from_output(&output).into_vec(), diff --git a/lib/u_lib/src/lib.rs b/lib/u_lib/src/lib.rs index 52ada1b..92b8382 100644 --- a/lib/u_lib/src/lib.rs +++ b/lib/u_lib/src/lib.rs @@ -7,7 +7,6 @@ pub mod conv; #[cfg(feature = "server")] pub mod db; pub mod error; -pub mod executor; pub mod jobs; pub mod logging; pub mod messaging; @@ -15,7 +14,9 @@ pub mod misc; pub mod models; pub mod platform; pub mod proc_output; +pub mod scheduler; pub mod types; +pub mod u_runner; pub mod ufs; #[cfg(unix)] pub mod unix; @@ -37,3 +38,6 @@ extern crate tracing; #[cfg(test)] #[macro_use] extern crate rstest; + +#[macro_use] +extern crate async_trait; diff --git a/lib/u_lib/src/models/agent.rs b/lib/u_lib/src/models/agent.rs index a05e74d..efe8afa 100644 --- a/lib/u_lib/src/models/agent.rs +++ b/lib/u_lib/src/models/agent.rs @@ -13,8 +13,8 @@ mod server { use self::server::*; use crate::{ - config::get_self_id, conv::systime_to_string, executor::ExecResult, jobs::NamedJobBatch, - platform, types::Id, + config::get_self_id, conv::systime_to_string, jobs::NamedJobBatch, platform, types::Id, + u_runner::ExecResult, }; #[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Display)] diff --git a/lib/u_lib/src/models/jobs/assigned.rs b/lib/u_lib/src/models/jobs/assigned.rs index a7caf20..dfbb254 100644 --- a/lib/u_lib/src/models/jobs/assigned.rs +++ b/lib/u_lib/src/models/jobs/assigned.rs @@ -1,9 +1,10 @@ -use super::{JobMeta, JobState, JobType}; +use super::{JobState, JobType}; #[cfg(feature = "server")] use crate::models::schema::*; use crate::{ config::get_self_id, conv::{bytes_to_string_truncated, systime_to_string}, + models::Job, types::Id, }; #[cfg(feature = "server")] @@ -11,6 +12,7 @@ use diesel::{Identifiable, Insertable, Queryable}; use serde::{Deserialize, Serialize}; use std::{borrow::Cow, fmt::Debug, time::SystemTime}; +/// A job assigned to some agents, stores execution state and result #[derive(Serialize, Deserialize, Clone, PartialEq)] #[cfg_attr( feature = "server", @@ -53,6 +55,13 @@ impl Debug for AssignedJob { } } +#[derive(Debug, Clone)] +pub struct PreparedJob { + pub job: Job, + pub ids: AssignedJobById, +} + +/// Lightweight linking struct between agent job and assigned job #[derive(Serialize, Deserialize, Clone, Copy, Debug)] pub struct AssignedJobById { pub agent_id: Id, @@ -61,20 +70,20 @@ pub struct AssignedJobById { pub job_id: Id, } -impl From<(&JobMeta, AssignedJobById)> for AssignedJob { - fn from((job, ids): (&JobMeta, AssignedJobById)) -> Self { +impl From<&PreparedJob> for AssignedJob { + fn from(prep_job: &PreparedJob) -> Self { let AssignedJobById { agent_id, id, job_id, - } = ids; + } = prep_job.ids; AssignedJob { id, agent_id, job_id, - alias: job.alias.clone(), - exec_type: job.exec_type, + alias: prep_job.job.meta.alias.clone(), + exec_type: prep_job.job.meta.exec_type, ..Default::default() } } diff --git a/lib/u_lib/src/models/jobs/meta.rs b/lib/u_lib/src/models/jobs/meta.rs index 10a8026..0426661 100644 --- a/lib/u_lib/src/models/jobs/meta.rs +++ b/lib/u_lib/src/models/jobs/meta.rs @@ -7,6 +7,7 @@ use crate::models::Payload; use crate::platform; use crate::types::Id; use crate::{UError, UResult}; +use cron::Schedule; #[cfg(feature = "server")] use diesel::{Identifiable, Insertable, Queryable}; use serde::{Deserialize, Serialize}; @@ -119,16 +120,24 @@ impl JobMeta { ))); } + if let Some(schedule) = &self.schedule { + Schedule::try_from(schedule.as_str()) + .map_err(|e| mk_err(format!("bad schedule: {e}")))?; + } + Ok(self) } } +/// An abstract valid-constructed job that can be assigned to any agent. +/// Contains job meta and payload meta #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] pub struct Job { pub meta: JobMeta, pub payload: Option, } +/// Job that has only been deserialized without checks #[derive(Serialize, Deserialize, Clone, Default)] pub struct RawJob<'p> { #[serde(default)] diff --git a/lib/u_lib/src/scheduler/entry.rs b/lib/u_lib/src/scheduler/entry.rs new file mode 100644 index 0000000..2d3c422 --- /dev/null +++ b/lib/u_lib/src/scheduler/entry.rs @@ -0,0 +1,66 @@ +use crate::{models::PreparedJob, types::Id}; +use chrono::{DateTime, Utc}; +use core::fmt; +use std::{future::Future, pin::Pin, sync::Arc}; + +pub type JobFuture = Box + Send>; + +pub trait SchedulerJob { + fn call(&self) -> Pin; +} + +impl SchedulerJob for F +where + F: Fn() -> T, + T: Future + Send + 'static, +{ + fn call(&self) -> Pin { + Box::pin(self()) + } +} + +#[derive(Clone)] +pub struct Entry { + pub id: Id, + pub job_id: Id, + pub schedule: Option, + pub next: Option>, + pub runnable: EntryType, +} + +#[derive(Clone)] +pub enum EntryType { + Common(Arc), + URunner(PreparedJob), +} + +impl From for EntryType { + fn from(value: J) -> Self { + Self::Common(Arc::new(value)) + } +} + +impl From for EntryType { + fn from(value: PreparedJob) -> Self { + Self::URunner(value) + } +} + +impl Entry { + pub fn set_next_run_time(&mut self) { + self.next = self.get_next_run_time(); + } + + pub fn get_next_run_time(&self) -> Option> { + match &self.schedule { + Some(schedule) => schedule.upcoming(Utc).next(), + None => Some(Utc::now()), + } + } +} + +impl fmt::Debug for Entry { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{} {:?} {:?}", self.id, self.schedule, self.next) + } +} diff --git a/lib/u_lib/src/scheduler/mod.rs b/lib/u_lib/src/scheduler/mod.rs new file mode 100644 index 0000000..0d2cc8c --- /dev/null +++ b/lib/u_lib/src/scheduler/mod.rs @@ -0,0 +1,187 @@ +mod entry; +mod spawner; + +use std::sync::Arc; +use std::time::Duration; + +use chrono::Utc; +use cron::Schedule; +use entry::Entry; +use once_cell::sync::Lazy; +use tokio::sync::Mutex; +use uuid::Uuid; + +use crate::types::Id; + +use self::entry::EntryType; +use self::spawner::{CommonSpawner, Spawner, URunnerSpawner}; + +pub static SCHEDULER: Lazy = Lazy::new(|| AsyncScheduler::new()); + +#[derive(Clone)] +pub struct AsyncScheduler { + entries: Arc>>, +} + +impl AsyncScheduler { + pub fn new() -> AsyncScheduler { + AsyncScheduler { + entries: Arc::new(Mutex::new(Vec::new())), + } + } + + pub async fn start_blocking(&self) -> ! { + for entry in self.entries.lock().await.iter_mut() { + entry.set_next_run_time(); + } + + loop { + let mut entries = self.entries.lock().await; + entries.sort_by(|b, a| b.next.cmp(&a.next)); + + let wait_duration = if let Some(entry) = entries.first() { + let wait_millis = (entry.next.as_ref().unwrap().timestamp_millis() as u64) + .saturating_sub(Utc::now().timestamp_millis() as u64); + + Duration::from_millis(wait_millis) + } else { + Duration::from_secs(1) + }; + + drop(entries); + tokio::time::sleep(wait_duration).await; + + let mut entries = self.entries.lock().await; + + CommonSpawner { + entries: &mut entries, + } + .spawn() + .await; + + URunnerSpawner { + entries: &mut entries, + } + .spawn() + .await; + + entries.retain(|e| e.schedule.is_some()); + } + } + + pub async fn add_job(&self, schedule: Option, runnable: impl Into) -> Id { + let entry_id = Uuid::new_v4(); + let runnable = runnable.into(); + let job_id = match &runnable { + EntryType::URunner(j) => j.job.meta.id, + _ => Id::new_v4(), + }; + let mut entry = Entry { + id: entry_id, + job_id, + schedule, + next: None, + runnable, + }; + + entry.set_next_run_time(); + self.entries.lock().await.push(entry); + + entry_id + } + + pub async fn del_job(&self, entry_id: Id) { + self.entries.lock().await.retain(|e| e.id != entry_id); + } + + pub async fn start(&self) { + let cloned = self.clone(); + tokio::spawn(async move { + cloned.start_blocking().await; + }); + } +} + +#[cfg(test)] +mod tests { + use std::time::Duration; + use tokio::time::sleep; + + use super::*; + + #[tokio::test(flavor = "multi_thread")] + async fn scheduling() { + let v = Arc::new(Mutex::new(0)); + let scheduler = AsyncScheduler::new(); + + { + let v = v.clone(); + scheduler + .add_job(Some("*/1 * * * * * *".try_into().unwrap()), move || { + let v = v.clone(); + async move { + *v.lock().await += 1; + } + }) + .await; + } + + scheduler + .add_job( + Some("*/1 * * * * * *".try_into().unwrap()), + move || async move { + println!("hello1"); + }, + ) + .await; + + scheduler + .add_job( + Some("*/1 * * * * * *".try_into().unwrap()), + move || async move { + println!("hello2"); + }, + ) + .await; + + scheduler.start().await; + + sleep(Duration::from_secs_f32(0.1)).await; // wait for scheduler + sleep(Duration::from_secs(1)).await; + + assert_eq!(*v.lock().await, 1); + + sleep(Duration::from_secs(1)).await; + + assert_eq!(*v.lock().await, 2); + } + + /// Run task every second. + /// Task will be cancelled if not completed until next run + /// (and this task lasts 2 secs), + /// so the counter value should still be 0 + #[tokio::test(flavor = "multi_thread")] + async fn cancellation() { + let counter = Arc::new(Mutex::new(0)); + let scheduler = AsyncScheduler::new(); + + { + let counter = counter.clone(); + scheduler + .add_job(Some("*/1 * * * * * *".try_into().unwrap()), move || { + let counter = counter.clone(); + async move { + sleep(Duration::from_secs(2)).await; + *counter.lock().await += 1; + } + }) + .await; + } + + scheduler.start().await; + + sleep(Duration::from_secs(3)).await; + + assert_eq!(*counter.lock().await, 0); + } +} diff --git a/lib/u_lib/src/scheduler/spawner.rs b/lib/u_lib/src/scheduler/spawner.rs new file mode 100644 index 0000000..597efef --- /dev/null +++ b/lib/u_lib/src/scheduler/spawner.rs @@ -0,0 +1,85 @@ +use std::time::Duration; + +use crate::jobs::AnonymousJobBatch; + +use super::entry::{Entry, EntryType}; +use chrono::Utc; +use tokio::time::timeout; + +#[async_trait] +pub trait Spawner { + async fn spawn(self); +} + +/// Spawn jobs in scheduler thru `tokio::spawn` +pub struct CommonSpawner<'e> { + pub entries: &'e mut [Entry], +} + +#[async_trait] +impl Spawner for CommonSpawner<'_> { + async fn spawn(self) { + for entry in self + .entries + .iter_mut() + .filter(|e| matches!(e.runnable, EntryType::Common(_))) + { + if let EntryType::Common(runnable) = &entry.runnable { + match entry.next.as_ref() { + Some(next) => { + let cancel_timeout = + next.timestamp_millis() - Utc::now().timestamp_millis(); + let cancel_timeout = Duration::from_millis(cancel_timeout as u64); + + tokio::spawn(timeout(cancel_timeout, runnable.call())); + + entry.set_next_run_time(); + } + None => { + error!("not implemented yet"); + todo!(); + } + } + } + if entry.next.as_ref().unwrap().gt(&Utc::now()) { + break; + } + } + } +} + +/// Spawn jobs in scheduler thru `URunner` +pub struct URunnerSpawner<'e> { + pub entries: &'e mut [Entry], +} + +#[async_trait] +impl Spawner for URunnerSpawner<'_> { + async fn spawn(self) { + let mut job_batch = vec![]; + for entry in self + .entries + .iter_mut() + .filter(|e| matches!(e.runnable, EntryType::URunner(_))) + { + if let EntryType::URunner(runnable) = &entry.runnable { + match entry.next.as_ref() { + Some(_) => { + job_batch.push(runnable.clone()); + entry.set_next_run_time(); + } + None => { + error!("not implemented yet"); + todo!(); + } + } + } + if entry.next.as_ref().unwrap().gt(&Utc::now()) { + break; + } + } + AnonymousJobBatch::from_prepared_jobs(job_batch) + .spawn() + .await; + } +} diff --git a/lib/u_lib/src/executor.rs b/lib/u_lib/src/u_runner.rs similarity index 72% rename from lib/u_lib/src/executor.rs rename to lib/u_lib/src/u_runner.rs index 929011b..712c1f8 100644 --- a/lib/u_lib/src/executor.rs +++ b/lib/u_lib/src/u_runner.rs @@ -39,12 +39,15 @@ fn get_sender() -> Sender { FUT_CHANNEL.0.clone() } -pub struct Waiter { +/// Job runner. Has multiple ways of work. +/// - run 1 or more jobs and wait until they're all done +/// - run 1 or more jobs in background and collect results of completed jobs later +pub struct URunner { tasks: Vec>, fids: Vec, } -impl Waiter { +impl URunner { pub fn new() -> Self { Self { tasks: vec![], @@ -86,58 +89,58 @@ impl Waiter { pub async fn wait(self) -> Vec { let mut result = vec![]; for fid in self.fids { - if let Some(task) = pop_task(fid).await { + if let Some(task) = Self::pop_task(fid).await { result.push(task.wait_result().await); } } result } -} -async fn init_receiver() { - while let Some(fid) = FUT_CHANNEL.1.lock().await.recv().await { - let mut lock = FUT_RESULTS.lock().await; - if let Some(j) = lock.get_mut(&fid) { - j.completed = true; + pub async fn pop_task_if_completed(fid: Uuid) -> Option { + let &JoinInfo { + handle: _, + collectable, + completed, + } = match FUT_RESULTS.lock().await.get(&fid) { + Some(t) => t, + None => return None, + }; + if collectable && completed { + let task = Self::pop_task(fid).await.unwrap(); + Some(task.wait_result().await) + } else { + None } } -} -async fn pop_task(fid: Uuid) -> Option { - FUT_RESULTS.lock().await.remove(&fid) -} + pub async fn pop_completed() -> Vec { + let mut completed: Vec = vec![]; + let fids = FUT_RESULTS + .lock() + .await + .keys() + .copied() + .collect::>(); + for fid in fids { + if let Some(r) = Self::pop_task_if_completed(fid).await { + completed.push(r) + } + } + completed + } -pub async fn pop_task_if_completed(fid: Uuid) -> Option { - let &JoinInfo { - handle: _, - collectable, - completed, - } = match FUT_RESULTS.lock().await.get(&fid) { - Some(t) => t, - None => return None, - }; - if collectable && completed { - let task = pop_task(fid).await.unwrap(); - Some(task.wait_result().await) - } else { - None + async fn pop_task(fid: Uuid) -> Option { + FUT_RESULTS.lock().await.remove(&fid) } } -pub async fn pop_completed() -> Vec { - let mut completed: Vec = vec![]; - let fids = FUT_RESULTS - .lock() - .await - .keys() - .copied() - .collect::>(); - for fid in fids { - if let Some(r) = pop_task_if_completed(fid).await { - completed.push(r) +async fn init_receiver() { + while let Some(fid) = FUT_CHANNEL.1.lock().await.recv().await { + let mut lock = FUT_RESULTS.lock().await; + if let Some(j) = lock.get_mut(&fid) { + j.completed = true; } } - completed } #[cfg(test)]