Wednesday, May 17, 2017

Back to school or Why I have decided to become an ML student

It was a long time ago when I started developing applications and algorithms (the first commercial project was started in 1990). These are mostly back-end, massive data processing, DSP, non-standard structure and behavior data servers development, OLAP, ETL, Reporting etc.
In the beginning of my career, I had an experience in the field of Data Science, Neural networks and Fuzzy Logic Algorithms.
But in the 90s my interest was predominately academic - I did not come across the commercial projects at all.
As a result, I did not go further than reading articles, books and working on my own density-based clustering algorithm development with complexity O(N^3), so everything was postponed for better times.
However everything has started changing recently: in my company at the pre-sale stage there have appeared the projects, where ML/DS (Machine Learning / Data Science) can be implemented.
And the object of my previous admiration, which had been almost forgotten, has revealed itself again.
Unfortunately, nowadays the desire has increased but there is not so much time as in my adolescence, if not to say that there is no time at all.

It occurred to me that in my case "learning with the teacher" is the only appropriate solution as I work as a head of MEAN-stack department, giving lectures of the "Algorithms and Data Structures" course at LITS at the same time and a few lectures in the scope of the "Introduction to Data Science" course.

And when it became clear (to be an ML student rather than to learn by myself), I did not doubt a second - I would study just only here (ML @ LITS), because I know my fellow teachers well and can fully trust them.

What I expect from the course:

  • Tidy up what I knew
  • Add modern methods and approaches
  • Systematize all of this
  • Feel the classes of problems that can be solved (terms, accuracy, how many people and which hardware are needed etc.) and which can not be
  • To get acquainted with modern tools

And many other things in my heart that I have not yet expressed in words ...

But the most important thing is to get the effect ASAP


Tuesday, May 16, 2017

Back to school или Почему я решил стать студентом ML

Разрабатываю программы и алгоритмы я давно (первый коммерческий проект - 1990 год). В основном это бекенд, массированная обработка данных, ЦОС, разработка серверов данных с нестандартной структурой и поведением, OLAP, ETL, Reporting etc.
В далёком прошлом у меня был опыт работы в области Data Mining, нейросетей и алгоритмов нечёткой логики.
Но в 90х интерес был, скорее, академическим - коммерческие проекты мне не попадались. Поэтому дальше книг, статей и разработки своего density-based алгоритма кластеризации со сложностью O(N^3) я не продвинулся и всё отодвинулось в долгий ящик.
Но сейчас всё стало меняться, причём как-то резко: в компании на пресейле стали появляться проекты, в которых можно было бы применить ML/DS (Machine Learning / Data Science).
И некогда забвенная любовь запылала с новой силой.
Но, в отличие от отрочества, желание больше, а времени меньше, чтобы не сказать нет совсем.

И пришло осознание, что в моём случае подходит только "обучение с учителем", так как работаю начальником отдела разработки (MEAN stack), при этом преподаю курс "Алгоритмы и структуры данных" в LITS и несколько лекций в рамках курса "Introduction to Data Science".

И когда пришло это осознание (стать студентом ML, а не подтягиваться самостоятельно), я ни секунды не сомневался - буду учиться только здесь же (ML @ LITS) ибо я хорошо знаю своих коллег-преподавателей и могу полностью им довериться.

Что я ожидаю от курса:


  • привести в порядок что знал
  • добавить современные методы и подходы
  • всё это систематизировать
  • почувствовать классы задач, которые можно решить (сроки, точность, сколько нужно людей и железа и т.д.) и которые нельзя
  • познакомиться с современными инструментами

и много ещё чего в душе, что словами пока не выразил ...

Но самое главное - получить эффект в максимально сжатый срок!


Tuesday, August 16, 2016

Эссе о суеверных

Это шуточное эссе является результатом размышления над вопросом, поднятым на одном из форумов. Спор был о том, считать ли суеверных рангом ниже, чем атеисты и верующие. 

Многие считают, что суеверные люди – это почти верующие, то есть эдакие себе “недоверующие”, чьи верования хаотичны и несистемны. В то время как верования истинно-верующих упорядочены, описаны, отформатированы, их ритуалы и прочие действия строго регламентированы и большинство включено в систему организаций.
Нередко сами организации верующих встроены в государство в качестве помощника по управлению паствой (хотя в начале многие верующие организовывались с целью защиты от государства или вообще противодействия ему).

На мой взгляд есть принципиальное отличие между суеверными и верующими.

Общий класс “верующие” (куда входят и атеисты) верят во что-либо без доказательства, с чьих-то слов. Более того, доказательство им совершенно не требуеся.
Здесь мне могут возразить многие верующие, что “необходимость доказательства” присуща им в полной мере ибо доказательствами изобилует Писание, житие святых и т.д.
Но давайте не путать доказательство и утверждение для самоуспокоения” (УДС). Например УДС я называю следующие утверждения:

  • Невозможно представить, что всё многообразие и красота этого мира получились случайно, без божественного вмешательства, так сказать эволюционно. (Здесь логическая ошибка. Из этого утверждения не следует, что б-г есть. Отсюда следует слабость воображения индивида.)
  • Это было сказано самим святым (!!), поэтому истинно, так как любое высказывание святого свято, извините за тавтологию (здесь смешение понятий, так как мы путаем святого и б-га. В той же Библии святой нередко был погрешим (сомнение – это грех). Моисей, помнится, даже поплатился за это)
  • Само Писание – это истина в последней инстанции ибо была дарована самим б-гом. Очевидно, что это “чистый” вопрос веры, так как верующие считают факт дарования Писания истиной, а неверующие говорят, что невозможно проверить утверждение Моисея, что Тора (первая часть Танаха) была дарована б-гом, а не он сам её сочинил.
  • Это явление необъяснимо с точки зрения современной науки, значит оно имеет божественное происхождение (ну или “внеземное” если речь идёт о “Свидетелях Инопланетян”). Но это тоже логическая ошибка. Иначе нам пришлось бы приписать божественность всему современному, что в прошлом веке было бы необъяснимым. Но мы же не хотим чтобы качество “божественность” было субъективным и относительным ... :-)

