Send email using notify and listen in PostgreSQL

How to send mail from PostgreSQL database

First of all, Hubert Lubaczewski has a wondeful blog post about the how to send mail from database and interact with the “world outside of the database.” Thanks to his explanation, I was able to implement it easily.

There are three main options to send emails directly from a PostgreSQL database:

Using an Extension: Some PostgreSQL extensions, like pgMail (uses PL/TclU – Procedural Untrusted Tool Command Language) or pgSmtp (uses plpython3u – Procedural Untrusted Python Language) are designed specifically for sending emails. These extensions simplify the process by providing built-in functionality for sending messages directly from within the database.

Custom Stored Procedure with Untrusted Languages: By using untrusted languages like PL/PerlU, PL/TclU you can create a custom stored procedure that interacts with external systems. These untrusted languages have access to external resources such as files, sockets and network systems, making it possible to send emails or perform other tasks that require communication outside the database.

Notify and Listen Methodology: This approach involves using PostgreSQL’s NOTIFY and LISTEN commands. The database sends notifications, which external services or daemons (written in languages like Python, Node.js, etc.) listen for. When a relevant event occurs, the external service sends the email based on the notification payload. This method offloads the email-sending task to a service outside the database while keeping the logic and triggers inside PostgreSQL.

Each method has its advantages depending on your needs for control, security and ease of implementation. Using an extension and custom stored procedure with untrusted language is easy to implement but not a secure way.

Instead of using untrusted languages, we opted for the LISTEN/NOTIFY approach. This method monitors a queue stored in a database table and triggers the sending of emails via the sendmail command at the OS level. This allowed us to handle the email-sending logic outside the database while adding flexibility within PostgreSQL.

Since we’re using Oracle Linux 8, we will use Postfix to send emails at the OS level.

To ensure your server can send anonymous emails using Postfix, you’ll need to coordinate with your organization’s mail server administrators and Make sure your server IP is whitelisted on the mail server for outgoing anonymous mail.

Create a dedicated user and a schema for mailing purposes and create a table on the postgres database which will store the mail records. We will use it as a queue.

Create a trigger and a related function that notifies a background process (which is responsible for sending emails at the OS layer) when a new record is added to the t_email_notifications table.

With this setup, every time a new row is added to the t_email_notifications table, PostgreSQL will send a notification to the Python script responsible for sending emails. The Python script will be listening for these notifications and will handle the email-sending process as soon as it receives the notification, ensuring immediate email dispatch.

Now we will create a Python script (email_listener.py) that listens for notifications on the new_email channel and sends emails using Postfix sendmail. We could have also used Node.js, C, Perl or another language that handles asynchronous events and callbacks efficiently, making it suitable for this type of task. I chose Python because it doesn’t require additional package installations. All the necessary modules (select, datetime, subprocess, logging) are part of the Python standard library. The only extra module needed is psycopg2, which is already installed as a prerequisite for Patroni. If you deploy this script to a server other than the database server, you will only need to install the psycopg2 pip package.

import psycopg2
import select
from datetime import datetime
import subprocess
import logging
import time
#Configure logging
logging.basicConfig(
filename='/var/log/postgresql_email_listener/postgresql_email_listener.log',
level=logging.INFO,
format='%(asctime)s – %(levelname)s – %(message)s'
)
def send_mail(email_data):
# This section is Docstring. Not a madatory section but a best practice for code quality, maintainability and usability.
"""
Sends an email using the `sendmail`command.
Args:
email_data(str): Concatenated email data using '€€' as delimiter.
Returns:
bool: True if the email is sent succesfully, False otherwise.
"""
# Split the email data using '€€' delimiter
email_from, email_to, subject, body = email_data.split('€€')
# Construct the email message in the `sendmail` format
message = f"""From: {email_from}
To: {email_to}
Subject: {subject}
Content-Type: text/plain; charset="UTF-8"
{body}
"""
try:
# Use the sendmail command with -t option
process = subprocess.Popen(["/usr/sbin/sendmail", "-t"], stdin=subprocess.PIPE, stderr=subprocess.PIPE)
stdout, stderr = process.communicate(message.encode("utf-8"))
if process.returncode == 0:
logging.info(f"Email sent succesfully to {email_to}")
return True
else:
logging.error(f"Failed to send email to {email_to}. Error: {stderr.decode('utf-8').strip()}")
return False
except Exception as e:
logging.error(f"An error occurred while sending email: {e}")
return False
def process_email(cur, email_id):
""" Fetch and process a single email by ID."""
cur.execute("""
SELECT email_from || '€€' || email_to || '€€' || email_subject || '€€' || email_body
FROM mailer.t_email_notifications where id = %s;
""", (email_id,))
email = cur.fetchone()
if email:
# Email data will be in the format of '€€' concatenated string
email_data = email[0]
# Send the email using sendmail
success = send_mail(email_data)
if success:
# Update the record to mark it as sent and set the sent_time
cur.execute("UPDATE mailer.t_email_notifications SET sent= TRUE, sent_time = %s WHERE id = %s;",
(datetime.now(), email_id))
logging.info(f"Email ID {email_id} marked as sent.")
def process_unsent_emails(cur):
""" Fetch and process all unsent emails."""
cur.execute("SELECT id FROM mailer.t_email_notifications WHERE sent = FALSE;")
unsent_emails = cur.fetchall()
for email in unsent_emails:
process_email(cur,email[0])
def listen_notifications(conn_params):
"""Main loop to listen for PostgreSQL notifications."""
while True:
try:
# Establish a connection to the database
logging.info("Connecting to the database…")
# Connect to the PostgreSQL database via Unix socket
conn = psycopg2.connect(**conn_params)
conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)
cur = conn.cursor()
# Listen to the new_email channel
cur.execute("LISTEN new_email;")
logging.info("Waiting for notications on channel 'new_email'…")
while True:
# Use select to wait for 60 seconds(like sleep) any input on the connection
if select.select([conn],[],[],60) == ([],[],[]):
continue # No notification received, loop back and wait again
conn.poll()
while conn.notifies:
notify = conn.notifies.pop(0)
logging.info(f"Received notification: {notify.payload}")
# Process the email corresponding to the received ID
process_email(cur, notify.payload)
# After processing the first notified email, process all unsent emails
process_unsent_emails(cur)
except (psycopg2.OperationalError,psycopg2.DatabaseError) as e:
logging.error(f"Database connection error: {e}. Reconnecting in 60 seconds…")
time.sleep(60) # Wait before trying to reconnect
except KeyboardInterrupt:
logging.info("Listener stopped.")
break
finally:
# Ensure the cursor and connection are closed if open
try:
cur.close()
conn.close()
logging.info("Connection closed.")
except NameError:
pass # If cur/conn not defined, skip closing
if __name__ == "__main__":
conn_params = {
'dbname': 'postgres',
'host': 'pgcluster.localdomain',
'port': 5000,
'user': 'mailer',
'password': 'XXXXX'
}
listen_notifications(conn_params)

