Skip to main content

Streaming conversion of CSV/TSV to JSON

Recently while exploring the IMDb data set, I need to convert Tab-Separated Values (TSV) files to JSON files. Based on my experience doing that, this tutorial will show you how to do streaming transformations of CSV/TSV files into JSON by using the csv library and how to test file reading/writing with the mock-fs library. Although this post talks about tsv files, it applies equally well to csv files.

There are many existing solutions that can convert tsv or csv files to JSON. However, most of them requires holding the entire input tsv and output JSON files in memory. Although that works well for small files, the IMDb data set contains a few large files, which will take up a lot of memory if done that way. For this tutorial, we’ll use two files for demo:

  • title.basics.tsv at 422MB, which contains the title, run time, year and genres of each movie. The corresponding JSON output is 1GB. I’ll call this the “simple” input because its conversion is more straightforward.
  • title.principals.tsv at 1.25GB, which links each movie with the name, job and characters of that movie’s cast members. Because the JSON output is 2.5 GB, the memory consumption for this conversion is at least 3.75GB, which is half the available memory of most laptops these days. I’ll call this the “advanced” input because its conversion takes a bit more work.

The best solution is a streaming conversion: data is streamed from the input file through a transformer (that performs the conversion while trying to retain as little data in memory as possible) into the output file. The memory footprint will be much smaller with the streaming approach because only small chunks of data is read and held in memory for processing at any one time.

I picked the csv library for this job because it looks well-maintained and has good documentation. The library consists of a few parts, of which we’ll only use two in this tutorial:

The result is in this repo. If you want to look at the actual giant input files, run npm run download:simple and npm run download:advanced to download the simple and advanced input files, respectively, into the realInput directory. To see the conversion in action, run npm run convert:simple and npm run convert:advanced, respectively. Be warned that the conversions can take a while.

Simple example

Let’s take a look at the “simple” example of converting title.basics.tsv. The first few rows look this:

1tconst titleType primaryTitle originalTitle isAdult startYear endYear runtimeMinutes genres
2tt0000001 short Carmencita Carmencita 1 1894 \N 1 Documentary,Short
3tt0000002 short Le clown et ses chiens Le clown et ses chiens 0 1892 \N 5 Animation,Short

where tconst is the movie ID and \N indicates no data. We want to convert it into something like this:

1[{
2 "tconst": "tt0000001",
3 "titleType": "short",
4 "primaryTitle": "Carmencita",
5 "originalTitle": "Carmencita",
6 "isAdult": true,
7 "startYear": 1894,
8 "endYear": null,
9 "runtimeMinutes": 1,
10 "genres": [ "Documentary", "Short" ]
11}, {
12 "tconst": "tt0000002",
13 "titleType": "short",
14 "primaryTitle": "Le clown et ses chiens",
15 "originalTitle": "Le clown et ses chiens",
16 "isAdult": false,
17 "startYear": 1892,
18 "endYear": null,
19 "runtimeMinutes": 5,
20 "genres": [ "Animation", "Short" ]
21}]

First we set up source, which streams data from the input file, and destination, which streams data to the output file:

1// https://github.com/huy-nguyen/streaming-conversion-tsv-to-json/blob/58729bd5/src/simpleConvert.ts
2
3// create empty output file. Otherwise, we wont' be able to create a writable
4// stream for output:
5await writeFile(outputPath, '', 'utf8');
6
7const source = fs.createReadStream(inputPath, 'utf8');
8const destination = fs.createWriteStream(outputPath, 'utf8');

Then we create the parser, which is a Node transform stream that will read the tsv file row-by-row, associate the data in each column with the column heading and emit a JavaScript object for downstream consumers:

1// https://github.com/huy-nguyen/streaming-conversion-tsv-to-json/blob/58729bd5/src/simpleConvert.ts
2const parser = parse({
3 // Because the input is tab-delimited:
4 delimiter: '\t',
5 // Because we want the library to automatically associate the column name
6 // with column value in each row for us:
7 columns: true,
8 // Because we don't want accidental quotes inside a column to be
9 // interpreted as "wrapper" for that column content:
10 quote: false,
11});

