Parsing DMARC aggregate XML: a tutorial with Node, Python, and Go
Walk through parsing DMARC aggregate (RUA) XML reports in three languages. Schema handling, gzip/zip unpacking, and aggregating records across multiple reports.
DMARC aggregate (RUA) reports arrive as XML files, often gzipped. The format is stable, well-specified, and — for some reason — still produces heap-dumps of "how do I parse this" questions on Stack Overflow every year. This tutorial walks through parsing them in Node, Python, and Go, with the same data model at the end.
If you just need a one-off parse, use our browser-only DMARC parser — paste XML, see structured output. The tutorial below is for when you want to build it into a pipeline.
The data model
Regardless of language, the target shape is:
Report {
org_name: string // who sent the report (google.com, yahoo.com, ...)
report_id: string
begin: timestamp
end: timestamp
domain: string // your domain
policy: { p, sp, pct }
records: [{
source_ip: string
count: int
dkim: pass | fail | neutral
spf: pass | fail | neutral
disposition: none | quarantine | reject
}]
}
Every language below produces something structurally identical. The differences are in idioms, not behavior.
Node.js
Using fast-xml-parser (lightweight, tree-shaking friendly) and the built-in zlib:
import { readFileSync } from 'fs';
import { gunzipSync } from 'zlib';
import { XMLParser } from 'fast-xml-parser';
function parseReport(filePath) {
let xml = readFileSync(filePath);
if (filePath.endsWith('.gz')) {
xml = gunzipSync(xml);
}
const parser = new XMLParser({ ignoreAttributes: false });
const doc = parser.parse(xml.toString('utf8'));
const feedback = doc.feedback;
const records = []
.concat(feedback.record || [])
.map((r) => ({
source_ip: r.row.source_ip,
count: Number(r.row.count),
dkim: r.row.policy_evaluated.dkim,
spf: r.row.policy_evaluated.spf,
disposition: r.row.policy_evaluated.disposition,
}));
return {
org_name: feedback.report_metadata.org_name,
report_id: feedback.report_metadata.report_id,
begin: Number(feedback.report_metadata.date_range.begin),
end: Number(feedback.report_metadata.date_range.end),
domain: feedback.policy_published.domain,
policy: {
p: feedback.policy_published.p,
sp: feedback.policy_published.sp,
pct: Number(feedback.policy_published.pct),
},
records,
};
}
Gotchas:
recordcan be a single object or an array depending on whether the report has 1 or N records. The.concat([], feedback.record)normalizes.countandbegin/endcome out as strings; coerce to numbers explicitly.- Some reports have missing fields (
sp,pct). Default them or your downstream code will crash.
Python
Using the stdlib xml.etree.ElementTree and gzip:
import gzip
import xml.etree.ElementTree as ET
from dataclasses import dataclass, field
from typing import List
@dataclass
class DmarcRecord:
source_ip: str
count: int
dkim: str
spf: str
disposition: str
@dataclass
class DmarcReport:
org_name: str
report_id: str
begin: int
end: int
domain: str
policy: dict
records: List[DmarcRecord] = field(default_factory=list)
def parse_report(path: str) -> DmarcReport:
if path.endswith('.gz'):
with gzip.open(path, 'rb') as f:
xml_bytes = f.read()
else:
with open(path, 'rb') as f:
xml_bytes = f.read()
root = ET.fromstring(xml_bytes)
meta = root.find('report_metadata')
policy = root.find('policy_published')
date_range = meta.find('date_range')
records = []
for r in root.findall('record'):
row = r.find('row')
pe = row.find('policy_evaluated')
records.append(DmarcRecord(
source_ip=row.findtext('source_ip'),
count=int(row.findtext('count', default='0')),
dkim=pe.findtext('dkim', default=''),
spf=pe.findtext('spf', default=''),
disposition=pe.findtext('disposition', default=''),
))
return DmarcReport(
org_name=meta.findtext('org_name', default=''),
report_id=meta.findtext('report_id', default=''),
begin=int(date_range.findtext('begin', default='0')),
end=int(date_range.findtext('end', default='0')),
domain=policy.findtext('domain', default=''),
policy={
'p': policy.findtext('p'),
'sp': policy.findtext('sp'),
'pct': policy.findtext('pct'),
},
records=records,
)
Gotchas:
ET.fromstringraises on malformed XML; wrap in try/except in production.- Real-world reports occasionally contain namespaces — if so, either strip them or use the
{namespace}tagpattern infind().
Go
Using the stdlib encoding/xml and compress/gzip:
package dmarc
import (
"compress/gzip"
"encoding/xml"
"io"
"os"
"strings"
"time"
)
type Report struct {
OrgName string `json:"org_name"`
ReportID string `json:"report_id"`
Begin time.Time `json:"begin"`
End time.Time `json:"end"`
Domain string `json:"domain"`
Policy Policy `json:"policy"`
Records []Record `json:"records"`
}
type Policy struct {
P string `json:"p"`
SP string `json:"sp,omitempty"`
Pct int `json:"pct"`
}
type Record struct {
SourceIP string `json:"source_ip"`
Count int `json:"count"`
DKIM string `json:"dkim"`
SPF string `json:"spf"`
Disposition string `json:"disposition"`
}
type feedback struct {
XMLName xml.Name `xml:"feedback"`
Metadata struct {
OrgName string `xml:"org_name"`
ReportID string `xml:"report_id"`
Date struct {
Begin int64 `xml:"begin"`
End int64 `xml:"end"`
} `xml:"date_range"`
} `xml:"report_metadata"`
Policy struct {
Domain string `xml:"domain"`
P string `xml:"p"`
SP string `xml:"sp"`
Pct int `xml:"pct"`
} `xml:"policy_published"`
Records []struct {
Row struct {
SourceIP string `xml:"source_ip"`
Count int `xml:"count"`
Eval struct {
DKIM string `xml:"dkim"`
SPF string `xml:"spf"`
Disposition string `xml:"disposition"`
} `xml:"policy_evaluated"`
} `xml:"row"`
} `xml:"record"`
}
func ParseFile(path string) (*Report, error) {
f, err := os.Open(path)
if err != nil {
return nil, err
}
defer f.Close()
var r io.Reader = f
if strings.HasSuffix(path, ".gz") {
gr, err := gzip.NewReader(f)
if err != nil {
return nil, err
}
defer gr.Close()
r = gr
}
var fb feedback
if err := xml.NewDecoder(r).Decode(&fb); err != nil {
return nil, err
}
records := make([]Record, len(fb.Records))
for i, rec := range fb.Records {
records[i] = Record{
SourceIP: rec.Row.SourceIP,
Count: rec.Row.Count,
DKIM: rec.Row.Eval.DKIM,
SPF: rec.Row.Eval.SPF,
Disposition: rec.Row.Eval.Disposition,
}
}
return &Report{
OrgName: fb.Metadata.OrgName,
ReportID: fb.Metadata.ReportID,
Begin: time.Unix(fb.Metadata.Date.Begin, 0),
End: time.Unix(fb.Metadata.Date.End, 0),
Domain: fb.Policy.Domain,
Policy: Policy{
P: fb.Policy.P,
SP: fb.Policy.SP,
Pct: fb.Policy.Pct,
},
Records: records,
}, nil
}
Gotchas:
- Go's
encoding/xmlis strict about struct tags matching element names; mis-cased tags silently produce empty fields. time.Unixconverts Unix seconds totime.Time— report dates come as epoch seconds.
Aggregating across reports
Real pipelines don't process one report at a time. A typical pattern:
- Fetch all reports for a domain for the last N days.
- Union all records; group by
source_ip. - Compute aggregated metrics: total messages, DKIM pass rate, SPF pass rate.
- Reverse-DNS each IP to see if it's known (your own ESP, a legitimate CRM, a suspicious source).
from collections import defaultdict
def aggregate_by_source(reports):
by_ip = defaultdict(lambda: {'count': 0, 'dkim_pass': 0, 'spf_pass': 0})
for r in reports:
for rec in r.records:
b = by_ip[rec.source_ip]
b['count'] += rec.count
if rec.dkim == 'pass':
b['dkim_pass'] += rec.count
if rec.spf == 'pass':
b['spf_pass'] += rec.count
return by_ip
The result is the actionable view: per-IP messages/day with pass rates. Any IP in the low-pass, high-count quadrant is your emergency.
What to do with the output
Once you have structured records, two common use cases:
- Dashboard — show pass rate per source over time; alert when a new IP appears sending > threshold messages.
- Policy ratcheting — once all authorized sources pass, tighten DMARC policy from
p=nonetoquarantinetoreject. The parsed data is your safety net.
For more on what the pass rate and alignment mean, see parsing DMARC aggregate reports.
Don't want to build this yourself?
Fair. Our DMARC Reporting API ingests RUA reports (via mailbox or API POST), parses them, aggregates across receivers, and surfaces structured JSON. Free tier covers single-domain monitoring up to 50k messages/day.