-- Publisher: Send notification
NOTIFY new_order, 'Order #12345 created';
-- Subscriber: Listen for notifications
LISTEN new_order;
-- In another connection, will receive notification
-- Client libraries handle notification delivery
-- Unlisten
UNLISTEN new_order;
UNLISTEN *; -- Unlisten all channels
-- Send notification with JSON payload
SELECT pg_notify(
'user_updates',
json_build_object(
'user_id', 123,
'action', 'updated',
'timestamp', NOW()
)::TEXT
);
-- Trigger-based notifications
CREATE OR REPLACE FUNCTION notify_new_order()
RETURNS TRIGGER AS $$
BEGIN
PERFORM pg_notify(
'new_order',
json_build_object(
'order_id', NEW.id,
'user_id', NEW.user_id,
'total', NEW.total,
'created_at', NEW.created_at
)::TEXT
);
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
CREATE TRIGGER order_notification
AFTER INSERT ON orders
FOR EACH ROW
EXECUTE FUNCTION notify_new_order();
-- Now every insert sends notification
INSERT INTO orders (user_id, total) VALUES (123, 99.99);
-- Listeners receive: {"order_id": 456, "user_id": 123, ...}
-- Cache invalidation pattern
CREATE OR REPLACE FUNCTION notify_cache_invalidation()
RETURNS TRIGGER AS $$
BEGIN
PERFORM pg_notify(
'cache_invalidate',
json_build_object(
'table', TG_TABLE_NAME,
'id', NEW.id,
'operation', TG_OP
)::TEXT
);
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
CREATE TRIGGER users_cache_invalidation
AFTER INSERT OR UPDATE OR DELETE ON users
FOR EACH ROW
EXECUTE FUNCTION notify_cache_invalidation();
-- Application listens and clears cache
/*
// Node.js example
const client = new Client();
await client.connect();
client.on('notification', (msg) => {
const payload = JSON.parse(msg.payload);
console.log('Cache invalidation:', payload);
// Clear cache
cache.del(`user:${payload.id}`);
});
await client.query('LISTEN cache_invalidate');
*/
-- Real-time dashboard updates
CREATE OR REPLACE FUNCTION notify_metrics_update()
RETURNS TRIGGER AS $$
BEGIN
PERFORM pg_notify(
'metrics_update',
json_build_object(
'metric_name', NEW.metric_name,
'value', NEW.value,
'timestamp', NEW.timestamp
)::TEXT
);
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
CREATE TRIGGER metrics_notification
AFTER INSERT ON metrics
FOR EACH ROW
EXECUTE FUNCTION notify_metrics_update();
-- Transactional notifications
BEGIN;
INSERT INTO orders (user_id, total) VALUES (123, 99.99);
-- Notification not sent yet
COMMIT;
-- Notification sent after commit
-- If rolled back:
BEGIN;
INSERT INTO orders (user_id, total) VALUES (123, 99.99);
ROLLBACK;
-- Notification NOT sent
-- Multiple channels
LISTEN orders;
LISTEN inventory;
LISTEN user_activity;
-- Conditional notifications
CREATE OR REPLACE FUNCTION notify_high_value_order()
RETURNS TRIGGER AS $$
BEGIN
IF NEW.total > 1000 THEN
PERFORM pg_notify(
'high_value_order',
json_build_object('order_id', NEW.id, 'total', NEW.total)::TEXT
);
END IF;
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
-- WebSocket push notifications via LISTEN/NOTIFY
/*
// Node.js WebSocket server
const WebSocket = require('ws');
const { Client } = require('pg');
const wss = new WebSocket.Server({ port: 8080 });
const dbClient = new Client();
await dbClient.connect();
// Subscribe to database notifications
dbClient.on('notification', (msg) => {
const payload = JSON.parse(msg.payload);
// Broadcast to all connected WebSocket clients
wss.clients.forEach((client) => {
if (client.readyState === WebSocket.OPEN) {
client.send(JSON.stringify({
channel: msg.channel,
data: payload
}));
}
});
});
await dbClient.query('LISTEN new_order');
await dbClient.query('LISTEN user_updates');
*/
-- Job queue pattern
CREATE TABLE job_queue (
id SERIAL PRIMARY KEY,
job_type VARCHAR(50),
payload JSONB,
status VARCHAR(20) DEFAULT 'pending',
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
started_at TIMESTAMP,
completed_at TIMESTAMP
);
CREATE OR REPLACE FUNCTION notify_new_job()
RETURNS TRIGGER AS $$
BEGIN
PERFORM pg_notify(
'job_queue',
json_build_object(
'job_id', NEW.id,
'job_type', NEW.job_type
)::TEXT
);
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
CREATE TRIGGER job_notification
AFTER INSERT ON job_queue
FOR EACH ROW
EXECUTE FUNCTION notify_new_job();
-- Worker process listens for jobs
/*
client.on('notification', async (msg) => {
const { job_id, job_type } = JSON.parse(msg.payload);
// Claim job
const result = await client.query(`
UPDATE job_queue
SET status = 'processing', started_at = NOW()
WHERE id = $1 AND status = 'pending'
RETURNING *
`, [job_id]);
if (result.rows.length === 0) return; // Already claimed
// Process job
await processJob(job_type, result.rows[0].payload);
// Mark complete
await client.query(`
UPDATE job_queue
SET status = 'completed', completed_at = NOW()
WHERE id = $1
`, [job_id]);
});
await client.query('LISTEN job_queue');
*/
-- Rate limiting notifications (prevent spam)
CREATE TABLE notification_throttle (
channel VARCHAR(100) PRIMARY KEY,
last_notification TIMESTAMP
);
CREATE OR REPLACE FUNCTION throttled_notify(
channel_name TEXT,
message TEXT,
throttle_seconds INT DEFAULT 1
)
RETURNS VOID AS $$
DECLARE
last_sent TIMESTAMP;
BEGIN
SELECT last_notification INTO last_sent
FROM notification_throttle
WHERE channel = channel_name;
IF last_sent IS NULL OR
NOW() - last_sent > (throttle_seconds || ' seconds')::INTERVAL THEN
PERFORM pg_notify(channel_name, message);
INSERT INTO notification_throttle (channel, last_notification)
VALUES (channel_name, NOW())
ON CONFLICT (channel)
DO UPDATE SET last_notification = NOW();
END IF;
END;
$$ LANGUAGE plpgsql;
-- Usage: Only send notification every 5 seconds
SELECT throttled_notify('metrics_update', 'New metrics available', 5);
-- Bi-directional communication
-- Application -> Database (via INSERT/UPDATE)
-- Database -> Application (via NOTIFY)
-- Command pattern
CREATE TABLE commands (
id SERIAL PRIMARY KEY,
command_type VARCHAR(50),
parameters JSONB,
result JSONB,
status VARCHAR(20) DEFAULT 'pending',
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
-- Application sends command
INSERT INTO commands (command_type, parameters)
VALUES ('send_email', '{"to": "user@example.com", "subject": "Hello"}');
-- Trigger notifies workers
CREATE TRIGGER command_notification
AFTER INSERT ON commands
FOR EACH ROW
EXECUTE FUNCTION notify_new_command();
-- Worker processes command and updates result
UPDATE commands
SET status = 'completed',
result = '{"sent": true, "message_id": "abc123"}'
WHERE id = 123;
-- Monitoring active listeners
SELECT
pid,
usename,
application_name,
client_addr,
backend_start,
state
FROM pg_stat_activity
WHERE query LIKE '%LISTEN%';