В прош­лом номере мы рас­смот­рели воз­можнос­ти биб­лиоте­ки Esper, пред­назна­чен­ной для слож­ной обра­бот­ки событий. В дан­ной статье перей­дем от теории к прак­тике, соз­дадим свою собс­твен­ную под­систе­му кор­реляции и интегри­руем ее с популяр­ной связ­кой Elasticsearch — Logstash — Kibana.
 

Intro

Тех, кто про­пус­тил прош­лый номер, крат­ко вве­ду в курс дела: Esper — это биб­лиоте­ка, поз­воля­ющая выпол­нять слож­ную обра­бот­ку событий (кор­реляцию), пос­тупа­ющих от раз­личных источни­ков. Пра­вила обра­бот­ки событий опи­сыва­ются с помощью язы­ка EPL (Event Processing Language), который очень похож на SQL. Нап­ример, что­бы в потоке событий, пос­тупа­ющих от фай­рво­ла, обна­ружить попыт­ки ска­ниро­вания адре­сов, мож­но исполь­зовать сле­дующее выраже­ние:

select src_ip,dst_ip,dst_port from firewall.win:time(30 sec)
group by src_ip
having count(distinct dst_ip) > 50
output first every 1 hour

Ло­гика работы Esper и раз­личные при­меры ее исполь­зования в кон­тек­сте обес­печения информа­цион­ной безопас­ности были деталь­но рас­смот­рены в пре­дыду­щей статье, сей­час же мы скон­цен­три­руем­ся на прак­тичес­кой реали­зации под­систе­мы кор­реляции с помощью Java-вер­сии биб­лиоте­ки. Как говорит­ся, прис­тегни­те рем­ни.

 

Строим каркас

Что­бы соз­дать под­систе­му кор­реляции на базе Esper, необ­ходимо ска­чать пос­леднюю вер­сию биб­лиоте­ки с сай­та про­екта и рас­паковать ее. Нам понадо­бит­ся основной модуль esper-5.1.0.jar, рас­полага­ющий­ся в кор­не, а так­же все допол­нитель­ные биб­лиоте­ки из пап­ки esper\lib, не забудь добавить их при соз­дании нового Java-про­екта (в Eclipse это дела­ется на вклад­ке Libraries с помощью кноп­ки Add External JARs).

Подключаем необходимые библиотеки
Под­клю­чаем необ­ходимые биб­лиоте­ки

Так как наше при­ложе­ние дол­жно обра­баты­вать раз­личные типы событий, их необ­ходимо опи­сать. Сде­лать это мож­но нес­коль­кими спо­соба­ми:

  1. С помощью клас­са:

    public class Antivirus {
    private String compname;
    private String file;
    private String virusname;
    public Antivirus(String compname,String file,String virusname){
    this.compname=compname;
    this.file=file;
    this.virusname=virusname;
    }
    public String getCompname() {return compname;}
    public String getFile() {return file;}
    public String getVirusname() {return virusname;}
    }
  2. С помощью объ­екта, реали­зующе­го интерфейс java.util.Map, клю­чи которо­го содер­жат наз­вания полей, а зна­чения — имя клас­са, соот­ветс­тву­юще­го типу поля:

    Map<String, Object> logonEventDef = new HashMap<String, Object>();
    logonEventDef.put("src_ip", String.class);
    logonEventDef.put("login", int.class);
    logonEventDef.put("result", String.class);
  3. С помощью мас­сива объ­ектов, эле­мен­ты которо­го явля­ются полями события:

    String[] firewallPropsNames =
    {"src_ip", "src_port","dst_ip","dst_port","action"};
    Object[] firewallpropsTypes =
    {String.class,int.class,String.class,int.class,String.class};

Итак, события опи­саны, прис­тупа­ем к ини­циали­зации самого движ­ка. Для это­го соз­даем объ­ект клас­са Configuration и заг­ружа­ем с помощью его опи­сан­ные выше типы событий. Дан­ный объ­ект в даль­нейшем может исполь­зовать­ся для уста­нов­ки раз­личных нас­тро­ек движ­ка и рас­ширения его воз­можнос­тей.

Configuration engineConfig = new Configuration();
engineConfig.addEventType("antivirus", Antivirus.class.getName());
engineConfig.addEventType("logonEvent",logonEventDef);
engineConfig.addEventType("firewall",firewallPropsNames,firewallpropsTypes);

Соз­данную кон­фигура­цию переда­ем на вход ста­тичес­кому методу EPServiceProviderManager.getDefaultProvider(), в резуль­тате чего получа­ем экзем­пляр движ­ка, а затем и адми­нис­тра­тив­ный интерфейс, с помощью которо­го заг­ружа­ем EPL-выраже­ния (пра­вила кор­реляции):

