ABONAMENTE VIDEO REDACȚIA
RO
EN
Numărul 148 Numărul 147 Numărul 146 Numărul 145 Numărul 144 Numărul 143 Numărul 142 Numărul 141 Numărul 140 Numărul 139 Numărul 138 Numărul 137 Numărul 136 Numărul 135 Numărul 134 Numărul 133 Numărul 132 Numărul 131 Numărul 130 Numărul 129 Numărul 128 Numărul 127 Numărul 126 Numărul 125 Numărul 124 Numărul 123 Numărul 122 Numărul 121 Numărul 120 Numărul 119 Numărul 118 Numărul 117 Numărul 116 Numărul 115 Numărul 114 Numărul 113 Numărul 112 Numărul 111 Numărul 110 Numărul 109 Numărul 108 Numărul 107 Numărul 106 Numărul 105 Numărul 104 Numărul 103 Numărul 102 Numărul 101 Numărul 100 Numărul 99 Numărul 98 Numărul 97 Numărul 96 Numărul 95 Numărul 94 Numărul 93 Numărul 92 Numărul 91 Numărul 90 Numărul 89 Numărul 88 Numărul 87 Numărul 86 Numărul 85 Numărul 84 Numărul 83 Numărul 82 Numărul 81 Numărul 80 Numărul 79 Numărul 78 Numărul 77 Numărul 76 Numărul 75 Numărul 74 Numărul 73 Numărul 72 Numărul 71 Numărul 70 Numărul 69 Numărul 68 Numărul 67 Numărul 66 Numărul 65 Numărul 64 Numărul 63 Numărul 62 Numărul 61 Numărul 60 Numărul 59 Numărul 58 Numărul 57 Numărul 56 Numărul 55 Numărul 54 Numărul 53 Numărul 52 Numărul 51 Numărul 50 Numărul 49 Numărul 48 Numărul 47 Numărul 46 Numărul 45 Numărul 44 Numărul 43 Numărul 42 Numărul 41 Numărul 40 Numărul 39 Numărul 38 Numărul 37 Numărul 36 Numărul 35 Numărul 34 Numărul 33 Numărul 32 Numărul 31 Numărul 30 Numărul 29 Numărul 28 Numărul 27 Numărul 26 Numărul 25 Numărul 24 Numărul 23 Numărul 22 Numărul 21 Numărul 20 Numărul 19 Numărul 18 Numărul 17 Numărul 16 Numărul 15 Numărul 14 Numărul 13 Numărul 12 Numărul 11 Numărul 10 Numărul 9 Numărul 8 Numărul 7 Numărul 6 Numărul 5 Numărul 4 Numărul 3 Numărul 2 Numărul 1
×
▼ LISTĂ EDIȚII ▼
Numărul 139
Abonament PDF

Rust în loc de cafea

Romulus Pașca
Software Developer & Trainer @ Haqr Studio



PROGRAMARE

Ceasul arată 3:37 AM. Muntele doarme învelit în ceață. Grivei latră plictisit la luna înghețată. Gândesc repede. Am două opțiuni: meciul Bill-Chiefs care abia a început sau un exercițiu de imaginație. Grivei se uită întrebător la mine. Nu pot să-l dezamăgesc.

Regulile sunt simple: am două ore să scriu un model minimal de actori în Rust fără async/await. Vreau să-mi dovedesc că este fiabil. Voi folosi ca sursă de inspirație articolul lui Alice Ryhl, cu toate că ea folosește tokio.

Un actor grăbit

Să pornim:

cargo init --bin achane

FYI, numele nu are nimic cu chain.

Prefer crossbeam la std::sync:mpsc:

cargo add crossbeam -F crossbeam-channel

Creăm fișierul sursă:

touch src/ echo.rs

Definim mesajele la care actorul va răspunde:

pub enum EchoMsg {
    Echo(String, Sender),
    Quit,
}

Vom folosi un canal crossbeam pentru a comunica cu actorul. Actorul se va bloca în metoda recv așteptând mesaje.

use crossbeam::channel::{bounded, Receiver, Sender};
pub struct Echo {
    rcv: Receiver,
}

impl Echo {
  pub fn run(&self) {
    loop {
      //blocking call
      let msg = self.rcv.
      recv().unwrap();
       match msg {
        EchoMsg::Echo(txt, snd) 
    => snd.send(txt).unwrap(),
         EchoMsg::Quit => break,
       }
     }
  }
}

