Фреймворк Netty появился относительно недавно, но с каждым годом он только набирает популярность. В 2011 году проект был удостоен награды Duke’s Choice за инновации в сетевом программировании. А уже сегодня его используют в своих разработках такие гиганты, как Apple, Twitter, Facebook, Google и Instagram. На базе Netty построены многие известные проекты с открытым кодом: Infinispan, HornetQ, Vert.x, Apache Cassandra и Elasticsearch. Попробуем разобраться, чем Netty так привлекает программистов со всего мира и что он может предложить такого, чего бы не было в JDK.

 

Работа с сетью в Java

Изначально в Java использовались только блокирующие сокеты. Это значит, что функция чтения из сокета ожидает до тех пор, пока не считается хотя бы один байт или не произошел разрыв соединения. Функция записи в сокет ожидает, пока все данные не будут переданы. Чтобы обработать несколько клиентов, требуется выделять по потоку на клиента. Чем больше клиентов, тем больше потоков. И все вроде бы хорошо, но переключение между потоками занимает значительную часть процессорного времени, а потоки большую часть времени просто простаивают. Так что переключаться приходится часто, нагрузка на систему растет и чем больше клиентов подключается, тем медленнее они получают ответ сервера.

Сервер с блокирующими сокетами
Сервер с блокирующими сокетами

К счастью, сокеты можно настроить таким образом, чтобы сразу узнавать, есть там данные или нет, при чтении и не ждать передачи всех данных при записи. Это так называемые неблокирующие сокеты. Они позволяют одному потоку взаимодействовать с несколькими клиентами. В Java поддержка неблокирующих сокетов появилась только в версии 1.4 в виде Java NIO. Java NIO первоначально расшифровывалось как New IO, но теперь оно уже никакое не новое, поэтому и называется Non-blocking IO.

Java NIO состоит из трех главных компонентов: каналов, буферов и селекторов.

Каналы похожи на потоки ввода/вывода (stream) с небольшими отличиями. Они обеспечивают двухстороннюю передачу данных, в один канал можно и писать, и читать. Чтение из канала и запись данных происходит асинхронно. Каналы всегда читают данные из буфера и пишут в буфер. Основные реализации каналов: FileChannel, DatagramChannel, SocketChannel, ServerSocketChannel.


Буфер представляет собой блок данных в памяти ограниченного размера и набор методов, облегчающих работу с ним. Буфер можно переключать в режим чтения и в режим записи. Один канал может работать с несколькими буферами, что позволяет не копировать данные на разных уровнях обработки. Если нужно, к примеру, добавить к данным заголовок, достаточно просто создать отдельный буфер для него, не копируя при этом сами данные. Основные классы буферов: ByteBuffer, MappedByteBuffer, CharBuffer, DoubleBuffer и другие.

Селектор — это компонент, который работает с каналами. В одном селекторе можно зарегистрировать несколько каналов, указав, какие именно события нужно отслеживать. Бывает четыре вида событий: «соединение с сервером установлено», «входящее соединение принято», «можно читать данные», «можно писать данные». Метод select блокирует выполнение кода, пока хотя бы в одном из каналов не произойдет интересующее селектор событие. Есть также и неблокирующий аналог метода select.

Сервер с неблокирующими сокетами
Сервер с неблокирующими сокетами

В Java 7 появилось расширение NIO под названием NIO2. В нем архитектура взаимодействия с каналами была немного изменена. Теперь стало возможным получать и отправлять данные асинхронно. Больше не нужно постоянно проверять, не появилось ли новое событие канала. Достаточно запустить операцию и зарегистрировать слушателя, который узнает о ее выполнении. Либо можно воспользоваться классом Future. Объект Future моделирует выполняемую операцию. С его помощью можно проверить, выполнена ли операция или заблокировать выполнение потока, пока не будет получен результат.

Таким образом, в Java существует три способа работы с сетевыми клиентами. Netty умеет работать со всеми тремя. Кроме того, Netty успешно борется с некоторыми проблемами NIO и NIO2. Например, NIO может работать немного по-разному на разных платформах, так как NIO реализован на низком уровне. И если какие-то тесты работают в Linux, нет стопроцентной гарантии, что они будут работать в Windows. NIO2 поддерживается только с седьмой версии Java. Так что для пользователей шестой Java придется писать свою версию сервера. Netty же предоставляет единый интерфейс работы с NIO, NIO2 и даже блокирующими сокетами — просто указываем, какой из фреймворков нужен.

