Obsługa zależności czasowych

ucgosu.pl 3 lat temu

Jakiś czas temu otrzymałem na maila takie pytanie:

Czy jest jakaś elegancka metoda, aby zarządzać zdarzeniami czasowymi w systemie?
Generalnie unikamy delay’ów i odnosimy się np. do zegara systemowego. Aby uruchomić daną komendę/operację/funkcję w konkretnym momencie czasowym używamy IF-ów. Sprawa się komplikuje jeżeli chcemy powiązać czasowo różne zdarzenia w systemie.

Oto przykład: chcemy aby:
1. Zdarzenie X było realizowane co 2 sekundy,
2. Niezależne zdarzenie Y co 5 sekund
3. Zdarzenie Z 1 sekundę po zdarzeniu X.

Czy masz tu jakiś pomysł jak taki cel można zrealizować prościej, z wykorzystaniem jakiegoś tricku, aby kod stał się prostszy i bardziej czytelny?

W dzisiejszym wpisie omówię dwa podejścia do tego tematu – z RTOSem i bez.

Weź RTOSa!

To jest cytat, a choćby tytuł prezentacji Mateusza Salamona opisującej typową poradę w takich przypadkach. Generalnie jest to dobra porada. W końcu główną ideą RTOSa jest radzenie sobie z obsługą niezależnych tasków.

I faktycznie w RTOSie możemy rozwiązać ten problem bardzo prosto:

task_y(void *params) { while (1) { rtos_delay_ms(5000); handle_y(); } }

I mamy task wykonujący zadanie co 5 sekund. Wyzwalanie jednego tasku z drugiego z opóźnieniem również nie jest trudne:

task_x(void *params) { while (1) { rtos_delay_ms(2000); handle_x(); rtos_sem_give(sem_z); } } task_z(void *params) { while (1) { rtos_sem_take(sem_z); rtos_delay_ms(1000); handle_z(); } }

wykorzystujemy semafor wyzwalany przez task X i odbierany przez task Z. RTOS pod spodem obsługuje oddzielne stosy dla wszystkich tasku, przełączanie kontekstów, delaye, priorytety itd. Możemy też zamiast semafora użyć kolejki i przekazywać dane z tasku X do Z jednocześnie zapewniając synchronizację. We FreeRTOSie na przykład wszystkie muteksy, semafory itp. tak naprawdę są właśnie kolejkami.

Wygoda vs kontrola

Ale jak to zwykle bywa w przypadku dodawania zewnętrznych bibliotek – dodajemy do projektu bardzo dużo kodu. W ten sposób zwiększamy mocno złożoność. Jednocześnie tracimy kontrolę nad tym co się dzieje i często choćby do końca tego nie rozumiemy. W zamian oszczędzamy czas. Ale czasem może to się obrócić przeciwko nam!

Nie ma chyba lepszego przykładu zwiększenia złożoności i utraty kontroli nad tym co się dzieje wewnątrz niż RTOS. W końcu aby obsługiwać taski niezależnie implementuje scheduler, zmiany kontekstu, obsługuje oddzielne stosy dla tasków, ma wiedzę o zawartości rejestrów CPU i tak dalej.

Dostajemy gotowca i nie musimy się tym wszystkim martwić. Ale możemy w ten sposób spowodować problemy w wielu innych miejscach. Na przykład przez wyścigi, deadlocki, konfigurację stosów dla poszczególnych tasków, priorytetów.

Tak samo jest w przypadku każdej innej biblioteki. Kiedy dodajemy zewnętrzny kod do obsługi wyświetlacza, systemu plików, akcelerometru, stos TCPIP, czy cokolwiek innego – też natrafimy na różne problemy tylko, iż inne. Musimy podjąć decyzję, czy użycie zewnętrznego kodu oszczędzi nam czas, czy nie.

W tym konkretnym przypadku uważam, iż oszczędzi. o ile obsługa kilku niezależnych zadań jest głównym celem systemu – RTOS jest najlepszym rozwiązaniem. A taki projekt z trzema taskami i niezbyt skomplikowanymi zależnościami jest idealny do nauki.

Ale i tak w dalszej części artykułu spróbujemy sobie to zaimplementować bez RTOSa. Również dla celów edukacyjnych.

Rozwiązanie bez RTOSa

