Parsing DMARC aggregate XML: a tutorial with Node, Python, and Go

Published on May 05, 2026

Walk through parsing DMARC aggregate (RUA) XML reports in three languages. Schema handling, gzip/zip unpacking, and aggregating records across multiple reports.

DMARC aggregate (RUA) reports arrive as XML files, often gzipped. The format is stable, well-specified, and — for some reason — still produces heap-dumps of "how do I parse this" questions on Stack Overflow every year. This tutorial walks through parsing them in Node, Python, and Go, with the same data model at the end.

If you just need a one-off parse, use our browser-only DMARC parser — paste XML, see structured output. The tutorial below is for when you want to build it into a pipeline.

The data model

Regardless of language, the target shape is:

Report {
  org_name: string        // who sent the report (google.com, yahoo.com, ...)
  report_id: string
  begin: timestamp
  end: timestamp
  domain: string          // your domain
  policy: { p, sp, pct }
  records: [{
    source_ip: string
    count: int
    dkim: pass | fail | neutral
    spf: pass | fail | neutral
    disposition: none | quarantine | reject
  }]
}

Every language below produces something structurally identical. The differences are in idioms, not behavior.

Node.js

Using fast-xml-parser (lightweight, tree-shaking friendly) and the built-in zlib:

import { readFileSync } from 'fs';
import { gunzipSync } from 'zlib';
import { XMLParser } from 'fast-xml-parser';

function parseReport(filePath) {
  let xml = readFileSync(filePath);
  if (filePath.endsWith('.gz')) {
    xml = gunzipSync(xml);
  }

  const parser = new XMLParser({ ignoreAttributes: false });
  const doc = parser.parse(xml.toString('utf8'));
  const feedback = doc.feedback;

  const records = []
    .concat(feedback.record || [])
    .map((r) => ({
      source_ip: r.row.source_ip,
      count: Number(r.row.count),
      dkim: r.row.policy_evaluated.dkim,
      spf: r.row.policy_evaluated.spf,
      disposition: r.row.policy_evaluated.disposition,
    }));

  return {
    org_name: feedback.report_metadata.org_name,
    report_id: feedback.report_metadata.report_id,
    begin: Number(feedback.report_metadata.date_range.begin),
    end: Number(feedback.report_metadata.date_range.end),
    domain: feedback.policy_published.domain,
    policy: {
      p: feedback.policy_published.p,
      sp: feedback.policy_published.sp,
      pct: Number(feedback.policy_published.pct),
    },
    records,
  };
}

Gotchas:

  • record can be a single object or an array depending on whether the report has 1 or N records. The .concat([], feedback.record) normalizes.
  • count and begin/end come out as strings; coerce to numbers explicitly.
  • Some reports have missing fields (sp, pct). Default them or your downstream code will crash.

Python

Using the stdlib xml.etree.ElementTree and gzip:

import gzip
import xml.etree.ElementTree as ET
from dataclasses import dataclass, field
from typing import List

@dataclass
class DmarcRecord:
    source_ip: str
    count: int
    dkim: str
    spf: str
    disposition: str

@dataclass
class DmarcReport:
    org_name: str
    report_id: str
    begin: int
    end: int
    domain: str
    policy: dict
    records: List[DmarcRecord] = field(default_factory=list)

def parse_report(path: str) -> DmarcReport:
    if path.endswith('.gz'):
        with gzip.open(path, 'rb') as f:
            xml_bytes = f.read()
    else:
        with open(path, 'rb') as f:
            xml_bytes = f.read()

    root = ET.fromstring(xml_bytes)

    meta = root.find('report_metadata')
    policy = root.find('policy_published')
    date_range = meta.find('date_range')

    records = []
    for r in root.findall('record'):
        row = r.find('row')
        pe = row.find('policy_evaluated')
        records.append(DmarcRecord(
            source_ip=row.findtext('source_ip'),
            count=int(row.findtext('count', default='0')),
            dkim=pe.findtext('dkim', default=''),
            spf=pe.findtext('spf', default=''),
            disposition=pe.findtext('disposition', default=''),
        ))

    return DmarcReport(
        org_name=meta.findtext('org_name', default=''),
        report_id=meta.findtext('report_id', default=''),
        begin=int(date_range.findtext('begin', default='0')),
        end=int(date_range.findtext('end', default='0')),
        domain=policy.findtext('domain', default=''),
        policy={
            'p': policy.findtext('p'),
            'sp': policy.findtext('sp'),
            'pct': policy.findtext('pct'),
        },
        records=records,
    )

Gotchas:

  • ET.fromstring raises on malformed XML; wrap in try/except in production.
  • Real-world reports occasionally contain namespaces — if so, either strip them or use the {namespace}tag pattern in find().

Go

Using the stdlib encoding/xml and compress/gzip:

package dmarc

