TIL: Use Postgres listen/notify for asynchronous communication

2022-04-12 00:00:00 +0000 UTC

Postgres supports a simple pub/sub system through the use of the NOTIFY and LISTEN commands. When a client has issued a command like LISTEN channel1, they will receive a notification when a client issues a command like NOTIFY channel1, '{"payload":"this"}'.

I am using this in a small test project to explore development of a microservice backend. Whenever my auth service (written in Go) processes a new user signup, it calls this function:

type NotificationPayload struct {
	DisplayName string `json:"display_name"`
	UserId      int    `json:"user_id"`
}

func (ur UserRepo) NotifyPlayerService(ctx context.Context, display string, id int) error {
	payload := NotificationPayload{DisplayName: display, UserId: id}
	payloadStr, err := json.Marshal(payload)
	if err != nil {
		return fmt.Errorf("json.Marshal: %v", err)
	}

	_, err = ur.pool.Exec(ctx,
		"SELECT pg_notify('user_player', $1)",
		payloadStr,
	)
	if err != nil {
		return fmt.Errorf("pool.Exec: %v", err)
	}

	return nil
}

Then my nodejs player management service can create a player record using this always-running listener.

export const listenForUserCreation = async () => {
	const client = await db.getClientListener();
	client.query('LISTEN user_player');
	client.on('notification', async (data) => {
		const { payload } = data;
		const JSONpayload = JSON.parse(payload);
		const { display_name='', user_id } = JSONpayload;
		
		console.log(`user_id: ${user_id}; typeof user_id ${typeof user_id}`);

		if (typeof user_id !== 'number') {
			console.error('message to user_player channel with bad user_id');
			console.error(user_id)
			return;
		}
		try {
			const res = await client.query(
				'INSERT INTO players (display_name, user_id) VALUES($1, $2) RETURNING *',
				[display_name, user_id]
			);
			if (Array.isArray(res.rows) && res.rows.length > 0) {
				console.log(`new player created: ${res.rows[0]}`);
			} else {
				console.log(res.rows);
				console.error('new player creation failed');
			}
		} catch(err) {
			console.error(`database error: ${err}`);
		}
	});
};

With the full payload being sent through the listen/notify system, there isn’t a direct way for the auth system to confirm a player has been created. I could instead create a shared table in the postgres backend for player creation jobs. The auth service would then create a new job record with a status field and send only the PK of that job record via listen/notify. Then the auth service could wait for a corresponding notification back from the player service to confirm the player creation loop is closed.

I will likely implement this more complete solution in the future, but for now this is working!

I was inspired to look into using postgres for asynchronous communication intially by this blog post.

Tags: til postgres