Threat Hunting LOLBAS w Elastic Stack na 2 sposoby (gościnnie Jupyter)

wiadrodanych.pl 1 rok temu
Zdjęcie: Threat Hunting LOLBAS Elastic Stack


Indeksowania danych do klastra Elastic Stack to dobry moment na wyłuskanie wartości z danych. W przypadku Threat Hunting‘u, dobrze przygotowane dane mogą znacznie przyśpieszyć proces wyszukiwania zagrożeń i anomalii. W artykule uprościmy wyszukiwanie procesów LOLBAS.

Co to jest LOLBAS / LOLBIN?

LOLBAS (Living Off The Land Binaries, Scripts and Libraries) to niezłośliwe skrypty, binarki i biblioteki, które wykorzystywane są adwersarze do robienia złośliwych rzeczy. Pliki te są elementami rozwiązań Microsoftu i nierzadko są podpisane przez Microsoft.

Wykorzystywane są do eksploitacji, persystencji, eskalacji uprawnień, dostarczania złośliwego systemu i obchodzenia detekcji. Na stronie projektu znajdziesz opis, funkcje, mapping do technik MITRE ATT&CK i przykłady użycia.

SITREP

Dostałeś/aś zadanie przeprowadzenia Threat Huntingu w oparciu o projekt LOLBAS .

Setup:

Repo:

https://github.com/zorteran/threat-hunting-lolbas-elastic-stack

Opcja 1. Łatwiej ale wolniej (ale trudniej)

Chcemy ograniczyć analizę do Event 1 (process creation) z Sysmon’a, więc odpalamy filtr event.provider: Microsoft-Windows-Sysmon AND event.code: 1 i przeglądamy… przeglądamy… i nagle jest!

Sam wykonałem mshta.exe aby ściągnąć i wyświetlić plik robots.txt z mojego bloga, więc wiedziałem czego szukam.

Problem z szukaniem

Co w przypadku gdy nie wiemy czego szukamy? Robimy nowe query!
process.name: ("mshta.exe" or "certutil.exe" or "certos.exe" or "replace.exe" or ...) Sporo tych binarek. Dużo pisania. Ciężko wypisać wszystkie, a co dopiero pokatalogować je po funkcjonalności. Może w pierwszej kolejności najlepiej się skupić np. na ściąganiu plików dzięki binarek lolbas?

Czy jest to problem? Zależy. Nie raz generowałem zapytania KQL dzięki Excela lub Pythona. Ad-hoc zapytanie wykona się o ile mieścimy się w maksymalnej liczbie filtrów. Gorzej jeżeli chcemy je powtarzać co 5 minut w regule detekcji….

Problem z wydajnością

Zastanawiałeś/aś się co dzieje się pod spodem gdy wykonujemy zapytanie? Na pomoc przychodzi Search Profiler znajdujący się w Dev Tools w Kibanie.

Wyszukiwanie jednego procesu trwało 0.086ms
Wyszukiwanie czterech procesów trwało 0.470ms

Na screenach powyżej widać ponad 5 krotną różnicę w czasie zapytań. Nie jest to precyzyjny pomiar (wykonałem go tylko raz), ale widać mniejszą lub większą różnicę. Na produkcyjnym klastrze i danych różnice będą bardziej widoczne.

Czy to źle?

Nie. Analityk w działaniach ad-hoc poradzi sobie z żonglowaniem zapytaniami, a czas wykonania zapytania będzie i tak wystarczający (Elasticsearch jest szybki i potrafi sporo wybaczyć). Jednak w przypadku reguł detekcji i powtarzalnych obserwacji takie podejście może być najwygodniejsze.

Opcja 2. Trudniej ale szybciej (ale łatwiej)

Będzie to opcja dłuższa, ale ciekawsza. Przygodę zaczniemy od odpalenia Jupytera . Używałem go już z sukcesem we wpisie o 10 najważniejszych źródeł MITRE ATT&CK. LOLBAS to też projekt na Github’ie. Każda binarka to osobny plik YAML. Spróbujemy wyciągnąć listę binarek oraz ich funkcje. Będzie to słownik pod wzbogacanie danych z winlogbeat.

Przekształcenie danych z LOLBAS w CSV

Pierwszym krokiem jest ściągnięcie repozytorium.

!git clone https://github.com/LOLBAS-Project/LOLBAS.git

Następnie import bibliotek i załadowanie wszystkich YAML do listy.

import os import sys import yaml from yaml.loader import SafeLoader import pandas as pd lolbas_list = [] for root, subdirs, files in os.walk(".\LOLBAS\yml"): for file in files: with open(f'{root}\\{file}') as f: lolbas_entry = yaml.load(f, Loader=SafeLoader) lolbas_list.append(lolbas_entry)

Ograniczymy się do nazwy i kategorii, a rezultat zapiszemy w pliku CSV. Tagi z funkcjami będą miały przedrostek lolbas_. Ograniczam się do binarek *.exe.