import (
    "compress/gzip"
    "encoding/xml"
    "io"
    "os"
    "strings"
    "time"
)

type Report struct {
    OrgName   string    `json:"org_name"`
    ReportID  string    `json:"report_id"`
    Begin     time.Time `json:"begin"`
    End       time.Time `json:"end"`
    Domain    string    `json:"domain"`
    Policy    Policy    `json:"policy"`
    Records   []Record  `json:"records"`
}

type Policy struct {
    P   string `json:"p"`
    SP  string `json:"sp,omitempty"`
    Pct int    `json:"pct"`
}

type Record struct {
    SourceIP    string `json:"source_ip"`
    Count       int    `json:"count"`
    DKIM        string `json:"dkim"`
    SPF         string `json:"spf"`
    Disposition string `json:"disposition"`
}

type feedback struct {
    XMLName  xml.Name `xml:"feedback"`
    Metadata struct {
        OrgName  string `xml:"org_name"`
        ReportID string `xml:"report_id"`
        Date     struct {
            Begin int64 `xml:"begin"`
            End   int64 `xml:"end"`
        } `xml:"date_range"`
    } `xml:"report_metadata"`
    Policy struct {
        Domain string `xml:"domain"`
        P      string `xml:"p"`
        SP     string `xml:"sp"`
        Pct    int    `xml:"pct"`
    } `xml:"policy_published"`
    Records []struct {
        Row struct {
            SourceIP string `xml:"source_ip"`
            Count    int    `xml:"count"`
            Eval     struct {
                DKIM        string `xml:"dkim"`
                SPF         string `xml:"spf"`
                Disposition string `xml:"disposition"`
            } `xml:"policy_evaluated"`
        } `xml:"row"`
    } `xml:"record"`
}

func ParseFile(path string) (*Report, error) {
    f, err := os.Open(path)
    if err != nil {
        return nil, err
    }
    defer f.Close()

    var r io.Reader = f
    if strings.HasSuffix(path, ".gz") {
        gr, err := gzip.NewReader(f)
        if err != nil {
            return nil, err
        }
        defer gr.Close()
        r = gr
    }

    var fb feedback
    if err := xml.NewDecoder(r).Decode(&fb); err != nil {
        return nil, err
    }

    records := make([]Record, len(fb.Records))
    for i, rec := range fb.Records {
        records[i] = Record{
            SourceIP:    rec.Row.SourceIP,
            Count:       rec.Row.Count,
            DKIM:        rec.Row.Eval.DKIM,
            SPF:         rec.Row.Eval.SPF,
            Disposition: rec.Row.Eval.Disposition,
        }
    }

    return &Report{
        OrgName:  fb.Metadata.OrgName,
        ReportID: fb.Metadata.ReportID,
        Begin:    time.Unix(fb.Metadata.Date.Begin, 0),
        End:      time.Unix(fb.Metadata.Date.End, 0),
        Domain:   fb.Policy.Domain,
        Policy: Policy{
            P:   fb.Policy.P,
            SP:  fb.Policy.SP,
            Pct: fb.Policy.Pct,
        },
        Records: records,
    }, nil
}

Gotchas:

  • Go's encoding/xml is strict about struct tags matching element names; mis-cased tags silently produce empty fields.
  • time.Unix converts Unix seconds to time.Time — report dates come as epoch seconds.

Aggregating across reports

Real pipelines don't process one report at a time. A typical pattern:

  1. Fetch all reports for a domain for the last N days.
  2. Union all records; group by source_ip.
  3. Compute aggregated metrics: total messages, DKIM pass rate, SPF pass rate.
  4. Reverse-DNS each IP to see if it's known (your own ESP, a legitimate CRM, a suspicious source).
from collections import defaultdict

def aggregate_by_source(reports):
    by_ip = defaultdict(lambda: {'count': 0, 'dkim_pass': 0, 'spf_pass': 0})
    for r in reports:
        for rec in r.records:
            b = by_ip[rec.source_ip]
            b['count'] += rec.count
            if rec.dkim == 'pass':
                b['dkim_pass'] += rec.count
            if rec.spf == 'pass':
                b['spf_pass'] += rec.count
    return by_ip

The result is the actionable view: per-IP messages/day with pass rates. Any IP in the low-pass, high-count quadrant is your emergency.

What to do with the output

Once you have structured records, two common use cases:

  • Dashboard — show pass rate per source over time; alert when a new IP appears sending > threshold messages.
  • Policy ratcheting — once all authorized sources pass, tighten DMARC policy from p=none to quarantine to reject. The parsed data is your safety net.

For more on what the pass rate and alignment mean, see parsing DMARC aggregate reports.

Don't want to build this yourself?

Fair. Our DMARC Reporting API ingests RUA reports (via mailbox or API POST), parses them, aggregates across receivers, and surfaces structured JSON. Free tier covers single-domain monitoring up to 50k messages/day.