Create Domain Audit File Option

Allows for the creation of an audit file that will explain for each domain the number of matches across available sources
This commit is contained in:
Shawn Grigson 2023-08-20 02:50:05 -05:00
parent 525dc876e0
commit 6f960aefec
4 changed files with 160 additions and 7 deletions

View File

@ -42,6 +42,9 @@ blocklist_instance_destinations = [
## File to save the fully merged blocklist into
# blocklist_savefile = '/tmp/merged_blocklist.csv'
## File to save the audit log of counts across sources
# blocklist_auditfile = '/tmp/domain_counts_list.csv'
## Don't push blocklist to instances, even if they're defined above
# no_push_instance = false

View File

@ -11,8 +11,8 @@ import os.path
import sys
import urllib.request as urlr
from .blocklists import Blocklist, parse_blocklist
from .const import DomainBlock, BlockSeverity
from .blocklists import Blocklist, BlockAuditList, parse_blocklist
from .const import DomainBlock, BlockSeverity, BlockAudit
from importlib.metadata import version
__version__ = version('fediblockhole')
@ -71,7 +71,7 @@ def sync_blocklists(conf: argparse.Namespace):
import_fields, conf.save_intermediate, conf.savedir, export_fields))
# Merge blocklists into an update dict
merged = merge_blocklists(blocklists, conf.mergeplan, conf.merge_threshold, conf.merge_threshold_type)
merged = merge_blocklists(blocklists, conf.mergeplan, conf.merge_threshold, conf.merge_threshold_type, conf.blocklist_auditfile)
# Remove items listed in allowlists, if any
allowlists = fetch_allowlists(conf)
@ -183,7 +183,8 @@ def fetch_from_instances(sources: dict,
def merge_blocklists(blocklists: list[Blocklist], mergeplan: str='max',
threshold: int=0,
threshold_type: str='count') -> Blocklist:
threshold_type: str='count',
save_block_audit_file: str='') -> Blocklist:
"""Merge fetched remote blocklists into a bulk update
@param blocklists: A dict of lists of DomainBlocks, keyed by source.
Each value is a list of DomainBlocks
@ -201,6 +202,7 @@ def merge_blocklists(blocklists: list[Blocklist], mergeplan: str='max',
@param returns: A dict of DomainBlocks keyed by domain
"""
merged = Blocklist('fediblockhole.merge_blocklists')
audit = BlockAuditList('fediblockhole.merge_blocklists')
num_blocklists = len(blocklists)
@ -219,10 +221,12 @@ def merge_blocklists(blocklists: list[Blocklist], mergeplan: str='max',
# Only merge items if `threshold` is met or exceeded
for domain in domain_blocks:
domain_matches_count = len(domain_blocks[domain])
domain_matches_percent = domain_matches_count / num_blocklists * 100
if threshold_type == 'count':
domain_threshold_level = len(domain_blocks[domain])
domain_threshold_level = domain_matches_count
elif threshold_type == 'pct':
domain_threshold_level = len(domain_blocks[domain]) / num_blocklists * 100
domain_threshold_level = domain_matches_percent
# log.debug(f"domain threshold level: {domain_threshold_level}")
else:
raise ValueError(f"Unsupported threshold type '{threshold_type}'. Supported values are: 'count', 'pct'")
@ -238,6 +242,18 @@ def merge_blocklists(blocklists: list[Blocklist], mergeplan: str='max',
block = apply_mergeplan(block, newblock, mergeplan)
merged.blocks[block.domain] = block
if len(save_block_audit_file) > 0:
blockdata:BlockAudit = {
'domain': domain,
'count': domain_matches_count,
'percent': domain_matches_percent,
}
audit.blocks[domain] = blockdata
if len(save_block_audit_file) > 0:
log.info(f"Saving audit file to {save_block_audit_file}")
save_domain_block_audit_to_file(audit, save_block_audit_file)
return merged
def apply_mergeplan(oldblock: DomainBlock, newblock: DomainBlock, mergeplan: str='max') -> dict:
@ -672,6 +688,34 @@ def save_blocklist_to_file(
for key, value in sorted_list:
writer.writerow(value)
def save_domain_block_audit_to_file(
blocklist: BlockAuditList,
filepath: str):
"""Save an audit log of domains blocked
@param blocklist: A dictionary of block definitions, keyed by domain
@param filepath: The path to the file the list should be saved in.
"""
export_fields = ['domain', 'count', 'percent']
try:
sorted_list = sorted(blocklist.blocks.items())
except KeyError:
log.error("Field 'domain' not found in blocklist.")
log.debug(f"blocklist is: {sorted_list}")
except AttributeError:
log.error("Attribute error!")
import pdb
pdb.set_trace()
log.debug("exporting audit file")
with open(filepath, "w") as fp:
writer = csv.DictWriter(fp, export_fields, extrasaction='ignore')
writer.writeheader()
for key, value in sorted_list:
writer.writerow(value)
def augment_args(args, tomldata: str=None):
"""Augment commandline arguments with config file parameters
@ -701,6 +745,9 @@ def augment_args(args, tomldata: str=None):
if not args.savedir:
args.savedir = conf.get('savedir', '/tmp')
if not args.blocklist_auditfile:
args.blocklist_auditfile = conf.get('blocklist_auditfile', '')
if not args.export_fields:
args.export_fields = conf.get('export_fields', [])
@ -737,6 +784,7 @@ def setup_argparse():
ap.add_argument('-S', '--save-intermediate', dest="save_intermediate", action='store_true', help="Save intermediate blocklists we fetch to local files.")
ap.add_argument('-D', '--savedir', dest="savedir", help="Directory path to save intermediate lists.")
ap.add_argument('-m', '--mergeplan', choices=['min', 'max'], help="Set mergeplan.")
ap.add_argument('-b', '--block-audit-file', dest="blocklist_auditfile", help="Save blocklist auditfile to this location.")
ap.add_argument('--merge-threshold', type=int, help="Merge threshold value")
ap.add_argument('--merge-threshold-type', choices=['count', 'pct'], help="Type of merge threshold to use.")

View File

@ -6,7 +6,7 @@ import json
from typing import Iterable
from dataclasses import dataclass, field
from .const import DomainBlock, BlockSeverity
from .const import DomainBlock, BlockSeverity, BlockAudit
import logging
log = logging.getLogger('fediblockhole')
@ -38,6 +38,33 @@ class Blocklist:
def values(self):
return self.blocks.values()
@dataclass
class BlockAuditList:
""" A BlockAuditlist object
A BlockAuditlist is a list of BlockAudits from an origin
"""
origin: str = None
blocks: dict[str, BlockAudit] = field(default_factory=dict)
def __len__(self):
return len(self.blocks)
def __class_getitem__(cls, item):
return dict[str, BlockAudit]
def __getitem__(self, item):
return self.blocks[item]
def __iter__(self):
return self.blocks.__iter__()
def items(self):
return self.blocks.items()
def values(self):
return self.blocks.values()
class BlocklistParser(object):
"""
Base class for parsing blocklists

View File

@ -85,6 +85,81 @@ class BlockSeverity(object):
if self._level >= other._level:
return True
class BlockAudit(object):
fields = [
'domain',
'count',
'percent',
]
all_fields = [
'domain',
'count',
'percent',
'id'
]
def __init__(self, domain:str,
count: int=0,
percent: int=0,
id: int=None):
"""Initialize the BlockAudit
"""
self.domain = domain
self.count = count
self.percent = percent
self.id = id
def _asdict(self):
"""Return a dict version of this object
"""
dictval = {
'domain': self.domain,
'count': self.count,
'percent': self.percent,
}
if self.id:
dictval['id'] = self.id
return dictval
def __repr__(self):
return f"<BlockAudit {self._asdict()}>"
def copy(self):
"""Make a copy of this object and return it
"""
retval = BlockAudit(**self._asdict())
return retval
def update(self, dict):
"""Update my kwargs
"""
for key in dict:
setattr(self, key, dict[key])
def __iter__(self):
"""Be iterable"""
keys = self.fields
if getattr(self, 'id', False):
keys.append('id')
for k in keys:
yield k
def __getitem__(self, k, default=None):
"Behave like a dict for getting values"
if k not in self.all_fields:
raise KeyError(f"Invalid key '{k}'")
return getattr(self, k, default)
def get(self, k, default=None):
return self.__getitem__(k, default)
# class _DomainBlock(NamedTuple):
# domain: str # FIXME: Use an actual Domain object from somewhere?
# severity: BlockSeverity = BlockSeverity.SUSPEND