EPServiceProvider engine = EPServiceProviderManager.getDefaultProvider(engineConfig);
EPAdministrator admin = engine.getEPAdministrator();
// Детектируем брутфорс паролей
EPStatement rule = admin.createEPL("select * from logonEvent(result='fail').win:time(1 min) group by src_ip having count(*)>30");

По умол­чанию Esper начина­ет обра­бот­ку пра­вил сра­зу пос­ле их соз­дания, одна­ко нич­то не меша­ет управлять этим поведе­нием. Пра­вило мож­но оста­новить и возоб­новить в любой момент, дос­таточ­но выз­вать соот­ветс­тву­ющие методы его экзем­пля­ра:

rule.stop();
rule.start();

Да­лее необ­ходимо орга­низо­вать отправ­ку событий в дви­жок, что­бы из потока событий Esper начал генери­ровать алер­ты сог­ласно задан­ным пра­вилам. Для это­го необ­ходимо получить экзем­пляр интерфей­са EPRuntime и выз­вать метод sendEvent, передав событие в качес­тве парамет­ра. В зависи­мос­ти от того, каким спо­собом было пред­став­лено событие, необ­ходимо исполь­зовать соот­ветс­тву­ющую вер­сию метода:

EPRuntime runtime = engine.getEPRuntime();
runtime.sendEvent(new Antivirus("user-pc","c:\\windows\\virus.exe","Trojan"));
Map<String, Object> logonEvent = new HashMap<String, Object>();
logonEvent.put("src_ip", "10.0.0.1");
logonEvent.put("login", "root");
logonEvent.put("result", "fail");
runtime.sendEvent(logonEvent,"logonEvent");
Object [] firewallEvent={"10.0.0.1",32000,"10.0.0.2",22,"permit"};
runtime.sendEvent(firewallEvent,"firewall");

По­лучить резуль­таты работы пра­вил мож­но раз­личны­ми спо­соба­ми. Самый прос­той — исполь­зовать объ­ект, реали­зующий интерфейс UpdateListener:

public class MyUpdateListener implements UpdateListener {
public void update(EventBean[] newEvents, EventBean[] oldEvents) {
if (newEvents != null) {
String eventType = newEvents[0].getEventType().toString();
Object event = newEvents[0].getEventType();
System.out.println("Event received "+eventType+" "
+ newEvents[0].getUnderlying());
}
}
}

При­ложе­ние нач­нет получать резуль­таты работы пра­вил (алер­ты) пос­ле под­писки, офор­мля­емой с помощью вызова метода addListener.

UpdateListener myListener = new MyUpdateListener();
rule.addListener(myListener);

Те­перь пос­ле каж­дого сра­баты­вания зарегис­три­рован­ных в движ­ке пра­вил будет вызывать­ся метод update, в который будет переда­вать­ся мас­сив «ста­рых» и «новых» событий. В пер­вую оче­редь нас будут инте­ресо­вать «новые» события, отра­жающие сос­тояние EPL-выраже­ния на момент сра­баты­вания.

Для пред­став­ления резуль­тата сра­баты­вания пра­вила в фор­мате XML или JSON (пос­ледний будет как нель­зя кста­ти при необ­ходимос­ти сох­ранения дан­ных в Elasticsearch) необ­ходимо исполь­зовать интерфейс EventRenderer, дос­туп к которо­му получа­ется через интерфейс EPRuntime сле­дующим обра­зом:

JSONEventRenderer jsonRenderer = engine.getEPRuntime().
getEventRenderer().getJSONRenderer(rule.getEventType());
String json = jsonRenderer.render(event);
 

Расширяем возможности

Ес­ли вер­нуть­ся к при­меру с опре­деле­нием ска­ниро­вания адре­сов, то в нем таит­ся один недос­таток — в реаль­ной инфраструк­туре всег­да есть сер­веры, генери­рующие боль­шое чис­ло соеди­нений: DNS, Proxy, сис­темы монито­рин­га, ска­неры уяз­вимос­тей. Таких доверен­ных сер­веров может быть не один десяток. Мож­но, конеч­но, зашить спи­сок доверен­ных узлов в тело EPL-выраже­ния с помощью филь­тра (как мы делали в пре­дыду­щем номере), но гораз­до удоб­нее хра­нить этот спи­сок во внеш­нем источни­ке, нап­ример в таб­лице СУБД, ведь Esper из короб­ки под­держи­вает работу с базами дан­ных через JDBC-драй­вер. Под­клю­чить СУБД мож­но с помощью фай­ла кон­фигура­ции или исполь­зуя API.