Примеры УДС можно приводить бесконечно и мы не будем этого делать. Здесь они приведены для иллюстрации, чтобы отличать УДС от доказательств.

Но есть ещё один класс - “агностики”, представители которого не опровергают утверждение бездоказательно, на основании высказывания вроде “ну это итак ясно”, равно как и не берут на веру бездоказательно, на основании “очевидности”.
Агностики честно говорят, что не знают чего-либо из-за недостаточности доказательств. Кстати, это качество настоящих учёных, которых тоже можно, за редким исключением, отнести к агностикам в силу профессиональных качеств.
Ну и вернёмся, наконец, к нашим суеверным.
Суеверные – это люди, которые на основании своего опыта (или опыта своих знакомых, опыта знакомых знакомых и т.п. цепочек) приходят к выводу, что “скорее всего там что-то такое есть”.
Например, нередко можно услышать что-то вроде:

  • В Пасху нельзя работать! Вот я перебирал двигатель в прошлую Пасху, так он вообще сгорел!
  • Словом можно самого себя сглазить! Стоило мне произнести вслух, что тут работы осталось на 5 минут, от силы на полчаса, как лопнула опалубка и пришлось переделывать. Закончили уже ближе к полуночи.
  • Мне сон плохой снился нынче. Но не буду рассказывать до полудня – не хочу чтобы сбылся!

И т.д. и т.п.

Обращаю внимание, что чаще всего на вопрос “ты в это веришь?” следует ответ – “да кто его знает, но лучше перестраховаться!”, хотя нередко можно услышать и “да, потому что уже был опыт”. При этом некоторыми суеверными отрицательный опыт игнорируется, а положительный накапливается, создавая иллюзию “доказанности”.
Но справедливости ради подобное “накопление” характерно и для представителей класса верующих.

Таким образом может показаться, что признак “необходимость доказательства” присущ всем вышеобозначенным классам, хоть и понимают они его по-разному.
Но, если всмотреться повнимательнее, то только агностики “чисты” по этому признаку: “доверяю тому, что доказано, но отвергну, если появятся опровержения и сомневаюсь в том, что недоказано”.
Верующие либо верят без доказательства, если утверждение - “основа веры”, либо довольствуются УДС, которые доказательствами не являются, либо накапливают свой личный положительный опыт (или по цепочке), игнорируя отрицательный.
Суеверные либо предполагают что-либо, хотя и не верят до конца (этим они схожи с агностиками), либо, как и верующие, накапливают положительный опыт, игнорируя отрицательный.
У нас почти получилась классификация суеверных, как “промежуточное звено” между классами агностиков и верующих ...

Почти.

Свойство “Накопление положительного опыта и игнорирование отрицательного” характерно и некоторым недобросовестным агностикам, которые делят вещи на “доказанные” и “под сомнением”. Если с теми, что "под сомнением", всё в порядке, то "доказанными" они иногда считают утверждения, которые “много раз подтвердились, и мало раз – нет”. Они считают такое утверждение истинным, игнорируя неподтвердившиеся случаи как “исключение, подтверждающее правило”.
Есть забавный пример, когда утверждение было истинным для всех подряд (без исключения) натуральных чисел от 1 до 40 и только 41-е число подкачало. Утверждалось, что полином x^2 + x + 41 (его изучал ещё Леонард Эйлер) даёт простые числа при подстановке натуральных.
Этот пример показывает, что никакое конечное число подтверждений гипотезы не говорит об её истинности. Просто “скорее всего” превращается в “вероятнее всего”, то есть в “почти истину”.
Таким образом, я склонен выделить свойство “Накопление положительного опыта и игнорирование отрицательного” из всех классов как не характерное для них, а характерное для “недобросовестных” (ну или ленивых) их представителей.

Таким образом, если выделить из всех классов “недобросовестных по отношению к доказательству” в отдельный, то у суеверных и агностиков не останется различий по признаку “необходимость доказательства”.

Friday, December 11, 2015

Experiments with parallelization (RUS)

Преамбула


В технологии NodeJS существует интересная возможность написания серверных приложений с помощью скриптового языка. У Майкрософта уже давно есть компонент WSH (Windows Scripting Host), способный запускать сценарии на скриптовых языках JScript и VBScript, а также и на других дополнительно устанавливаемых языках (например, Perl). Но он не получил особого распространения, так как не мог реализовать одну из желаемых серверных функциональностей – веб-сервер. Веб-приложения должны использовать Apache, IIS, nginx, и т.д. так как серверные скрипты, способные генерировать динамические веб страницы, по-прежнему отделены от веб сервера.
NodeJS - это одна из технологий, которые предоставили нам эту возможность.
Но серверные приложения - это не только веб-серверы и веб сайты, а любые приложения, выполняющие сервисные (обслуживающие) функции по запросу клиента, предоставляя ему доступ к определённым ресурсам или услугам.
Сюда можно отнести также web API, базы данных различных типов, сетевые приложения, охватывающие такой класс задач, как балансировка нагрузки, ETL процессы, распределённые вычисления, распределённые файловые хранилища и т.д.
И для того, чтобы лучше понять возможности NodeJS, границы применимости подходов в параллельных вычислениях, физические ограничения среды, прочувствовать сокеты, потоки (streams), межпроцессное взаимодействие и чтобы закрепить теоретические знания, была выбрана задача – распараллелить один из самых быстрых алгоритмов сортировки – Quick Sort.
Таким образом, с целью более глубокого изучения возможностей технологии, была поставлена задача:
  • добиться, чтобы параллельная (или распределённая) сортировка массива была быстрее, чем сортировка в одном процессе (NodeJS декларирован как STA (single thread application)),
