/ in Russian, на русском

Флюссоник: чтение UDP MPEG-TS

Ещё одна статья в жанре одного из моих предыдущих постов, но на этот раз про чтение, а не раздачу. Решение проблемы по шагам.

Речь идёт о полировке фич нашего видеостримингового сервера Flussonic.

Хо-хо, Макс Лапшин, мой ментор и начальник, подкинул мне ещё одну интересную задачу. Итак, вот она:

Проблема

Флюссоник умеет захватывать MPEG-TS поток по UDP. Так как он написан на Erlang, то логично для этого использовать gen_udp. Опытным путём было установлено, что захват UDP видеопотока заметно нагружает CPU. (больше чем всё остальное)

На самом деле нетрудно понять в чём причина: gen_udp работает как порт-драйвер, и каждую пришедшую датаграмму отправляет как сообщение, а это хоть и дешёвая операция, но на больших объёмах даёт о себе знать.

MPEG-TS пакет занимает 188 байт, в MTU влезает 7 пакетов: 1316 байт, значит в идеале максимальный размер датаграммы 1316 байт. Возьмём 2-х мегабитный поток, поделим на размер датаграммы, получим 2000000/(1316*8) ≈ 190 сообщений в секунду.

Если мы возьмём 10-ти мегабитный поток (совсем не редкость), то всё становится сильно хуже: 10000000/(1316*8) ≈ 950 сообщений в секунду.

Макс написал на Си свой собственный порт-драйвер, который занимался тем же что и gen_udp, только склеивал датаграммы, накапливал буфер, и лишь затем отсылал процессу. Очевидное решение проблемы.

Этот порт даёт примерно троекратную экономию CPU, но в некоторых условиях работает нестабильно. Проблема которую мне необходимо решить — переписать этот порт чтобы он хорошо работал.

Ну а для этого мне нужно полностью погрузиться в проблему — увидеть много всего интересного.

Первые идеи

Первая мысль — неужели создатели Erlang не подумали о таком варианте событий? Неужели нельзя читать UDP как-то иначе?

Я увидел функцию gen_udp:recv/2, которая должна читать датаграммы в блокирующем режиме. Вроде бы то что нужно, без отправки сообщений! Однако если посмотреть исходный код, то можно заметить следующее:

-spec recv(Socket, Length) ->
                  {ok, {Address, Port, Packet}} | {error, Reason} when
      Socket :: socket(),
      Length :: non_neg_integer(),
      Address :: inet:ip_address(),
      Port :: inet:port_number(),
      Packet :: string() | binary(),
      Reason :: not_owner | inet:posix().

recv(S,Len) when is_port(S), is_integer(Len) ->
    case inet_db:lookup_socket(S) of
	{ok, Mod} ->
	    Mod:recv(S, Len);
	Error ->
	    Error
    end.

Так, вызывается одноимённая функция из другого модуля. Похоже это разделение на код работающий по IPv6 и IPv4.

Там же можно увидеть два файла inet_udp.erl и inet6_udp.erl. Пусть будет IPv4, смотрим дальше:

recv(S,Len) ->
    prim_inet:recvfrom(S, Len).

Так, снова отфутболили, ищем заветный prim_inet и находим его:

recvfrom(S, Length) ->
    recvfrom0(S, Length, -1).

recvfrom(S, Length, infinity) ->
    recvfrom0(S, Length, -1);
recvfrom(S, Length, Time) when is_integer(Time), Time < 16#ffffffff ->
    recvfrom0(S, Length, Time);
recvfrom(_, _, _) -> {error,einval}.

