Stakhanovise.NET – Sau cum să-ndeplinești cincinalul în patru ani

Motto
Stahanovismul n-a murit, doar un pic s-o răspândit.

Stahanovismul a fost o mare găselniță. A rămas în lume precum râia – nici în ziua de azi nu și-a luat liber, nici măcar o zi. Atât de mare a fost impactul său civilizațional încât prin anii ’60 sau ’70 CIA a desfășurat o operațiune specială pentru extragerea sa pe șestache din Uniunea Sovietică, deîmpreună cu coaiele lui Lenin păstrate-n formol (mai mult despre Dânsele într-un studiu istoric de primă clasă pe care-l voi publica în timp util).

Cele două artefacte au clădit practic întreaga lume nouă de o parte și de alta a oceanului. Marile corporații de azi ar fi fost practic de neimaginat fără sarabanda derivatelor motivaționale sintetice din această mare filozofie de viață.

Politicile publice de sănătate, de gen, transgen și contragen, administrative, întreg Sovietul European, toate se bazează pe stahanovism și aerul doct degajat de acele două Biluțe care-au schimbat lumea așa cum o știm.

Dacă ar trăi astăzi, Vasile Roaită ar mai sta o dată cu mâna pe sonerie, dar acum de fericire. Din păcate, Dânsul nu mai este cu noi și, într-un act de insuficientă compensare, dar cu sincere Sentimente Proletare, anunț cu tremur în voce un nou proiect marca Atelierele Boia: Stakhanovise.NET.

Chintesența

Traducând în cod-mașină principiile socialiste democrației europene tradiționale, Stakhanovise.NET este o librărie pentru C# / .NET Standard 2.1 care permite, în ciuda denumirii date la dinții găinii, implementarea unei cozi de procesare folosind, în loc de o componentă specializată, o bază de date PostgreSQL.

Deși nu mi-am propus să emulez întocmai toată mecanica de funcționare a unei componente specializate, cerința principală a fost să poată funcționa similar, adică să-mi pot defini mesaje/clase arbitrare și alte entități (tot clase) care să le proceseze. Atât și nimic mai mult, afară de chestiile de bun simț:

minimizarea pierderilor de mesaje (deloc, dacă se poate);
fără mesaje prelucrate de două ori simultan (dacă nu-i cu supărare);
un mecanism decent de retry (în limita bunului simț);
posibilitatea de-a întârzia procesarea unui mesaj până la un moment dat (doar pentru membrii de partid);
și un set decent opțiuni de configurare (gen câte thread-uri de procesare să ruleze într-o instanță etc.).

La modul cel mai naiv, se poate folosi o singură tabelă, care, pe lângă câmpurile de sarcină utilă (ca să zic așa), are alte două câmpuri: unul care stochează identificatorul thread-ului (poate fi orice fel de identificator: Int, String, Guid etc.) care a preluat mesajul respectiv în procesare și altul momentul de timp până la care nimeni altcineva nu-l poate lua. Atunci când se verifică dacă sunt mesaje noi, preiau primul rând din tabel care:

fie nu e preluat de nimeni altcineva;
fie e preluat de cineva și a expirat perioada de timp în care i se acordă exclusivitate (dacă face poc în timp de procesează, să nu rămână nesatisfăcut).

La preluarea mesajului, se procedează în doi pași:

se actualizează ID-ul thread-ului care l-a preluat cu ID-ul thread-ului curent (și momentul de timp amintit mai sus);
se recitește mesajul și se reverifică ID-ul thread-ului care l-a preluat: dacă e același cu ID-ul thread-ului curent, atunci se purcede cu procesarea (acest ultim pas e necesar pentru că inevitabil mai mult thread-uri vor ajunge să concureze pentru un singur mesaj).

O diagrama și mai naivă pentru varianta naivă

O diagrama și mai naivă pentru varianta naivă

Modalitatea de mai sus funcționează oarecum satisfăcător, dar, desigur, are câteva neajunsuri, printre care cel mai imprtant e că nu ne satisface orgoliul de intelectuali rasați: e prea simplu. Un alt neajuns, afară de vorbăria excesivă cu baza de date, ar fi vorbăria excesivă atunci când nu-i nimic de făcut.

Există două leacuri pentru problemele de mai sus. Pentru a nu verifica obsesiv-compusliv tabela de mesaje, putem folosi mecanismul LISTEN/NOTIFY oferit de Postgres. Thread-ul care verifică, dacă nu mai găsește nimic, în loc să reverifice, intră într-un soi de așteptare și monitorizează un canal folosind LISTEN. Oricine pune un mesaj în coadă trimite o notificare folosind NOTIFY pe același canal. Canalul e un string oarecare ales de aplicație.

Dinamica de funcționare, cel puțin în C# (cu Npgsql) trebuie să țină cont de câteva considerente, pe lângă faptul că trebuie menținută o conexiune în permanență:

acea conexiune poate să crape și, până la restabilire, pot fi pierdute notificări (deci la reconectare trebuie musai interogată tabela de mesaje);
acea conexiune trebuie să fie într-un pool separat față de pool-ul de lucru, configurată musai cu un keep-alive (obligație care decurge din faptul că trebuie să fie permanentă);
cum thread-ul care ascultă aceste notificări trebuie să poată răspunde la comenzi de oprire, nu poate aștepta la nesfârșit, fără time-out, deci trebuie s-o facă într-o buclă, cu un time-out bine ales.

Utilizarea LISTEN/NOTIFY

Utilizarea LISTEN/NOTIFY

