Merge pull request #2 from sgrigson/domain-audit-file

Create Domain Audit File Option
This commit is contained in:
Shawn Grigson 2023-08-30 01:15:41 -05:00 committed by GitHub
commit df74af1e6c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 160 additions and 7 deletions

View File

@ -42,6 +42,9 @@ blocklist_instance_destinations = [
## File to save the fully merged blocklist into
# blocklist_savefile = '/tmp/merged_blocklist.csv'
## File to save the audit log of counts across sources
# blocklist_auditfile = '/tmp/domain_counts_list.csv'
## Don't push blocklist to instances, even if they're defined above
# no_push_instance = false

View File

@ -11,8 +11,8 @@ import os.path
import sys
import urllib.request as urlr
from .blocklists import Blocklist, parse_blocklist
from .const import DomainBlock, BlockSeverity
from .blocklists import Blocklist, BlockAuditList, parse_blocklist
from .const import DomainBlock, BlockSeverity, BlockAudit
from importlib.metadata import version
__version__ = version('fediblockhole')
@ -71,7 +71,7 @@ def sync_blocklists(conf: argparse.Namespace):
import_fields, conf.save_intermediate, conf.savedir, export_fields))
# Merge blocklists into an update dict
merged = merge_blocklists(blocklists, conf.mergeplan, conf.merge_threshold, conf.merge_threshold_type)
merged = merge_blocklists(blocklists, conf.mergeplan, conf.merge_threshold, conf.merge_threshold_type, conf.blocklist_auditfile)
# Remove items listed in allowlists, if any
allowlists = fetch_allowlists(conf)
@ -183,7 +183,8 @@ def fetch_from_instances(sources: dict,
def merge_blocklists(blocklists: list[Blocklist], mergeplan: str='max',
threshold: int=0,
threshold_type: str='count') -> Blocklist:
threshold_type: str='count',
save_block_audit_file: str='') -> Blocklist:
"""Merge fetched remote blocklists into a bulk update
@param blocklists: A dict of lists of DomainBlocks, keyed by source.
Each value is a list of DomainBlocks
@ -201,6 +202,7 @@ def merge_blocklists(blocklists: list[Blocklist], mergeplan: str='max',
@param returns: A dict of DomainBlocks keyed by domain
"""
merged = Blocklist('fediblockhole.merge_blocklists')
audit = BlockAuditList('fediblockhole.merge_blocklists')
num_blocklists = len(blocklists)
@ -219,10 +221,12 @@ def merge_blocklists(blocklists: list[Blocklist], mergeplan: str='max',
# Only merge items if `threshold` is met or exceeded
for domain in domain_blocks:
domain_matches_count = len(domain_blocks[domain])
domain_matches_percent = domain_matches_count / num_blocklists * 100
if threshold_type == 'count':
domain_threshold_level = len(domain_blocks[domain])
domain_threshold_level = domain_matches_count
elif threshold_type == 'pct':
domain_threshold_level = len(domain_blocks[domain]) / num_blocklists * 100
domain_threshold_level = domain_matches_percent
# log.debug(f"domain threshold level: {domain_threshold_level}")
else:
raise ValueError(f"Unsupported threshold type '{threshold_type}'. Supported values are: 'count', 'pct'")
@ -238,6 +242,18 @@ def merge_blocklists(blocklists: list[Blocklist], mergeplan: str='max',
block = apply_mergeplan(block, newblock, mergeplan)
merged.blocks[block.domain] = block
if len(save_block_audit_file) > 0:
blockdata:BlockAudit = {
'domain': domain,
'count': domain_matches_count,
'percent': domain_matches_percent,
}
audit.blocks[domain] = blockdata
if len(save_block_audit_file) > 0:
log.info(f"Saving audit file to {save_block_audit_file}")
save_domain_block_audit_to_file(audit, save_block_audit_file)
return merged
def apply_mergeplan(oldblock: DomainBlock, newblock: DomainBlock, mergeplan: str='max') -> dict:
@ -672,6 +688,34 @@ def save_blocklist_to_file(
for key, value in sorted_list:
writer.writerow(value)
def save_domain_block_audit_to_file(
blocklist: BlockAuditList,
filepath: str):
"""Save an audit log of domains blocked
@param blocklist: A dictionary of block definitions, keyed by domain
@param filepath: The path to the file the list should be saved in.
"""
export_fields = ['domain', 'count', 'percent']
try:
sorted_list = sorted(blocklist.blocks.items())
except KeyError:
log.error("Field 'domain' not found in blocklist.")
log.debug(f"blocklist is: {sorted_list}")
except AttributeError:
log.error("Attribute error!")
import pdb
pdb.set_trace()
log.debug("exporting audit file")
with open(filepath, "w") as fp:
writer = csv.DictWriter(fp, export_fields, extrasaction='ignore')
writer.writeheader()
for key, value in sorted_list:
writer.writerow(value)
def augment_args(args, tomldata: str=None):
"""Augment commandline arguments with config file parameters
@ -701,6 +745,9 @@ def augment_args(args, tomldata: str=None):
if not args.savedir:
args.savedir = conf.get('savedir', '/tmp')
if not args.blocklist_auditfile:
args.blocklist_auditfile = conf.get('blocklist_auditfile', '')
if not args.export_fields:
args.export_fields = conf.get('export_fields', [])
@ -737,6 +784,7 @@ def setup_argparse():
ap.add_argument('-S', '--save-intermediate', dest="save_intermediate", action='store_true', help="Save intermediate blocklists we fetch to local files.")
ap.add_argument('-D', '--savedir', dest="savedir", help="Directory path to save intermediate lists.")
ap.add_argument('-m', '--mergeplan', choices=['min', 'max'], help="Set mergeplan.")
ap.add_argument('-b', '--block-audit-file', dest="blocklist_auditfile", help="Save blocklist auditfile to this location.")
ap.add_argument('--merge-threshold', type=int, help="Merge threshold value")
ap.add_argument('--merge-threshold-type', choices=['count', 'pct'], help="Type of merge threshold to use.")

View File

@ -6,7 +6,7 @@ import json
from typing import Iterable
from dataclasses import dataclass, field
from .const import DomainBlock, BlockSeverity
from .const import DomainBlock, BlockSeverity, BlockAudit
import logging
log = logging.getLogger('fediblockhole')
@ -38,6 +38,33 @@ class Blocklist:
def values(self):
return self.blocks.values()
@dataclass
class BlockAuditList:
""" A BlockAuditlist object
A BlockAuditlist is a list of BlockAudits from an origin
"""
origin: str = None
blocks: dict[str, BlockAudit] = field(default_factory=dict)
def __len__(self):
return len(self.blocks)
def __class_getitem__(cls, item):
return dict[str, BlockAudit]
def __getitem__(self, item):
return self.blocks[item]
def __iter__(self):
return self.blocks.__iter__()
def items(self):
return self.blocks.items()
def values(self):
return self.blocks.values()
class BlocklistParser(object):
"""
Base class for parsing blocklists

View File

@ -85,6 +85,81 @@ class BlockSeverity(object):
if self._level >= other._level:
return True
class BlockAudit(object):
fields = [
'domain',
'count',
'percent',
]
all_fields = [
'domain',
'count',
'percent',
'id'
]
def __init__(self, domain:str,
count: int=0,
percent: int=0,
id: int=None):
"""Initialize the BlockAudit
"""
self.domain = domain
self.count = count
self.percent = percent
self.id = id
def _asdict(self):
"""Return a dict version of this object
"""
dictval = {
'domain': self.domain,
'count': self.count,
'percent': self.percent,
}
if self.id:
dictval['id'] = self.id
return dictval
def __repr__(self):
return f"<BlockAudit {self._asdict()}>"
def copy(self):
"""Make a copy of this object and return it
"""
retval = BlockAudit(**self._asdict())
return retval
def update(self, dict):
"""Update my kwargs
"""
for key in dict:
setattr(self, key, dict[key])
def __iter__(self):
"""Be iterable"""
keys = self.fields
if getattr(self, 'id', False):
keys.append('id')
for k in keys:
yield k
def __getitem__(self, k, default=None):
"Behave like a dict for getting values"
if k not in self.all_fields:
raise KeyError(f"Invalid key '{k}'")
return getattr(self, k, default)
def get(self, k, default=None):
return self.__getitem__(k, default)
# class _DomainBlock(NamedTuple):
# domain: str # FIXME: Use an actual Domain object from somewhere?
# severity: BlockSeverity = BlockSeverity.SUSPEND