Фреймворк Netty появился относительно недавно, но с каждым годом он только набирает популярность. В 2011 году проект был удостоен награды Duke’s Choice за инновации в сетевом программировании. А уже сегодня его используют в своих разработках такие гиганты, как Apple, Twitter, Facebook, Google и Instagram. На базе Netty построены многие известные проекты с открытым кодом: Infinispan, HornetQ, Vert.x, Apache Cassandra и Elasticsearch. Попробуем разобраться, чем Netty так привлекает программистов со всего мира и что он может предложить такого, чего бы не было в JDK.
Работа с сетью в Java
Изначально в Java использовались только блокирующие сокеты. Это значит, что функция чтения из сокета ожидает до тех пор, пока не считается хотя бы один байт или не произошел разрыв соединения. Функция записи в сокет ожидает, пока все данные не будут переданы. Чтобы обработать несколько клиентов, требуется выделять по потоку на клиента. Чем больше клиентов, тем больше потоков. И все вроде бы хорошо, но переключение между потоками занимает значительную часть процессорного времени, а потоки большую часть времени просто простаивают. Так что переключаться приходится часто, нагрузка на систему растет и чем больше клиентов подключается, тем медленнее они получают ответ сервера.
Xakep #198. Случайностей не бывает
К счастью, сокеты можно настроить таким образом, чтобы сразу узнавать, есть там данные или нет, при чтении и не ждать передачи всех данных при записи. Это так называемые неблокирующие сокеты. Они позволяют одному потоку взаимодействовать с несколькими клиентами. В Java поддержка неблокирующих сокетов появилась только в версии 1.4 в виде Java NIO. Java NIO первоначально расшифровывалось как New IO, но теперь оно уже никакое не новое, поэтому и называется Non-blocking IO.
Java NIO состоит из трех главных компонентов: каналов, буферов и селекторов.
Каналы похожи на потоки ввода/вывода (stream) с небольшими отличиями. Они обеспечивают двухстороннюю передачу данных, в один канал можно и писать, и читать. Чтение из канала и запись данных происходит асинхронно. Каналы всегда читают данные из буфера и пишут в буфер. Основные реализации каналов: FileChannel, DatagramChannel, SocketChannel, ServerSocketChannel.
Буфер представляет собой блок данных в памяти ограниченного размера и набор методов, облегчающих работу с ним. Буфер можно переключать в режим чтения и в режим записи. Один канал может работать с несколькими буферами, что позволяет не копировать данные на разных уровнях обработки. Если нужно, к примеру, добавить к данным заголовок, достаточно просто создать отдельный буфер для него, не копируя при этом сами данные. Основные классы буферов: ByteBuffer, MappedByteBuffer, CharBuffer, DoubleBuffer и другие.
Селектор — это компонент, который работает с каналами. В одном селекторе можно зарегистрировать несколько каналов, указав, какие именно события нужно отслеживать. Бывает четыре вида событий: «соединение с сервером установлено», «входящее соединение принято», «можно читать данные», «можно писать данные». Метод select блокирует выполнение кода, пока хотя бы в одном из каналов не произойдет интересующее селектор событие. Есть также и неблокирующий аналог метода select.
В Java 7 появилось расширение NIO под названием NIO2. В нем архитектура взаимодействия с каналами была немного изменена. Теперь стало возможным получать и отправлять данные асинхронно. Больше не нужно постоянно проверять, не появилось ли новое событие канала. Достаточно запустить операцию и зарегистрировать слушателя, который узнает о ее выполнении. Либо можно воспользоваться классом Future. Объект Future моделирует выполняемую операцию. С его помощью можно проверить, выполнена ли операция или заблокировать выполнение потока, пока не будет получен результат.
Таким образом, в Java существует три способа работы с сетевыми клиентами. Netty умеет работать со всеми тремя. Кроме того, Netty успешно борется с некоторыми проблемами NIO и NIO2. Например, NIO может работать немного по-разному на разных платформах, так как NIO реализован на низком уровне. И если какие-то тесты работают в Linux, нет стопроцентной гарантии, что они будут работать в Windows. NIO2 поддерживается только с седьмой версии Java. Так что для пользователей шестой Java придется писать свою версию сервера. Netty же предоставляет единый интерфейс работы с NIO, NIO2 и даже блокирующими сокетами — просто указываем, какой из фреймворков нужен.
Использование нескольких буферов для одного канала содержит утечку памяти для некоторых версий седьмой и шестой Java. А обновить Java на сервере не всегда представляется возможным. В Netty реализованы свои классы буферов, которые работают исправно.
Еще одна неприятность может подстерегать пользователей Linux-систем. Согласно документации, селектор должен ожидать появления события, но из-за ошибки в системе уведомлений epoll может возникать ситуация, когда селектор выходит из блокировки, даже если событий канала нет. В результате он начинает лихорадочно проверять каналы в цикле и съедает процессор на 100%. Netty справляется с этой и другими проблемами.
Netty также избавляет от сложностей асинхронного кода. Рассмотрим, к примеру, описанный выше селектор. Метод select может вернуть разнообразные события (можно писать в канал, можно читать, «клиент присоединился»), чтобы разобраться в этой мешанине, придется писать огромную if-else конструкцию, поддерживать которую — еще то веселье. Поэтому в Netty используется паттерн реактор. Каждому событию можно назначить свой обработчик. В Netty этот паттерн получил дальнейшее развитие. Каждому событию можно назначить цепочку обработчиков, выходное значение первого обработчика служит входным значением второго обработчика и так далее. Это облегчает последовательную обработку информации. К примеру, если клиент присылает сжатые зашифрованные данные, то первый обработчик разархивирует данные, второй расшифровывает, а третий уже непосредственно их обрабатывает. Если потребуется еще какая-то дополнительная обработка данных, то достаточно просто вставить новый обработчик в эту цепочку.
Основные компоненты Netty
Приложение Netty начинается с классов Bootstrap и ServerBootstrap. Bootstrap занимается инициализацией и конфигурацией инфраструктуры клиента, ServerBootstrap инициализирует сервер.
За обработку данных отвечают экземпляры ChannelHandler. Обработчики переводят объекты в бинарные данные и наоборот, а также предоставляют метод обработки ошибок, которые возникают в процессе. Таким образом, вся бизнес-логика происходит в обработчике. Обработчики разделяются на два типа: ChannelInboundHandler и ChannelOutboundHandler. Первый тип для работы с входящими данными, второй — с исходящими.
Когда Netty подключается к серверу или принимает соединение от клиента, он должен знать, как обрабатывать данные, которые принимаются и отсылаются. Инициализацией и конфигурацией обработчиков данных занимается ChannelInitializer. Он добавляет реализации ChannelHandler к ChannelPipeline. ChannelPipeline передает данные на обработку всем обработчикам в порядке, в котором они были добавлены. Каждому последующему обработчику передаются данные, уже обработанные в предыдущем.
Все операции ввода-вывода для канала выполняются в цикле событий EventLoop. Несколько циклов событий объединяются в группу EventLoopGroup. Bootstrap клиента создает один экземпляр EventLoopGroup. ServerBootstrap для сервера создает два экземпляра EventLoopGroup. Один экземпляр только принимает соединения от клиентов, второй обрабатывает остальные события: чтение/запись данных и так далее. Это помогает избежать тайм-аута при подключении к серверу новых клиентов.
Когда канал регистрируется, он привязывается к определенному циклу событий на все время своего существования. Цикл событий всегда выполняется в одном и том же потоке, поэтому не нужно заботиться о синхронизации операций ввода-вывода канала. Поскольку обычно один EventLoop работает с несколькими каналами, важно не выполнять никаких блокирующих операций в ChannelHandler. Но если это все же требуется, то Netty предоставляет возможность указать EventExecutorGroup при регистрации канала, EventExecutor которого будет выполнять все методы ChannelHandler в отдельном потоке, не нагружая EventLoop канала.
Все операции в Netty выполняются асинхронно. Это значит, что операция будет выполнена не сразу, а через некоторое время. Чтобы понять, выполнилась операция или нет, Netty предоставляет Future и ChannelFuture. Future позволяет зарегистрировать слушателя, который будет уведомлен о выполнении операции. ChannelFuture может блокировать выполнение потока до окончания выполнения операции.
Пример работы: «умный дом»
Пришло время самого интересного — посмотреть, насколько удобно этим всем пользоваться. Напишем небольшое клиент-серверное приложение, используя Netty. Возьмем, к примеру, модную в наше время концепцию умного дома. Сервер на Netty будет обрабатывать данные с датчиков — клиентов Netty и посылать им нужные команды. Воспользуемся преимуществами ChannelPipeline и реализуем свой протокол передачи данных между клиентом и сервером.
Для протокола данных удобнее всего воспользоваться разработкой компании Google, утилитой protobuf. Protobuf — это механизм передачи структурированных данных, который не зависит от языка программирования или платформы. Нужно только написать каркас протокола передачи данных и сгенерировать специальный класс на нужном языке программирования.
Нам понадобятся два вида сообщений. SensorData — сообщение от датчика, в котором содержится статус, тип датчика и какое-то строковое значение. Command — сообщение от сервера с кодом команды, которую нужно выполнить.
enum Status {
OK = 0;
ERROR = 1;
}
enum SensorType {
TEMPERATURE = 0;
MOVEMENT = 1;
SMOKE = 2;
}
message SensorData {
required Status status = 1;
required SensorType sensorType = 2;
required string message = 3;
}
message Command {
required uint32 code = 1;
}
Описываем протокол в файле с расширением proto и запускаем команду для генерации Java-класса:
$ protoc src/net/xakep/netty/smarthouse/data/SensorDataProtocol.proto --java_out=src/
Класс работы с нашим протоколом готов. Можно приступать к реализации сервера. Инициализацию сервера выполняет класс ServerBootstrap
. Для работы с клиентами будем использовать неблокирующие сокеты, поэтому новые соединения принимает класс NioEventLoopGroup
, а в качестве канала связи используется NioServerSocketChannel
. Настройку канала общения с клиентом выполняет класс SmartHouseServerInitializer
.
EventLoopGroup group = new NioEventLoopGroup();
ServerBootstrap b = new ServerBootstrap();
b.group(group)
.channel(NioServerSocketChannel.class)
.childHandler(new SmartHouseServerInitializer());
Открываем соединение на нужном порту и вызываем метод sync, который ожидает завершения операции. Метод sync вернет экземпляр ChannelFuture
. Используем его, чтобы блокировать выполнение главного потока, пока канал сервера не будет закрыт:
ChannelFuture future = b.bind(PORT).sync();
future.channel().closeFuture().sync();
В конце работы сервера освобождаем ресурсы и потоки, которыми пользовался цикл событий:
group.shutdownGracefully();
Класс SmartHouseServerInitializer
наследуем от ChannelInitializer<SocketChannel>
. В методе initChannel
добавляем обработчики входящих и исходящих данных:
ChannelPipeline p = socketChannel.pipeline();
p.addLast(new ProtobufVarint32FrameDecoder());
p.addLast(new ProtobufDecoder(SensorDataProtocol.SensorData.getDefaultInstance()));
p.addLast(new ProtobufVarint32LengthFieldPrepender());
p.addLast(new ProtobufEncoder());
p.addLast(new SmartHouseServerHandler());
Когда ChannelPipeline
получает новые входящие данные, он вызывает последовательно ProtobufVarint32FrameDecoder
и ProtobufDecoder
(и любые другие наследники от ChannelInboundHandler
), которые преобразуют входные данные из массива байтов в объект SensorData. Этот объект подается на вход SmartHouseServerHandler
для обработки. Когда в ChannelPipeline
поступают исходящие данные, вызываются последовательно ProtobufVarint32LengthFieldPrepender
и ProtobufEncoder
(и любые другие наследники от ChannelOutboundHandler
), которые переводят исходящий объект в байтовые данные. Подробнее о работе protobuf-обработчиков можно узнать из документации Netty.
Обработчик SmartHouseServerHandler
наследован от SimpleChannelInboundHandler
. SimpleChannelInboundHandler
удобно использовать, если требуется прочитать сообщение определенного типа. В нашем случае это сообщения типа SensorData. Нам потребуется реализовать три метода. Метод channelRead0 позволяет обработать полученное сообщение указанного типа. Для простоты сервер будет просто посылать клиенту команду ничего не делать:
protected void channelRead0(ChannelHandlerContext ctx, SensorDataProtocol.SensorData sensorData) throws Exception {
Command.Builder builder = Command.newBuilder().setCode(COMMAND_IDLE);
ctx.write(builder.build());
}
Метод write записывает исходящие данные в буфер, но не отправляет их клиенту. Для этого служит метод flush
, который вызывается в channelReadComplete
:
public void channelReadComplete(ChannelHandlerContext ctx) {
ctx.flush();
}
Для обработки исключений, которые возникают при работе канала, используется метод exceptionCaught
. Для простоты в случае ошибки будем выводить ее стек и закрывать канал:
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
Теперь займемся клиентом. Для инициализации клиента используется класс Bootstrap
. Настройкой канала соединения с сервером занимается класс SmartHouseClientInitializer
. Клиенту не нужно держать соединение, поэтому после того, как данные отправлены, канал закрывается:
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
b.group(group)
.channel(NioSocketChannel.class)
.handler(new SmartHouseClientInitializer());
Channel channel = b.connect(HOST, PORT).sync().channel();
sendData(channel);
channel.close();
} finally {
group.shutdownGracefully();
}
Реализация SmartHouseClientInitializer
похожа на реализацию SmartHouseServerInitializer
, только объект мы ожидаем класса Command
и обрабатывать его будет класс SmartHouseClientHandler
:
pipeline.addLast(new ProtobufDecoder(SensorDataProtocol.Command.getDefaultInstance()));
...
pipeline.addLast(new SmartHouseClientHandler());
SmartHouseClientHandler
тоже является наследником SimpleChannelInboundHandler
. Для получения данных от сервера воспользуемся блокирующей очередью. Она приостановит выполнение программы, пока в нее не будут добавлены новые данные:
private final BlockingQueue<SensorDataProtocol.Command> answer = new LinkedBlockingQueue<SensorDataProtocol.Command>();
protected void channelRead0(ChannelHandlerContext channelHandlerContext, SensorDataProtocol.Command command) throws Exception {
answer.add(command);
}
Отправлять данные на сервер будет метод sendUpdate
. В нем используется writeAndFlush
, чтобы сразу передать данные в сеть. Метод sendUpdate
возвращает полученную от сервера команду:
public SensorDataProtocol.Command sendUpdate(SensorDataProtocol.Status status, SensorDataProtocol.SensorType sensorType, String message) {
SensorData.Builder builder = SensorData.newBuilder();
builder.setStatus(status)
.setSensorType(sensorType)
.setMessage(message);
channel.writeAndFlush(builder.build());
SensorDataProtocol.Command command = answer.take();
return command;
}
Осталось передать на сервер тестовые данные, например, что температура дома 23 градуса:
void sendData(Channel channel) {
SmartHouseClientHandler handler = channel.pipeline().get(SmartHouseClientHandler.class);
SensorDataProtocol.Command command = handler.sendUpdate(Status.OK, SensorType.TEMPERATURE, "23");
...
}
Вывод
Таким образом, с помощью Netty можно быстро и просто написать любое быстрое клиент-серверное приложение, которое к тому же будет легко расширяться и масштабироваться. Если для обработки клиентов не хватает одного потока, следует всего лишь передать нужное число потоков в конструктор EventLoopGroup. Если на какой-то стадии развития проекта понадобится дополнительная обработка данных, не нужно переписывать код, достаточно добавить новый обработчик в ChannelPipeline, что значительно упрощает поддержку приложения.
Netty позволяет передавать данные через TCP или UDP, поддерживает множество протоколов, таких как HTTP, FTP, SMTP, WebSockets, SSL/TLC, SPDY. API хорошо документирован, на гитхабе выложено множество примеров с подробными комментариями. А все возрастающее сообщество свидетельствует о том, что продукт получился хороший, достойный внимания и активного использования.