recvfrom0(S, Length, Time)
  when is_port(S), is_integer(Length), Length >= 0, Length =< 16#ffffffff ->
    case ctl_cmd(S, ?PACKET_REQ_RECV,[enc_time(Time),?int32(Length)]) of
	{ok,[R1,R0]} ->
	    Ref = ?u16(R1,R0),
	    receive
		% Success, UDP:
		{inet_async, S, Ref, {ok, [F,P1,P0 | AddrData]}} ->
		    {IP,Data} = get_ip(F, AddrData),
		    {ok, {IP, ?u16(P1,P0), Data}};

		% Success, SCTP:
		{inet_async, S, Ref, {ok, {[F,P1,P0 | Addr], AncData, DE}}} ->
		    {IP, _}   = get_ip(F, Addr),
		    {ok, {IP, ?u16(P1,P0), AncData, DE}};

		% Back-end error:
		{inet_async, S, Ref, Error={error, _}} ->
		    Error
	    end;
	Error ->
	    Error % Front-end error
    end;

И видим по-сути то же самое от чего пытались убежать.

Хорошо, раз в стандартной библиотеке нельзя просто получить данные сделав блокирующий вызов recv, то почему бы не сделать порт-драйвер, который бы предоставлял возможность таких вызовов, к примеру через erlang:port_control/3.

Однако такой способ не подойдёт — нативные вызовы (через порты или NIF-ы) в идеале должны занимать не более 1–3 миллисекунд. Если больше, то планировщик Erlang совсем потеряется в временном пространстве, сойдёт с ума, и всему приложению станет плохо.

Похоже что отправка сообщений из порта и получение их процессом единственный подходящий способ передать данные.

Тогда действительно, нужно накапливать буфер из датаграмм и отправлять дальше одним большим куском.

Проблемы с существующем порт-драйвером

Наш драйвер открывает сокет с параметром O_NONBLOCK, далее мы используем вызов driver_select.

Мне удалось воспроизвести ситуацию, когда порт-драйвер терял часть трафика, при этом стандартный gen_udp отдавал всё чётко. Кроме того, это происходило строго при наличии нагрузки на сеть. Казалось, что лажал именно driver_select — будто ready_input коллбэк просто не вызывался когда следовало.

По-идее код gen_udp должен работать по той же схеме. Первым делом полез разбираться как там открывается сокет, какие опции применяются.

Поковыряться в исходниках стандартной библиотеки Erlang мы всегда успеем, а сперва можно попробовать посмотреть на вывод strace для двух вариантов работы — с нашим портом и с gen_udp.

Смотрим через strace

strace — это клёвая тулза которая ведёт лог вызова системных вызовов любой пользовательской программы.

Открытие сокета, включение каких-то опций, подписка на события по сокету (select) — всё это системные вызовы. Можно будет детально посмотреть чем отличается код gen_udp от нашего порта, не влезая в исходники стандартной библиотеки.

strace -ff -o strace/log make run

Мне лень выдирать конкретную команду которая запускает наш флюссоник (вида erl -pa …), я просто натравил strace на сам make и всего дочерние процессы.

Эта команда выполнит make run, и для каждого созданного процесса создаст отдельный файл-лог вида strace/log.PID.

У меня таких файлов сгенерилось много — аж 82. Значит за время выполнения этой команды было создано 82 процесса.

Найти нужные нам процессы довольно просто, нужно просто поискать по ключевому слову IPPROTO_UDP, это параметр передаваемый при создании UDP сокета.

grep IPPROTO_UDP * -n
log.16721:523:socket(PF_INET, SOCK_DGRAM, IPPROTO_UDP) = 10
log.16758:48387:socket(PF_INET, SOCK_DGRAM, IPPROTO_UDP) = 24
log.16758:48654:socket(PF_INET, SOCK_DGRAM, IPPROTO_UDP) = 26
log.16758:50628:socket(PF_INET, SOCK_DGRAM, IPPROTO_UDP) = 28
log.16759:16995:socket(PF_INET, SOCK_DGRAM, IPPROTO_UDP) = 9
log.16759:22221:socket(PF_INET, SOCK_DGRAM, IPPROTO_UDP) = 10
log.16759:182443:socket(PF_INET, SOCK_DGRAM, IPPROTO_UDP) = 23

Можно увидеть множество таких вызовов. Похоже что общение между Erlang-нодами тоже происходит по UDP, Erlang рантайм делает вызовы socket() даже если мы не запускаем никакого захвата.

