TSM - Tick Tock on Beanstalkd Message Queues

Tudor Mărghidanu - Software Architect

În general,timpul e o dimensiune restrictivă, cu atât mai mult în industria IT, unde orice produs, în orice stagiu, se raportează direct la această unitate de măsură. Mai mult decât atât, dezvoltatorii de software au clasificat timpul în categorii, iar resursele alocate unui proiect se concentrează în esență pe eficientizarea timpului pentru dezvoltarea produsului. În acest articol mă voi referi doar la timpul de execuție a unei aplicații într-o sesiune dată.

Sunt sigur că mulți dintre voi cunoașteți termenul de cozi de mesaje (message queue), mai ales dacă ați avut ocazia de a lucra cu aplicații care își desfășoară operațiile într-un mod asincron. Aceste cozi de mesaje oferă câteva avantaje majore cum ar fi:

Există mai multe servicii prin care se pot implementa cozi de mesaje, dar acest articol se referă doar la Beanstalkd.

Beanstalkd

Beanstalkd este un serviciu cu o interfață generică folosit pentru a reduce latența între procesele unei aplicații care necesită un timp mare de execuție. Datorită interfeței generice, serviciul reprezintă un punct major de scalabilitate în cadrul aplicației dezvoltate. Beanstalkd nu introduce nicio limitare de implementare (limbaj sau serializare) deoarece se bazează pe PUSH sockets pentru comunicare și are un protocol simplu prin care face acest lucru. Pentru a înțelege mai bine restul articolului este important să avem câțiva termeni de bază. Iată o descriere a vocabularului Beanstalkd:

Problemă

Tind să cred că cel mai ușor se învață din exemple; de aceea, m-am gândit la următoarea problemă:

"Să se construiască o aplicație web unde utilizatorii pot să încarce fișiere video în diverse formate, astfel încât ulterior aceste video-uri să fie disponibile într-un player video pe una dintre paginile aplicației."

Ce e important la acest enunț e faptul că e suficient de vag încât să lase loc pentru scalabilitatea problemei, care, în mod evident, atrage scalabilitatea soluției. Dar frumusețea ei este dată, în acest caz, de simplitatea Beanstalkd. Deoarece, odată implementat clientul, execuția lui se poate scala atât vertical (mai multe procese de sistem pe aceeași mașină), cât și orizontal (mai multe procese de sistem pe mai multe mașini) folosind aceeași regulă inițială.

Figura ilustrează cum ar trebui să circule datele dintr-o parte în alta a aplicației și felul în care ar trebui să interacționeze aplicația web cu consumatorii folosind un layer de shared storage.

Să presupunem următoarea situație: clienții încarcă pe o pagină definită un set de fișiere video care intră într-un proces predefinit. Acest proces îndeplinește două funcții: prima funcție se ocupă de stocarea fișierului într-un layer de persistență predefinit (sistem de fișiere distribuit sau bază de date), iar a doua pregătește și scrie un mesaj în Beanstalkd, care conține informații ce trimit la referința din layer-ul de persistență. Din acest punct operația devine una asincronă și distribuită. Dacă raportul dintre numărul de consumatori și frecvența datelor de intrare a fost determinată corect, fișierele încărcate ar trebui să fie procesate într-un timp scurt.

package MyApp::Globals;
# ... More static properties ...
use JSON::XS;
use Beanstalk::Client;
class_has "message_queue" => (
	is => "ro",
	isa => "Beanstalk::Client",
	default => sub {
		return Beanstalk::Client->new(
{
# NOTE: This usually should come from a configuration
file...
	server => "localhost",
# Making sure we serialize/deserialize via JSON.
encoder => sub { encode_json( shift() ); },
decoder => sub { decode_json( shift() ); },
		}
	);
}
);
package MyApp::Web::Controllers::Videos;
# ...
sub upload {
	my $self = shift();
	# Retrieving the uploaded video.
	my $video = $self->req()->upload( "video" );
# Additional content and headers validation ...
# Storing the video in the persistance layer ...
my $object = MyApp::Globals->context()
	->dfs()->raw_videos(
		{
			filename => $video->filename(),
			headers => $video->headers()->to_hash(),
			data => $video->slurp(),
			# ... additional user data
		}
	);
# Making sure we use the right tube for sending the #
data.
MyApp::Globals->message_queue()
	->use( "raw_videos" );
# Storing the data in the queue...
MyApp::Globals->message_queue()
	->put(
{
priority => 10000,
data => $object->pack(),
	# Serialization occurs automatically ...
	}
);
}

Consumatorii lucrează în bucle continue cerând mesaje de la Beanstalkd pe măsură ce procesează datele, marcând status-ul acțiunii în urma procesării, astfel încât să putem identifica atât numărul de rulări corecte, cât și erorile care au apărut pe parcurs. Mai mult decât atât, în cazul erorilor putem relua mesajele marcate ca fiind eronate odată ce problema a fost corectată ulterior.

Un alt aspect important este că paralelizarea consumatorilor se face prin procese de sistem, ceea ce înseamnă un management ușor. În același timp, se scoate din calcul locking-ul unor resurse la nivel de aplicație și implicit memory leaks.

# Getting messages only from these tubes ...
MyApp::Globals->message_queue()
->watch_only( "raw_videos" );

while( 1 ) {
# Retrieving a job from the message queue and
# marking it as reserved...
my $job = MyApp::Globals->message_queue()
->reserve();

eval {
my $data = $job->args();
# Automatic data deserialization ...
# Doing the magic on the data here ...
};

# In case of an error we signal the error in
# back-end and budy the job.
if( my $error = $@ ) {
	$logger->log_error( $error );
	$job->bury();
} else {
	$job->delete();
	# If everything is ok we simply delete the job
	# from the tube!
	}
}

Concluzii

Ca o notă personală, întotdeauna mi-au plăcut soluțiile simple și elegante care au un set redus de reguli și o terminologie simplă. Beanstalkd este un astfel de exemplu. În altă ordine de idei, este important să realizăm că adoptarea acestui serviciu reprezintă o muncă de integrare, într-o oarecare măsură, și roata nu ar trebui re-inventată în niciun punct de dezvoltare a aplicației.

Un alt aspect important este faptul că, folosind un sistem distribuit în această manieră, el permite atât compresarea/dilatarea timpului, cât și fragmentarea execuției într-un mod evident, iar ceea ce ar dura câteva săptămâni, rulând în mod secvențial, se poate reduce la câteva zile sau chiar ore în funcție de durata procesului de bază.

Pros

Cons