Ceasul arată 3:37 AM. Muntele doarme învelit în ceață. Grivei latră plictisit la luna înghețată. Gândesc repede. Am două opțiuni: meciul Bill-Chiefs care abia a început sau un exercițiu de imaginație. Grivei se uită întrebător la mine. Nu pot să-l dezamăgesc.
Regulile sunt simple: am două ore să scriu un model minimal de actori în Rust fără async/await
. Vreau să-mi dovedesc că este fiabil. Voi folosi ca sursă de inspirație articolul lui Alice Ryhl, cu toate că ea folosește tokio.
Să pornim:
cargo init --bin achane
FYI, numele nu are nimic cu chain
.
Prefer crossbeam la std::sync:mpsc:
cargo add crossbeam -F crossbeam-channel
Creăm fișierul sursă:
touch src/ echo.rs
Definim mesajele la care actorul va răspunde:
echo
- cei doi parametri sunt mesajul și destinația la care vom trimite răspunsul
quit
- care va semnala actorului să se opreascăpub enum EchoMsg {
Echo(String, Sender),
Quit,
}
Vom folosi un canal crossbeam
pentru a comunica cu actorul. Actorul se va bloca în metoda recv
așteptând mesaje.
use crossbeam::channel::{bounded, Receiver, Sender};
pub struct Echo {
rcv: Receiver,
}
impl Echo {
pub fn run(&self) {
loop {
//blocking call
let msg = self.rcv.
recv().unwrap();
match msg {
EchoMsg::Echo(txt, snd)
=> snd.send(txt).unwrap(),
EchoMsg::Quit => break,
}
}
}
}
Obiectul Handle
este cel prin care creăm și transmitem mesaje actorului. Handlerul creează și apoi pornește actorul într-un thread
nou, dintr-un scope
predefinit. Pentru a putea interacționa cu actorul din mai multe threaduri
handlerul trebuie să poată fi clonat.
Codul este destul de simplu, doar parametrii de lifetimes
pentru scope
ne dau un pic de furcă.
#[derive(Clone)]
pub struct EchoHandle {
snd: Sender,
}
impl EchoHandle {
pub fn new<'sc, 'env>(cap: usize,
s: &'sc Scope<'sc, 'env>) -> Self
{
let (snd, rcv) = bounded::(cap);
let echo = Echo { rcv };
s.spawn(move || {
echo.run();
});
EchoHandle { snd }
}
pub fn echo(&self, txt: String,
rpl: Sender) {
let msg = EchoMsg::Echo(txt, rpl);
self.snd.send(msg).unwrap();
}
pub fn quit(&self) {
self.snd.send(EchoMsg::Quit)
.unwrap();
}
}
Să vedem dacă merge:
pub fn one2one() {
thread::scope(|s| {
let echo = Echo::new(16);
let echo_handle = echo.handle();
s.spawn(move || {
echo.run();
});
let ech = echo_handle.clone();
s.spawn(move || {
let (snd, rcv) = bounded::(100);
for id in 0..100 {
ech.echo(format!("Hello {}", id), snd.clone());
}
for _ in 0..100 {
println!("Got {}", rcv.recv().unwrap());
}
ech.quit();
});
});
}
Specificăm executabilul în Cargo.toml
și apoi rulăm.
[[bin]]
name = "echo"
path = "src/echo.rs"
cargo run --release --bin echo
Grivei își manifestă aprecierea cu un căscat, într-o jumătate de oră am reușit, să avem un actor funcțional.
Din geam, motanul Spot care speră la niște lapte înainte de micul dejun mă întreabă din priviri: cât de performant este actorul tău?
Întrebarea lui Spot nu mă îngrijorează. Nu sunt treaz la 4:17 AM ca să rulez un singur actor. Voi scrie un lanț lung de actori, care își pasează mesaje de la unul la altul.
Să ne grăbim: touch src/pchain.rs
. Codul este similar cu cel anterior. Fiecare actor rulează în threadul
său, și opțional are un alt actor la care să paseze mesajele primite. Să vedem:
pub enum PChainMsg {
Pass(String),
Quit,
}
pub struct PLink {
msg_cnt: usize,
rcv: Receiver,
nxt: Option,
}
impl PLink {
pub fn run(&mut self) {
loop {
let msg = self.rcv.recv().unwrap();
match msg {
PChainMsg::Pass(txt) => {
self.msg_cnt += 1;
if let Some(link) = &self.nxt {
link.send(txt);
}
}
PChainMsg::Quit => {
if let Some(link) = &self.nxt {
link.quit();
}
assert_eq!(MSG_COUNT,
self.msg_cnt);
return;
}
}
}
}
}
#[derive(Clone)]
pub struct PLinkHandle {
snd: Sender,
}
impl PLinkHandle {
pub fn new<'sc, 'env>(nxt: Option,
cap: usize, s: &'sc Scope<'sc, 'env>) -> Self {
let (snd, rcv) = bounded::(cap);
let mut plink = PLink {
msg_cnt: 0,
rcv,
nxt,
};
s.spawn(move || {
plink.run();
});
PLinkHandle { snd }
}
pub fn send(&self, txt: String) {
let msg = PChainMsg::Pass(txt);
self.snd.send(msg).unwrap();
}
pub fn quit(&self) {
let msg = PChainMsg::Quit;
self.snd.send(msg).unwrap();
}
}
Grivei se uită la mine cu nerăbdare: să creăm 10000 de actori care pasează 10000 de mesaje!. Nu ar fi mai bine să testăm cu mai puțini? Este prea de vreme să argumentez cu el.
const CHAIN_COUNT: usize = 10_000;
const CAPACITY: usize = 16;
pub(crate) const MSG_COUNT: usize = 10_000;
pub fn main() {
{
thread::scope(|s| {
let mut crt = PLinkHandle::new(None, CAPACITY, s);
for _i in 1..CHAIN_COUNT {
let plink = PLinkHandle::new(Some(
crt.clone()), CAPACITY, s);
crt = plink;
}
let pusher = crt;
s.spawn(move || {
for i in 0..MSG_COUNT {
pusher.send(format!("Message {}", i))
}
pusher.quit();
});
});
}
}
Să-i dăm drumul:
cargo build --release
Momentul adevărului:
time target/release/pchane
Memoria PC-ului stă sub 5GB(are 32 deci nu-i grijă), cele 8 procesoare sunt la 100%, load average-ul
este pe la 120, și 20 de secunde mai târziu programul nostru se termină.
Spot este revoltat: 20 de secunde pentru 10000 de actori este mult prea mult.
Încerc să-i explic ca pentru 5:17 AM e destul de bine, dar nu-l pot convinge. Intervine Grivei acid: "capacitatea canalului tău este prea mică". Poate are dreptate. Sper să aibă.
Schimb:
const CAPACITY: usize = 1024;
Compilez. Rulez. Așa da. Programul se termina în mai puțin de 2 secunde și jumătate. În sfârșit, ceva cu ce să mă mândresc.
Vreau să mă opresc, să merg să-mi fac cafeaua, dar Spot e nemilos: "Cum merge cu 20000 de actori?". Hai Spot, numai nu crezi că...
Din păcate are dreptate, un șir de erori se revarsă în consolă, PC-ul meu nu vrea să creeze atâtea threaduri
.
Grivei zâmbește. Mai există o soluție: să rulăm actorii într-un thread pool
și chemăm operația non-blocking
try_recv
. Încerc să-i explic că nu am un thread pool
la îndemâna. Replica lui e seacă: rayon
.
Tipăresc nervos:
cargo add rayon
cp src/pchane.rs src/tchane.rs
Refactorizez numele la structuri și încep să fac schimbările necesare. Funcția run
a actorului nu va mai conține o buclă, iar la creare nu pornim actorul ci îl înregistrăm într-o listă de actori, care va fi apoi procesată în mod repetat de către thread pool
.
Thread poolul
este aproape transparent, doar chemăm funcția par_iter_mut
. Dacă am înlocui-o cu iter_mut
am avea un sistem de actori care ar rula într-un singur thread
. Să vedem:
impl TLink {
pub fn run(&mut self) -> bool {
let rcv_msg = self.rcv.try_recv();
match rcv_msg {
Ok(msg) => match msg {
TChainMsg::Pass(txt) => {
self.msg_cnt += 1;
if let Some(link) = &self.nxt {
link.send(txt);
}
return true;
}
TChainMsg::Quit => {
if let Some(link) = &self.nxt {
link.quit();
}
assert_eq!(MSG_COUNT, self.msg_cnt);
return false;
}
},
Err(err) => match err {
crossbeam::channel::TryRecvError::Empty => {
return true;
}
crossbeam::channel::TryRecvError::Disconnected
=> panic!("Chain broken"),
},
};
}
}
impl TLinkHandle {
pub fn new<'sc, 'env>(nxt: Option,
cap: usize, actors: &mut Vec) -> Self {
let (snd, rcv) = bounded::(cap);
let tlink = TLink {
msg_cnt: 0,
rcv,
nxt,
};
actors.push(tlink);
TLinkHandle { snd }
}
//same code for quit and send methods
}
Codul final.
pub fn main() {
thread::scope(|s| {
let mut chain: Vec =
Vec::with_capacity(CHAIN_COUNT);
let mut crt = TLinkHandle::new(
None, CAPACITY, &mut chain);
for _ in 1..CHAIN_COUNT {
crt = TLinkHandle::new(Some(crt), CAPACITY,
&mut chain);
}
let pusher = crt;
s.spawn(move || {
for i in 0..MSG_COUNT {
pusher.send(format!("Message {}", i))
}
pusher.quit();
});
s.spawn(move || {
let (s, r) = unbounded::();
while !chain.is_empty() {
//we poll the actors here
chain.par_iter_mut().enumerate()
.for_each(|(i, e)| {
if !e.run() {
s.send(i).unwrap();
}
});
//remove dead actors
while !r.is_empty() {
match r.try_recv() {
Ok(idx) => {
chain.swap_remove(idx);
}
Err(err) => match err {
crossbeam::channel::TryRecvError::
Empty => break,
crossbeam::channel::TryRecvError::
Disconnected => panic!("Bad"),
},
}
}
}
println!("Done")
});
});
}
Arată frumos, facem economie la threaduri
, nu blocăm, ar trebui să fie soluția perfectă. Să rulăm cu 10000 de actori:
time target/release/tchane
Ooops! Nu e rău, dar nici grozav: 4 secunde, aproape dublu față de modelul cu un thread
pe actor. Întrebarea se vede pe fața lui Spot: "merge cu 20000 de actori?". Rulez nervos.
time target/release/tchane
Done
____________________________________________________
Executed in 15.12 secs fish external
usr time 109.51 secs 0.00 micros 109.51 secs
sys time 3.49 secs 475.00 micros 3.49 secs
Da! Funcționează. Grivei încearcă să-mi explice ceva despre thread pool
. Spot crede că durează prea mult. De data asta nu mă las intimidat.
E 5:37AM, mi-am făcut exercițiul de imaginație pe dimineața asta. Merg la duș, am o zi lungă - copiii din clasa a 9-a au mari probleme cu vectorii.
La cină, încerc să explic că un clasic thread per request
este simplu, elegant, ușor de înțeles și mai mult decât suficient pentru 99.99% din aplicații. Grivei replică tacticos că soluția lui bazată pe un rayon thread pool
se scalează excelent - a reușit să ruleze cu 60000 de actori în mai puțin de 100 de secunde și cu 100000 de actori în 245 de secunde.
time target/release/tchane
___________________________________________________ Executed in 244.41 secs fish external
usr time 29.76 mins 319.00 micros 29.76 mins
sys time 0.28 mins 120.00 micros 0.28 mins
Sunt gata să merg să văd reluarea la Chiefs-Bills
, când Spot miaună pe sub mustăți: vm.max_map_count
. "Te rog frumos, Spot, nu acum !" Dar e de neclintit.
Caut repede pe net: max_map_count
contains the maximum number of memory map areas a process may have
. Și atunci:
cat /proc/sys/vm/max_map_count
65530
echo vm.max_map_count=250000 >>/etc/sysctl.d/99-sysctl.conf
sudo sysctl --system
Verificăm:
cat /proc/sys/vm/max_map_count
250000
Rulăm varianta cu un thread pe actor pentru 20000, 30000, ..., 60000 de threaduri. Nu avem erori și se scalează aproape liniar. Pentru 60000 de treaduri avem:
time target/release/pchane
________________________________________________________
Executed in 17.91 secs fish external
usr time 62.23 secs 301.00 micros 62.23 secs
sys time 59.55 secs 0.00 micros 59.55 secs
Rezultatul e impresionant. Totuși, Spot se uită întrebător la mine. Trebuie să-i confirm: cu 70000 de actori jucăria noastră se strică, pentru 100000 de actori care rulează fiecare în threadul
lui, avem nevoie de un calculator nou.
git add .
git commit -m 'Initial commit'
git push
Codul sursă îl găsiți pe Codeberg.
de Ovidiu Mățan
de Mircea Talu