или, как минимум,
  • найти объёмы массивов, при которых сортировка в одном процессе медленнее, чем в нескольких процессах (или потоках).

Решение поставленной задачи усложняется наличием двух факторов:
  • эмпирический закон Амдала накладывает ограничение на ускорение алгоритма из-за наличия в нем последовательных фрагментов;
  • необходимость передавать данные между процессами увеличивает время выполнения этих последовательных фрагментов.

Была выдвинута гипотеза: если разделить массив на несколько кусков, и каждый кусок отсортировать в отдельном потоке (или процессе) на отдельном ядре процессора параллельно, то общее время на сортировку будет меньше, чем сортировка в один поток.
Следует заметить, что если разделить массив на части «как есть», то на финальном этапе нам предстоит слияние отсортированных кусков. Одна из стратегий (чтобы избежать финального слияния) – сделать предварительную сортировку. Благоприятным моментом является то, что алгоритм quick sort на каждой итерации проводит последовательный скан массива, разделяя сканируемый кусок на две части по признаку «больше или меньше опорного значения». Каждая из частей неотсортированная, но любой элемент из части «маленьких элементов» меньше самого маленького элемента из части «больших элементов» (и наоборот – любой элемент из части «больших элементов» больше самого большого из части «маленьких элементов»). Таким образом, каждая из частей может быть отсортирована отдельно от другой и над результатом не надо будет производить слияние.
Если нам нужно разделить массив на число частей, равное два в некоторой степени, то показателем степени будет количество последовательных сканов всего массива, которые нужно выполнить для разделения.
Например
  • На первой итерации алгоритм разделит весь массив на две неотсортированные части.
  • На второй итерации каждая половинка будет разделена надвое (сканирование двух половинок имеет ту же вычислительную сложность, что и сканирование целого массива) – итого 4 неотсортированные части.
  • На третьей итерации каждая четверть будет разделена пополам – это третье сканирование. И в результате - 8 неотсортированных частей и т.д.
Замечание.
Сравнение производится с сортировкой в один поток, в котором эти предварительные сканирования обязательно произойдут. Таким образом, применяя предварительное расщепление, потери темпа по сравнению с последовательным алгоритмом не происходит.

Реализация


Настройка окружения.

Для решения поставленной задачи были сгенерированы массивы фейковых данных о пользователях (типа ФИО, дата рождения, адрес проживания и т.д.) и сохранены в JSON-формате.
С первых же строчек кода стало понятно, что накладные расходы по созданию рабочих процессов (вёркеров) столь велики, что пока вёркеры подадут сигнал «ONLINE», STA уже просигнализирует о завершении задачи.
(Слово «веркер» здесь употребляется в смысле «рабочий процесс», или «работник», «исполнитель»).
Было решено время инициализации вёркеров вынести за рамки поставленной задачи, изменив ее формулировку следующим образом: построить некий сервер, принимающий более одного массива для сортировки и возвращающий отсортированные массивы.
Но это изменение помогло незначительно – сортировка массива в 16к элементов в STA происходила за десятки миллисекунд, а передача данных вёркерам и обратно происходила намного дольше.
Следующим шагом было наращивание объёма массива, чтобы время передачи стало относительно меньше времени сортировки.
Возникла новая проблема: ограничение на размер исходной строки для JSON-парсера (встроенного в V8) около 1 Gb. Пришлось использовать сторонний модуль «stream-json/utils/StreamArray».
Проблема была решена, но при дальнейшем наращивании объёма массива, он перестал помещаться в оперативной памяти, выделяемой для процесса NodeJS.
Помог параметр запуска самого процесса NodeJS «--max-old-space-size=4096», увеличив размер памяти до 4 Gb.
Кроме того, массив сузился до двух колонок (ID, SortingValue), где ID - порядковый номер записи в оригинальном массиве (после сортировки «узкий» массив всегда можно восстановить до полного размера по оригинальным ID). Сужение массива даёт ещё один положительный эффект – вёркерам передаётся меньший объём данных.
Поскольку не стояла задача оперировать массивами данных, не помещающимися в оперативной памяти сервера, то на этом решено было прекратить дальнейшее наращивание объёмов данных и остановиться на 1.1 млн записей.

Укорачивание последовательных фрагментов алгоритма

Предположим, что в нашем процессоре есть 4 ядра, и мы создали 4 вёркера. Мы можем разделить сортируемый массив на 4 части и передать вёркерам – по одной части на каждый. Каждую из частей массива можно передавать вёркеру любыми кусками в любой последовательности, но к сортировке своей части каждый из них приступит только после получения всех частей. Таким образом, передача части вёркеру – цепочка последовательных операций, то есть последовательный фрагмент.
И мы заинтересованы передать данные как можно быстрее и должны отыскать самый быстрый вариант передачи.
Замечание: Внутри единичного процесса NodeJS библиотека libuv предоставляет пул потоков (thread pool), в котором количество потоков по умолчанию равно 4 (UV_THREADPOOL_SIZE == 4) (например см. http://docs.libuv.org/en/latest/threadpool.html). В нашем случае мы располагаем 4 ядрами и было бы заманчиво использовать потоки вместо процессов, так как время передачи данных уменьшилось бы практически до нуля (время на запись в переменную ссылки на участок памяти). Было проверено предположение, что async модуль, который, как заявлено, может запускать задачи параллельно в асинхронном режиме, может использовать внутренний пул потоков и запускать каждую задачу в отдельный поток (если задач не более 4 штук), утилизируя доступные ядра. Но, к сожалению, данное предположение не подтвердилась. Сортировка в один поток не уступала по времени сортировке с помощью async более 10%, а иногда и опережала (при некоторых величинах массивов, хотя закономерности не выявлено).
Кроме того, пул потоков глобален для одного процесса, а значит, разделяет память, доступную процессу (см. выше), между всеми потоками и наращивая количество потоков мы уменьшаем память,  которой может оперировать каждый поток.

Замечание. Далее в тексте термин «поток» будет применяться к понятию «поток данных»– stream).
Было рассмотрено использование следующих вариантов транспорта данных в вёркеры и обратно:
  • IPC (использование встроенного IPC-канала)
  • Sockets
  • FS (сохранение в файле и передача вёркеру только пути к файлу).
  • Stdio (использование стандартных IO streams - stdin/stdout).

