Webhooks let you build custom integrations with any system. When standard integrations don't meet your needs, webhooks provide the flexibility to create exactly what you want.
What Are Webhooks?
Webhooks are HTTP callbacks that notify your system when events occur. Instead of polling for updates, your system receives real-time notifications.
Monitor detects downtime
ā
Webhook sent to your endpoint
ā
Your system processes and acts
Webhook Payload Structure
Typical monitoring webhook payload:
{
"event_type": "monitor.down",
"timestamp": "2026-01-31T14:30:00Z",
"monitor": {
"id": "mon_abc123",
"name": "Production API",
"url": "https://api.example.com",
"type": "http"
},
"check": {
"status": "down",
"response_code": 500,
"response_time_ms": 2500,
"location": "us-east"
},
"incident": {
"id": "inc_xyz789",
"started_at": "2026-01-31T14:28:00Z",
"duration_seconds": 120
}
}
Building a Webhook Receiver
Basic Python Receiver
from flask import Flask, request, jsonify
import hmac
import hashlib
app = Flask(__name__)
WEBHOOK_SECRET = "your-secret-key"
def verify_signature(payload, signature):
expected = hmac.new(
WEBHOOK_SECRET.encode(),
payload,
hashlib.sha256
).hexdigest()
return hmac.compare_digest(signature, expected)
@app.route('/webhook', methods=['POST'])
def handle_webhook():
# Verify authenticity
signature = request.headers.get('X-Signature')
if not verify_signature(request.data, signature):
return jsonify({"error": "Invalid signature"}), 401
data = request.json
event_type = data.get('event_type')
if event_type == 'monitor.down':
handle_downtime(data)
elif event_type == 'monitor.up':
handle_recovery(data)
return jsonify({"status": "ok"})
def handle_downtime(data):
# Custom logic here
print(f"Alert: {data['monitor']['name']} is down!")
def handle_recovery(data):
print(f"Recovered: {data['monitor']['name']} is back up")
if __name__ == '__main__':
app.run(port=5000)
Node.js Receiver
const express = require('express');
const crypto = require('crypto');
const app = express();
app.use(express.json());
const WEBHOOK_SECRET = 'your-secret-key';
function verifySignature(payload, signature) {
const expected = crypto
.createHmac('sha256', WEBHOOK_SECRET)
.update(JSON.stringify(payload))
.digest('hex');
return crypto.timingSafeEqual(
Buffer.from(signature),
Buffer.from(expected)
);
}
app.post('/webhook', (req, res) => {
const signature = req.headers['x-signature'];
if (!verifySignature(req.body, signature)) {
return res.status(401).json({ error: 'Invalid signature' });
}
const { event_type, monitor } = req.body;
switch (event_type) {
case 'monitor.down':
handleDowntime(req.body);
break;
case 'monitor.up':
handleRecovery(req.body);
break;
}
res.json({ status: 'ok' });
});
app.listen(3000);
Common Integration Patterns
Auto-Scaling on Alert
import boto3
def handle_downtime(data):
if 'api' in data['monitor']['name'].lower():
# Scale up API instances
client = boto3.client('autoscaling')
client.set_desired_capacity(
AutoScalingGroupName='api-asg',
DesiredCapacity=10
)
Create Support Ticket
import requests
def handle_downtime(data):
requests.post(
"https://api.zendesk.com/tickets",
json={
"ticket": {
"subject": f"Alert: {data['monitor']['name']} is down",
"description": f"Service down since {data['incident']['started_at']}",
"priority": "high"
}
},
headers={"Authorization": "Bearer ..."}
)
Update Status Page
def handle_downtime(data):
requests.post(
"https://api.statuspage.io/incidents",
json={
"incident": {
"name": f"{data['monitor']['name']} Outage",
"status": "investigating",
"impact_override": "major"
}
}
)
Trigger Runbook
def handle_downtime(data):
monitor_name = data['monitor']['name']
# Map monitors to runbooks
runbooks = {
'Production API': 'api-restart.sh',
'Database': 'db-failover.sh'
}
if monitor_name in runbooks:
subprocess.run([f"/runbooks/{runbooks[monitor_name]}"])
Log to Analytics
from google.cloud import bigquery
def handle_webhook(data):
client = bigquery.Client()
table_id = "project.dataset.incidents"
row = {
"timestamp": data['timestamp'],
"monitor": data['monitor']['name'],
"status": data['check']['status'],
"response_time": data['check']['response_time_ms']
}
client.insert_rows_json(table_id, [row])
Securing Webhooks
Signature Verification
Always verify webhook authenticity:
def verify_webhook(payload, signature, secret):
expected = hmac.new(
secret.encode(),
payload,
hashlib.sha256
).hexdigest()
return hmac.compare_digest(f"sha256={expected}", signature)
IP Allowlisting
Restrict to known IPs:
ALLOWED_IPS = ['1.2.3.4', '5.6.7.8']
@app.before_request
def check_ip():
if request.remote_addr not in ALLOWED_IPS:
abort(403)
HTTPS Only
Never accept webhooks over HTTP in production.
Secret Rotation
Rotate webhook secrets periodically:
- Generate new secret
- Update monitoring service
- Update receiver to accept both
- Deprecate old secret
Error Handling
Retry Logic
Your endpoint should handle retries:
from flask import g
import uuid
@app.route('/webhook', methods=['POST'])
def webhook():
# Idempotency key
request_id = request.headers.get('X-Request-ID')
if is_duplicate(request_id):
return jsonify({"status": "already_processed"})
try:
process_webhook(request.json)
mark_processed(request_id)
return jsonify({"status": "ok"})
except Exception as e:
# Don't mark as processed - allow retry
return jsonify({"error": str(e)}), 500
Response Codes
| Code | Meaning | Retry? |
|---|---|---|
| 200-299 | Success | No |
| 400 | Bad request | No |
| 401/403 | Auth failure | No |
| 500-599 | Server error | Yes |
| Timeout | Network issue | Yes |
Testing Webhooks
Local Development
Use ngrok to expose local server:
ngrok http 5000
# Get URL like https://abc123.ngrok.io
Configure monitoring to send to ngrok URL.
Webhook Testing Tools
- Webhook.site - Inspect incoming webhooks
- RequestBin - Capture and analyze requests
- Postman - Replay captured webhooks
Test Payload
Save test payloads for development:
curl -X POST http://localhost:5000/webhook \
-H "Content-Type: application/json" \
-H "X-Signature: test-sig" \
-d @test-payload.json
Deployment Considerations
High Availability
- Deploy across multiple zones
- Use load balancer
- Implement health checks
Queue Processing
For reliability, queue incoming webhooks:
from redis import Redis
from rq import Queue
queue = Queue(connection=Redis())
@app.route('/webhook', methods=['POST'])
def webhook():
queue.enqueue(process_webhook, request.json)
return jsonify({"status": "queued"})
Monitoring the Webhook Handler
Alert if your webhook handler fails:
- Track processing time
- Monitor error rates
- Alert on queue backlog
Webhook Integration Checklist
- Endpoint deployed and accessible
- HTTPS enabled
- Signature verification implemented
- Error handling configured
- Retry logic in place
- Logging enabled
- Monitoring for the handler
- Test webhooks verified
- Documentation updated