Preamble
In the NodeJS technology there is an interesting possibility of writing server applications using a scripting language. For a long time Microsoft has a WSH component (Windows Scripting Host), which can interpret and run plain-text JScript (.JS and .JSE files) and VBScript (.VBS and .VBE files), as well as other additional installed languages (e.g. Perl). But it was not widespread, because it could not implement one of the desired server functionality - the Web server. Web applications have to use Apache, IIS, nginx, etc. because the server-side scripts that can generate dynamic webpages still are separated from the web server.
NodeJS is one of the technologies that has given us this opportunity.
But server applications - it's not just Web servers and Web sites, but any applications that perform service functions at the client’s requests, giving him access to certain resources or services.
It may also include web API, a database of different types, network applications, covering such class of problems as load balancing, ETL processes, distributed computing, distributed file storage, etc.
And for better understanding of the opportunities of NodeJS, limits of applicability of approaches to parallel computing, the physical limitations of the development environment, for feeling sockets, streams, inter-process communication, and for theoretical knowledge consolidation the task has been chosen: to parallelize one of the fastest sorting algorithms - the Quick Sort.
Thus, for the purpose of better understanding of the possibilities of technology, was formulated the following problem:
- to achieve that the parallel (and distributed) sorting the array is faster than sorting in a single process (NodeJS is declared as STA (single thread application)),
or, at least,
- to find out arrays' size in which a sorting in a single process is slower than in a few separate processes (or threads).
The solution of the problem is complicated because of two factors’ presence:
- Empirical Amdahl's law restricts the algorithm acceleration because of the presence of consecutive fragments;
- The need to transmit data between processes increases runtime of the sequential fragments.
It was hypothesized that if to split the array into several parts, and each part to sort in a separate thread (or process) on a separate processor core in parallel, the total time for sorting would be less than in one thread.
Note that if the array is divided into parts "as is", then at the final stage we should to merge the sorted parts. One of the strategies (to avoid the final merging) is to do pre-sorting. A favorable point is that the quick sort algorithm makes (at each iteration) a serial scan of the array, splitting scanned part into two parts on the basis of "more or less than the pivot value". Each part is unordered, but any element of the "small items’ part" is smaller than the smallest element of the "big items’ part" (and vice versa - any element of the "big items’ part" is larger than the largest part of the "small items’ part"). Thus, each of the parts may be sorted separately from each other and the results should not be merged.
If we need to split the array into several parts, amount of which equals two in some degree, then this degree coincides with the number of serial scans of the entire array, which is necessary to perform the separation.
For example:
On the first iteration the algorithm will split the whole array into two unsorted parts. On the second iteration each part will be divided into two (scanning two parts has the same computational complexity as the scanning of the whole array). As a result – there are 4 unsorted parts. On the third iteration each part will be divided into two - this is the third scan. As a result – we have got 8 unsorted parts, etc.
Notice that the comparison is performed with sorting in a single thread, in which these pre-scans must occur anyway. Thus, by applying pre-splitting, the loss pace over the serial algorithm does not occur.
Realization
Setting up the environment.
To resolve the problem the datasets of fake user data (such as name, birth date, address, etc.) were generated and saved into JSON-formatted file.
From the very first lines of code, it became clear that the overhead of creating the separate work processes (workers) was so high that while the workers are sending the "ONLINE" signal, STA has already done the sorting.
It was decided to make the initialization of the workers beyond the competition, to paraphrase the problem as follows: to build a sorting server, which receives arrays (more than one), sort them and returns sorted arrays.
But this change helps a little: STA sorts array with 16k items for tens of milliseconds and the data is transmitted to workers and back much longer.
The next step was to increase the volume of the array to do the transmission time less relatively to the sorting time. But a new problem appeared: the string size restriction in the JSON-parser (built-in V8) is about 1 Gb. I had to use a third-party module
"stream-json/utils/StreamArray".
The problem was solved, but further increasing the volume of the array fits no longer in the memory allocated for the NodeJS process.
Start parameter of the NodeJS process itself "--max-old-space-size=4096" helped to increase the amount of memory to 4 GB.
In addition, array has been narrowed up to two columns (ID, SortingValue), where ID - serial number of the item in the original array (after sorting "narrow" array can always be restored to full size by the original ID). The narrowing of the array gives another positive effect - workers receive smaller amount of transmitted data.
Since there wasn’t such problem, how to operate data sets that do not fit in the server's RAM, then it had been decided to stop the further expansion of the data volume and stop at 1.1 million records.
Shortening of sequential fragments
Suppose that our CPU has 4 cores, and we've created 4 workers. We can split the array to be sorted into 4 parts and transmit them to workers - one per worker. Each part of the array can be transmitted to worker by any pieces in any order, but each worker will start sorting only after receiving all the parts. Thus transmitting parts of the array to workers chunk by chunk it is a chain of successive operations i.e. sequential fragments.
And we are interested in sending data as soon as possible and have to find the fastest variant of the transmission.
Notes: Inside the single NodeJS process, the libuv library provides a thread pool, in which the default number of threads is 4 (UV_THREADPOOL_SIZE == 4) (see e.g. http://docs.libuv.org/en/latest/threadpool.html). In our case, we have four cores, and it would be tempting to use threads instead of processes, since the data transmission time would be reduced to almost zero (time to set a reference variable).
There has been verified the hypothesis that the “async” module, which, as declared, can run tasks asynchronously in parallel, may use an internal thread pool and run each task in a separate thread (if there are not more than 4 tasks), utilizing available cores. But, unfortunately, this assumption was not confirmed. The sorting in the single thread did not yield the sorting, using “async” module, more than 10%, and sometimes was faster (for some volumes of arrays, although the regularity was not identified).
In addition a thread pool is global for a single process and so it shares the whole process memory (see above) between all threads. And increasing the number of threads we reduce the memory, which each thread can operate with.
It has been considered how to use the following data transport (to the workers and back):
- IPC (using built-in IPC channel)
- Sockets
- FS (saving to the file and sending to the worker process only the file path).
- Stdio (using standard IO streams - stdin/stdout).
Because the most of the above items are the data streams (except IPC), then we can form the data to transmit like streams (custom streams). This will enable to delegate the management of data transmission (back pressure, optimal package size and other issues) to a built-in mechanism just by using the pipe() method. Moreover, it is enough to write two streams ("from-part-of-array-to-stream" and "from-stream-to-array") and we can use them with any transport stream.
In addition, here were used two formats for a data transmission (further they will be named protocols), and two streams were made for each of them - Readable (from part of array to stream) and Writable (from stream to array):
Buffer protocol. Each record has a fixed length. In our case, has been chosen a length of 32 bytes, since the length of the field for sorting does not exceed 24 bytes. The 1st byte is the length of the field; 2-5 bytes are an integer (Int32) value and keep a serial number of the item in the source array (see the figure below). Next is the value for sorting, the length of which is stored in the 1st byte. Other characters are ignored. The idea is that the buffer size becomes larger, but all records have a fixed length. This protocol gives us a random access to any block of items just by a number of item (the operation seek(), instead of the operation scan()).
Since the file system operates blocks (clusters), the dimensions of which are multiples of 1024 bytes (1K, 2K, and so on), then dividing the size of the block on the size of one item (in our case 32 bytes), we get a whole number of items and each item will be read from a certain position of the block.
Figure 1. Memory dump of the “Buffer protocol”
String protocol. Each item has the form arr[i].ID + ‘|’ + arr[i].name + ‘\n’ (see the figure below) and the end of the array is represented by EOT (ascii 0x04). This marker (End of Transmission) will help us to emit the 'finish' event within our stream, where we read the data from it to the array, as some streams (e.g. child.stdout) do not emit this event. This protocol is more compact than the previous one, but the random access can be forgotten. For reading a certain item, all the previous items from the beginning of the block should be read.
Furthermore, in the case of recording the array items into a stream it can be ensured that each block (or chunk) will comprise an integer number of items (any item will not be divided between two chunks). But during the passing data through a socket, chunks can be cut arbitrarily, and it cannot be counted that the transmitted chunk contains an integer number of items. And a partial item can be on both sides: at the beginning and at the end.
Thus we have to combine a residue of the previous chunk with the new one in the
mystream.prototype._read() function in order not to lose any item. And this is an additional overhead.
Figure 2. Dump of the “String protocol”
So, we have 4 streams.
Table 1. R/W streams, which correspond to Buffer format and String format.
Title | Type | Direction |
ArrayToBufferStream | Readable | Read from Array to Buffered Stream |
ArrayFromBufferStream | Writable | Write Buffered Stream to Array |
ArrayToStringStream | Readable | Read from Array to String Stream |
ArrayFromStringStream | Writable | Write String Stream to Array |
The comparison result of the "narrow" array transmission's different modes is presented in the Figure 3. The transmitted array contains 1.1 million items, where each item contains the ID (Int32) and Value (not more than 24 characters).
Figure 3. The comparison the various type of data transport using different protocols (Buffer and String)
Herewith, if the buffer stream expectedly won the string stream by the transmission via the file, then the result of the transmission via a socket had been unexpected. Not only the socket passed an array longer than a file, but the string stream was ahead of a buffer stream. In addition, IPC channel, in which the array is passed as an object, was faster than the transmission via a socket (behind the scenes, perhaps there is a stringification).
Well, a favorite of the race turned into a method of using standard IO streams of workers.
It is the method which has been used for the final experiment - competition for an array sorting between the STA (single thread application) and the four workers (4-core processor).
But for the purity of the experiment two variants of data transmission were left (String Stream via IO and Buffer Stream via File). And as it turned out it wasn’t pointless.
Here should be noticed the effect of another factor - the size of the chunks, which are forming while converting an array to a stream. Saying strictly, two parameters - the size of the chunk, and the type of transport affect the speed of data transmission simultaneously. This means that optimum chunk size for the same type of stream will vary depending on the transport type. For each mode of transmission, chunk size is selected separately, and the results of the competition with the best settings for each experiment can be found on the picture above.
Furthermore, should be noted, that data transmission via sockets within a process, or even within a single server isn’t the same as the data transmission over the network or web, when data passes through a router. In the last case optimal packages' sizes will be different. Moreover, using the protocol Buffer Stream, we can be sure that the packages which are given, will not be fragmented (in extreme case they will be combined), so my algorithm hasn’t got the processing of the "cut" items (in the protocol String Stream it exists). Rather, using the web-socket, Buffer Stream will not work (or rather will be, if to correct it a little).
There is another thing: to transmit the whole array via the IPC-channel is not the fastest way. It is better to transmit it by blocks of items. In my case it turned out to be the optimal transmission - 8K items.
Optimal sizes of the chunks are given in the following table:
Table 2.
Protocol and Transport | Chunk Length | IOM |
String Stream via File | 40*1024 | Bytes |
Buffer Stream via File | 40*1024 | Bytes |
String Stream via Socket | 32*1024 | Bytes |
Buffer Stream via Socket | 32*1024 | Bytes |
String Stream via IO | 16*1024 | Bytes |
Buffer Stream via IO | 32*1024 | Bytes |
Array object via IPC | 8*1024 | Items |
Experiments.
So, that is the moment of the final "sorting" competition between the STA and parallel processes.
The first experiment (IO async).
Transport: Standard process IO. Protocol: String Stream. Task Starting: Asynchronous
Algorithm:
- Main process splits the array of 1.1 million elements into 4 pieces and using "async.forEach()" transforms each of the pieces to the data stream, and pipes the appropriate channel "child.stdin" of each worker to it.
- On the workers' side our back stream is piped to the standard input (process.stdin), which transforms the stream back to the array.
- Each worker sorts a piece of the array ...
- ... and transfers array back to Main process using the standard output stream (process.stdout)
- The concatenation of the array pieces back into a single array.
Concatenation will occur all of the sorted array pieces after receiving.
(Check on the "sorted" is kept, but has been left outside the competition)
The Figure 4 shows the timeline of the experiment (the top series corresponds to the STA sorting in the according scale).
Figure 4. Transport: Standard process IO. Protocol: String Stream. Task Starting: Asynchronous
On the Figure there is standing out a long time period (3.28 - 4.57 (s)) in which the four pieces of array are transformed into streams asynchronously and are transmitted to their workers via stdin-streams. Since the main process is single-threaded (STA), it "blurs" the data transmission process between all workers and as the result all four workers will begin each its work almost simultaneously (almost - because of unequal length of the pieces). Sorted array pieces, which are transmitted back to Main process, miss each other without interfering. Only two of the longest pieces slightly hinder each other at the finish (worker 0 and 1). The last stage is the concatenation of the returning pieces (40 ms).
So the sorting process (from the beginning of the array’s splitting into pieces (2.843 s) until the end of the concatenation (6.332 s)) took about 3.5 s. Despite of the fact that the array’s sorting in a single-threaded process (STA) takes approximately 3.7 s.
Of course, parallel sorting is a bit faster. But the profit of 5.5% is not worth the effort that has been expended.
Because of the single-threaded main process our workers were idling awaiting for their unit of work and the transmission time nullifies useful work.
We will make the new experiment, correcting a bit the previous algorithm, and will start sending of the next task to worker only after previous worker completely has got the task.
Let's see what would happen.
The second experiment (IO series).
Transport: Standard process IO. Protocol: String Stream. Task Starting: Sequential
Algorithm (distinct from the previous one):
- Main process splits the array of 1.1 million elements into 4 pieces and sequentially transforms each of the pieces to the data stream, and pipes to it the appropriate channel "child.stdin" of each worker.
Let's consider the experiment timeline below:
Figure 5. Transport: Standard process IO. Protocol: String Stream. Task Starting: Sequential
That's funny, that the total time of sorting virtually unchanged. The main process, sending the task to the last worker simultaneously begins to take the result from the first worker, thereby shifting the sorting in the latter process (worker 0 on the figure). And, as a consequence, it impacts on the total time sorting.
This is similar to the distribution of food in the field kitchen - each person will eat their portion quickly, but lunch is over, when a single chef will distribute food to the latter. Plus eating time of the latter. At the same time it does not matter in which order workers get their portions. Either one by one in sequence or a single chef will go around the table, put a piece to each dish. Then go back and soon, until all the dishes are filled simultaneously. Total eating time will be the same.
And quite unexpectedly the much better result has been got (just for the statistics) with using another transport (via the file).
The third experiment (via File async).
Transport: via File. Protocol: Buffer Stream. Task Starting: Asynchronous
Figure 6. Transport: via File. Protocol: Buffer Stream. Task Starting: Asynchronous
Figure 6 shows that the transmission of data from the main process to workers has declined sharply.
Why did it happen? After all when comparing the transports (see. Fig. 3) Buffer Stream via File System showed middle results.
The reason was interesting. While transmitting data via IO, CPU time of the main process is spent from the beginning to the end of the transmission (as long as the last byte will go to worker). But while using the file system, an operation of the data transmission consists of two sub-operations - saving a file and reading from a file.
And although while saving the data in the file, CPU time of the main process is utilizing (as in the previous experiments), but the reading from file is the duty of each worker and each worker utilizes its own CPU time!
And since the two sub-operations last the same time, we get nearly double reduction of the main process time of the data transmission to workers.
The sorting process in this case (since the start of the array splitting (2.820 s) to complete concatenation (5.203 s)) took about 2.4 s. It takes 68% of the single-threaded sorting process. So the parallelization is using the File System as a transport accelerated sorting process by 1/3, compared with the STA.
Conclusions
The idea to parallelize various fragments of an algorithm is productive, but has to be considered the cost of data transmission.
When we have a small data, the splitting of algorithm does not make sense.
But when there is a big data, the main process gets rid of part of the work by its distribution between the workers, but instead of that spends time on data transmission.
If the data can be prepared in the shared memory with fast access and transmit not the data itself, but link to it (such as between threads of the process), the effect of parallelization would become more noticeable.
We could try the implementing module in C++, which would launch a thread pool in itself, and follow the steps in the workflow, with our data, which we will pass in this module is the link ... But it will not be called programming NodeJS.