Среди множества этих вызовов нам нужно найти такие, где выполняется bind() именно по тому адресу, который пытаемся захватить. Я делал захват по 239.0.0.1:1234.

Итак, чем отличается открытие сокета у gen_udp и у mpegts_udp (наш порт-драйвер?). Вот:

socket(PF_INET, SOCK_DGRAM, IPPROTO_UDP) = 28
fcntl(28, F_GETFL)                      = 0x2 (flags O_RDWR)
fcntl(28, F_SETFL, O_RDWR|O_NONBLOCK)   = 0
getsockopt(28, SOL_SOCKET, SO_PRIORITY, [0], [4]) = 0
getsockopt(28, SOL_IP, IP_TOS, [0], [4]) = 0
setsockopt(28, SOL_SOCKET, SO_REUSEADDR, [1], 4) = 0
setsockopt(28, SOL_IP, IP_TOS, [0], 4)  = 0
setsockopt(28, SOL_SOCKET, SO_PRIORITY, [0], 4) = 0
getsockopt(28, SOL_SOCKET, SO_PRIORITY, [0], [4]) = 0
getsockopt(28, SOL_IP, IP_TOS, [0], [4]) = 0
setsockopt(28, SOL_SOCKET, SO_RCVBUF, [2097152], 4) = 0
setsockopt(28, SOL_IP, IP_TOS, [0], 4)  = 0
setsockopt(28, SOL_SOCKET, SO_PRIORITY, [0], 4) = 0
getsockopt(28, SOL_SOCKET, SO_PRIORITY, [0], [4]) = 0
getsockopt(28, SOL_IP, IP_TOS, [0], [4]) = 0
setsockopt(28, SOL_IP, IP_MULTICAST_TTL, [4], 4) = 0
setsockopt(28, SOL_IP, IP_TOS, [0], 4)  = 0
setsockopt(28, SOL_SOCKET, SO_PRIORITY, [0], 4) = 0
getsockopt(28, SOL_SOCKET, SO_PRIORITY, [0], [4]) = 0
getsockopt(28, SOL_IP, IP_TOS, [0], [4]) = 0
setsockopt(28, SOL_IP, IP_MULTICAST_LOOP, [1], 4) = 0
setsockopt(28, SOL_IP, IP_TOS, [0], 4)  = 0
setsockopt(28, SOL_SOCKET, SO_PRIORITY, [0], 4) = 0
getsockopt(28, SOL_SOCKET, SO_PRIORITY, [0], [4]) = 0
getsockopt(28, SOL_IP, IP_TOS, [0], [4]) = 0
setsockopt(28, SOL_IP, IP_ADD_MEMBERSHIP, "\357\0\0\1\0\0\0\0", 8) = 0
setsockopt(28, SOL_IP, IP_TOS, [0], 4)  = 0
setsockopt(28, SOL_SOCKET, SO_PRIORITY, [0], 4) = 0
epoll_ctl(4, EPOLL_CTL_ADD, 28, {EPOLLIN, {u32=28, u64=5206135880472854556}}) = 0
bind(28, {sa_family=AF_INET, sin_port=htons(1234), sin_addr=inet_addr("239.0.0.1")}, 16) = 0
socket(PF_INET, SOCK_DGRAM, IPPROTO_UDP) = 28
setsockopt(28, SOL_SOCKET, SO_REUSEADDR, [1], 4) = 0
bind(28, {sa_family=AF_INET, sin_port=htons(1234), sin_addr=inet_addr("239.0.0.1")}, 16) = 0
setsockopt(28, SOL_IP, IP_ADD_MEMBERSHIP, "\357\0\0\1\0\0\0\0", 8) = 0
fcntl(28, F_GETFL)                      = 0x2 (flags O_RDWR)
fcntl(28, F_SETFL, O_RDWR|O_NONBLOCK)   = 0
epoll_ctl(4, EPOLL_CTL_ADD, 28, {EPOLLIN, {u32=28, u64=22500040878587932}}) = 0