Obiectul Handle este cel prin care creăm și transmitem mesaje actorului. Handlerul creează și apoi pornește actorul într-un thread nou, dintr-un scope predefinit. Pentru a putea interacționa cu actorul din mai multe threaduri handlerul trebuie să poată fi clonat.

Codul este destul de simplu, doar parametrii de lifetimes pentru scope ne dau un pic de furcă.

#[derive(Clone)]
pub struct EchoHandle {
    snd: Sender,
}

impl EchoHandle {
 pub fn new<'sc, 'env>(cap: usize,
 s: &'sc Scope<'sc, 'env>) -> Self  
 {
   let (snd, rcv) = bounded::(cap);

   let echo = Echo { rcv };
     s.spawn(move || {
        echo.run();
     });
    EchoHandle { snd }
    }

 pub fn echo(&self, txt: String, 
 rpl: Sender) {
 let msg = EchoMsg::Echo(txt, rpl);
       self.snd.send(msg).unwrap();
    }
pub fn quit(&self) {
  self.snd.send(EchoMsg::Quit)
   .unwrap();
  }
}

Să vedem dacă merge:

pub fn one2one() {
    thread::scope(|s| {
    let echo = Echo::new(16);
    let echo_handle = echo.handle();
    s.spawn(move || {
       echo.run();
    });

    let ech = echo_handle.clone();
    s.spawn(move || {
    let (snd, rcv) = bounded::(100);
    for id in 0..100 {
      ech.echo(format!("Hello {}", id), snd.clone());
    }
      for _ in 0..100 {
        println!("Got {}", rcv.recv().unwrap());
      }
       ech.quit();
     });
  });
}

Specificăm executabilul în Cargo.toml și apoi rulăm.

[[bin]]
  name = "echo"
  path = "src/echo.rs"

