One of the most important things to care about as a web developer is the integrity of the data. In this article, we learn what a transaction is and how it can help us ensure that our data is correct.
The idea behind transactions
A transaction is a set of instructions that either happens entirely or doesn’t happen at all. To understand why we might need transactions, let’s use the most common example.
When transferring money from one bank account to another, two steps happen:
- we withdraw a certain amount of money from the first account,
- we add the same amount to the second account.
If the whole operation fails completely, that’s something relatively harmless. The worst scenario would be to perform just a part of the above steps. For example, if we withdraw the money from the first account but fail to add it to the second one, we break the integrity of our data. To prevent that, we can bundle multiple steps into a single unit of work, referred to as a transaction.
ACID properties
A valid transaction can be described using a few properties:
Atomicity
All of the operations in a transaction are a single unit. Therefore, it either succeeds entirely or fully fails.
Consistency
The transaction transitions the database from one valid state to another.
Isolation
Multiple transactions could occur concurrently without the risk of having an invalid state of the database. In our case, another transaction should see the funds in one bank account or the other, but not in both.
Durability
As soon as we commit the changes from the transaction, they should survive permanently.
Transactions in PostgreSQL
Fortunately, PostgreSQL gives us the tools to ensure all ACID properties. To create a transaction, we need to group a set of statements with BEGIN and COMMIT.
In the previous part of this series, we’ve defined a many-to-many relationship between categories and posts. First, let’s create a transaction that deletes a category and all of the posts within it.
1BEGIN;
2
3--Deleting posts that belong to a given category
4DELETE FROM post_entity
5 WHERE id IN (
6 SELECT post_entity_id FROM post_entity_categories WHERE category_id = 1
7 );
8
9--Disconnecting posts from categories
10DELETE FROM post_entity_categories
11 WHERE category_id=1;
12
13--Deleting the category
14DELETE FROM category
15 WHERE id=1;
16
17COMMIT;Thanks to using a transaction, if something goes wrong when deleting a category, PostgreSQL performs a rollback, and thanks to that, the posts are still intact.
We can also perform a rollback manually and abort the current transaction.
1BEGIN;
2
3DROP TABLE "post_entity_categories";
4
5ROLLBACK;Thanks to using ROLLBACK, the post_entity_categories will never be dropped in the above transaction.
Transactions with MikroORM
MikroORM implements the unit of work pattern. Thanks to that, it batches queries out of the box.
In API with NestJS #62. Introduction to MikroORM with PostgreSQL, we’ve learned that we need to flush all of the changes we’ve made to our entities if we want the changes to be reflected in the database.
Flush Modes
A crucial thing to notice is that MikroORM supports a few flushing strategies.
1import { Module } from '@nestjs/common';
2import { ConfigModule, ConfigService } from '@nestjs/config';
3import { MikroOrmModule } from '@mikro-orm/nestjs';
4import { FlushMode } from '@mikro-orm/core/enums';
5
6@Module({
7 imports: [
8 MikroOrmModule.forRootAsync({
9 imports: [ConfigModule],
10 inject: [ConfigService],
11 useFactory: (configService: ConfigService) => ({
12 flushMode: FlushMode.ALWAYS,
13 // ...
14 }),
15 }),
16 ],
17})
18export class DatabaseModule {}With FlushMode.ALWAYS, MikroORM flushes before every query. Therefore, using it would prevent us from implementing transactions by delaying the flush.
With FlushMode.AUTO, MikroORM sometimes flushes implicitly, which might be a little surprising.
1async createPost(post: CreatePostDto, user: User) {
2 const postData = {
3 ...post,
4 author: user,
5 };
6 const newPost = await this.postRepository.create(postData);
7 // creating a new post, but not flushing it yet
8 this.postRepository.persist(newPost);
9
10 // querying all of the current posts
11 const allCurrentPosts = await this.postRepository.findAll();
12
13 const isNewPostPersisted = allCurrentPosts.some(post => {
14 return post.id === newPost.id;
15 })
16 console.log(isNewPostPersisted); // true
17
18 return newPost;
19}Since we’ve queried all of the posts before flushing the newly created entity, MikroORM automatically flushed our changes for us.
The above behavior can sometimes get in the way of implementing transactions. Because of that, in this article, we use the FlushMode.COMMIT option that aims to delay the flush until the current transaction is committed.
1import { Module } from '@nestjs/common';
2import { ConfigModule, ConfigService } from '@nestjs/config';
3import { MikroOrmModule } from '@mikro-orm/nestjs';
4import { FlushMode } from '@mikro-orm/core/enums';
5
6@Module({
7 imports: [
8 MikroOrmModule.forRootAsync({
9 imports: [ConfigModule],
10 inject: [ConfigService],
11 useFactory: (configService: ConfigService) => ({
12 flushMode: FlushMode.COMMIT,
13 debug: configService.get('SHOULD_DEBUG_SQL'),
14 // ...
15 }),
16 }),
17 ],
18})
19export class DatabaseModule {}We also use debug to investigate what queries MikroORM is performing.
Delaying flushing to implement transactions
Let’s start by making some adjustments to our PostsService:
1import { Injectable } from '@nestjs/common';
2import { InjectRepository } from '@mikro-orm/nestjs';
3import { EntityRepository } from '@mikro-orm/core';
4import PostEntity from './post.entity';
5import PostNotFoundException from './exceptions/postNotFound.exception';
6
7@Injectable()
8export class PostsService {
9 constructor(
10 @InjectRepository(PostEntity)
11 private readonly postRepository: EntityRepository<PostEntity>,
12 ) {}
13
14 async getPostById(id: number) {
15 const post = await this.postRepository.findOne({
16 id,
17 });
18 if (!post) {
19 throw new PostNotFoundException(id);
20 }
21 return post;
22 }
23
24 async getPostsFromCategory(categoryId: number) {
25 return this.postRepository.find({
26 categories: {
27 id: categoryId,
28 },
29 });
30 }
31
32 async deletePost(id: number, withFlush = true) {
33 const post = await this.getPostById(id);
34 this.postRepository.remove(post);
35 if (withFlush) {
36 return this.postRepository.flush();
37 }
38 }
39
40 // ...
41}Now, our deletePost accepts an additional withFlush argument. Thanks to doing that, we can avoid flushing if we need to.
Let’s also make changes to our CategoriesService to use the above functionality.
1import { Injectable } from '@nestjs/common';
2import CategoryNotFoundException from './exceptions/categoryNotFound.exception';
3import { InjectRepository } from '@mikro-orm/nestjs';
4import { EntityRepository } from '@mikro-orm/core';
5import Category from './category.entity';
6import { PostsService } from '../posts/posts.service';
7import { EntityManager } from '@mikro-orm/postgresql';
8
9@Injectable()
10export default class CategoriesService {
11 constructor(
12 @InjectRepository(Category)
13 private readonly categoryRepository: EntityRepository<Category>,
14 private readonly postsService: PostsService,
15 private readonly entityManager: EntityManager,
16 ) {}
17
18 async getCategoryById(id: number) {
19 const category = await this.categoryRepository.findOne({
20 id,
21 });
22 if (!category) {
23 throw new CategoryNotFoundException(id);
24 }
25 return category;
26 }
27
28 async deleteCategory(id: number, withFlush = true) {
29 const category = await this.getCategoryById(id);
30 this.categoryRepository.remove(category);
31 if (withFlush) {
32 return this.categoryRepository.flush();
33 }
34 }
35
36 async deleteCategoryWithPosts(categoryId: number) {
37 const allPosts = await this.postsService.getPostsFromCategory(categoryId);
38 for (const post of allPosts) {
39 await this.postsService.deletePost(post.id, false);
40 }
41 await this.deleteCategory(categoryId);
42 return this.entityManager.flush();
43 }
44
45 // ...
46}The crucial part above is the deleteCategoryWithPosts function. It calls the postsService.deletePost(post.id, false) method on every post from the category, marking it for deleting without flushing.
Then, we also mark the category for deleting with the this.deleteCategory method.
A the end of the deleteCategoryWithPosts function, we used entityManager.flush. Thanks to doing that, we removed all the posts and categories that we marked for deleting. If an error occurs at any point of the transaction, MikroORM automatically rolls back all of the changes.
Using categoryRepository.flush() would have the same effect as entityManager.flush and would delete both posts and categories. We can use entityManager.flush to put an emphasis on the fact that we make changes not only to the categories.
Thanks to the fact that we’ve used the debug mode in MikroORM, we can take a look at the logs:
[query] select “p0”.* from “post_entity” as “p0” left join “post_entity_categories” as “p1” on “p0″.”id” = “p1″.”post_entity_id” where “p1″.”category_id” = 6 [took 2 ms] [query] select “c0”.* from “category” as “c0” where “c0″.”id” = 6 limit 1 [took 1 ms] [query] begin [query] delete from “post_entity” where “id” in (36, 37, 38, 39, 40) [took 1 ms] [query] delete from “category” where “id” in (6) [took 0 ms] [query] commit
Above, we can see that removing posts and the category was performed in a single transaction.
Creating transactions explicitly
So far, we’ve been defining transactions implicitly. If we want to be more verbose, we can do that explicitly.
1async deleteCategoryWithPosts(categoryId: number) {
2 const allPosts = await this.postsService.getPostsFromCategory(categoryId);
3 return this.entityManager.transactional(async () => {
4 for (const post of allPosts) {
5 await this.postsService.deletePost(post.id, false);
6 }
7 await this.deleteCategory(categoryId);
8 });
9}When we use entityManager.transactional, MikroORM runs our callback inside a database transaction and flushes the changes at the end.
If we want to be even more explicit, we can manually begin, commit, and roll back a transaction.
1async deleteCategoryWithPosts(categoryId: number) {
2 const allPosts = await this.postsService.getPostsFromCategory(categoryId);
3
4 const forkedEntityManager = this.entityManager.fork();
5 await forkedEntityManager.begin();
6
7 try {
8 for (const post of allPosts) {
9 await this.postsService.deletePost(post.id, false);
10 }
11 await this.deleteCategory(categoryId);
12 await forkedEntityManager.commit();
13 } catch (error) {
14 forkedEntityManager.rollback();
15 throw error;
16 }
17 return this.entityManager.flush();
18}The above is equivalent to the entityManager.transactional function. It forks the entity manager above to get a fresh entity manager with a new identity map.
Summary
In this article, we’ve gone through the idea of transactions and implemented them both through SQL and MikroORM. We’ve deleted a category and its post within a transaction. The above allowed us to prevent the posts from being deleted without removing the category. Thanks to doing that, we’ve dealt with the danger of losing the integrity of our database.