Nest.js Tutorial

Handling CPU-intensive tasks with queues

Marcin Wanago
JavaScriptNestJSTypeScript

Handling CPU-intensive operations with REST API can be tricky. If our endpoint takes too much time to respond, it might result in a timeout. In this article, we look into queues to help us resolve this issue.

Queue proves to be a very useful part of backend architecture. With it, we can implement asynchronous and distributed processing. A queue is a data structure that is modeled on a real-world queue. A publisher can post messages to the queue. A consumer can consume the message and process it. Once the consumer handles the message, no other consumer can process this message.

With NestJS, we have access to the @nestjs/bull package. It wraps the Bull library that provides queue functionalities based on Redis. Redis is a fast and reliable key-value store that keeps data in its memory. Even if we restart our Node.js application, we don’t lose the data saved in Redis.

Setting up Bull and Redis

Since Bull uses Redis to manage queues, we need to set it up. So far, within this series, we’ve used Docker Compose to help us with our architecture. Thankfully, it is straightforward to set up Redis with Docker.

docker-compose.yml
1version: "3"
2services:
3  redis:
4    image: "redis:alpine"
5    ports:
6      - "6379:6379"
7# ...
By default, Redis works on port 6379

Connecting to Redis requires us to define two additional environment variables: the port and the host.

app.module.ts
1import { Module } from '@nestjs/common';
2import { ConfigModule } from '@nestjs/config';
3import * as Joi from '@hapi/joi';
4 
5@Module({
6  imports: [
7    ConfigModule.forRoot({
8      validationSchema: Joi.object({
9        REDIS_HOST: Joi.string().required(),
10        REDIS_PORT: Joi.number().required(),
11        // ...
12      })
13    }),
14    // ...
15  ],
16  controllers: [],
17  providers: [],
18})
19export class AppModule {}
.env
1REDIS_HOST=localhost
2REDIS_PORT=6379
3# ...

We also need to install the necessary dependencies.

1npm install @nestjs/bull @types/bull bull

Once we’ve got all of the above configured, we can establish a connection with Redis.

app.module.ts
1import { Module } from '@nestjs/common';
2import { ConfigModule, ConfigService } from '@nestjs/config';
3import { BullModule } from '@nestjs/bull';
4 
5@Module({
6  imports: [
7    BullModule.forRootAsync({
8      imports: [ConfigModule],
9      useFactory: async (configService: ConfigService) => ({
10        redis: {
11          host: configService.get('REDIS_HOST'),
12          port: Number(configService.get('REDIS_PORT')),
13        },
14      }),
15      inject: [ConfigService],
16    }),
17    // ...
18  ],
19  controllers: [],
20  providers: [],
21})
22export class AppModule {}

Thanks to calling BullModule.forRootAsync, we can use Bull across all of our modules.

We can pass more options besides the redis object when configuring Bull. For a whole list check out the documentation.

Managing queues with Bull

Let’s create a queue that can help optimize multiple PNG images for us. We will start with defining a module.

optimize.module.ts
1import { Module } from '@nestjs/common';
2import { OptimizeController } from './optimize.controller';
3import { BullModule } from '@nestjs/bull';
4import { ImageProcessor } from './image.processor';
5 
6@Module({
7  imports: [
8    BullModule.registerQueue({
9      name: 'image',
10    })
11  ],
12  providers: [ImageProcessor],
13  exports: [],
14  controllers: [OptimizeController]
15})
16export class OptimizeModule {}

Above, we register our queue using BullModule.registerQueue. Thanks to doing so, we can use it in our OptimizeController.

optimize.controller.ts
1import {
2  Controller,
3  Post,
4  UploadedFiles,
5  UseInterceptors,
6} from '@nestjs/common';
7import { AnyFilesInterceptor } from '@nestjs/platform-express';
8import { Express } from 'express';
9import { InjectQueue } from '@nestjs/bull';
10import { Queue } from 'bull';
11 
12@Controller('optimize')
13export class OptimizeController {
14  constructor(
15    @InjectQueue('image') private readonly imageQueue: Queue,
16  ) {}
17 
18  @Post('image')
19  @UseInterceptors(AnyFilesInterceptor())
20  async processImage(@UploadedFiles() files: Express.Multer.File[]) {
21    const job = await this.imageQueue.add('optimize', {
22      files
23    });
24 
25    return {
26      jobId: job.id
27    }
28  }
29}

Above, we follow the NestJS documentation on how to upload multiple files with Multer.  To do that, we need the AnyFilesInterceptor and the @UploadedFiles() decorator.

Once we have the files, we need to add a job to our queue using the add() method. We pass two arguments to it: the name of the job that we later refer to and the data it needs.

In the above endpoint, we respond with the id of the job. This will allow the user to ask for the return value of the job later.

Consuming the queue

Now we need to define a consumer. With it, we can process jobs added to the queue.

To optimize images, we use the imagemin library. Since we expect the user to upload multiple images, we compress the result to a .zip file using the adm-zip package.