Можно увидеть, что устанавливается больший размер буфера (стандартный 8 кб, здесь 2 мб). Кроме того, указываются дополнительные опции для задания приоритета этому сокету.

Вообще эти повторяющиеся вызовы с SO_PRIORITY и IP_TOS выглядят странно, но если поковыряться в файле inet_drv.c, то можно найти пояснение от одного из разработчиков Erlang:

#if  defined(IP_TOS) && defined(SOL_IP) && defined(SO_PRIORITY)
static int setopt_prio_tos_trick
        (int fd, int proto, int type, char* arg_ptr, int arg_sz, int propagate)
{
    /* The relations between SO_PRIORITY, TOS and other options
       is not what you (or at least I) would expect...:
       If TOS is set after priority, priority is zeroed.
       If any other option is set after tos, tos might be zeroed.
       Therefore, save tos and priority. If something else is set, 
       restore both after setting, if  tos is set, restore only 
       prio and if prio is set restore none... All to keep the
       user feeling socket options are independent. /PaN */

Ага, значит любая попытка установить какую-то опцию сокету обрамляется в двумя get-ами и set-ами для SO_PRIORITY и IP_TOS.

Итак, указали все те же опции для нашего сокета что и gen_udp. Картинка стала намного лучше, большей частью из-за явно указанного буфера (SO_RCVBUF).

Повреждение картинки

Но записав одновременно один и тот же кусок видео через gen_udp и через mpegts_udp я вижу рассыпания из-за нашего порт-драйвера.

Поймал рассыпание кадра:

mpegts_udp
mpegts_udp

gen_udp
gen_udp

Вообще, рассыпание картинки может быть по-двум причинам:

  1. Потеря пакетов. Если пакеты прилетевшие в буфер по сети вовремя не считать, то их может смыть свежими данными и они будут потеряны.
  2. Повреждение данных уже после получения. Возможно у нас в коде драйвера мы где-то допустили ошибку, как-то небрежно копируем какие-то данные, по каким-то причинам повреждаем данные, получаем какой-то мусор.

Если разобрать (через contrib/ts_reader.erl) эти два куска видео, а потом сделать diff, то можно увидеть следующее:

1997c1997
< 1 h264    frame  6537388      167(  9860) h264([{aud,2,765877162,{9,240}},{single,9850,1555489264,'P'}])
---
> 1 h264    frame  6537388      167(  8261) h264([{aud,2,765877162,{9,240}},{single,8251,1507530533,'P'}])
3871c3871
< 1 h264    frame  6561078      125( 14187) h264([{aud,2,765877162,{9,240}},{single,14177,339307100,'P'}])
---
> 1 h264    frame  6561078      125( 12715) h264([{aud,2,765877162,{9,240}},{single,12705,3923439548,'P'}])
5669c5669
< 1 h264    frame  6583726      167( 11004) h264([{aud,2,765877162,{9,240}},{single,10994,1060480850,'P'}])
---
> 1 h264    frame  6583726      167(  9532) h264([{aud,2,765877162,{9,240}},{single,9522,1166933415,'P'}])
6299c6299
< 1 h264    frame  6591859      125( 11571) h264([{aud,2,765877162,{9,240}},{single,11561,1916933918,'P'}])
---
> 1 h264    frame  6591859      125( 10099) h264([{aud,2,765877162,{9,240}},{single,10089,3951446825,'P'}])
7546c7546
< 1 h264    frame  6607082      125( 15294) h264([{aud,2,765877162,{9,240}},{single,15284,2780667195,'P'}])
---
> 1 h264    frame  6607082      125( 13822) h264([{aud,2,765877162,{9,240}},{single,13812,1363338479,'P'}])
8046c8046
< 1 h264    frame  6613047      125( 17931) h264([{aud,2,765877162,{9,240}},{single,17921,3455808322,'P'}])
---
> 1 h264    frame  6613047      125( 16827) h264([{aud,2,765877162,{9,240}},{single,16817,3124787100,'P'}])
10492,10498d10491
< 2  aac    frame  6643704        0(   241)
< 2  aac    frame  6643725        0(   239)
< 1 h264    frame  6643744      125( 12714) h264([{aud,2,765877162,{9,240}},{single,12704,2094291602,'P'}])
< 2  aac    frame  6643747        0(   290)
< 2  aac    frame  6643768        0(   242)
< 1 h264    frame  6643786      125( 14782) h264([{aud,2,765877162,{9,240}},{single,14772,3361774981,'P'}])

Как можно видеть, потерянных кадров нет, однако те кадры что есть отличаются в размере. Кроме того, один из кусков оказался длинее другого, можно видеть в конце шесть кадров, которые оказались только у него.

Можно заметить, что для одних и тех же кадров размер в первом файле всегда больше чем во втором (указан в скобках). Если посчитать разницу в размере для каждой пары кадров, то получим это:

1599, 1472, 1472, 1472, 1472, 1104

Интересно. Кадр разрезается на несколько MPEG-TS пакетов. Размер пакета — 188 байт. Однако количество полезной информации (payload) которую можно поместить в пакет меньше: 184 байта.

Если мы потеряли несколько пакетов подряд в середине одного кадра, то эти потери будут кратны 184. Если потеряли конец кадра, то не будет кратен. Если мы поделим 1472 на 184, то получим ровно 8 MPEG-TS пакетов. Т.е. на самом деле мы потеряли 188*8 = 1504 байта (несколько раз).

Значит никакого повреждения данных нет, по каким-то причинам мы не успеваем считать данные из сокета до того как их смоет новыми данными.

Неаккуратная работа с памятью

По всей видимости проблема в том как мы работаем с буфером нашего порт-драйвера.

  while((s = recvfrom(d->socket, d->buf->orig_bytes + d->len, d->size - d->len, 0, (struct sockaddr *)&peer, &peer_len)) > 0) {
    d->len += s;
    if(d->len >= d->limit) {
      check_errors(d);
      flush(d);
    }
  }

Наш драйвер время от времени делает flush —проверяет сколько данных уже накопилось, и если что-то есть отправляет содержимое эрланг-процессу.

Кроме того, если в процессе считывания с сокета мы накопили уже много данных, и скоро в буфер может не влезть, то тоже нужно делать flush().

Стыд и срам, но ошибка оказалась очень банальной. Мы пытались пользоваться памятью после того как уже её освободили и занулили. В функции flush() есть такие строчки:


  if(d->len == 0) return;
  if(!d->buf) return;
  ErlDrvTermData reply[] = {
    ERL_DRV_ATOM, atom_udp,
    ERL_DRV_PORT, driver_mk_port(d->port),
    ERL_DRV_BINARY, (ErlDrvTermData)d->buf, (ErlDrvTermData)d->len, (ErlDrvTermData)0,
    ERL_DRV_INT, (ErlDrvTermData)d->packet_count,
    ERL_DRV_INT, (ErlDrvTermData)d->error_count,
    ERL_DRV_TUPLE, 5
  };
  driver_output_term(d->port, reply, sizeof(reply) / sizeof(reply[0]));

  driver_free_binary(d->buf);
  d->buf = 0;

Обращаю внимание на последние две. Во flush() вы отправляли данные эрланг-процессу, и освобождали буфер. Если flush() выполняется в середине цикла recvfrom и ещё есть куча данных, а буфер уже освобождён и ему присвоен нулевой адрес, то произойдёт беда.

Мы берём адрес buf, читаем оттуда адрес (его значение оказалось 8), и по нему пишем.

Т.е. по-идее recvfrom() должен был попытаться записать данные по адресу 8 и всё должно было упасть, но почему-то этого не произошло.

Внутри recvfrom() производится проверка на возможность записи, и возвращается ошибка в случае недоступности. А мы по-глупости просто не проверяли ошибку.

На этом всё.