The most important part of this whole operation is the transformer, which, in this simple example, just calls JSON.stringify on every row emitted by the parser:

1// https://github.com/huy-nguyen/streaming-conversion-tsv-to-json/blob/58729bd5/src/simpleConvert.ts
2let outputIndex = 0;
3const transformer = transform((rawRow: RawRow): string => {
4 const currentRecordIndex = outputIndex;
5 outputIndex += 1;
6 if (outputIndex % 100000 === 0 && shouldLogProgress === true) {
7 console.info('processing row ', outputIndex);
8 }
9 const {isAdult, startYear, endYear, runtimeMinutes, genres, ...rest} = rawRow;
10 const parsedRow: ParsedRow = {
11 ...rest,
12 isAdult: !!(isAdult === '1'),
13 startYear: parseInt(startYear, 10),
14 endYear: (endYear === '\N') ? null : parseInt(endYear, 10),
15 runtimeMinutes: (runtimeMinutes === '\N') ? null : parseInt(runtimeMinutes, 10),
16 genres: genres.split(','),
17 };
18 const result = (currentRecordIndex === 0) ? `[${JSON.stringify(parsedRow)}` : `,${JSON.stringify(parsedRow)}`;
19 return result;
20});

Note that we do have to take care to close the JSON list after the last movie has been written to JSON:

1// https://github.com/huy-nguyen/streaming-conversion-tsv-to-json/blob/58729bd5/src/simpleConvert.ts
2destination.on('finish', async () => {
3 if (outputIndex === 0) {
4 // In this case, no row has been processed from TSV file so the
5 // output should be an empty list:
6 await appendFile(outputPath, '[]', 'utf8');
7 } else {
8 // In this case, at least one row has been processed so we just need
9 // to write the closing bracket:
10 await appendFile(outputPath, ']', 'utf8');
11 }
12 resolve();
13});

Having set up all these pipes, now we need to connect them together to create a continuous pipeline that our data can flow through like water. We do this by literally .pipe-ing the input of one stream into the next:

1// https://github.com/huy-nguyen/streaming-conversion-tsv-to-json/blob/58729bd5/src/simpleConvert.ts
2source.pipe(parser).pipe(transformer).pipe(destination);

Advanced example:

In this example, the cast of a single movie is recorded over multiple rows, one for each cast member. The first few rows look like this:

1tconst ordering nconst category job characters
2tt0000001 1 nm1588970 self \N ["Herself"]
3tt0000001 2 nm0005690 director \N \N
4tt0000001 3 nm0374658 cinematographer director of photography \N

We want to consolidate the information about each movie’s cast members into a single JSON object like this (where nconst is the person ID):

1[
2 {
3 "tconst": "tt0000001",
4 "principals": [
5 {
6 "nconst": "nm1588970",
7 "category": "self",
8 "job": null,
9 "characters": ["Herself"]
10 },
11 {
12 "nconst": "nm0005690",
13 "category": "director",
14 "job": null,
15 "characters": null
16 },
17 {
18 "nconst": "nm0374658",
19 "category": "cinematographer",
20 "job": "director of photography",
21 "characters": null
22 }
23 ]
24 }
25]

Unlike the simple example above, which uses a memory-less transformer (i.e. it doesn’t remember previous rows), this next transformer needs to do some record keeping because the rows are related. This transformer essentially needs to keep comparing the next row’s movie ID (tconst) with the previous row’s movie ID to detect when the movies change between rows. When that change happens, we create a new element in the output JSON list:

1// https://github.com/huy-nguyen/streaming-conversion-tsv-to-json/blob/58729bd5/src/advancedConvert.ts
2let prevRow: RawRow | undefined;
3let outputIndex = 0;
4let inputRowIndex = 0;
5let outputObject!: ParsedRow;
6
7const transformer = transform((nextRow: RawRow) => {
8inputRowIndex += 1;
9if (inputRowIndex % 100000 === 0 && shouldLogProgress === true) {
10 console.info('processing row ', inputRowIndex);
11}
12
13const {tconst, nconst, category, job, characters} = nextRow;
14
15let toBeReturned;
16if (
17 // If this is the first row ...
18 prevRow === undefined ||
19 // ... or if the movie has changed ...
20 nextRow.tconst !== prevRow.tconst) {
21
22 // ... return previous movie;
23 if (prevRow !== undefined) {
24 toBeReturned = (outputIndex === 1) ?
25 `[${JSON.stringify(outputObject)}` : `,${JSON.stringify(outputObject)}`;
26 }
27 // ... then create a new movie:
28 outputObject = {
29 tconst,
30 principals: [],
31 };
32 outputIndex += 1;
33}
34
35const {principals} = outputObject;
36let outputCharacters: string[] | null;
37if (characters === '\\N') {
38 // This means `characters` is not provided:
39 outputCharacters = null;
40} else if (characters.startsWith('[') && characters.endsWith(']')) {
41 // `characters` should be interpreted as an array of strings:
42 outputCharacters = JSON.parse(characters);
43} else {
44 // If `characters` is a string, put it in a list:
45 // (also need to remove quoted literal quotes surrounding the text):
46 outputCharacters = [
47 characters.replace(/^"/, '').replace(/"$/, ''),
48 ];
49}
50principals.push({
51 nconst,
52 category,
53 job: (job === '\\N') ? null : job,
54 characters: outputCharacters,
55});
56
57prevRow = nextRow;
58
59if (toBeReturned !== undefined) {
60 return toBeReturned;
61}

Because we only know that we’re done with the data for each movie when we see the next one, we wouldn’t know that we have seen the last movie until the input data stream has finished. This code takes care of writing the last movie to the output file:

1// https://github.com/huy-nguyen/streaming-conversion-tsv-to-json/blob/58729bd5/src/advancedConvert.ts
2destination.on('finish', async () => {
3 if (outputIndex === 0) {
4 // In this case, no row has been processed from TSV file so the
5 // output should be an empty list:
6 await appendFile(outputPath, '[]', 'utf8');
7 } else {
8 // The last row would not have been written out to the file so
9 // we need to do that here. However, we do need to open a new list (with ])
10 // or continue an existing list (with a comma) depending on whether the last row
11 // is alos the only row:
12 const lastItemToWrite = (outputIndex === 1) ?
13 `[${JSON.stringify(outputObject)}]` :
14 `,${JSON.stringify(outputObject)}]`;
15 await appendFile(outputPath, lastItemToWrite, 'utf8');
16 }
17 resolve();
18});

Test code

As usual, I use jest for testing. We use the mock-fs library to mock out the file system so that we don’t have to read from or write to real files during testing. Once mock-fs is invoked, the only files that you can read using fs.readFile are the ones that you register with mock-fs, like this:

1// https://github.com/huy-nguyen/streaming-conversion-tsv-to-json/blob/58729bd5/src/__tests__/convert.js
2test('With many rows of input data', async () => {
3// ...
4 mockFs = require('mock-fs');
5 mockFs({
6 [fakeInputDir]: {
7 [`${fakeInputFileName}.tsv`]: testInput,
8 },
9 [fakeOutputDir]: {
10 [`${fakeInputFileName}.json`]: '',
11 },
12 });
13});

In the above mocked file system, the directory fakeInputDir is inside the directory containing the test script (__tests__/convert.js). The fake directory contains the fake input file that will be consumed by the converter. Don’t forget to call mockFs.restore() after each test to restore the real file system because otherwise jest will fail.

I include tests for some corner cases, such as empty input and input that consists of one header row and one data row. If you checkout the repo at this point and run npm run test, all the tests should pass.

© 2021 Huy Nguyen© 2021