Содержание статьи
Intro
Тех, кто пропустил прошлый номер, кратко введу в курс дела: Esper — это библиотека, позволяющая выполнять сложную обработку событий (корреляцию), поступающих от различных источников. Правила обработки событий описываются с помощью языка EPL (Event Processing Language), который очень похож на SQL. Например, чтобы в потоке событий, поступающих от файрвола, обнаружить попытки сканирования адресов, можно использовать следующее выражение:
select src_ip,dst_ip,dst_port from firewall.win:time(30 sec)group by src_iphaving count(distinct dst_ip) > 50output first every 1 hour
Логика работы Esper и различные примеры ее использования в контексте обеспечения информационной безопасности были детально рассмотрены в предыдущей статье, сейчас же мы сконцентрируемся на практической реализации подсистемы корреляции с помощью Java-версии библиотеки. Как говорится, пристегните ремни.
Строим каркас
Чтобы создать подсистему корреляции на базе Esper, необходимо скачать последнюю версию библиотеки с сайта проекта и распаковать ее. Нам понадобится основной модуль esper-5.1.0.jar, располагающийся в корне, а также все дополнительные библиотеки из папки esper\lib, не забудь добавить их при создании нового Java-проекта (в Eclipse это делается на вкладке Libraries с помощью кнопки Add External JARs).
Так как наше приложение должно обрабатывать различные типы событий, их необходимо описать. Сделать это можно несколькими способами:
-
С помощью класса:
public class Antivirus {private String compname;private String file;private String virusname;public Antivirus(String compname,String file,String virusname){this.compname=compname;this.file=file;this.virusname=virusname;}public String getCompname() {return compname;}public String getFile() {return file;}public String getVirusname() {return virusname;}} -
С помощью объекта, реализующего интерфейс java.util.Map, ключи которого содержат названия полей, а значения — имя класса, соответствующего типу поля:
Map<String, Object> logonEventDef = new HashMap<String, Object>();logonEventDef.put("src_ip", String.class);logonEventDef.put("login", int.class);logonEventDef.put("result", String.class); -
С помощью массива объектов, элементы которого являются полями события:
String[] firewallPropsNames ={"src_ip", "src_port","dst_ip","dst_port","action"};Object[] firewallpropsTypes ={String.class,int.class,String.class,int.class,String.class};
Итак, события описаны, приступаем к инициализации самого движка. Для этого создаем объект класса Configuration и загружаем с помощью его описанные выше типы событий. Данный объект в дальнейшем может использоваться для установки различных настроек движка и расширения его возможностей.
Configuration engineConfig = new Configuration();engineConfig.addEventType("antivirus", Antivirus.class.getName());engineConfig.addEventType("logonEvent",logonEventDef);engineConfig.addEventType("firewall",firewallPropsNames,firewallpropsTypes);
Созданную конфигурацию передаем на вход статическому методу EPServiceProviderManager.getDefaultProvider(), в результате чего получаем экземпляр движка, а затем и административный интерфейс, с помощью которого загружаем EPL-выражения (правила корреляции):
EPServiceProvider engine = EPServiceProviderManager.getDefaultProvider(engineConfig);EPAdministrator admin = engine.getEPAdministrator();// Детектируем брутфорс паролейEPStatement rule = admin.createEPL("select * from logonEvent(result='fail').win:time(1 min) group by src_ip having count(*)>30");
По умолчанию Esper начинает обработку правил сразу после их создания, однако ничто не мешает управлять этим поведением. Правило можно остановить и возобновить в любой момент, достаточно вызвать соответствующие методы его экземпляра:
rule.stop();rule.start();
Далее необходимо организовать отправку событий в движок, чтобы из потока событий Esper начал генерировать алерты согласно заданным правилам. Для этого необходимо получить экземпляр интерфейса EPRuntime и вызвать метод sendEvent, передав событие в качестве параметра. В зависимости от того, каким способом было представлено событие, необходимо использовать соответствующую версию метода:
EPRuntime runtime = engine.getEPRuntime();runtime.sendEvent(new Antivirus("user-pc","c:\\windows\\virus.exe","Trojan"));Map<String, Object> logonEvent = new HashMap<String, Object>();logonEvent.put("src_ip", "10.0.0.1");logonEvent.put("login", "root");logonEvent.put("result", "fail");runtime.sendEvent(logonEvent,"logonEvent");Object [] firewallEvent={"10.0.0.1",32000,"10.0.0.2",22,"permit"};runtime.sendEvent(firewallEvent,"firewall");
Получить результаты работы правил можно различными способами. Самый простой — использовать объект, реализующий интерфейс UpdateListener:
public class MyUpdateListener implements UpdateListener { public void update(EventBean[] newEvents, EventBean[] oldEvents) { if (newEvents != null) { String eventType = newEvents[0].getEventType().toString(); Object event = newEvents[0].getEventType(); System.out.println("Event received "+eventType+" " + newEvents[0].getUnderlying()); } }}
Приложение начнет получать результаты работы правил (алерты) после подписки, оформляемой с помощью вызова метода addListener.
UpdateListener myListener = new MyUpdateListener();rule.addListener(myListener);
Теперь после каждого срабатывания зарегистрированных в движке правил будет вызываться метод update, в который будет передаваться массив «старых» и «новых» событий. В первую очередь нас будут интересовать «новые» события, отражающие состояние EPL-выражения на момент срабатывания.
Для представления результата срабатывания правила в формате XML или JSON (последний будет как нельзя кстати при необходимости сохранения данных в Elasticsearch) необходимо использовать интерфейс EventRenderer, доступ к которому получается через интерфейс EPRuntime следующим образом:
JSONEventRenderer jsonRenderer = engine.getEPRuntime(). getEventRenderer().getJSONRenderer(rule.getEventType());String json = jsonRenderer.render(event);
Расширяем возможности
Если вернуться к примеру с определением сканирования адресов, то в нем таится один недостаток — в реальной инфраструктуре всегда есть серверы, генерирующие большое число соединений: DNS, Proxy, системы мониторинга, сканеры уязвимостей. Таких доверенных серверов может быть не один десяток. Можно, конечно, зашить список доверенных узлов в тело EPL-выражения с помощью фильтра (как мы делали в предыдущем номере), но гораздо удобнее хранить этот список во внешнем источнике, например в таблице СУБД, ведь Esper из коробки поддерживает работу с базами данных через JDBC-драйвер. Подключить СУБД можно с помощью файла конфигурации или используя API.
Рассмотрим последний способ. Сначала необходимо создать контейнер ConfigurationDBRef для хранения конфигурации работы с базами данных и заполнить его параметрами: строка подключения, логин, пароль. После чего добавить сконфигурированный контейнер в общую конфигурацию движка.
ConfigurationDBRef mysql = new ConfigurationDBRef();mysql.setDriverManagerConnection("com.mysql.jdbc.Driver", "jdbc:mysql://localhost/testDB", "user", "password");mysql.setExpiryTimeCache(60, 120);engineConfig.addDatabaseReference("mysql", mysql);
В данном примере мы также добавили настройки кеширования, благодаря этому результат SQL-выражения будет действительным в течение 60 с, при этом каждые две минуты движок будет производить очистку результатов, время жизни которых более 60 с.
Использовать сконфигурированный внешний источник в EPL-выражении для устранения ложных срабатываний при обнаружении сканирования адресов можно следующим образом:
select src_ip,dst_ip,dst_port,isAllowedfrom firewall.win:time(30 sec) as fw,sql:mysql ['select case when exists(select ip from scanAllowed where ip=${src_ip}) then true else false end as isAllowed'] as allowedwhere isAllowed=0group by fw.src_iphaving count(distinct fw.dst_ip) > 50output first every 1 hour;
Для данного примера кеширование будет задействовано только для одинаковых адресов источника. Как правило, список доверенных адресов содержит не более сотни узлов, поэтому целесообразней его закешировать и объединить с адресами из событий с помощью outer join:
select src_ip,dst_ip,dst_port,ipfrom firewall.win:time(30 sec) as fwleft outer join sql:mysql ['select ip from scanAllowed'] as allowedon fw.src_ip=allowed.ipwhere ip is nullgroup by fw.src_iphaving count(distinct fw.dst_ip) > 50
Благодаря кешу Esper должен был запомнить список доверенных IP-адресов, полученных из таблицы scanAllowed, и автоматически построить индекс для быстрого поиска, однако при отладке обнаружился интересный нюанс: при включенном кешировании Esper выдавал некорректные результаты, о чем был заведен тикет в JIRA.
Предположим, что, помимо списка доверенных хостов, у нас есть справочник ipplan, в котором хранятся диапазоны адресов и их описание.
Для повышения быстродействия поиска диапазоны хранятся в форме начального и конечного адреса, представленного в числовом виде. Предположим, что согласно политике безопасности подключения из Wi-Fi-сети к сегменту баз данных запрещены, поэтому в случае ошибки конфигурации межсетевого экрана или какой‑нибудь диверсии нам необходимо оперативно выявлять такие коннекты. Несмотря на то что событие от файрвола содержит адреса в текстовом формате, благодаря гибкости Esper мы можем определить свою собственную функцию по переводу IP-адреса в числовой формат, которую можно будет использовать в EPL-выражениях:
public class MyEsperUtils { public static Long ipToInt(String addr) { String[] addrArray = addr.split("\\."); long num = 0; for (int i=0;i<addrArray.length;i++) { int power = 3-i; num += ((Integer.parseInt(addrArray[i])%256 * Math.pow(256,power))); } return num; }}
После определения функции ее необходимо зарегистрировать в конфигурации, передав имя, название класса и метода, реализующего функцию в качестве параметров:
engineConfig.addPlugInSingleRowFunction("ipToInt", "MyEsperUtils", "ipToInt");
Теперь все готово, чтобы использовать нашу новую функцию в EPL-выражении для выявления запрещенных соединений между сегментами wifi и database:
select src_ip,dst_ip,action,src_net.description,dst_net.descriptionfrom firewall as fw,sql:mysql ['select description from ipplan where ${ipToInt(src_ip)} between startaddr and endaddr'] as src_net,sql:mysql ['select description from ipplan where ${ipToInt(dst_ip)} between startaddr and endaddr'] as dst_netwhere src_net.description = 'wifi' and dst_net.description='database' and action='permit'output first every 1 hour
Благодаря возможности определять свои собственные функции можно существенно расширить функционал EPL-правил, например добавить поддержку определения географических координат по IP-адресам, используя базы GeoIP, и измерять расстояние между двумя событиями.
Отладка и поиск ошибок
Отладка и поиск ошибок неразрывно связаны с разработкой софта. Ребята из Esper позаботились об упрощении этой задачи. В качестве дефолтного компонента логирования используется Log4j. Чтобы задать его параметры, достаточно подключить файл конфигурации log4j.xml при запуске приложения с помощью свойства log4j.configuration (за основу можно взять файл, поставляемый вместе с самой библиотекой из директории esper\etc):
java -Dlog4j.configuration=log4j.xml ...
Для отладки EPL-выражений удобно использовать аннотацию @Audit, которую необходимо вставить перед самим текстом правила:
@Audit('stream,property') select src_ip,dst_ip,dst_port from firewall
В параметрах аннотации перечисляются категории операций, которые необходимо логировать. Так, категория stream отвечает за вывод каждого события, получаемого правилом, а property — за отображение названия полей событий и их значений. Таких категорий в Esper насчитывается более десятка.
Другая интересная возможность, которая может быть полезна для оптимизации EPL-выражений, — это вывод плана запроса. Включение вывода плана может быть выполнено следующим образом:
engineConfig.getEngineDefaults().getLogging().setEnableQueryPlan(true);
Включение логирования результатов работы SQL-запросов при работе с внешними источниками делается похожим способом:
engineConfig.getEngineDefaults().getLogging().setEnableJDBC(true);
Благодаря этой опции можно увидеть, как работает кеширование, сколько времени выполнялся запрос и сколько значений было возвращено СУБД.
info
Esper поддерживает доступ к СУБД через jdbc, к нереляционным данным через вызов Java-методов, определение собственных функций для EPL, все это позволяет гибко расширять функционал.
Интеграция с Logstash
Корреляция не заменяет задачу сбора и хранения логов, ее результаты служат отправной точкой для реагирования на инцидент, зафиксированный с помощью правила. Отличным набором для организации логирования и быстрого поиска по событиям является комбинация Elasticsearch — Logstash — Kibana (ELK). Elasticsearch представляет собой поисковый индекс (БД) для хранения и поиска событий, Kibana — удобное средство для визуализации и поиска хранимых данных, Logstash — гибкий и универсальный парсер. В этой связке явно не хватает коррелятора, чтобы приблизить это решение к гордому названию SIEM. Устраним этот недостаток; к счастью, каркас у нас уже готов. Все, что осталось сделать, — это направить события из Logstash в коррелятор, а результаты работы сохранить в Elasticsearch. Интеграцию будем проводить с использованием Redis, который является рекомендованным «брокером».