Może się wydawać, iż zaproponowane przeze mnie rozwiązanie jest dużo bardziej skomplikowane niż to wyżej na RTOSie. A to dlatego, iż większość złożoności jest schowane właśnie w kodzie RTOSa. Tak naprawdę moje rozwiązanie jest mocno uproszczone, a wręcz brakuje mu pewnych elementów, które RTOS ma. Tyle tylko, iż kiedy piszemy je samemu, od razu widzimy całą tą złożoność.

Próba rozwiązania tego typu zagadnień samemu pomaga nam też lepiej zrozumieć cel i sposób działania RTOSa. Możemy sobie coś takiego raz napisać, a potem już zawsze korzystać z RTOSa, albo jakiejś biblioteki do obsługi eventów.

Pełny kod mojego rozwiązania bez RTOSa znajdziesz tutaj:
https://godbolt.org/z/P746YsMd3

W praktyce ten kod zostałby podzielony na kilka plików, które zaznaczyłem odpowiednimi komentarzami.

W outpucie możesz zobaczyć, iż task X wywołuje się co 5 ticków, task Y co 10 ticków, a task Z 2 ticki po tasku X.

Omówię teraz po kolei, co tam się znajduje.

Po pierwsze mamy funkcję main, która imituje upływ czasu, inicjalizuje i obsługuje eventy:

#define MAX_TIME 1000 int main(void) { int32_t time = 0; events_init(); event_x_add(); event_y_add(); event_z_add(); for (time = 0; time < MAX_TIME; time++) { printf("time: %d\n", time); events_process(); } }

Obsługa eventów

Obsługę eventów oparłem na wskaźnikach na funkcje. Przechowuję gdzieś listę wszystkich eventów w systemie i cyklicznie wywołuję funkcję przechodzącą przez wszystkie aktywne eventy i wywołującą je kiedy trzeba.

Każdy event jest strukturą:

typedef void (*handler_t)(void *); typedef uint32_t timeout_t; struct event { handler_t handler; timeout_t timeout; bool is_active; };

Potrzebuję w niej mieć wskaźnik na funkcję, pozostały timeout i flagę wskazującą, czy event jest aktywny. Moja implementacja jest dostosowana do konkretnego zastosowania. W RTOSie struktura do obsługi tasku, czy kolejki miałaby o wiele więcej elementów.

Mam tablicę eventów:

#define MAX_EVENTS 10 struct event events[MAX_EVENTS];

Mam funkcję do wykonywania typowych akcji na pojedynczym evencie, czyli czyszczenia, resetowania:

void event_clear(struct event *e) { e->handler = NULL; e->timeout = 0; e->is_active = false; } err_code_t event_restart(struct event *e, timeout_t t) { if (e == NULL) { return -1; } e->timeout = t; return 0; }

Wszystkie akcje chcę wykonywać w dedykowanych funkcjach, żeby inne moduły nie wiedziały o wewnętrznej implementacji eventów.

Mam też funkcje działające na całej tablicy eventów:

struct event * event_find_free(void) { int32_t i = 0; struct event *ret = NULL; for (i = 0; i < MAX_EVENTS; i++) { if (!events[i].is_active) { ret = &events[i]; break; } } return ret; } err_code_t event_add(handler_t h, timeout_t t) { struct event *e = event_find_free(); if (e == NULL) { return -1; } e->handler = h; e->timeout = t; e->is_active = true; return 0; } void events_init(void) { int32_t i = 0; for (i = 0; i < MAX_EVENTS; i++) { event_clear(&events[i]); } } void events_process(void) { int32_t i; for (i = 0; i < MAX_EVENTS; i++) { if (events[i].is_active) { struct event * e = &events[i]; e->timeout--; if (e->timeout == 0) { e->handler(e); } } } }

Mamy więc funkcję czyszczącą całą tablicę, dodającą pojedynczy event i wywołującą funkcję dla wszystkich eventu.

Można powiedzieć, iż to jest nasz odpowiednik schedulera, ale bardzo ubogi. Możemy dodawać funkcje do wykonania z jakimś opóźnieniem.

Task cykliczny

W naszym zadaniu Y jest taskiem cyklicznym. Jego implementacja wygląda tak:

#define EVENT_Y_TIMEOUT 10 void event_y_callback(void *param); void add_event_y(void); void event_y_callback(void *param) { printf("event y\n"); event_restart(param, EVENT_Y_TIMEOUT); } void event_y_add(void) { event_add(event_y_callback, EVENT_Y_TIMEOUT); }