Коль скоро большая часть из перечисленного - это потоки данных (кроме IPC), то и свои данные для передачи мы тоже можем сформировать в виде потоков (custom streams). Это даст возможность делегировать управление передачей данных (back pressure, optimal package size and other issues) встроенному механизму просто используя метод pipe(). Более того, достаточно написать два потока («from-part-of-array-to-stream» and «from-stream-to-array») и можно их использовать с любым транспортным потоком.
Кроме того, были использованы два формата передачи данных (далее называемые протоколами), для каждого из которых были сделаны по два потока – Readable (из части массива в поток) и Writable (из потока обратно в массив):

  1. Буферный протокол. Каждая запись имеет фиксированную длину. В нашем случае была выбрана длина 32 байта, так как длина поля для сортировки не превышает 24. 1-й байт – длина поля; 2-5 байты – целое число (Int32) – номер элемента в исходном массиве (см. рисунок ниже), далее следует само значение для сортировки, длина которого сохранена в 1-м байте. Остальные символы – мусор, который будет игнорироваться. Идея состоит в том, что размер буфера станет больше, зато все записи будут иметь фиксированную длину. Этот протокол даёт нам произвольный доступ (random access) к любому блоку записей просто по номеру элемента (операция seek(), вместо операции scan()). Поскольку файловая система оперирует блоками (кластерами), размеры которых кратны 1024 байт(1К, 2К, и тд), то, разделив размер полученного блока на размер одного элемента ( в нашем случае 32 байта), мы получим целое число элементов и каждый элемент просто будет считан с определённой позиции блока.
     Рисунок 1. Дамп памяти в протоколе – «Буферный протокол»

  2. Строковый протокол. Каждая запись имеет вид arr[i].ID + ‘|’ + arr[i].name + ‘\n’ (см. рисунок ниже) и конец массива обозначен символом EOT (ascii 0x04). . Этот маркер (End of Transmission) нам поможет запустить(emit) событие finish внутри нашего потока, когда мы считываем данные из него в массив, так как некоторые потоки (например child.stdout) не генерируют (emit) это событие. Этот формат более компактный, чем предыдущий, но об random access можно забыть. Чтобы считать определённую запись, придётся считать все предшествующие от начала блока.
    Кроме того, в случае записи элементов массива в поток можно гарантировать, чтобы каждый блок (или чанк) будет содержать целое число элементов (то есть ни один элемент не будет разделён между двумя чанками). Но при прохождении данных через сокет, на выходе из сокета чанки могут быть нарезаны произвольным образом и нельзя рассчитывать, что в пришедшем на обработку чанке содержится целое число элементов. Причём кусок элемента может быть как в начале, так и в конце.
    Таким образом в функции mystream.prototype._read() придётся объединять остаток от предыдущего чанка со вновь пришедшим, чтобы не потерять ни одной записи. А это – дополнительные накладные расходы.
    Рисунок 2. Дамп памяти в протоколе – «Строковый протокол»

Итак, получилось 4 потока.

Таблица 1. R/W Потоки, соответствующие протоколам Buffer format и String format
НазваниеТипОписание
ArrayToBufferStreamReadableЧитает из массива в Буферный поток
ArrayFromBufferStreamWritableПишет из Буферного потока в массив
ArrayToStringStreamReadableЧитает из массива в Строковый поток
ArrayFromStringStreamWritableПишет из Строкового потока в массив

Результат сравнения разных способов передачи «узкого» массива представлен на рисунке 3. При этом массив содержит 1.1 млн элементов, где каждый элемент содержит ID (Int32) и Value (не более 24 символа).

Рисунок 3. Сравнение транспортов передачи данных разными протоколами (Buffer and String)

При этом если при передаче через файл buffer stream ожидаемо выиграл у строкового, то при передаче через сокет результат был неожиданным. Мало того, что сокет передавал массив дольше, чем файл, так ещё и строковый поток опередил буферный. Кроме того, IPC канал, при котором массив передается как объект, оказался быстрее, чем через сокет (за сценой, возможно, происходит превращение в строку).
«Победителем соревнования» оказался способ, при котором использовались стандартные потоки ввода вывода вёркеров.
Именно этот способ был использован для окончательного эксперимента – соревнования по сортировке массива в одном процессе (через STA) и с помощью четырёх вёркеров на четырёх-ядерном процессоре.
Необходимо отметить влияние ещё одного фактора – размер чанков, которые формируются в момент превращения массива в поток. Строго говоря, на скорость передачи данных влияет одновременно два параметра - размер чанка и тип транспорта. То есть для одного и того же типа потока оптимальный размер чанка будет разным в зависимости от типа транспорта. Для каждого способа передачи размер чанка подбирается отдельно и на рисунке 3 можно увидеть результаты соревнования при лучших настройках для каждого эксперимента.
Кроме того, следует отметить, что передача данных через сокеты внутри одного процесса или даже в пределах одного сервера не то же самое, что передача данных по сети или через веб, когда данные проходят через роутеры. В последнем случае оптимальные размеры пакетов будут другими. Более того, используя протокол передачи Buffer Stream можно быть уверенным, что пакеты, которые передаются, не будут раздроблены (в крайнем случае они объединятся), поэтому в алгоритме у меня нет обработки «обрезков» элементов (в протоколе String Stream есть). Скорее всего, при использовании web-socket Buffer Stream работать не будет (вернее будет, если его немного изменить).
Существует ещё один момент – передавать через IPC-канал целый массив – не самый быстрый способ. Лучше передавать по блоку записей. В моём случае оптимальной оказалась передача по 8к записей.
Настройки размеров чанков приведены в следующей таблице:

Таблица 2.
Протокол и транспортДлина чанкаед.изм.
Строковый поток через файл40*1024байт
Буферный поток через файл40*1024байт
Строковый поток через сокет32*1024байт
Буферный поток через сокет32*1024байт
Строковый поток через ввод-вывод16*1024байт
Буферный поток через ввод-вывод32*1024байт
Массив через IPC8*1024шт.

Эксперименты.


Итак, настал момент окончательного соревнования при сортировке массива с помощью параллельных процессов.

Первый эксперимент (IO async).

Transport: Standard process IO. Protocol: String Stream. Task Starting: Asynchronous

Алгоритм:


  • Главный (Main) процесс расщепляет массив из 1.1 млн элементов на 4 куска и, используя async.forEach(), превращает каждый из кусков массива в поток данных и подключает (pipe) к нему соответствующий канал child.stdin каждого вёркера.
  • На стороне вёркера к стандартному потоку ввода (process.stdin) подключается (pipe) наш созданный обратный поток, который превращает поток снова в массив.
  • Каждый вёркер сортирует свой кусок массива …
  •  … и передаёт массив назад в Main процесс, используя стандартный поток вывода (process.stdout)
  • Конкатенация кусков массива обратно в целый массив.
Конкатенация произойдёт тогда, когда будут получены все отсортированные части массива.
(Проверка на «отсортированность» проводится, но вынесена за рамки соревнования)
На рисунке 4 показана временная диаграмма (timeline) эксперимента (the top series corresponds to the STA sorting in the according scale).

Рисунок 4. Transport: Standard process IO. Protocol: String Stream. Task Starting: Asynchronous
На рисунке выделяется большой период времени (3.28 - 4.57 (s)) в течение которого асинхронно четыре куска массива превращаются в потоки и передаются через stdin-stream-ы своим вёркерам. Поскольку главный процесс однопоточный (STA), он «размывает» процесс передачи данных между всеми вёркерами и в результате все четыре вёркера смогут приступить каждый к своей работе почти одновременно (почти – из-за неодинаковой длины кусков). Назад отсортированные куски массивов успевают разминуться, не пересекаясь. Только два самых длинных куска немного помешают друг другу на финише (worker 0 and 1). Последний этап - конкатенация пришедших кусков (40 ms).
Итак, процесс сортировки (с момента начала разделения массива на куски (split) (2.843 s) до завершения конкатенации (6.332 s)) занял примерно 3.5 s. При том, что сортировка в одном процессе (STA) занимает в среднем 3.7 s.
Конечно, параллельная сортировка получилась несколько быстрее последовательной. Но выигрыш в 5.5% не стоит тех усилий, что были затрачены.
Из-за того, что главный процесс однопоточный, наши вёркеры простаивают в ожидании своей порции задачи и время на пересылку нивелирует полезную работу.
Мы поставим второй эксперимент, немного исправив алгоритм, и начнём пересылку задачи следующему вёркеру, только после того, когда предыдущий вёркер получит задачу полностью.
Посмотрим, что их этого получится.

Второй эксперимент (IO series).

Transport: Standard process IO. Protocol: String Stream. Task Starting: Sequential
Алгоритм (отличие от предыдущего):

  • Main процесс расщепляет массив из 1.1 млн элементов на 4 куска и последовательно превращает каждый из кусков массива в поток и подключает (pipe) к нему соответствующий канал child.stdin каждого вёркера.
Давайте рассмотрим timeline эксперимента на следующем рисунке:

Рисунок 5. Transport: Standard process IO. Protocol: String Stream. Task Starting: Sequential

Интересно, что общее время сортировки практически не изменилось. Главный процесс, посылая задачу последнему вёркеру одновременно начал принимать результат от первого вёркера, тем самым отодвинув процесс сортировки в последнем процессе (на рисунке это вёркер 0). И, как следствие, повлиял на общее время сортировки.
Это похоже на раздачу пищи на полевой кухне – каждый человек съест свою порцию быстро, но обед закончится, когда один повар раздаст пищу последнему. Плюс время приема пищи этим последним. При этом совсем неважно, в какой последовательности вёркеры получат свою порцию. Либо один за другим последовательно, либо повар будет идти вдоль стола, докладывая в каждую миску по кусочку. Потом пойдёт обратно и т.д., пока все тарелки не наполнятся одновременно. Общее время приёма пищи будет тем же.
И совсем неожиданно, применив для статистики другой транспорт (через файл), был получены существенно лучший результат: 

Третий эксперимент (via File async).

Transport: via File. Protocol: Buffer Stream. Task Starting: Asynchronous

Рисунок 6. Transport: via File. Protocol: Buffer Stream. Task Starting: Asynchronous

Как показывает рис.6, время передачи данных от главного процесса вёркерам резко сократилось.
Почему так произошло? Ведь при сравнении транспортов (см. рис. 3) Buffer Stream via File System показал средний результат.
Причина оказалась в следующем. При передаче данных через IO, процессорное время главного процесса расходуется с начала и до конца передачи (пока последний байт не уйдёт вёркеру). Но при использовании файловой системы операция передачи данных состоит из двух подопераций (sub-operation) – сохранение в файл и считывание из файла.
И хотя при сохранении данных в файл также расходуется процессорное время главного процесса (как в предыдущих экспериментах), считыванием из файла занимаются уже вёркеры и тратят уже каждый своё процессорное время!
А так как обе подоперации примерно одинаковы по времени, то мы получаем сокращение времени главного процесса на передачу данных вёркерам почти вдвое.
Процесс сортировки в этом случае (с момента начала разделения массива на куски (split) (2.820 s) до завершения конкатенации (5.203 s)) занял примерно 2.4 s. Что составило 68% от времени сортировки в одном процессе (STA). То есть, распараллеливание с использованием файловой системы в качестве транспорта ускорило процесс сортировки на 1/3 по сравнению с STA.