Пример конфигурации внешнего источника с помощью файла
При­мер кон­фигура­ции внеш­него источни­ка с помощью фай­ла

Рас­смот­рим пос­ледний спо­соб. Сна­чала необ­ходимо соз­дать кон­тей­нер ConfigurationDBRef для хра­нения кон­фигура­ции работы с базами дан­ных и запол­нить его парамет­рами: стро­ка под­клю­чения, логин, пароль. Пос­ле чего добавить скон­фигури­рован­ный кон­тей­нер в общую кон­фигура­цию движ­ка.

ConfigurationDBRef mysql = new ConfigurationDBRef();
mysql.setDriverManagerConnection("com.mysql.jdbc.Driver",
"jdbc:mysql://localhost/testDB", "user", "password");
mysql.setExpiryTimeCache(60, 120);
engineConfig.addDatabaseReference("mysql", mysql);

В дан­ном при­мере мы так­же добави­ли нас­трой­ки кеширо­вания, бла­года­ря это­му резуль­тат SQL-выраже­ния будет дей­стви­тель­ным в течение 60 с, при этом каж­дые две минуты дви­жок будет про­изво­дить очис­тку резуль­татов, вре­мя жиз­ни которых более 60 с.

Ис­поль­зовать скон­фигури­рован­ный внеш­ний источник в EPL-выраже­нии для устра­нения лож­ных сра­баты­ваний при обна­руже­нии ска­ниро­вания адре­сов мож­но сле­дующим обра­зом:

select src_ip,dst_ip,dst_port,isAllowed
from firewall.win:time(30 sec) as fw,
sql:mysql ['select case when exists(select ip from scanAllowed
where ip=${src_ip}) then true
else false end as isAllowed'] as allowed
where isAllowed=0
group by fw.src_ip
having count(distinct fw.dst_ip) > 50
output first every 1 hour;

Для дан­ного при­мера кеширо­вание будет задей­ство­вано толь­ко для оди­нако­вых адре­сов источни­ка. Как пра­вило, спи­сок доверен­ных адре­сов содер­жит не более сот­ни узлов, поэто­му целесо­образней его закеши­ровать и объ­еди­нить с адре­сами из событий с помощью outer join:

select src_ip,dst_ip,dst_port,ip
from firewall.win:time(30 sec) as fw
left outer join sql:mysql ['select ip from scanAllowed'] as allowed
on fw.src_ip=allowed.ip
where ip is null
group by fw.src_ip
having count(distinct fw.dst_ip) > 50

Бла­года­ря кешу Esper дол­жен был запом­нить спи­сок доверен­ных IP-адре­сов, получен­ных из таб­лицы scanAllowed, и авто­мати­чес­ки пос­тро­ить индекс для быс­тро­го поис­ка, одна­ко при отладке обна­ружил­ся инте­рес­ный нюанс: при вклю­чен­ном кеширо­вании Esper выдавал некор­рек­тные резуль­таты, о чем был заведен ти­кет в JIRA.

Пред­положим, что, помимо спис­ка доверен­ных хос­тов, у нас есть спра­воч­ник ipplan, в котором хра­нят­ся диапа­зоны адре­сов и их опи­сание.

Справочник с описанием сетей
Спра­воч­ник с опи­сани­ем сетей

Для повыше­ния быс­тро­дей­ствия поис­ка диапа­зоны хра­нят­ся в фор­ме началь­ного и конеч­ного адре­са, пред­став­ленно­го в чис­ловом виде. Пред­положим, что сог­ласно полити­ке безопас­ности под­клю­чения из Wi-Fi-сети к сег­менту баз дан­ных зап­рещены, поэто­му в слу­чае ошиб­ки кон­фигура­ции меж­сетево­го экра­на или какой‑нибудь дивер­сии нам необ­ходимо опе­ратив­но выяв­лять такие кон­некты. Нес­мотря на то что событие от фай­рво­ла содер­жит адре­са в тек­сто­вом фор­мате, бла­года­ря гиб­кости Esper мы можем опре­делить свою собс­твен­ную фун­кцию по перево­ду IP-адре­са в чис­ловой фор­мат, которую мож­но будет исполь­зовать в EPL-выраже­ниях:

public class MyEsperUtils {
public static Long ipToInt(String addr) {
String[] addrArray = addr.split("\\.");
long num = 0;
for (int i=0;i<addrArray.length;i++) {
int power = 3-i;
num += ((Integer.parseInt(addrArray[i])%256 * Math.pow(256,power)));
}
return num;
}
}