  cargo run --release --bin echo

Grivei își manifestă aprecierea cu un căscat, într-o jumătate de oră am reușit, să avem un actor funcțional.

Din geam, motanul Spot care speră la niște lapte înainte de micul dejun mă întreabă din priviri: cât de performant este actorul tău?

Întrebarea lui Spot nu mă îngrijorează. Nu sunt treaz la 4:17 AM ca să rulez un singur actor. Voi scrie un lanț lung de actori, care își pasează mesaje de la unul la altul.

Telefonul fără fir

Să ne grăbim: touch src/pchain.rs. Codul este similar cu cel anterior. Fiecare actor rulează în threadul său, și opțional are un alt actor la care să paseze mesajele primite. Să vedem:

pub enum PChainMsg {
    Pass(String),
    Quit,
}

pub struct PLink {
    msg_cnt: usize,
    rcv: Receiver,
    nxt: Option,
}

impl PLink {
    pub fn run(&mut self) {
        loop {
            let msg = self.rcv.recv().unwrap();
            match msg {
                PChainMsg::Pass(txt) => {
                    self.msg_cnt += 1;
                    if let Some(link) = &self.nxt {
                        link.send(txt);
                    }
                }
                PChainMsg::Quit => {
                    if let Some(link) = &self.nxt {
                        link.quit();
                    }
                    assert_eq!(MSG_COUNT, 
                     self.msg_cnt);
                    return;
                }
            }
        }
    }
}

#[derive(Clone)]
pub struct PLinkHandle {
    snd: Sender,
}

impl PLinkHandle {
 pub fn new<'sc, 'env>(nxt: Option, 
  cap: usize, s: &'sc Scope<'sc, 'env>) -> Self {
      let (snd, rcv) = bounded::(cap);
        let mut plink = PLink {
            msg_cnt: 0,
            rcv,
            nxt,
        };
        s.spawn(move || {
            plink.run();
        });
        PLinkHandle { snd }
    }
    pub fn send(&self, txt: String) {
        let msg = PChainMsg::Pass(txt);
        self.snd.send(msg).unwrap();
    }
    pub fn quit(&self) {
        let msg = PChainMsg::Quit;
        self.snd.send(msg).unwrap();
    }
}

Grivei se uită la mine cu nerăbdare: să creăm 10000 de actori care pasează 10000 de mesaje!. Nu ar fi mai bine să testăm cu mai puțini? Este prea de vreme să argumentez cu el.

const CHAIN_COUNT: usize = 10_000;
const CAPACITY: usize = 16;
pub(crate) const MSG_COUNT: usize = 10_000;

pub fn main() {
{
   thread::scope(|s| {
   let mut crt = PLinkHandle::new(None, CAPACITY, s);
      for _i in 1..CHAIN_COUNT {
        let plink = PLinkHandle::new(Some(
          crt.clone()), CAPACITY, s);
            crt = plink;
          }
         let pusher = crt;
         s.spawn(move || {
           for i in 0..MSG_COUNT {
              pusher.send(format!("Message {}", i))
            }
            pusher.quit();
         });
      });
    }
}

Să-i dăm drumul:

cargo build --release

Momentul adevărului:

time target/release/pchane

Memoria PC-ului stă sub 5GB(are 32 deci nu-i grijă), cele 8 procesoare sunt la 100%, load average-ul este pe la 120, și 20 de secunde mai târziu programul nostru se termină.

Spot este revoltat: 20 de secunde pentru 10000 de actori este mult prea mult.

Încerc să-i explic ca pentru 5:17 AM e destul de bine, dar nu-l pot convinge. Intervine Grivei acid: "capacitatea canalului tău este prea mică". Poate are dreptate. Sper să aibă.

Schimb:

const CAPACITY: usize = 1024;

Compilez. Rulez. Așa da. Programul se termina în mai puțin de 2 secunde și jumătate. În sfârșit, ceva cu ce să mă mândresc.

Vreau să mă opresc, să merg să-mi fac cafeaua, dar Spot e nemilos: "Cum merge cu 20000 de actori?". Hai Spot, numai nu crezi că...

Din păcate are dreptate, un șir de erori se revarsă în consolă, PC-ul meu nu vrea să creeze atâtea threaduri.

Grivei zâmbește. Mai există o soluție: să rulăm actorii într-un thread pool și chemăm operația non-blocking try_recv. Încerc să-i explic că nu am un thread pool la îndemâna. Replica lui e seacă: rayon.

Raze de speranță

Tipăresc nervos:

cargo add rayon  
cp src/pchane.rs src/tchane.rs

Refactorizez numele la structuri și încep să fac schimbările necesare. Funcția run a actorului nu va mai conține o buclă, iar la creare nu pornim actorul ci îl înregistrăm într-o listă de actori, care va fi apoi procesată în mod repetat de către thread pool.

Thread poolul este aproape transparent, doar chemăm funcția par_iter_mut. Dacă am înlocui-o cu iter_mut am avea un sistem de actori care ar rula într-un singur thread. Să vedem:

impl TLink {
    pub fn run(&mut self) -> bool {
        let rcv_msg = self.rcv.try_recv();
        match rcv_msg {
            Ok(msg) => match msg {
                TChainMsg::Pass(txt) => {
                    self.msg_cnt += 1;
                    if let Some(link) = &self.nxt {
                        link.send(txt);
                    }
                    return true;
                }
                TChainMsg::Quit => {
                  if let Some(link) = &self.nxt {
                      link.quit();
                 }
                 assert_eq!(MSG_COUNT, self.msg_cnt);
                    return false;
                }
         },
         Err(err) => match err {
         crossbeam::channel::TryRecvError::Empty => {
               return true;
       }
       crossbeam::channel::TryRecvError::Disconnected 
        => panic!("Chain broken"),
        },
     };
    }
}
impl TLinkHandle {
    pub fn new<'sc, 'env>(nxt: Option, 
     cap: usize, actors: &mut Vec) -> Self {
        let (snd, rcv) = bounded::(cap);
        let tlink = TLink {
            msg_cnt: 0,
            rcv,
            nxt,
        };
        actors.push(tlink);
        TLinkHandle { snd }
    }
    //same code for quit and send methods
}

Codul final.

pub fn main() {
 thread::scope(|s| {
   let mut chain: Vec = 
   Vec::with_capacity(CHAIN_COUNT);

   let mut crt = TLinkHandle::new(
   None, CAPACITY, &mut chain);

   for _ in 1..CHAIN_COUNT {
     crt = TLinkHandle::new(Some(crt), CAPACITY, 
     &mut chain);
   }
     let pusher = crt;
     s.spawn(move || {
       for i in 0..MSG_COUNT {
         pusher.send(format!("Message {}", i))
       }
        pusher.quit();
     });
     s.spawn(move || {
     let (s, r) = unbounded::();
     while !chain.is_empty() {
      //we poll the actors here
      chain.par_iter_mut().enumerate()
       .for_each(|(i, e)| {
         if !e.run() {
         s.send(i).unwrap();
             }
           });
         //remove dead actors
         while !r.is_empty() {
           match r.try_recv() {
             Ok(idx) => {
               chain.swap_remove(idx);
             }
             Err(err) => match err {
             crossbeam::channel::TryRecvError::
             Empty => break,

             crossbeam::channel::TryRecvError::
             Disconnected => panic!("Bad"),
             },
            }
          }
         }
         println!("Done")
       });
    });
}

Arată frumos, facem economie la threaduri, nu blocăm, ar trebui să fie soluția perfectă. Să rulăm cu 10000 de actori:

time target/release/tchane

Ooops! Nu e rău, dar nici grozav: 4 secunde, aproape dublu față de modelul cu un thread pe actor. Întrebarea se vede pe fața lui Spot: "merge cu 20000 de actori?". Rulez nervos.

time target/release/tchane
Done
____________________________________________________
Executed in   15.12 secs    fish           external
   usr time  109.51 secs    0.00 micros  109.51 secs
   sys time    3.49 secs  475.00 micros    3.49 secs

Da! Funcționează. Grivei încearcă să-mi explice ceva despre thread pool. Spot crede că durează prea mult. De data asta nu mă las intimidat.

E 5:37AM, mi-am făcut exercițiul de imaginație pe dimineața asta. Merg la duș, am o zi lungă - copiii din clasa a 9-a au mari probleme cu vectorii.

Epilog

La cină, încerc să explic că un clasic thread per request este simplu, elegant, ușor de înțeles și mai mult decât suficient pentru 99.99% din aplicații. Grivei replică tacticos că soluția lui bazată pe un rayon thread pool se scalează excelent - a reușit să ruleze cu 60000 de actori în mai puțin de 100 de secunde și cu 100000 de actori în 245 de secunde.

time target/release/tchane
___________________________________________________   Executed in  244.41 secs    fish           external
   usr time   29.76 mins  319.00 micros   29.76 mins
   sys time    0.28 mins  120.00 micros    0.28 mins  

Sunt gata să merg să văd reluarea la Chiefs-Bills, când Spot miaună pe sub mustăți: vm.max_map_count. "Te rog frumos, Spot, nu acum !" Dar e de neclintit.

Caut repede pe net: max_map_count contains the maximum number of memory map areas a process may have. Și atunci:

cat /proc/sys/vm/max_map_count
65530
echo vm.max_map_count=250000 >>/etc/sysctl.d/99-sysctl.conf
sudo sysctl --system

Verificăm:

cat /proc/sys/vm/max_map_count  
250000

Rulăm varianta cu un thread pe actor pentru 20000, 30000, ..., 60000 de threaduri. Nu avem erori și se scalează aproape liniar. Pentru 60000 de treaduri avem:

time target/release/pchane
________________________________________________________
Executed in   17.91 secs    fish           external
   usr time   62.23 secs  301.00 micros   62.23 secs
   sys time   59.55 secs    0.00 micros   59.55 secs

Rezultatul e impresionant. Totuși, Spot se uită întrebător la mine. Trebuie să-i confirm: cu 70000 de actori jucăria noastră se strică, pentru 100000 de actori care rulează fiecare în threadul lui, avem nevoie de un calculator nou.

git add .  
git commit -m 'Initial commit'  
git push

Codul sursă îl găsiți pe Codeberg.

LANSAREA NUMĂRULUI 149

Marți, 26 Octombrie, ora 18:00

sediul Cognizant

Facebook Meetup StreamEvent YouTube

NUMĂRUL 147 - Automotive

Sponsori

  • Accenture
  • BT Code Crafters
  • Accesa
  • Bosch
  • Betfair
  • MHP
  • BoatyardX
  • .msg systems
  • P3 group
  • Ing Hubs
  • Cognizant Softvision
  • Colors in projects

Romulus Pașca a mai scris