Для демонстрации в Redis будет два списка:
- input — содержит обработанные Logstash события, представленные в JSON-формате. Данные из этого списка будут поступать на вход коррелятора;
- alerts — в этот список коррелятор будет выгружать результат срабатывания правил.
Для простоты будем считать, что события описаны с помощью java.util.Map. Забирать события из Redis будем с помощью библиотеки Jedis, вызывая в цикле следующий фрагмент кода (здесь и далее по тексту не приводится обработка исключений и инициализация Jedis):
Jedis jedisTake = jedisFactory.getJedisPool().getResource();// Получаем события из RedisList<String> events = jedisTake.blpop(0,input);String event = events.get(1);JSONObject eventJson = new JSONObject(event);// Получаем тип событияString type = eventJson.getString("type");Map<String, Object> eventMap = new HashMap<String, Object>();Iterator<String> keys = eventJson.keys();while(keys.hasNext()){ String key = keys.next(); String value = eventJson.getString(key); // Наполняем поля события для отправки в коррелятор eventMap.put(key, value);}// Отправляем в корреляторruntime.sendEvent(eventMap,type);
В результате события из списка input попадут на вход коррелятора. Для получения алертов необходимо модифицировать метод update, который преобразует результат работы правила в формат JSON и сохранит его в списке alerts:
public void update(EventBean[] newEvents, EventBean[] oldEvents) { Jedis jedisPublish = jedisFactory.getJedisPool().getResource(); Pipeline pipe = jedisPublish.pipelined(); String alertEvent = ""; for(int i=0; i<newEvents.length; i++){ // Получаем тип события EventType eventType = newEvents[i].getEventType(); jsonRenderer = runtime.getEventRenderer().getJSONRenderer(eventType); // Формируем JSON-представление алерта alertEvent = jsonRenderer.render(newEvents[i]); // Сохраняем алерт в Redis jedisPublish.rpush(alerts, alertEvent); } pipe.sync();}
Осталось только настроить Logstash для перекладывания алертов из Redis в Elasticsearch, и можно использовать Kibana для анализа инцидентов.
warning
При работе с внешними источниками обязательно тестируй правила на производительность, Esper поддерживает хорошие возможности для отладки и поиска узких мест.
Заключение
Итак, мы рассмотрели все основные шаги, необходимые для создания своей подсистемы корреляции с помощью Java-версии библиотеки Esper, благодаря работе с внешними источниками данных и собственным функциям сделали использование нашего коррелятора более удобным. Добавив Esper к связке Elasticsearch — Logstash — Kibana, мы приблизили ELK-стек к полноценной SIEM. Чтобы ты лучше разобрался в рассмотренном материале, я подготовил демоприложение, которое может стать отправной точкой для решения твоих собственных задач из различных областей, где требуется сложная обработка событий с минимальной задержкой. Удачных экспериментов