1npm install imagemin @types/imagemin imagemin-pngquant adm-zip @types/adm-zip
optimize.processor.ts
1import { Process, Processor } from '@nestjs/bull';
2import { Job } from 'bull';
3import * as AdmZip from 'adm-zip';
4import { buffer } from 'imagemin';
5import imageminPngquant from 'imagemin-pngquant';
6import { Express } from 'express';
7 
8@Processor('image')
9export class ImageProcessor {
10  @Process('optimize')
11  async handleOptimization(job: Job) {
12    const files: Express.Multer.File[] = job.data.files;
13 
14    const optimizationPromises: Promise<Buffer>[] = files.map(file => {
15      const fileBuffer = Buffer.from(file.buffer);
16      return buffer(fileBuffer, {
17        plugins: [
18          imageminPngquant({
19            quality: [0.6, 0.8]
20          })
21        ]
22      })
23    })
24 
25    const optimizedImages = await Promise.all(optimizationPromises);
26 
27    const zip = new AdmZip();
28 
29    optimizedImages.forEach((image, index) => {
30      const fileData = files[index];
31      zip.addFile(fileData.originalname, image);
32    })
33 
34    return zip.toBuffer();
35  }
36}
To make it more verbose, we could update the progress of the job by calling the job.progress(number) method when we finish up optimizing some of the images.

Above, we manipulate buffers. A Node.js buffer represents a fixed-length sequence of bytes. If you want to know more about buffers, check out Node.js TypeScript #3. Explaining the Buffer.

We call the Buffer.from(file.buffer) function, because the file.buffer stopped being an instance of the Buffer class when serialized and put to the Redus store.

Returning the result of the job

The crucial part of our handleOptimization method is the fact that it returns a buffer. Thanks to that, Bull saves our optimized images to Redis, and we can refer to it later.

To do that, let’s create a new endpoint that takes the job’s id as a parameter.

optimize.controller.ts
1import {
2  Controller,
3  Get,
4  Param,
5  Res,
6} from '@nestjs/common';
7import { Response } from 'express';
8import { InjectQueue } from '@nestjs/bull';
9import { Queue } from 'bull';
10import { Readable } from 'stream';
11 
12@Controller('optimize')
13export class OptimizeController {
14  constructor(
15    @InjectQueue('image') private readonly imageQueue: Queue,
16  ) {}
17 
18  // ...
19 
20  @Get('image/:id')
21  async getJobResult(@Res() response: Response, @Param('id') id: string) {
22    const job = await this.imageQueue.getJob(id);
23 
24    if (!job) {
25      return response.sendStatus(404);
26    }
27 
28    const isCompleted = await job.isCompleted();
29 
30    if (!isCompleted) {
31      return response.sendStatus(202);
32    }
33 
34    const result = Buffer.from(job.returnvalue);
35 
36    const stream = Readable.from(result);
37 
38    stream.pipe(response);
39  }
40}
If we would update the progress of the job in the consumer, we might respond with it if the job is not yet complete.

Above, we use the imageQueue.getJob() method to get the job with a given id. Since we’ve used the @Res() decorator, we put NestJS into the library-specific mode for the getJobResult handler. Because of that, we are responsible for managing the response manually, for example, with the response.sendStatus method.

If the job with the specified id exists but hasn’t yet been completed, we respond with the 202 Accepted status. It indicates that we’ve accepted the request and are processing it, but we haven’t yet completed it.

If the job is completed, we create a readable stream from the buffer and send it to the user.

If you want to know more about streams, check out Node.js TypeScript #4. Paused and flowing modes of a readable stream

If you want to use Postman to download the result, use the “Send and download” button.

Running jobs in separate processes

Our job processors can run in separate processes for better performance.

1import { Module } from '@nestjs/common';
2import { OptimizeController } from './optimize.controller';
3import { BullModule } from '@nestjs/bull';
4import { join } from 'path';
5 
6@Module({
7  imports: [
8    BullModule.registerQueue({
9      name: 'image',
10      processors: [{
11        name: 'optimize',
12        path: join(__dirname, 'image.processor.js')
13      }],
14    })
15  ],
16  providers: [],
17  exports: [],
18  controllers: [OptimizeController]
19})
20export class OptimizeModule {}

Since we execute our image processor in a forked process, the dependency injection isn’t available. If we would need some dependencies, we would need to initialize them.

1import * as AdmZip from 'adm-zip';
2import { buffer } from 'imagemin';
3import imageminPngquant from 'imagemin-pngquant';
4import { Express } from 'express';
5import { Job, DoneCallback } from 'bull';
6 
7async function imageProcessor(job: Job, doneCallback: DoneCallback) {
8  const files: Express.Multer.File[] = job.data.files;
9 
10  const optimizationPromises: Promise<Buffer>[] = files.map(file => {
11    const fileBuffer = Buffer.from(file.buffer);
12    return buffer(fileBuffer, {
13      plugins: [
14        imageminPngquant({
15          quality: [0.6, 0.8]
16        })
17      ]
18    })
19  })
20 
21  const optimizedImages = await Promise.all(optimizationPromises);
22 
23  const zip = new AdmZip();
24 
25  optimizedImages.forEach((image, index) => {
26    const fileData = files[index];
27    zip.addFile(fileData.originalname, image);
28  })
29 
30  doneCallback(null, zip.toBuffer());
31}
32 
33export default imageProcessor;
If you want to know more about child processes, read Node.js TypeScript #10. Is Node.js single-threaded? Creating child processes

Summary

In this article, we’ve learned the basics of managing queues with NestJS and Bull. To do that, we’ve implemented an example in which we optimize multiple images at once. Thanks to doing that through the queue, we can better manage our resources. We can also avoid timeouts on CPU-intensive tasks and run them in separate processes.