Выводы

Идея распараллеливания различных фрагментов одного алгоритма продуктивна, но приходится учитывать стоимость передачи данных.
Пока данных немного, разделять алгоритм не имеет смысла.
Но при большом количестве данных основной процесс, при распределении работы между рабочими процессами, избавляется от части работы, но взамен тратит время на передачу данных.
Если бы данные можно было подготовить в общей памяти (shared memory) с быстрым доступом и передавать не сами данные, а ссылку на них (как между потоками (threads) одного процесса), то эффект от распараллеливания стал бы более заметен.
Можно попробовать реализовать модуль на С++, который внутри себя запустит пул потоков (thread pool), и выполнит необходимые действия в рабочих потоках с нашими данными, которые передаются в этот модуль по ссылке … Но это уже не является задачей программирования на Node.JS.

Thursday, December 10, 2015

Experiments with parallelization

Preamble


In the NodeJS technology there is an interesting possibility of writing server applications using a scripting language. For a long time Microsoft has a WSH component (Windows Scripting Host), which can interpret and run plain-text JScript (.JS and .JSE files) and VBScript (.VBS and .VBE files), as well as other additional installed languages (e.g. Perl). But it was not widespread, because it could not implement one of the desired server functionality - the Web server. Web applications have to use Apache, IIS, nginx, etc. because the server-side scripts that can generate dynamic webpages still are separated from the web server.
NodeJS is one of the technologies that has given us this opportunity.
But server applications - it's not just Web servers and Web sites, but any applications that perform service functions at the client’s requests, giving him access to certain resources or services.
It may also include web API, a database of different types, network applications, covering such class of problems as load balancing, ETL processes, distributed computing, distributed file storage, etc.
And for better understanding of the opportunities of NodeJS, limits of applicability of approaches to parallel computing, the physical limitations of the development environment, for feeling sockets, streams, inter-process communication, and for theoretical knowledge consolidation the task has been chosen: to parallelize one of the fastest sorting algorithms - the Quick Sort.
Thus, for the purpose of better understanding of the possibilities of technology, was formulated the following problem:
  • to achieve that the parallel (and distributed) sorting the array is faster than sorting in a single process (NodeJS is declared as STA (single thread application)),
or, at least,
  • to find out arrays' size in which a sorting in a single process is slower than in a few separate processes (or threads).

The solution of the problem is complicated because of two factors’ presence:
  • Empirical Amdahl's law restricts the algorithm acceleration because of the presence of consecutive fragments;
  • The need to transmit data between processes increases runtime of the sequential fragments.

It was hypothesized that if to split the array into several parts, and each part to sort in a separate thread (or process) on a separate processor core in parallel, the total time for sorting would be less than in one thread.
Note that if the array is divided into parts "as is", then at the final stage we should to merge the sorted parts. One of the strategies (to avoid the final merging) is to do pre-sorting. A favorable point is that the quick sort algorithm makes (at each iteration) a serial scan of the array, splitting scanned part into two parts on the basis of "more or less than the pivot value". Each part is unordered, but any element of the "small items’ part" is smaller than the smallest element of the "big items’ part" (and vice versa - any element of the "big items’ part" is larger than the largest part of the "small items’ part"). Thus, each of the parts may be sorted separately from each other and the results should not be merged.
If we need to split the array into several parts, amount of which equals two in some degree, then this degree coincides with the number of serial scans of the entire array, which is necessary to perform the separation.
For example:
On the first iteration the algorithm will split the whole array into two unsorted parts. On the second iteration each part will be divided into two (scanning two parts has the same computational complexity as the scanning of the whole array). As a result – there are 4 unsorted parts. On the third iteration each part will be divided into two - this is the third scan. As a result – we have got 8 unsorted parts, etc.
Notice that the comparison is performed with sorting in a single thread, in which these pre-scans must occur anyway. Thus, by applying pre-splitting, the loss pace over the serial algorithm does not occur.

Realization


Setting up the environment.

To resolve the problem the datasets of fake user data (such as name, birth date, address, etc.) were generated and saved into JSON-formatted file.
From the very first lines of code, it became clear that the overhead of creating the separate work processes (workers) was so high that while the workers are sending the "ONLINE" signal, STA has already done the sorting.
It was decided to make the initialization of the workers beyond the competition, to paraphrase the problem as follows: to build a sorting server, which receives arrays (more than one), sort them and returns sorted arrays.
But this change helps a little: STA sorts array with 16k items for tens of milliseconds and the data is transmitted to workers and back much longer.
The next step was to increase the volume of the array to do the transmission time less relatively to the sorting time. But a new problem appeared: the string size restriction in the JSON-parser (built-in V8) is about 1 Gb. I had to use a third-party module "stream-json/utils/StreamArray".
The problem was solved, but further increasing the volume of the array fits no longer in the memory allocated for the NodeJS process.
Start parameter of the NodeJS process itself "--max-old-space-size=4096" helped to increase the amount of memory to 4 GB.
In addition, array has been narrowed up to two columns (ID, SortingValue), where ID - serial number of the item in the original array (after sorting "narrow" array can always be restored to full size by the original ID). The narrowing of the array gives another positive effect - workers receive smaller amount of transmitted data.
Since there wasn’t such problem, how to operate data sets that do not fit in the server's RAM, then it had been decided to stop the further expansion of the data volume and stop at 1.1 million records.

Shortening of sequential fragments

