Wrzucenie alertów z Elastic Stack na Apache Kafka daje wiele nowych możliwości. Możemy wysłać notyfikacje na Discord. Wszelka automatyzacja i enrichment stoją otworem. W artykule spróbujemy to zrobić na darmowej licencji .
Środowisko
W tym przypadku wykorzystałem:
- Apache Kafka – jako message broker
- Kafka Connect + kafka-connect-elasticsearch-source – do pobierania danych z Elasticsearch’a
- Skrypcik w pythonie – szukałem Kafka Connect sink’a do discorda, ale niestety nikt się takowym nie pochwalił na Github Może kiedyś napiszę swój connector w Scali…
Przygotowując ten artykuł korzystałem z gotowego klastra Elasticsearch, któy miałem pod ręką. Jeśl takiego nie masz, możesz skorzystać z mojego gotowca z którego często korzystam- zorteran/elastic-stack-docker-boilerplate
Jeśli chodzi o Kafkę, wykorzystalem docker-compose cp-all-in-one/docker-compose.yml od Confluenta. Poniżej fragment który musiałem zmienić. W katalogu connect-plugins umieściłem wspomniany kafka-connect-elasticsearch-source. Plik ca.p12 to certyfikat CA mojego klastra Elasticsearch. Musimy musimy mu zaufać, aby powiodła się weryfikacja TLS. Pamiętaj, aby SAN‘y w certyfikacjie był zgodne z rzeczywistością.
... CONNECT_PLUGIN_PATH: "/usr/share/java,/usr/share/confluent-hub-components,/usr/share/connect-plugins" volumes: - "./connect-plugins/ca.p12:/etc/certs/ca.p12" - "./connect-plugins:/usr/share/connect-plugins" ...A po co mi ta Kafka?
Twórca Debezium znany jest ze słów Friends Don’t Let Frends Do Dual-Writes. Debezium (Change Data Capture) poświęciłem osobny wpis na blogu. Na począktu mógłby wystarczyć zwykły skrypt w Python… Z czasem zdarzeniami zainteresuje się drugi zespół, potem ktoś wpadnie na pomysł wysyłania zdarzeń typu crtitical prosto na maila. Finalnie skrypt znacznie się rozrośnie, a jego utrzymanie będzie uciążliwe. Nie bez powodu świat ciągnie w kierunku mikroserwisów i wzorców typu outbox.
Alert w Kibanie
Alerty wizualnie mniej więcej w takiej formie. Testowałem to na banalnych regułach typu wykryj wykonania sleep, vi. Klikając w tab JSON zobaczymy źródłowy dokument znajdujący się w indeksie .internal.alerts-security.alerts-default-*.
Konfiguracja Kafka Connect
Do konfiguracji connectoró możemy wykorzystać (płatnego) Confluent Control Center, (płatnego) Conduktor, (darmowego) AKHQ… albo darmowego curl’a . Poniżej przykładowa konfiguracja. Na co zwróciłbym uwagę:
- “value.converter”: “org.apache.kafka.connect.json.JsonConverter” – ponieważ, dane w Elasticsearch’u to JSON’y
- value.converter.schemas.enable”: “false” – ponieważ interesje nas dokument, a nie jego JSON schema.
- “incrementing.field.name”: “@timestamp” – w przypadku alertów, pole @timestamp będzie najlepszym kandydatem. Dla CDC zwykłych indeksów lepszym kandydatem może być event.created lub event.ingested.
Po przygotowaniu pliku wystarczy go puścić curl’em. jeżeli coś nie działa (pamietasz o prawidłowym certyfikacie i SAN?), zerknij do logów Kafka Connect.
curl -X POST -H "Content-Type: application/json" --data @body.json http://my-kafka.lan:8083/connectors | jqSkrypcik w pythonie
Strumień alertów w topic’ach zapewniony. Można zabrać się za skrypy. Miałem już wygenerowany webhook dla Gitlaba, więc wybrałem tę drogę. W skrócie: niekończąca się pętla, która wrzuca tytuł i opis alertu jako Embed w Discord.
from json import loads from confluent_kafka import Consumer from loguru import logger from discord_webhook import DiscordWebhook, DiscordEmbed KAFKA_CONF = {'bootstrap.servers': "10.10.10.10:9092", 'group.id': "dev-wiaderko", 'auto.offset.reset': 'latest'} # earliest / latest KAFKA_TOPICS = ["^es-alerts.*"] DISCORD_WEBHOOK_URL = "https://discord.com/api/webhooks/something/somethingelse" DISCORD_NOTIFICATION_USERNAME = "Mały Krzykacz" def send_discord_notification(title, description): webhook = DiscordWebhook(url=DISCORD_WEBHOOK_URL, username=DISCORD_NOTIFICATION_USERNAME) embed = DiscordEmbed(title=title, description=description, color='03b2f8') embed.set_timestamp() webhook.add_embed(embed) webhook.execute() def process_message(msg): logger.info("Processing message: topic={} partition={} offset={}", msg.error(), msg.topic(), msg.partition(), msg.offset()) message = loads(msg.value().decode('utf-8')) send_discord_notification(title=message["kibana.alert.rule.name"], description=message["kibana.alert.reason"].replace("senuto", 'wiaderko')) logger.info("If you start me up, I'll never stop!") try: consumer = Consumer(KAFKA_CONF) consumer.subscribe(KAFKA_TOPICS) while True: msg = consumer.poll(timeout=1.0) if msg is None: continue if msg.error(): logger.error("Kafka error: code={} topic={} partition={} offset={}", msg.error(), msg.topic(), msg.partition(), msg.offset()) process_message(msg) finally: consumer.close()PS. Nie umieszczaj kredek/api key itp. w kodzie w repozytorium. Kod powyżej to prototyp. Wykorzystaj zmienne środowiskowe, pliki konfiguracyjne itp. (os.getenv() method)
Wynik działania
Wnioski
Rozwiązanie wymagało minimalną ilość kodu, a znacznie zwiększyło możliwości wykorzystania alertów. Po głowie chodzi mi skrypt wykonujący enrichment alertów dla pól typu domena, IP, hash. Można wykorzystać do tego np. VirusTotal.
Rozważam napisanie swojego Discord Sink’a dla Kafka Connect. Zaletą tego warianty było by gotowe HA w przypadku więcej niż 1 węzła Kafka Connect. W przypadku skryptu Python o HA muszę postarać się sam.