Использование нескольких буферов для одного канала содержит утечку памяти для некоторых версий седьмой и шестой Java. А обновить Java на сервере не всегда представляется возможным. В Netty реализованы свои классы буферов, которые работают исправно.

Еще одна неприятность может подстерегать пользователей Linux-систем. Согласно документации, селектор должен ожидать появления события, но из-за ошибки в системе уведомлений epoll может возникать ситуация, когда селектор выходит из блокировки, даже если событий канала нет. В результате он начинает лихорадочно проверять каналы в цикле и съедает процессор на 100%. Netty справляется с этой и другими проблемами.

Netty также избавляет от сложностей асинхронного кода. Рассмотрим, к примеру, описанный выше селектор. Метод select может вернуть разнообразные события (можно писать в канал, можно читать, «клиент присоединился»), чтобы разобраться в этой мешанине, придется писать огромную if-else конструкцию, поддерживать которую — еще то веселье. Поэтому в Netty используется паттерн реактор. Каждому событию можно назначить свой обработчик. В Netty этот паттерн получил дальнейшее развитие. Каждому событию можно назначить цепочку обработчиков, выходное значение первого обработчика служит входным значением второго обработчика и так далее. Это облегчает последовательную обработку информации. К примеру, если клиент присылает сжатые зашифрованные данные, то первый обработчик разархивирует данные, второй расшифровывает, а третий уже непосредственно их обрабатывает. Если потребуется еще какая-то дополнительная обработка данных, то достаточно просто вставить новый обработчик в эту цепочку.

 

Основные компоненты Netty

Приложение Netty начинается с классов Bootstrap и ServerBootstrap. Bootstrap занимается инициализацией и конфигурацией инфраструктуры клиента, ServerBootstrap инициализирует сервер.

За обработку данных отвечают экземпляры ChannelHandler. Обработчики переводят объекты в бинарные данные и наоборот, а также предоставляют метод обработки ошибок, которые возникают в процессе. Таким образом, вся бизнес-логика происходит в обработчике. Обработчики разделяются на два типа: ChannelInboundHandler и ChannelOutboundHandler. Первый тип для работы с входящими данными, второй — с исходящими.

Когда Netty подключается к серверу или принимает соединение от клиента, он должен знать, как обрабатывать данные, которые принимаются и отсылаются. Инициализацией и конфигурацией обработчиков данных занимается ChannelInitializer. Он добавляет реализации ChannelHandler к ChannelPipeline. ChannelPipeline передает данные на обработку всем обработчикам в порядке, в котором они были добавлены. Каждому последующему обработчику передаются данные, уже обработанные в предыдущем.

Все операции ввода-вывода для канала выполняются в цикле событий EventLoop. Несколько циклов событий объединяются в группу EventLoopGroup. Bootstrap клиента создает один экземпляр EventLoopGroup. ServerBootstrap для сервера создает два экземпляра EventLoopGroup. Один экземпляр только принимает соединения от клиентов, второй обрабатывает остальные события: чтение/запись данных и так далее. Это помогает избежать тайм-аута при подключении к серверу новых клиентов.

Сервер Netty
Сервер Netty

Когда канал регистрируется, он привязывается к определенному циклу событий на все время своего существования. Цикл событий всегда выполняется в одном и том же потоке, поэтому не нужно заботиться о синхронизации операций ввода-вывода канала. Поскольку обычно один EventLoop работает с несколькими каналами, важно не выполнять никаких блокирующих операций в ChannelHandler. Но если это все же требуется, то Netty предоставляет возможность указать EventExecutorGroup при регистрации канала, EventExecutor которого будет выполнять все методы ChannelHandler в отдельном потоке, не нагружая EventLoop канала.

Все операции в Netty выполняются асинхронно. Это значит, что операция будет выполнена не сразу, а через некоторое время. Чтобы понять, выполнилась операция или нет, Netty предоставляет Future и ChannelFuture. Future позволяет зарегистрировать слушателя, который будет уведомлен о выполнении операции. ChannelFuture может блокировать выполнение потока до окончания выполнения операции.

 