Funkcja event_y_callback zawiera obsługę naszego tasku. W tym wypadku ograniczyłem się do wyprintowania na konsolę. A na samym końcu task restartuje swój timeout. Robi to przy użyciu argumentu param i funkcji do obsługi eventu. Task Y nie musi nic wiedzieć o wewnętrznej implementacji eventów – mamy działającą abstrakcję.

Dodawanie eventu do naszego schedulera jest zrealizowane dzięki funkcji event_y_add.

Task wyzwalający akcję

W naszym przykładzie task X jest wywoływany cyklicznie, a jednocześnie wyzwala task Z. Jego implementacja musi więc trochę się różnić od tasku Y. Nie chcemy, aby task X wiedział o wszystkich taskach, które wyzwala. Zamiast tego będzie on udostępniał mechanizm subskrypcji. A dodanie konkretnej akcji będzie w gestii tasku Z. Po raz kolejny dbamy o abstrakcję i odpowiedni przepływ zależności. To task Z zależy od X, dlatego on powinien się zapisywać.

Bardzo często robimy odwrotnie i potem ciężko się połapać w kodzie. Dlatego, iż bez wnikliwej znajomości systemu nie wiemy dlaczego task X miały wykonywać funkcje dla tasku Z i po rozbudowaniu systemu jeszcze 10 innych tasków.

Mechanizm subskrypcji oparłem o wzorzec projektowy Observer. Często w C zapominamy o wzorcach projektowych, ponieważ C nie jest językiem obiektowym. Ale implementacja na wskaźnikach na funkcje sprawdza się równie dobrze. A przy okazji pomaga zapewnić odpowiednią separację modułów.

Kod do obsługi obserwera zrobiłem w sposób ogólny. Dzięki temu można użyć tych samych funkcji do obsługi różnych list subskrybentów:

typedef void (*notify_fun_t)(void *param); err_code_t subscribe(notify_fun_t nf, notify_fun_t *sublist, int32_t sub_max) { int32_t i = 0; err_code_t ret = -1; for (i = 0; i < sub_max; i++) { if (sublist[i] == NULL) { printf("subscribed on idx: %d\n", i); sublist[i] = nf; ret = 0; break; } } return ret; } void notify(notify_fun_t *sublist, int32_t sub_max, void *param) { int32_t i = 0; for (i = 0; i < sub_max; i++) { if (sublist[i] != NULL) { sublist[i](param); } } }

A co tu się adekwatnie dzieje?

Mamy wskaźnik na funkcję notify_fun_t. W funkcji subscribe dopisujemy taki wskaźnik do listy. Natomiast funkcja notify przechodzi przez listę i wywołuje wszystkie funkcje wskaźniki na funkcje, jakie znajdzie.

Możemy teraz użyć tych funkcji w obsłudze taska X:

#define EVENT_X_TIMEOUT 5 void event_x_callback(void *param); void add_event_x(void); #define SUB_X_MAX 5 notify_fun_t subscribed_x[SUB_X_MAX]; void event_x_callback(void *param) { printf("event x\n"); event_restart(param, EVENT_X_TIMEOUT); notify(subscribed_x, SUB_X_MAX, param); } void event_x_add(void) { event_add(event_x_callback, EVENT_X_TIMEOUT); } void subscribe_to_x(notify_fun_t nf) { subscribe(nf, subscribed_x, SUB_X_MAX); }

Dla taska X mamy te same elementy co w tasku Y, czyli funkcje event_x_callback i event_x_add. Mamy również obsługę subskrypcji. A więc jest specjalna tablica subscribed_x przechowująca notify_fun_t oraz funkcje do subskrypcji i notyfikacji z odpowiednimi argumentami.

W tym przypadku również wskaźnik na event przechodzi przez wszystkie funkcje jako void *param i może być wykorzystany w funkcjach obsługujących eventy, o ile zajdzie taka potrzeba.

Podsumowując – task X wykona swoje zadanie, zresetuje swój event, a następnie wykona funkcję notify dla wszystkich zapisanych subskrybentów. Nie obchodzi go, co dokładnie te funkcje robią.

Task wyzwalany

Task Z jest wyzwalany przez task X. A jeszcze, żeby nie było tak prosto na początku ma delay. Task Z korzysta z interfejsu do subskrypcji udostępnianego przez task X:

#define EVENT_Z_TIMEOUT 2 + 1 void event_z_callback(void *param); void add_event_z(void); void event_z_callback(void *param) { printf("event z\n"); event_clear(param); /* * Nie trigerujemy od nowa, zamiast tego czyscimy event. * Po kolejnym wywolaniu z eventu x utworzy sie od nowa. */ } void event_z_notify(void *param) { printf("event z notify called\n"); event_add(event_z_callback, EVENT_Z_TIMEOUT); } void event_z_add(void) { /* Event Z musi wiedziec o istnieniu eventu x, bo od niego zalezy */ subscribe_to_x(event_z_notify); }

Mamy podobnie jak poprzednio funkcje event_z_callback i event_z_add. Jednak ich implementacja się trochę różni. Nasz callback tym razem nie restartuje timeoutu, tylko usuwa się z listy. Event jest od nowa dodawany przez funkcję event_z_notify, którą podaliśmy jako argument podczas subskrypcji do tasku x.

Kiedy wywoła się task X, wywoła się też funkcja notify, task Z zostanie dodany, odczekamy timeout i wywołamy callback dla tasku Z. Następnie task Z zostanie usunięty, poczekamy na kolejne wywołanie tasku X i cykl się zacznie od początku.

W definicji timeoutu dla tasku Z widzimy jedną z trudności własnego implementowania mechanizmów RTOSopodobnych.

#define EVENT_Z_TIMEOUT 2 + 1

Dodanie offsetu 1 to brzydki hack, który zastosowałem na szybko, żeby przykład działał. Nowy event jest dodawany do tablicy jako kolejny element. Pętla obsługująca eventy obsługuje go w tej samej iteracji zmniejszając od razu timeout. W dodatku to zadanie nie jest gwarantowane, bo przy innej kolejności eventów w tablicy task Z może zostać zapisany na wcześniejszy indeks i nie być obsłużony w tej iteracji.

Aby rozwiązać ten problem lepiej musielibyśmy dodać mechanizm gwarantujący, iż task zostanie dodany dopiero po przejściu całej pętli w funkcji events_process.

Takich miejsc, gdzie możemy się wywalić implementując samodzielnie mechanizmy obsługi i synchronizacji tasków jest sporo. I dlatego właśnie korzystamy z gotowych RTOSów, które są na bieżąco rozwijane i aktualizowane. Dzięki temu możemy skupić się na pisaniu naszej aplikacji, a nie na pisaniu schedulera, który jak już wspomniałem sam w sobie może być cięższym zadaniem, niż sama aplikacja.

W ten sposób udało nam się dobrnąć do końca omawiania tej przykładowej implementacji. Jak wspominałem – nie jest to rozwiązanie produkcyjne. Raczej pokazuje sposób realizacji różnych mechanizmów, które możemy zastosować w swoim kodzie.

Podsumowanie

Pokazana przykładowa implementacja korzysta ze wskaźników na funkcje, listy eventów i wzorca observer. Wskaźniki na funkcje pomagają nam nie tylko obsłużyć eventy, ale również wprowadzić odpowiednią strukturę do naszego kodu. Poszczególne moduły wiedzą tylko o tych elementach systemu, o których muszą. Dzięki trzymaniu się tych zasad możemy pisać łatwiejszy do utrzymania kod.

Jeżeli obsługa eventów nie jest najważniejszym celem naszej aplikacji i jest potrzebna tylko w kilku miejscach – możemy zrezygnować z takiej dbałości o zachowanie abstrakcji. Wtedy funkcje będą wywoływane bezpośrednio, kod zostanie zaśmiecony zależnościami, ale dla pojedynczych funkcji może nie opłaca się implementować całych mechanizmów.

Natomiast sama implementacja jest jednak dosyć skomplikowana. Widzimy więc wyraźnie ile RTOS przed nami ukrywa złożoności. A w końcu on to robi w jeszcze bardziej skomplikowany (ale jednocześnie bezpieczniejszy) sposób. Dlatego kiedy eventów robi się więcej, a zależności między nimi są coraz bardziej skomplikowane – RTOS staje się nieunikniony.

Na koniec jeszcze przypominam o trwającej promocji na kurs „C dla Zaawansowanych”. Promocja trwa jeszcze do poniedziałku do północy. A tematy wskaźników na funkcje, interfejsów, wzorców projektowych, czy warstw abstrakcji są tam omówione.

Idź do oryginalnego materiału