def unique(list1): list_set = set(list1) unique_list = (list(list_set)) return unique_list def process_lolbas_entry(entry): return { 'name': entry['Name'].lower(), 'tags': ",".join(unique(['lolbas'] + [ 'lolbas_' + x["Category"].lower() for x in entry["Commands"] ])) } df = pd.DataFrame(list(map(process_lolbas_entry, lolbas_list))) df[df['name'].str.contains(".exe")].to_csv("lolbas_dict.csv",index=False,header=False)

Przygotowanie słownika i ingest pipeline

Wygenerowany plik CSV jest mały i można go zapisać w Elasticsearch dzięki Kibany – Data Visualizer’a w module Machine Learning (jest to darmowa funkcjonalność). Utworzyłem indeks dict-lolbas. W Ingest pipeline wykorzystałem processor split, aby otrzymać tablicę tagów.

Dodany indeks wykorzystamy do wzbogacania danych dzięki Enrich Policy i Enrich Processor. Wspominałem o tej metodzie w artykule gdzie wykrywaliśmy phishing na podstawie feedu z CERT Polska. W momencie wykonania _execute tworzony jest dedykowany indeks na bazie wskazanych indeksów źródłowych. Jest zoptymalizowany pod kątem liczby segmentów (1) i replik (odnoszę wrażenie, iż shardów łącznie jest tyle ile hot/content node’ów).

PUT /_enrich/policy/lolbas-binaries { "match": { "indices": "dict-lolbas", "match_field": "process_name", "enrich_fields": ["tags"] } } PUT _enrich/policy/lolbas-binaries/_execute
W trakcie implementacji i testowania winlogbeat-lolbas-enrichment

Finalnie mamy pipeline który:

  1. Wzbogaca dokument o pole lolbas_enrichment
  2. Dodaje tagi z pola lolbas_enrichment.tags
  3. Usuwa pole lolbas_enrichment
PUT _ingest/pipeline/winlogbeat-lolbas-enrichment { "description": "Adding tags to lolbas processes", "processors": [ { "enrich": { "policy_name": "lolbas-binaries", "field": "process.name", "target_field": "lolbas_enrichment", "if": "ctx.process.name != null" } }, { "foreach": { "field": "lolbas_enrichment.tags", "processor": { "append": { "field": "tags", "value": [ "{{{_ingest._value}}}" ], "allow_duplicates": false } }, "if": "ctx.lolbas_enrichment != null" }, "remove": { "field": "lolbas_enrichment", "if": "ctx.lolbas_enrichment != null" } } ] }

Ostatni element układanki to dodanie Pipeline Processor do głównego ingest pipeline wykorzystywanego przez winlogbeat’a, czyli w tym przypadku winlogbeat-8.5.2-routing. Pipeline ten składa się praktycznie z samych pod-pipeline’ów.

[ { "set": { "field": "event.ingested", "value": "{{_ingest.timestamp}}" } }, { "pipeline": { "name": "winlogbeat-8.5.2-security", "if": "ctx?.winlog?.channel == 'Security' && ['Microsoft-Windows-Eventlog', 'Microsoft-Windows-Security-Auditing'].contains(ctx?.winlog?.provider_name)" } }, { "pipeline": { "name": "winlogbeat-8.5.2-sysmon", "if": "ctx?.winlog?.channel == 'Microsoft-Windows-Sysmon/Operational'" } }, { "pipeline": { "name": "winlogbeat-8.5.2-powershell", "if": "ctx?.winlog?.channel == 'Windows PowerShell'" } }, { "pipeline": { "if": "ctx?.winlog?.channel == 'Microsoft-Windows-PowerShell/Operational'", "name": "winlogbeat-8.5.2-powershell_operational" } }, { "pipeline": { "name": "winlogbeat-lolbas-enrichment", "if": "ctx?.process?.name != null", "ignore_failure": true } } ]

Efekt

Chwila działania nowego pipeline’u i już widać efekty. Mamy do dyspozycji kilka tagów.

Za pomocą prostego filtra tags: "lolbas_download" widzimy nie tylko Event 1, ale również Event 3 (a docelowo wszystkie zdarzenia które mają pole process.name.

Zróbmy więc regułę! LOLBAS z funkcją Download odnoszą się do zasobów takich jak web (http) czy zasoby sieciowe (ftp, smb). Skupmy się na http. Wykorzystamy do tego EQL.

process where tags == "lolbas_download" and process.command_line: "*http*"

Wildcard query również można zoptymalizować tagując wcześniej dokumenty z frazą http na poziomie indeksowania do klastra Elasticsearch.

Czy to rozwiązanie jest kompletne?

Nie jest, ale nie ma sensu się tym przejmować. Zasada Pareto: 20 % wysiłku, 80 % efektu. Np. pierwsza binarka z listy LOLBAS “AppInstaller.exe” ma inną nazwę procesu. Baza rozwiązania jest. Trzeba trochę dopracować CSV’kę .

Jaki jest morał tej historii?

  1. Wzbogacanie danych potrafi znacznie zwiększyć efektywność pracy analityka..
  2. Jupyter to fajne narzędzie, które minimalizuje potrzebę wykorzystania interfejsu białkowego (robienie czegoś “z łapy”)
Idź do oryginalnego materiału