Пример работы: «умный дом»

Пришло время самого интересного — посмотреть, насколько удобно этим всем пользоваться. Напишем небольшое клиент-серверное приложение, используя Netty. Возьмем, к примеру, модную в наше время концепцию умного дома. Сервер на Netty будет обрабатывать данные с датчиков — клиентов Netty и посылать им нужные команды. Воспользуемся преимуществами ChannelPipeline и реализуем свой протокол передачи данных между клиентом и сервером.

Для протокола данных удобнее всего воспользоваться разработкой компании Google, утилитой protobuf. Protobuf — это механизм передачи структурированных данных, который не зависит от языка программирования или платформы. Нужно только написать каркас протокола передачи данных и сгенерировать специальный класс на нужном языке программирования.

Нам понадобятся два вида сообщений. SensorData — сообщение от датчика, в котором содержится статус, тип датчика и какое-то строковое значение. Command — сообщение от сервера с кодом команды, которую нужно выполнить.

enum Status {
    OK = 0;
    ERROR = 1;
}

enum SensorType {
    TEMPERATURE = 0;
    MOVEMENT = 1;
    SMOKE = 2;
}

message SensorData {
  required Status status = 1;
  required SensorType sensorType = 2;
  required string message = 3;
}

message Command {
    required uint32 code = 1;
}

Описываем протокол в файле с расширением proto и запускаем команду для генерации Java-класса:

$ protoc src/net/xakep/netty/smarthouse/data/SensorDataProtocol.proto --java_out=src/

Класс работы с нашим протоколом готов. Можно приступать к реализации сервера. Инициализацию сервера выполняет класс ServerBootstrap. Для работы с клиентами будем использовать неблокирующие сокеты, поэтому новые соединения принимает класс NioEventLoopGroup, а в качестве канала связи используется NioServerSocketChannel. Настройку канала общения с клиентом выполняет класс SmartHouseServerInitializer.

EventLoopGroup group = new NioEventLoopGroup();
ServerBootstrap b = new ServerBootstrap();
b.group(group)
    .channel(NioServerSocketChannel.class)
    .childHandler(new SmartHouseServerInitializer());

Открываем соединение на нужном порту и вызываем метод sync, который ожидает завершения операции. Метод sync вернет экземпляр ChannelFuture. Используем его, чтобы блокировать выполнение главного потока, пока канал сервера не будет закрыт:

ChannelFuture future = b.bind(PORT).sync();
future.channel().closeFuture().sync();

В конце работы сервера освобождаем ресурсы и потоки, которыми пользовался цикл событий:

group.shutdownGracefully();

Класс SmartHouseServerInitializer наследуем от ChannelInitializer<SocketChannel>. В методе initChannel добавляем обработчики входящих и исходящих данных:

ChannelPipeline p = socketChannel.pipeline();
p.addLast(new ProtobufVarint32FrameDecoder());
p.addLast(new ProtobufDecoder(SensorDataProtocol.SensorData.getDefaultInstance()));
p.addLast(new ProtobufVarint32LengthFieldPrepender());
p.addLast(new ProtobufEncoder());
p.addLast(new SmartHouseServerHandler());

Когда ChannelPipeline получает новые входящие данные, он вызывает последовательно ProtobufVarint32FrameDecoder и ProtobufDecoder (и любые другие наследники от ChannelInboundHandler), которые преобразуют входные данные из массива байтов в объект SensorData. Этот объект подается на вход SmartHouseServerHandler для обработки. Когда в ChannelPipeline поступают исходящие данные, вызываются последовательно ProtobufVarint32LengthFieldPrepender и ProtobufEncoder (и любые другие наследники от ChannelOutboundHandler), которые переводят исходящий объект в байтовые данные. Подробнее о работе protobuf-обработчиков можно узнать из документации Netty.

Обработчик SmartHouseServerHandler наследован от SimpleChannelInboundHandler. SimpleChannelInboundHandler удобно использовать, если требуется прочитать сообщение определенного типа. В нашем случае это сообщения типа SensorData. Нам потребуется реализовать три метода. Метод channelRead0 позволяет обработать полученное сообщение указанного типа. Для простоты сервер будет просто посылать клиенту команду ничего не делать:

protected void channelRead0(ChannelHandlerContext ctx, SensorDataProtocol.SensorData sensorData) throws Exception {
    Command.Builder builder = Command.newBuilder().setCode(COMMAND_IDLE);
    ctx.write(builder.build());
}

Метод write записывает исходящие данные в буфер, но не отправляет их клиенту. Для этого служит метод flush, который вызывается в channelReadComplete:

public void channelReadComplete(ChannelHandlerContext ctx) {
    ctx.flush();
}

Для обработки исключений, которые возникают при работе канала, используется метод exceptionCaught. Для простоты в случае ошибки будем выводить ее стек и закрывать канал:

public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
    cause.printStackTrace();
    ctx.close();
}

Теперь займемся клиентом. Для инициализации клиента используется класс Bootstrap. Настройкой канала соединения с сервером занимается класс SmartHouseClientInitializer. Клиенту не нужно держать соединение, поэтому после того, как данные отправлены, канал закрывается:

EventLoopGroup group = new NioEventLoopGroup();
try {
    Bootstrap b = new Bootstrap();
    b.group(group)
            .channel(NioSocketChannel.class)
            .handler(new SmartHouseClientInitializer());
    Channel channel = b.connect(HOST, PORT).sync().channel();
    sendData(channel);
    channel.close();
} finally {
    group.shutdownGracefully();
}

Реализация SmartHouseClientInitializer похожа на реализацию SmartHouseServerInitializer, только объект мы ожидаем класса Command и обрабатывать его будет класс SmartHouseClientHandler:

pipeline.addLast(new ProtobufDecoder(SensorDataProtocol.Command.getDefaultInstance()));
...
pipeline.addLast(new SmartHouseClientHandler());

SmartHouseClientHandler тоже является наследником SimpleChannelInboundHandler. Для получения данных от сервера воспользуемся блокирующей очередью. Она приостановит выполнение программы, пока в нее не будут добавлены новые данные:

private final BlockingQueue<SensorDataProtocol.Command> answer = new LinkedBlockingQueue<SensorDataProtocol.Command>();
protected void channelRead0(ChannelHandlerContext channelHandlerContext, SensorDataProtocol.Command command) throws Exception {
    answer.add(command);
}

Отправлять данные на сервер будет метод sendUpdate. В нем используется writeAndFlush, чтобы сразу передать данные в сеть. Метод sendUpdate возвращает полученную от сервера команду:

public SensorDataProtocol.Command sendUpdate(SensorDataProtocol.Status status, SensorDataProtocol.SensorType sensorType, String message) {
    SensorData.Builder builder = SensorData.newBuilder();
    builder.setStatus(status)
           .setSensorType(sensorType)
           .setMessage(message);
    channel.writeAndFlush(builder.build());
    SensorDataProtocol.Command command = answer.take();
    return command;
}

Осталось передать на сервер тестовые данные, например, что температура дома 23 градуса:

void sendData(Channel channel) {
    SmartHouseClientHandler handler = channel.pipeline().get(SmartHouseClientHandler.class);
    SensorDataProtocol.Command command = handler.sendUpdate(Status.OK, SensorType.TEMPERATURE, "23");
    ...
}
 

Вывод

Таким образом, с помощью Netty можно быстро и просто написать любое быстрое клиент-серверное приложение, которое к тому же будет легко расширяться и масштабироваться. Если для обработки клиентов не хватает одного потока, следует всего лишь передать нужное число потоков в конструктор EventLoopGroup. Если на какой-то стадии развития проекта понадобится дополнительная обработка данных, не нужно переписывать код, достаточно добавить новый обработчик в ChannelPipeline, что значительно упрощает поддержку приложения.

Netty позволяет передавать данные через TCP или UDP, поддерживает множество протоколов, таких как HTTP, FTP, SMTP, WebSockets, SSL/TLC, SPDY. API хорошо документирован, на гитхабе выложено множество примеров с подробными комментариями. А все возрастающее сообщество свидетельствует о том, что продукт получился хороший, достойный внимания и активного использования.

  • Подпишись на наc в Telegram!

    Только важные новости и лучшие статьи

    Подписаться

  • Подписаться
    Уведомить о
    3 комментариев
    Старые
    Новые Популярные
    Межтекстовые Отзывы
    Посмотреть все комментарии