Pentru a verifica și prelua mai optim un mesaj, fără a ajunge simultan pe mâna a două thread-uri de procesare, există fie mecanismul de advisory locks, fie, începând cu PostgreSQL 9.5, FOR UPDATE SKIP LOCKED.

Mecanismul de advisory locks e foarte interesant: Postgres pune la dispoziție o serie de funcții prin care poți bloca accesul pe o resursă (care poate fi un Int32 sau un Int64 oarecare). Partea bună și-n același timp partea proastă e că acest lock e garantat (deci există) doar la nivel e conexiune: a murit conexiunea, s-a dus și lock-ul.

Ceea ce, evident, înseamnă că trebuie păstrată conexiunea cât timp mesajul este procesat (practic, în scenariul dat, un mesaj e identificat printr-un Int64 care servește drept resursa pe care se preia lock-ul).

Într-o lume ideală, menținerea unei conexiuni pe un interval de timp care nu poate fi cunoscut a-priori nu e o idee prea rea, însă-n lumea reală mai pică rețele, caz în care e liber la vânătoare pe un mesaj care s-ar putea foarte bine să fie-n curs de procesare (sau care să fi fost procesat cu succes).

Așa că am renunțat la varianta aceasta după o sumedenie de dureri de cap și am mers pe FOR UPDATE SKIP LOCKED, plus renunțarea la a mai menține statusul unui mesaj în aceeași tabelă care reprezintă și coada de procesare. Am, deci, două tabele: una pentru coadă, alta pentru rezultate.

Când un mesaj este extras (practic șters) din coadă este creată, în aceeși tranzacție, o înregistrare în tabela de rezultate și, prin intermediul aceleia, mențin statusul. Dacă procesarea face poc, este actualizat statusul în tabela de rezultate și mesajul e readăugat în coadă în caz că sunt îndeplinite anumite criterii (de exemplu, nu a făcut plici de mai mult de X ori).

Peste toate acestea e o arhitectură clasică de producător consumator, cu ceva inspirație de aici. Mesajele sunt serializate ca JSON cu menținerea informațiilor legate de tipul de date original, iar clasele care le procesează pot să-și declare serviciile de care au nevoie și le vor fi satisfăcute prin dependency injection, o tehnologie revoluționară de vaccinare.

Un mic exemplu

Deci, fără altă vorbărie, iată un exemplu de mesaj:

public class ExtractCoalFromMine 
{
	public int TimesToExceedTheQuota { get;set; }
}

Și un exemplu de procesator, cu mențiunea că lăsăm drept exercițiu pentru cititor o posibilă implementare a IPropagandaEngine, respectiv IMineRepository:

public class ExtractCoalFromMineExecutor 
	: BaseTaskExecutor 
{
	private IMineRepository mMineRepository;

	private IPropagandaEngine mPropagandEngine;

	public ExtractCoalFromMineExecutor(IMineRepository mineRepository, 
		IPropagandaEngine propagandEngine)
	{
		mMineRepository = mineRepository 
			?? throw new ArgumentNullException(nameof(mineRepository));
		mPropagandEngine = propagandEngine 
			?? throw new ArgumentNullException(nameof(propagandEngine));
	}

	public async Task ExecuteAsync ( ExtractCoalFromMine payload, 
		ITaskExecutionContext executionContext )
	{
		MiningCoalResult result = a
			wait MineCoalAsync(payload.MineIdentifier, 
				payload.TimesToExceedTheQuota, 
				payload.PropagandaSlogan);

		if (result.QuotaExceededByRequiredTimes)
			await AwardMedalAsync(MedalTypes.HeroOfSocialistLabour);
	}

	private async Task MineCoalAsync(string mineIdentifier, 
		int timesToExceedQuota, 
		string propagandaSlogan)
	{
		MiningCoalResult result = 
			new MiningCoalResult();

		Mine mine = await mMineRepository
			.FindWorkingPeoplesMineAsync(mineIdentifier);
		
		try
		{
			if (mine == null)
			{
				//A true working man/woman does not stop if 
				//	he/she cannot find the mine - 
				//	He/she builds it!
				mine = await mMineRepository
					.DigMineForTheMotherlandAsync(propagandaSlogan);
			}

			for (int i = 0; i < timesToExceedQuota; i ++)
				await mine.MineCoalAsync(propagandaSlogan);
		}
		catch (Exception)
		{
			//If something goes wrong, cover up the whole thing
			//	and report that we have exceeded the quota
		}
		finally
		{
			result.QuotaExceededByRequiredTimes = true;
		}

		return result;
	}

	private async Task AwardMedalAsync(MedalTypes medalType)
	{
		await mPropagandEngine
			.DistributeMeaninglessBullAboutMedal(medalType);
		await mPropagandEngine
			.DistributePrizeAsync(priceValue: PrizeValue.Meaningless);
	}
}

În sfârșit, secvența minimală pentru executare:

await Stakhanovise
	.CreateForTheMotherland()
	.SetupWorkingPeoplesCommittee(setup => 
	{
		setup.SetupTaskQueueConnection(connSetup => 
		{
			connSetup.WithConnectionString(
				"Host=localmotherland;Port=61117;" 
					+ "Database=coal_mining_db;Username=postgres;" 
					+ "Password=forthemotherland1917;"
			);
		});
	})
	.StartFulfillingFiveYearPlanAsync();

Mai multe informații pot fi găsite pe GitHub, unde am coagulat diversele versiuni pe care le-am folosit de-a lungul timpului, inclusiv o aplicație demonstrativă, iar pe NuGet pot fi găsite pachetele gata de consum, evident, responsabil și cu măsură.