Now we will create a service file under /etc/systemd/system/ called email_listener.service. Creating a service allows it to run seamlessly in the background, restart if it crashes, and automatically start with the reboots.

/var/lib/pgsql/scripts/email_listener.py is the script we have prepared in STEP 4. Service will be running with the postgres user, we will ensure that service user has write permissions to the log file.

Reload the systemd daemon to recognize the new service. Start service and enable it to restart on reboots.

It will record all the log records to /var/log/postgresql_email_listener.log file.

Since all logs are recorded in the /var/log/postgresql_email_listener.log file, we will configure log rotation for it. The log file will be rotated when it reaches a size of 100 MB, but you can adjust this based on your needs.

During the testing phase, messages with Turkish characters (such as ‘ç, ş, ı, ğ, ö, ü’) in the subject line failed. To diagnose the issue, we examined the log records in /var/spool/mail/root.

...
Action: failed
Status: 5.6.7
Diagnostic-Code: X-Postfix; SMTPUTF8 is required, but was not offered by host
amx03.localdomain[192.168.40.32]
...

Non-ASCII characters (those not part of the standard ASCII character set) cause an issue because Internet specifications for mail servers only allow ASCII text in message headers. Only mail servers that support a special feature called SMTPUTF8 can accept headers with non-ASCII characters. Although SMTPUTF8 support is enabled by default, we will disable it. We will configure Postfix to disable this feature, as the vast majority of email software is capable of handling such emails without it.

We will create foreign tables (mailer.ft_mailer_notifications) in all other databases that reference the central table (mailer.t_emailer_nofications) in the PostgreSQL database using the Postgres Foreign Data Wrapper (FDW). A mail_sender function will be created with SECURITY DEFINER rights, allowing the function to execute with the privileges of its owner rather than the user invoking it. This contrasts with the default behavior (SECURITY INVOKER), where the function runs with the invoking user’s privileges. After completing the setup, a standard database user, granted only schema usage and execute privileges on the mail_sender function, will be able to send mail.

Enable postgresql foreign data wrapper in the database which needs sending mail.

Create a staging schema and table in the source database for fdw.

Create user mapping for the mail_sender proxy user and Create a foreign table with only the necessary columns, omitting others such as id, sent, added_time, and processed_time.

Create a function for sending emails.

Grant privileges to the database user who needs to send emails.

Send mail.

We have completed all the setup for sending emails from PostgreSQL, which listens for notifications of emails, processes them and updates their status in the database. By utilizing a Python script using the psycopg2 library, we queried the t_email_notifications table and sent the emails via the sendmail command. The implementation ensures that all notifications trigger email sends, and each processed email updates the corresponding record’s status. With the integration of logging and error handling, this solution provides a reliable method for managing emails directly from the database.

While the email notification system effectively manages task-related communications, it is essential to consider data policies in light of GDPR (General Data Protection Regulation) and local Personal Data Protection laws. The stored emails typically should not (we hope) contain private data. However, implementing a routine job using pg_cron to delete records that have been sent and are older than X days can be a prudent measure.

Hope it helps.


Discover More from Osman DİNÇ


Comments

Leave your comment