Пос­ле опре­деле­ния фун­кции ее необ­ходимо зарегис­три­ровать в кон­фигура­ции, передав имя, наз­вание клас­са и метода, реали­зующе­го фун­кцию в качес­тве парамет­ров:

engineConfig.addPlugInSingleRowFunction("ipToInt", "MyEsperUtils", "ipToInt");

Те­перь все готово, что­бы исполь­зовать нашу новую фун­кцию в EPL-выраже­нии для выяв­ления зап­рещен­ных соеди­нений меж­ду сег­мента­ми wifi и database:

select src_ip,dst_ip,action,src_net.description,dst_net.description
from firewall as fw,
sql:mysql ['select description from ipplan
where ${ipToInt(src_ip)} between startaddr and endaddr'] as src_net,
sql:mysql ['select description from ipplan
where ${ipToInt(dst_ip)} between startaddr and endaddr'] as dst_net
where src_net.description = 'wifi' and
dst_net.description='database' and action='permit'
output first every 1 hour

Бла­года­ря воз­можнос­ти опре­делять свои собс­твен­ные фун­кции мож­но сущес­твен­но рас­ширить фун­кци­онал EPL-пра­вил, нап­ример добавить под­дер­жку опре­деле­ния геог­рафичес­ких коор­динат по IP-адре­сам, исполь­зуя базы GeoIP, и изме­рять рас­сто­яние меж­ду дву­мя событи­ями.

 

Отладка и поиск ошибок

От­ладка и поиск оши­бок нераз­рывно свя­заны с раз­работ­кой соф­та. Ребята из Esper позабо­тились об упро­щении этой задачи. В качес­тве дефол­тно­го ком­понен­та логиро­вания исполь­зует­ся Log4j. Что­бы задать его парамет­ры, дос­таточ­но под­клю­чить файл кон­фигура­ции log4j.xml при запус­ке при­ложе­ния с помощью свой­ства log4j.configuration (за осно­ву мож­но взять файл, пос­тавля­емый вмес­те с самой биб­лиоте­кой из дирек­тории esper\etc):

java -Dlog4j.configuration=log4j.xml ...

Для отладки EPL-выраже­ний удоб­но исполь­зовать анно­тацию @Audit, которую необ­ходимо вста­вить перед самим тек­стом пра­вила:

@Audit('stream,property') select src_ip,dst_ip,dst_port from firewall

В парамет­рах анно­тации перечис­ляют­ся катего­рии опе­раций, которые необ­ходимо логиро­вать. Так, катего­рия stream отве­чает за вывод каж­дого события, получа­емо­го пра­вилом, а property — за отоб­ражение наз­вания полей событий и их зна­чений. Таких катего­рий в Esper нас­читыва­ется более десят­ка.

Отладка EPL-выражений в действии
От­ладка EPL-выраже­ний в дей­ствии

Дру­гая инте­рес­ная воз­можность, которая может быть полез­на для опти­миза­ции EPL-выраже­ний, — это вывод пла­на зап­роса. Вклю­чение вывода пла­на может быть выпол­нено сле­дующим обра­зом:

engineConfig.getEngineDefaults().getLogging().setEnableQueryPlan(true);

Вклю­чение логиро­вания резуль­татов работы SQL-зап­росов при работе с внеш­ними источни­ками дела­ется похожим спо­собом:

engineConfig.getEngineDefaults().getLogging().setEnableJDBC(true);

Бла­года­ря этой опции мож­но уви­деть, как работа­ет кеширо­вание, сколь­ко вре­мени выпол­нялся зап­рос и сколь­ко зна­чений было воз­вра­щено СУБД.

info

Esper под­держи­вает дос­туп к СУБД через jdbc, к нереля­цион­ным дан­ным через вызов Java-методов, опре­деле­ние собс­твен­ных фун­кций для EPL, все это поз­воля­ет гиб­ко рас­ширять фун­кци­онал.

 

Интеграция с Logstash

Кор­реляция не заменя­ет задачу сбо­ра и хра­нения логов, ее резуль­таты слу­жат отправ­ной точ­кой для реаги­рова­ния на инци­дент, зафик­сирован­ный с помощью пра­вила. Отличным набором для орга­низа­ции логиро­вания и быс­тро­го поис­ка по событи­ям явля­ется ком­бинация Elasticsearch — Logstash — Kibana (ELK). Elasticsearch пред­став­ляет собой поис­ковый индекс (БД) для хра­нения и поис­ка событий, Kibana — удоб­ное средс­тво для визу­али­зации и поис­ка хра­нимых дан­ных, Logstash — гиб­кий и уни­вер­саль­ный пар­сер. В этой связ­ке явно не хва­тает кор­релято­ра, что­бы приб­лизить это решение к гор­дому наз­ванию SIEM. Устра­ним этот недос­таток; к счастью, кар­кас у нас уже готов. Все, что оста­лось сде­лать, — это нап­равить события из Logstash в кор­релятор, а резуль­таты работы сох­ранить в Elasticsearch. Интегра­цию будем про­водить с исполь­зовани­ем Redis, который явля­ется рекомен­дован­ным «бро­кером».

Пример интеграции Esper и ELK
При­мер интегра­ции Esper и ELK

Для демонс­тра­ции в Redis будет два спис­ка:

  • input — содер­жит обра­ботан­ные Logstash события, пред­став­ленные в JSON-фор­мате. Дан­ные из это­го спис­ка будут пос­тупать на вход кор­релято­ра;
  • alerts — в этот спи­сок кор­релятор будет выг­ружать резуль­тат сра­баты­вания пра­вил.

Для прос­тоты будем счи­тать, что события опи­саны с помощью java.util.Map. Забирать события из Redis будем с помощью биб­лиоте­ки Jedis, вызывая в цик­ле сле­дующий фраг­мент кода (здесь и далее по тек­сту не при­водит­ся обра­бот­ка исклю­чений и ини­циали­зация Jedis):

Jedis jedisTake = jedisFactory.getJedisPool().getResource();
// Получаем события из Redis
List<String> events = jedisTake.blpop(0,input);
String event = events.get(1);
JSONObject eventJson = new JSONObject(event);
// Получаем тип события
String type = eventJson.getString("type");
Map<String, Object> eventMap = new HashMap<String, Object>();
Iterator<String> keys = eventJson.keys();
while(keys.hasNext()){
String key = keys.next();
String value = eventJson.getString(key);
// Наполняем поля события для отправки в коррелятор
eventMap.put(key, value);
}
// Отправляем в коррелятор
runtime.sendEvent(eventMap,type);

В резуль­тате события из спис­ка input попадут на вход кор­релято­ра. Для получе­ния алер­тов необ­ходимо модифи­циро­вать метод update, который пре­обра­зует резуль­тат работы пра­вила в фор­мат JSON и сох­ранит его в спис­ке alerts:

public void update(EventBean[] newEvents, EventBean[] oldEvents) {
Jedis jedisPublish = jedisFactory.getJedisPool().getResource();
Pipeline pipe = jedisPublish.pipelined();
String alertEvent = "";
for(int i=0; i<newEvents.length; i++){
// Получаем тип события
EventType eventType = newEvents[i].getEventType();
jsonRenderer = runtime.getEventRenderer().getJSONRenderer(eventType);
// Формируем JSON-представление алерта
alertEvent = jsonRenderer.render(newEvents[i]);
// Сохраняем алерт в Redis
jedisPublish.rpush(alerts, alertEvent);
}
pipe.sync();
}

Ос­талось толь­ко нас­тро­ить Logstash для перек­ладыва­ния алер­тов из Redis в Elasticsearch, и мож­но исполь­зовать Kibana для ана­лиза инци­ден­тов.

Результат интеграции с ELK-стеком
Ре­зуль­тат интегра­ции с ELK-сте­ком

warning

При работе с внеш­ними источни­ками обя­затель­но тес­тируй пра­вила на про­изво­дитель­ность, Esper под­держи­вает хорошие воз­можнос­ти для отладки и поис­ка узких мест.

 

Заключение

Итак, мы рас­смот­рели все основные шаги, необ­ходимые для соз­дания сво­ей под­систе­мы кор­реляции с помощью Java-вер­сии биб­лиоте­ки Esper, бла­года­ря работе с внеш­ними источни­ками дан­ных и собс­твен­ным фун­кци­ям сде­лали исполь­зование нашего кор­релято­ра более удоб­ным. Добавив Esper к связ­ке Elasticsearch — Logstash — Kibana, мы приб­лизили ELK-стек к пол­ноцен­ной SIEM. Что­бы ты луч­ше разоб­рался в рас­смот­ренном матери­але, я под­готовил де­моп­риложе­ние, которое может стать отправ­ной точ­кой для решения тво­их собс­твен­ных задач из раз­личных областей, где тре­бует­ся слож­ная обра­бот­ка событий с минималь­ной задер­жкой. Удач­ных экспе­римен­тов

Оставить мнение