Suppose that our CPU has 4 cores, and we've created 4 workers. We can split the array to be sorted into 4 parts and transmit them to workers - one per worker. Each part of the array can be transmitted to worker by any pieces in any order, but each worker will start sorting only after receiving all the parts. Thus transmitting parts of the array to workers chunk by chunk it is a chain of successive operations i.e. sequential fragments.
And we are interested in sending data as soon as possible and have to find the fastest variant of the transmission.
Notes: Inside the single NodeJS process, the libuv library provides a thread pool, in which the default number of threads is 4 (UV_THREADPOOL_SIZE == 4) (see e.g. http://docs.libuv.org/en/latest/threadpool.html). In our case, we have four cores, and it would be tempting to use threads instead of processes, since the data transmission time would be reduced to almost zero (time to set a reference variable).
There has been verified the hypothesis that the “async” module, which, as declared, can run tasks asynchronously in parallel, may use an internal thread pool and run each task in a separate thread (if there are not more than 4 tasks), utilizing available cores. But, unfortunately, this assumption was not confirmed. The sorting in the single thread did not yield the sorting, using “async” module, more than 10%, and sometimes was faster (for some volumes of arrays, although the regularity was not identified).
In addition a thread pool is global for a single process and so it shares the whole process memory (see above) between all threads. And increasing the number of threads we reduce the memory, which each thread can operate with.
It has been considered how to use the following data transport (to the workers and back):
  • IPC (using built-in IPC channel)
  • Sockets
  • FS (saving to the file and sending to the worker process only the file path).
  • Stdio (using standard IO streams - stdin/stdout).

Because the most of the above items are the data streams (except IPC), then we can form the data to transmit like streams (custom streams). This will enable to delegate the management of data transmission (back pressure, optimal package size and other issues) to a built-in mechanism just by using the pipe() method. Moreover, it is enough to write two streams ("from-part-of-array-to-stream" and "from-stream-to-array") and we can use them with any transport stream.
In addition, here were used two formats for a data transmission (further they will be named protocols), and two streams were made for each of them - Readable (from part of array to stream) and Writable (from stream to array):

  1. Buffer protocol. Each record has a fixed length. In our case, has been chosen a length of 32 bytes, since the length of the field for sorting does not exceed 24 bytes. The 1st byte is the length of the field; 2-5 bytes are an integer (Int32) value and keep a serial number of the item in the source array (see the figure below). Next is the value for sorting, the length of which is stored in the 1st byte. Other characters are ignored. The idea is that the buffer size becomes larger, but all records have a fixed length. This protocol gives us a random access to any block of items just by a number of item (the operation seek(), instead of the operation scan()).
    Since the file system operates blocks (clusters), the dimensions of which are multiples of 1024 bytes (1K, 2K, and so on), then dividing the size of the block on the size of one item (in our case 32 bytes), we get a whole number of items and each item will be read from a certain position of the block.
     Figure 1. Memory dump of the “Buffer protocol”

  2. String protocol. Each item has the form arr[i].ID + ‘|’ + arr[i].name + ‘\n’ (see the figure below) and the end of the array is represented by EOT (ascii 0x04). This marker (End of Transmission) will help us to emit the 'finish' event within our stream, where we read the data from it to the array, as some streams (e.g. child.stdout) do not emit this event. This protocol is more compact than the previous one, but the random access can be forgotten. For reading a certain item, all the previous items from the beginning of the block should be read.
    Furthermore, in the case of recording the array items into a stream it can be ensured that each block (or chunk) will comprise an integer number of items (any item will not be divided between two chunks). But during the passing data through a socket, chunks can be cut arbitrarily, and it cannot be counted that the transmitted chunk contains an integer number of items. And a partial item can be on both sides: at the beginning and at the end.
    Thus we have to combine a residue of the previous chunk with the new one in the mystream.prototype._read() function in order not to lose any item. And this is an additional overhead.
    Figure 2. Dump of the “String protocol”

So, we have 4 streams.

Table 1. R/W streams, which correspond to Buffer format and String format.
TitleTypeDirection
ArrayToBufferStreamReadableRead from Array to Buffered Stream
ArrayFromBufferStreamWritableWrite Buffered Stream to Array
ArrayToStringStreamReadableRead from Array to String Stream
ArrayFromStringStreamWritableWrite String Stream to Array

The comparison result of the "narrow" array transmission's different modes is presented in the Figure 3. The transmitted array contains 1.1 million items, where each item contains the ID (Int32) and Value (not more than 24 characters).

Figure 3. The comparison the various type of data transport using different protocols (Buffer and String)
Herewith, if the buffer stream expectedly won the string stream by the transmission via the file, then the result of the transmission via a socket had been unexpected. Not only the socket passed an array longer than a file, but the string stream was ahead of a buffer stream. In addition, IPC channel, in which the array is passed as an object, was faster than the transmission via a socket (behind the scenes, perhaps there is a stringification).
Well, a favorite of the race turned into a method of using standard IO streams of workers.
It is the method which has been used for the final experiment - competition for an array sorting between the STA (single thread application) and the four workers (4-core processor).
But for the purity of the experiment two variants of data transmission were left (String Stream via IO and Buffer Stream via File). And as it turned out it wasn’t pointless.
Here should be noticed the effect of another factor - the size of the chunks, which are forming while converting an array to a stream. Saying strictly, two parameters - the size of the chunk, and the type of transport affect the speed of data transmission simultaneously. This means that optimum chunk size for the same type of stream will vary depending on the transport type. For each mode of transmission, chunk size is selected separately, and the results of the competition with the best settings for each experiment can be found on the picture above.
Furthermore, should be noted, that data transmission via sockets within a process, or even within a single server isn’t the same as the data transmission over the network or web, when data passes through a router. In the last case optimal packages' sizes will be different. Moreover, using the protocol Buffer Stream, we can be sure that the packages which are given, will not be fragmented (in extreme case they will be combined), so my algorithm hasn’t got the processing of the "cut" items (in the protocol String Stream it exists). Rather, using the web-socket, Buffer Stream will not work (or rather will be, if to correct it a little).
There is another thing: to transmit the whole array via the IPC-channel is not the fastest way. It is better to transmit it by blocks of items. In my case it turned out to be the optimal transmission - 8K items.
Optimal sizes of the chunks are given in the following table:

Table 2.
Protocol and TransportChunk LengthIOM
String Stream via File40*1024Bytes
Buffer Stream via File40*1024Bytes
String Stream via Socket32*1024Bytes
Buffer Stream via Socket32*1024Bytes
String Stream via IO16*1024Bytes
Buffer Stream via IO32*1024Bytes
Array object via IPC8*1024Items

Experiments.


So, that is the moment of the final "sorting" competition between the STA and parallel processes.

The first experiment (IO async).

Transport: Standard process IO. Protocol: String Stream. Task Starting: Asynchronous

Algorithm:


  • Main process splits the array of 1.1 million elements into 4 pieces and using "async.forEach()" transforms each of the pieces to the data stream, and pipes the appropriate channel "child.stdin" of each worker to it.
  • On the workers' side our back stream is piped to the standard input (process.stdin), which transforms the stream back to the array.
  • Each worker sorts a piece of the array ...
  • ... and transfers array back to Main process using the standard output stream (process.stdout)
  • The concatenation of the array pieces back into a single array.
Concatenation will occur all of the sorted array pieces after receiving.
(Check on the "sorted" is kept, but has been left outside the competition)
The Figure 4 shows the timeline of the experiment (the top series corresponds to the STA sorting in the according scale).

Figure 4. Transport: Standard process IO. Protocol: String Stream. Task Starting: Asynchronous
On the Figure there is standing out a long time period (3.28 - 4.57 (s)) in which the four pieces of array are transformed into streams asynchronously and are transmitted to their workers via stdin-streams. Since the main process is single-threaded (STA), it "blurs" the data transmission process between all workers and as the result all four workers will begin each its work almost simultaneously (almost - because of unequal length of the pieces). Sorted array pieces, which are transmitted back to Main process, miss each other without interfering. Only two of the longest pieces slightly hinder each other at the finish (worker 0 and 1). The last stage is the concatenation of the returning pieces (40 ms).
So the sorting process (from the beginning of the array’s splitting into pieces (2.843 s) until the end of the concatenation (6.332 s)) took about 3.5 s. Despite of the fact that the array’s sorting in a single-threaded process (STA) takes approximately 3.7 s.
Of course, parallel sorting is a bit faster. But the profit of 5.5% is not worth the effort that has been expended.
Because of the single-threaded main process our workers were idling awaiting for their unit of work and the transmission time nullifies useful work.
We will make the new experiment, correcting a bit the previous algorithm, and will start sending of the next task to worker only after previous worker completely has got the task.
Let's see what would happen.

The second experiment (IO series).

Transport: Standard process IO. Protocol: String Stream. Task Starting: Sequential
Algorithm (distinct from the previous one):

  • Main process splits the array of 1.1 million elements into 4 pieces and sequentially transforms each of the pieces to the data stream, and pipes to it the appropriate channel "child.stdin" of each worker.
Let's consider the experiment timeline below:

Figure 5. Transport: Standard process IO. Protocol: String Stream. Task Starting: Sequential

That's funny, that the total time of sorting virtually unchanged. The main process, sending the task to the last worker simultaneously begins to take the result from the first worker, thereby shifting the sorting in the latter process (worker 0 on the figure). And, as a consequence, it impacts on the total time sorting.
This is similar to the distribution of food in the field kitchen - each person will eat their portion quickly, but lunch is over, when a single chef will distribute food to the latter. Plus eating time of the latter. At the same time it does not matter in which order workers get their portions. Either one by one in sequence or a single chef will go around the table, put a piece to each dish. Then go back and soon, until all the dishes are filled simultaneously. Total eating time will be the same.
And quite unexpectedly the much better result has been got (just for the statistics) with using another transport (via the file). 

The third experiment (via File async).

Transport: via File. Protocol: Buffer Stream. Task Starting: Asynchronous

Figure 6. Transport: via File. Protocol: Buffer Stream. Task Starting: Asynchronous

Figure 6 shows that the transmission of data from the main process to workers has declined sharply.
Why did it happen? After all when comparing the transports (see. Fig. 3) Buffer Stream via File System showed middle results.
The reason was interesting. While transmitting data via IO, CPU time of the main process is spent from the beginning to the end of the transmission (as long as the last byte will go to worker). But while using the file system, an operation of the data transmission consists of two sub-operations - saving a file and reading from a file.
And although while saving the data in the file, CPU time of the main process is utilizing (as in the previous experiments), but the reading from file is the duty of each worker and each worker utilizes its own CPU time!
And since the two sub-operations last the same time, we get nearly double reduction of the main process time of the data transmission to workers.
The sorting process in this case (since the start of the array splitting (2.820 s) to complete concatenation (5.203 s)) took about 2.4 s. It takes 68% of the single-threaded sorting process. So the parallelization is using the File System as a transport accelerated sorting process by 1/3, compared with the STA.

Conclusions

The idea to parallelize various fragments of an algorithm is productive, but has to be considered the cost of data transmission.
When we have a small data, the splitting of algorithm does not make sense.
But when there is a big data, the main process gets rid of part of the work by its distribution between the workers, but instead of that spends time on data transmission.
If the data can be prepared in the shared memory with fast access and transmit not the data itself, but link to it (such as between threads of the process), the effect of parallelization would become more noticeable.
We could try the implementing module in C++, which would launch a thread pool in itself, and follow the steps in the workflow, with our data, which we will pass in this module is the link ... But it will not be called programming NodeJS.