Merge pull request #55 from sgrigson/domain-audit-file

Add a domain blocks audit file to assist with auditing, reviewing, debugging, etc.
This commit is contained in:
Justin Warren 2023-09-07 07:15:31 +10:00 committed by GitHub
commit 9200fc31ae
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 165 additions and 7 deletions

View File

@ -363,6 +363,11 @@ The filename is based on the URL or domain used so you can tell where each list
Sets where to save intermediate blocklist files. Defaults to `/tmp`. Sets where to save intermediate blocklist files. Defaults to `/tmp`.
### blocklist_auditfile
If provided, will save an audit file of counts and percentages by domain. Useful for debugging
thresholds. Defaults to None.
### no_push_instance ### no_push_instance
Defaults to False. Defaults to False.

View File

@ -42,6 +42,9 @@ blocklist_instance_destinations = [
## File to save the fully merged blocklist into ## File to save the fully merged blocklist into
# blocklist_savefile = '/tmp/merged_blocklist.csv' # blocklist_savefile = '/tmp/merged_blocklist.csv'
## File to save the audit log of counts across sources
# blocklist_auditfile = '/tmp/domain_counts_list.csv'
## Don't push blocklist to instances, even if they're defined above ## Don't push blocklist to instances, even if they're defined above
# no_push_instance = false # no_push_instance = false

View File

@ -11,8 +11,8 @@ import os.path
import sys import sys
import urllib.request as urlr import urllib.request as urlr
from .blocklists import Blocklist, parse_blocklist from .blocklists import Blocklist, BlockAuditList, parse_blocklist
from .const import DomainBlock, BlockSeverity from .const import DomainBlock, BlockSeverity, BlockAudit
from importlib.metadata import version from importlib.metadata import version
__version__ = version('fediblockhole') __version__ = version('fediblockhole')
@ -71,7 +71,7 @@ def sync_blocklists(conf: argparse.Namespace):
import_fields, conf.save_intermediate, conf.savedir, export_fields)) import_fields, conf.save_intermediate, conf.savedir, export_fields))
# Merge blocklists into an update dict # Merge blocklists into an update dict
merged = merge_blocklists(blocklists, conf.mergeplan, conf.merge_threshold, conf.merge_threshold_type) merged = merge_blocklists(blocklists, conf.mergeplan, conf.merge_threshold, conf.merge_threshold_type, conf.blocklist_auditfile)
# Remove items listed in allowlists, if any # Remove items listed in allowlists, if any
allowlists = fetch_allowlists(conf) allowlists = fetch_allowlists(conf)
@ -183,7 +183,8 @@ def fetch_from_instances(sources: dict,
def merge_blocklists(blocklists: list[Blocklist], mergeplan: str='max', def merge_blocklists(blocklists: list[Blocklist], mergeplan: str='max',
threshold: int=0, threshold: int=0,
threshold_type: str='count') -> Blocklist: threshold_type: str='count',
save_block_audit_file: str=None) -> Blocklist:
"""Merge fetched remote blocklists into a bulk update """Merge fetched remote blocklists into a bulk update
@param blocklists: A dict of lists of DomainBlocks, keyed by source. @param blocklists: A dict of lists of DomainBlocks, keyed by source.
Each value is a list of DomainBlocks Each value is a list of DomainBlocks
@ -201,6 +202,7 @@ def merge_blocklists(blocklists: list[Blocklist], mergeplan: str='max',
@param returns: A dict of DomainBlocks keyed by domain @param returns: A dict of DomainBlocks keyed by domain
""" """
merged = Blocklist('fediblockhole.merge_blocklists') merged = Blocklist('fediblockhole.merge_blocklists')
audit = BlockAuditList('fediblockhole.merge_blocklists')
num_blocklists = len(blocklists) num_blocklists = len(blocklists)
@ -219,10 +221,12 @@ def merge_blocklists(blocklists: list[Blocklist], mergeplan: str='max',
# Only merge items if `threshold` is met or exceeded # Only merge items if `threshold` is met or exceeded
for domain in domain_blocks: for domain in domain_blocks:
domain_matches_count = len(domain_blocks[domain])
domain_matches_percent = domain_matches_count / num_blocklists * 100
if threshold_type == 'count': if threshold_type == 'count':
domain_threshold_level = len(domain_blocks[domain]) domain_threshold_level = domain_matches_count
elif threshold_type == 'pct': elif threshold_type == 'pct':
domain_threshold_level = len(domain_blocks[domain]) / num_blocklists * 100 domain_threshold_level = domain_matches_percent
# log.debug(f"domain threshold level: {domain_threshold_level}") # log.debug(f"domain threshold level: {domain_threshold_level}")
else: else:
raise ValueError(f"Unsupported threshold type '{threshold_type}'. Supported values are: 'count', 'pct'") raise ValueError(f"Unsupported threshold type '{threshold_type}'. Supported values are: 'count', 'pct'")
@ -238,6 +242,18 @@ def merge_blocklists(blocklists: list[Blocklist], mergeplan: str='max',
block = apply_mergeplan(block, newblock, mergeplan) block = apply_mergeplan(block, newblock, mergeplan)
merged.blocks[block.domain] = block merged.blocks[block.domain] = block
if save_block_audit_file:
blockdata:BlockAudit = {
'domain': domain,
'count': domain_matches_count,
'percent': domain_matches_percent,
}
audit.blocks[domain] = blockdata
if save_block_audit_file:
log.info(f"Saving audit file to {save_block_audit_file}")
save_domain_block_audit_to_file(audit, save_block_audit_file)
return merged return merged
def apply_mergeplan(oldblock: DomainBlock, newblock: DomainBlock, mergeplan: str='max') -> dict: def apply_mergeplan(oldblock: DomainBlock, newblock: DomainBlock, mergeplan: str='max') -> dict:
@ -672,6 +688,34 @@ def save_blocklist_to_file(
for key, value in sorted_list: for key, value in sorted_list:
writer.writerow(value) writer.writerow(value)
def save_domain_block_audit_to_file(
blocklist: BlockAuditList,
filepath: str):
"""Save an audit log of domains blocked
@param blocklist: A dictionary of block definitions, keyed by domain
@param filepath: The path to the file the list should be saved in.
"""
export_fields = ['domain', 'count', 'percent']
try:
sorted_list = sorted(blocklist.blocks.items())
except KeyError:
log.error("Field 'domain' not found in blocklist.")
log.debug(f"blocklist is: {sorted_list}")
except AttributeError:
log.error("Attribute error!")
import pdb
pdb.set_trace()
log.debug("exporting audit file")
with open(filepath, "w") as fp:
writer = csv.DictWriter(fp, export_fields, extrasaction='ignore')
writer.writeheader()
for key, value in sorted_list:
writer.writerow(value)
def augment_args(args, tomldata: str=None): def augment_args(args, tomldata: str=None):
"""Augment commandline arguments with config file parameters """Augment commandline arguments with config file parameters
@ -701,6 +745,9 @@ def augment_args(args, tomldata: str=None):
if not args.savedir: if not args.savedir:
args.savedir = conf.get('savedir', '/tmp') args.savedir = conf.get('savedir', '/tmp')
if not args.blocklist_auditfile:
args.blocklist_auditfile = conf.get('blocklist_auditfile', None)
if not args.export_fields: if not args.export_fields:
args.export_fields = conf.get('export_fields', []) args.export_fields = conf.get('export_fields', [])
@ -737,6 +784,7 @@ def setup_argparse():
ap.add_argument('-S', '--save-intermediate', dest="save_intermediate", action='store_true', help="Save intermediate blocklists we fetch to local files.") ap.add_argument('-S', '--save-intermediate', dest="save_intermediate", action='store_true', help="Save intermediate blocklists we fetch to local files.")
ap.add_argument('-D', '--savedir', dest="savedir", help="Directory path to save intermediate lists.") ap.add_argument('-D', '--savedir', dest="savedir", help="Directory path to save intermediate lists.")
ap.add_argument('-m', '--mergeplan', choices=['min', 'max'], help="Set mergeplan.") ap.add_argument('-m', '--mergeplan', choices=['min', 'max'], help="Set mergeplan.")
ap.add_argument('-b', '--block-audit-file', dest="blocklist_auditfile", help="Save blocklist auditfile to this location.")
ap.add_argument('--merge-threshold', type=int, help="Merge threshold value") ap.add_argument('--merge-threshold', type=int, help="Merge threshold value")
ap.add_argument('--merge-threshold-type', choices=['count', 'pct'], help="Type of merge threshold to use.") ap.add_argument('--merge-threshold-type', choices=['count', 'pct'], help="Type of merge threshold to use.")

View File

@ -6,7 +6,7 @@ import json
from typing import Iterable from typing import Iterable
from dataclasses import dataclass, field from dataclasses import dataclass, field
from .const import DomainBlock, BlockSeverity from .const import DomainBlock, BlockSeverity, BlockAudit
import logging import logging
log = logging.getLogger('fediblockhole') log = logging.getLogger('fediblockhole')
@ -38,6 +38,33 @@ class Blocklist:
def values(self): def values(self):
return self.blocks.values() return self.blocks.values()
@dataclass
class BlockAuditList:
""" A BlockAuditlist object
A BlockAuditlist is a list of BlockAudits from an origin
"""
origin: str = None
blocks: dict[str, BlockAudit] = field(default_factory=dict)
def __len__(self):
return len(self.blocks)
def __class_getitem__(cls, item):
return dict[str, BlockAudit]
def __getitem__(self, item):
return self.blocks[item]
def __iter__(self):
return self.blocks.__iter__()
def items(self):
return self.blocks.items()
def values(self):
return self.blocks.values()
class BlocklistParser(object): class BlocklistParser(object):
""" """
Base class for parsing blocklists Base class for parsing blocklists

View File

@ -84,6 +84,81 @@ class BlockSeverity(object):
def __ge__(self, other): def __ge__(self, other):
if self._level >= other._level: if self._level >= other._level:
return True return True
class BlockAudit(object):
fields = [
'domain',
'count',
'percent',
]
all_fields = [
'domain',
'count',
'percent',
'id'
]
def __init__(self, domain:str,
count: int=0,
percent: int=0,
id: int=None):
"""Initialize the BlockAudit
"""
self.domain = domain
self.count = count
self.percent = percent
self.id = id
def _asdict(self):
"""Return a dict version of this object
"""
dictval = {
'domain': self.domain,
'count': self.count,
'percent': self.percent,
}
if self.id:
dictval['id'] = self.id
return dictval
def __repr__(self):
return f"<BlockAudit {self._asdict()}>"
def copy(self):
"""Make a copy of this object and return it
"""
retval = BlockAudit(**self._asdict())
return retval
def update(self, dict):
"""Update my kwargs
"""
for key in dict:
setattr(self, key, dict[key])
def __iter__(self):
"""Be iterable"""
keys = self.fields
if getattr(self, 'id', False):
keys.append('id')
for k in keys:
yield k
def __getitem__(self, k, default=None):
"Behave like a dict for getting values"
if k not in self.all_fields:
raise KeyError(f"Invalid key '{k}'")
return getattr(self, k, default)
def get(self, k, default=None):
return self.__getitem__(k, default)
# class _DomainBlock(NamedTuple): # class _DomainBlock(NamedTuple):
# domain: str # FIXME: Use an actual Domain object from somewhere? # domain: str # FIXME: Use an actual Domain object from somewhere?