"""A tool for managing federated Mastodon blocklists """ import argparse import toml import csv import requests import json import time import os.path import sys import urllib.request as urlr from .blocklist_parser import parse_blocklist from .const import DomainBlock, BlockSeverity from importlib.metadata import version __version__ = version('fediblockhole') import logging logging.basicConfig(level=logging.INFO, format='%(asctime)s %(levelname)s %(message)s') log = logging.getLogger('fediblockhole') # Max size of a URL-fetched blocklist URL_BLOCKLIST_MAXSIZE = 1024 ** 3 # Wait at most this long for a remote server to respond REQUEST_TIMEOUT = 30 # Time to wait between instance API calls to we don't melt them API_CALL_DELAY = 3600 / 300 # 300 API calls per hour # We always import the domain and the severity IMPORT_FIELDS = ['domain', 'severity'] # We always export the domain and the severity EXPORT_FIELDS = ['domain', 'severity'] def sync_blocklists(conf: dict): """Sync instance blocklists from remote sources. @param conf: A configuration dictionary """ # Build a dict of blocklists we retrieve from remote sources. # We will merge these later using a merge algorithm we choose. # Always import these fields import_fields = IMPORT_FIELDS # Add extra import fields if defined in config import_fields.extend(conf.import_fields) # Always export these fields export_fields = EXPORT_FIELDS # Add extra export fields if defined in config export_fields.extend(conf.export_fields) blocklists = {} # Fetch blocklists from URLs if not conf.no_fetch_url: blocklists = fetch_from_urls(blocklists, conf.blocklist_url_sources, import_fields, conf.save_intermediate, conf.savedir, export_fields) # Fetch blocklists from remote instances if not conf.no_fetch_instance: blocklists = fetch_from_instances(blocklists, conf.blocklist_instance_sources, import_fields, conf.save_intermediate, conf.savedir, export_fields) # Merge blocklists into an update dict merged = merge_blocklists(blocklists, conf.mergeplan) if conf.blocklist_savefile: log.info(f"Saving merged blocklist to {conf.blocklist_savefile}") save_blocklist_to_file(merged.values(), conf.blocklist_savefile, export_fields) # Push the blocklist to destination instances if not conf.no_push_instance: log.info("Pushing domain blocks to instances...") for dest in conf.blocklist_instance_destinations: domain = dest['domain'] token = dest['token'] max_followed_severity = BlockSeverity(dest.get('max_followed_severity', 'silence')) push_blocklist(token, domain, merged.values(), conf.dryrun, import_fields, max_followed_severity) def fetch_from_urls(blocklists: dict, url_sources: dict, import_fields: list=IMPORT_FIELDS, save_intermediate: bool=False, savedir: str=None, export_fields: list=EXPORT_FIELDS) -> dict: """Fetch blocklists from URL sources @param blocklists: A dict of existing blocklists, keyed by source @param url_sources: A dict of configuration info for url sources @returns: A dict of blocklists, same as input, but (possibly) modified """ log.info("Fetching domain blocks from URLs...") for item in url_sources: url = item['url'] # If import fields are provided, they override the global ones passed in source_import_fields = item.get('import_fields', None) if source_import_fields: # Ensure we always use the default fields import_fields = IMPORT_FIELDS.extend(source_import_fields) max_severity = item.get('max_severity', 'suspend') listformat = item.get('format', 'csv') with urlr.urlopen(url) as fp: rawdata = fp.read(URL_BLOCKLIST_MAXSIZE).decode('utf-8') blocklists[url] = parse_blocklist(rawdata, listformat, import_fields, max_severity) if save_intermediate: save_intermediate_blocklist(blocklists[url], url, savedir, export_fields) return blocklists def fetch_from_instances(blocklists: dict, sources: dict, import_fields: list=IMPORT_FIELDS, save_intermediate: bool=False, savedir: str=None, export_fields: list=EXPORT_FIELDS) -> dict: """Fetch blocklists from other instances @param blocklists: A dict of existing blocklists, keyed by source @param url_sources: A dict of configuration info for url sources @returns: A dict of blocklists, same as input, but (possibly) modified """ log.info("Fetching domain blocks from instances...") for item in sources: domain = item['domain'] admin = item.get('admin', False) token = item.get('token', None) # If import fields are provided, they override the global ones passed in source_import_fields = item.get('import_fields', None) if source_import_fields: # Ensure we always use the default fields import_fields = IMPORT_FIELDS.extend(source_import_fields) # Add the blocklist with the domain as the source key blocklists[domain] = fetch_instance_blocklist(domain, token, admin, import_fields) if save_intermediate: save_intermediate_blocklist(blocklists[domain], domain, savedir, export_fields) return blocklists def merge_blocklists(blocklists: dict, mergeplan: str='max') -> dict: """Merge fetched remote blocklists into a bulk update @param mergeplan: An optional method of merging overlapping block definitions 'max' (the default) uses the highest severity block found 'min' uses the lowest severity block found """ merged = {} for key, blist in blocklists.items(): log.debug(f"processing blocklist from: {key} ...") for newblock in blist: domain = newblock.domain # If the domain has two asterisks in it, it's obfuscated # and we can't really use it, so skip it and do the next one if '*' in domain: log.debug(f"Domain '{domain}' is obfuscated. Skipping it.") continue elif domain in merged: log.debug(f"Overlapping block for domain {domain}. Merging...") blockdata = apply_mergeplan(merged[domain], newblock, mergeplan) else: # New block blockdata = newblock # end if log.debug(f"blockdata is: {blockdata}") merged[domain] = blockdata # end for return merged def apply_mergeplan(oldblock: DomainBlock, newblock: DomainBlock, mergeplan: str='max') -> dict: """Use a mergeplan to decide how to merge two overlapping block definitions @param oldblock: The existing block definition. @param newblock: The new block definition we want to merge in. @param mergeplan: How to merge. Choices are 'max', the default, and 'min'. """ # Default to the existing block definition blockdata = oldblock._asdict() # Merge comments keylist = ['public_comment', 'private_comment'] for key in keylist: try: oldcomment = getattr(oldblock, key) newcomment = getattr(newblock, key) blockdata[key] = merge_comments(oldcomment, newcomment) except KeyError: log.debug(f"Key '{key}' missing from block definition so cannot compare. Continuing...") continue # How do we override an earlier block definition? if mergeplan in ['max', None]: # Use the highest block level found (the default) log.debug(f"Using 'max' mergeplan.") if newblock.severity > oldblock.severity: log.debug(f"New block severity is higher. Using that.") blockdata['severity'] = newblock.severity # For 'reject_media', 'reject_reports', and 'obfuscate' if # the value is set and is True for the domain in # any blocklist then the value is set to True. for key in ['reject_media', 'reject_reports', 'obfuscate']: newval = getattr(newblock, key) if newval == True: blockdata[key] = True elif mergeplan in ['min']: # Use the lowest block level found log.debug(f"Using 'min' mergeplan.") if newblock.severity < oldblock.severity: blockdata['severity'] = newblock.severity # For 'reject_media', 'reject_reports', and 'obfuscate' if # the value is set and is False for the domain in # any blocklist then the value is set to False. for key in ['reject_media', 'reject_reports', 'obfuscate']: newval = getattr(newblock, key) if newval == False: blockdata[key] = False else: raise NotImplementedError(f"Mergeplan '{mergeplan}' not implemented.") log.debug(f"Block severity set to {blockdata['severity']}") return DomainBlock(**blockdata) def merge_comments(oldcomment:str, newcomment:str) -> str: """ Merge two comments @param oldcomment: The original comment we're merging into @param newcomment: The new commment we want to merge in @returns: a new str of the merged comment """ # Don't merge if both comments are None or '' if oldcomment in ['', None] and newcomment in ['', None]: return '' # If both comments are the same, don't merge if oldcomment == newcomment: return oldcomment # We want to skip duplicate fragments so we don't end up # re-concatenating the same strings every time there's an # update, causing the comment to grow without bound. # We tokenize the comments, splitting them on ', ', and comparing # the tokens, skipping duplicates. # This means "boring, lack of moderation, nazis, scrapers" merging # with "lack of moderation, scrapers" should result in # "boring, lack of moderation, nazis, scrapers" old_tokens = oldcomment.split(', ') new_tokens = newcomment.split(', ') # Remove any empty string tokens that we get while '' in old_tokens: old_tokens.remove('') while '' in new_tokens: new_tokens.remove('') # Remove duplicate tokens for token in old_tokens: if token in new_tokens: new_tokens.remove(token) # Combine whatever tokens are left into one set tokenset = old_tokens tokenset.extend(new_tokens) # Return the merged string return ', '.join(tokenset) def requests_headers(token: str=None): """Set common headers for requests""" headers = { 'User-Agent': f"FediBlockHole/{__version__}" } if token: headers['Authorization'] = f"Bearer {token}" return headers def fetch_instance_blocklist(host: str, token: str=None, admin: bool=False, import_fields: list=['domain', 'severity']) -> list[DomainBlock]: """Fetch existing block list from server @param host: The remote host to connect to. @param token: The (optional) OAuth Bearer token to authenticate with. @param admin: Boolean flag to use the admin API if True. @param import_fields: A list of fields to import from the remote instance. @returns: A list of the domain blocks from the instance. """ log.info(f"Fetching instance blocklist from {host} ...") if admin: api_path = "/api/v1/admin/domain_blocks" else: api_path = "/api/v1/instance/domain_blocks" headers = requests_headers(token) url = f"https://{host}{api_path}" blocklist = [] link = True while link: response = requests.get(url, headers=headers, timeout=REQUEST_TIMEOUT) if response.status_code != 200: log.error(f"Cannot fetch remote blocklist: {response.content}") raise ValueError("Unable to fetch domain block list: %s", response) blocklist.extend( parse_blocklist(response.content, 'json', import_fields) ) # Parse the link header to find the next url to fetch # This is a weird and janky way of doing pagination but # hey nothing we can do about it we just have to deal link = response.headers.get('Link', None) if link is None: break pagination = link.split(', ') if len(pagination) != 2: link = None break else: next = pagination[0] # prev = pagination[1] urlstring, rel = next.split('; ') url = urlstring.strip('<').rstrip('>') return blocklist def delete_block(token: str, host: str, id: int): """Remove a domain block""" log.debug(f"Removing domain block {id} at {host}...") api_path = "/api/v1/admin/domain_blocks/" url = f"https://{host}{api_path}{id}" response = requests.delete(url, headers=requests_headers(token), timeout=REQUEST_TIMEOUT ) if response.status_code != 200: if response.status_code == 404: log.warning(f"No such domain block: {id}") return raise ValueError(f"Something went wrong: {response.status_code}: {response.content}") def fetch_instance_follows(token: str, host: str, domain: str) -> int: """Fetch the followers of the target domain at the instance @param token: the Bearer authentication token for OAuth access @param host: the instance API hostname/IP address @param domain: the domain to search for followers of @returns: int, number of local followers of remote instance accounts """ api_path = "/api/v1/admin/measures" url = f"https://{host}{api_path}" key = 'instance_follows' # This data structure only allows us to request a single domain # at a time, which limits the load on the remote instance of each call data = { 'keys': [ key ], key: { 'domain': domain }, } # The Mastodon API only accepts JSON formatted POST data for measures response = requests.post(url, headers=requests_headers(token), json=data, timeout=REQUEST_TIMEOUT ) if response.status_code != 200: if response.status_code == 403: log.error(f"Cannot fetch follow information for {domain} from {host}: {response.content}") raise ValueError(f"Something went wrong: {response.status_code}: {response.content}") # Get the total returned follows = int(response.json()[0]['total']) return follows def check_followed_severity(host: str, token: str, domain: str, severity: BlockSeverity, max_followed_severity: BlockSeverity=BlockSeverity('silence')): """Check an instance to see if it has followers of a to-be-blocked instance""" log.debug("Checking followed severity...") # Return straight away if we're not increasing the severity if severity <= max_followed_severity: return severity # If the instance has accounts that follow people on the to-be-blocked domain, # limit the maximum severity to the configured `max_followed_severity`. log.debug("checking for instance follows...") follows = fetch_instance_follows(token, host, domain) time.sleep(API_CALL_DELAY) if follows > 0: log.debug(f"Instance {host} has {follows} followers of accounts at {domain}.") if severity > max_followed_severity: log.warning(f"Instance {host} has {follows} followers of accounts at {domain}. Limiting block severity to {max_followed_severity}.") return max_followed_severity return severity def is_change_needed(oldblock: dict, newblock: dict, import_fields: list): change_needed = oldblock.compare_fields(newblock, import_fields) return change_needed def update_known_block(token: str, host: str, blockdict: dict): """Update an existing domain block with information in blockdict""" api_path = "/api/v1/admin/domain_blocks/" try: id = blockdict['id'] blockdata = blockdict.copy() del blockdata['id'] except KeyError: import pdb pdb.set_trace() url = f"https://{host}{api_path}{id}" response = requests.put(url, headers=requests_headers(token), data=blockdata, timeout=REQUEST_TIMEOUT ) if response.status_code != 200: raise ValueError(f"Something went wrong: {response.status_code}: {response.content}") def add_block(token: str, host: str, blockdata: DomainBlock): """Block a domain on Mastodon host """ log.debug(f"Blocking domain {blockdata.domain} at {host}...") api_path = "/api/v1/admin/domain_blocks" url = f"https://{host}{api_path}" response = requests.post(url, headers=requests_headers(token), data=blockdata._asdict(), timeout=REQUEST_TIMEOUT ) if response.status_code == 422: # A stricter block already exists. Probably for the base domain. err = json.loads(response.content) log.warning(err['error']) elif response.status_code != 200: raise ValueError(f"Something went wrong: {response.status_code}: {response.content}") def push_blocklist(token: str, host: str, blocklist: list[dict], dryrun: bool=False, import_fields: list=['domain', 'severity'], max_followed_severity:BlockSeverity=BlockSeverity('silence'), ): """Push a blocklist to a remote instance. Merging the blocklist with the existing list the instance has, updating existing entries if they exist. @param token: The Bearer token for OAUTH API authentication @param host: The instance host, FQDN or IP @param blocklist: A list of block definitions. They must include the domain. @param import_fields: A list of fields to import to the instances. """ log.info(f"Pushing blocklist to host {host} ...") # Fetch the existing blocklist from the instance # Force use of the admin API, and add 'id' to the list of fields if 'id' not in import_fields: import_fields.append('id') serverblocks = fetch_instance_blocklist(host, token, True, import_fields) # # Convert serverblocks to a dictionary keyed by domain name knownblocks = {row.domain: row for row in serverblocks} for newblock in blocklist: log.debug(f"Processing block: {newblock}") oldblock = knownblocks.get(newblock.domain, None) if oldblock: log.debug(f"Block already exists for {newblock.domain}, checking for differences...") change_needed = is_change_needed(oldblock, newblock, import_fields) # Is the severity changing? if 'severity' in change_needed: log.debug("Severity change requested, checking...") if newseverity > oldblock.severity: # Confirm if we really want to change the severity # If we still have followers of the remote domain, we may not # want to go all the way to full suspend, depending on the configuration newseverity = check_followed_severity(host, token, oldblock.domain, newblock.severity, max_followed_severity) if newseverity != oldblock.severity: newblock.severity = newseverity else: log.info("Keeping severity of block the same to avoid disrupting followers.") change_needed.remove('severity') if change_needed: log.info(f"Change detected. Need to update {change_needed} for domain block for {oldblock.domain}") log.info(f"Old block definition: {oldblock}") log.info(f"Pushing new block definition: {newblock}") blockdata = oldblock.copy() blockdata.update(newblock) if not dryrun: update_known_block(token, host, blockdata) # add a pause here so we don't melt the instance time.sleep(API_CALL_DELAY) else: log.info("Dry run selected. Not applying changes.") else: log.debug("No differences detected. Not updating.") pass else: # This is a new block for the target instance, so we # need to add a block rather than update an existing one log.info(f"Adding new block: {newblock}...") # Make sure the new block doesn't clobber a domain with followers newblock.severity = check_followed_severity(host, token, newblock.domain, newblock.severity, max_followed_severity) if not dryrun: add_block(token, host, newblock) # add a pause here so we don't melt the instance time.sleep(API_CALL_DELAY) else: log.info("Dry run selected. Not adding block.") def load_config(configfile: str): """Augment commandline arguments with config file parameters Config file is expected to be in TOML format """ conf = toml.load(configfile) return conf def save_intermediate_blocklist( blocklist: list[dict], source: str, filedir: str, export_fields: list=['domain','severity']): """Save a local copy of a blocklist we've downloaded """ # Invent a filename based on the remote source # If the source was a URL, convert it to something less messy # If the source was a remote domain, just use the name of the domain log.debug(f"Saving intermediate blocklist from {source}") source = source.replace('/','-') filename = f"{source}.csv" filepath = os.path.join(filedir, filename) save_blocklist_to_file(blocklist, filepath, export_fields) def save_blocklist_to_file( blocklist: list[DomainBlock], filepath: str, export_fields: list=['domain','severity']): """Save a blocklist we've downloaded from a remote source @param blocklist: A dictionary of block definitions, keyed by domain @param filepath: The path to the file the list should be saved in. @param export_fields: Which fields to include in the export. """ try: blocklist = sorted(blocklist, key=lambda x: x.domain) except KeyError: log.error("Field 'domain' not found in blocklist.") log.debug(f"blocklist is: {blocklist}") log.debug(f"export fields: {export_fields}") with open(filepath, "w") as fp: writer = csv.DictWriter(fp, export_fields, extrasaction='ignore') writer.writeheader() for item in blocklist: writer.writerow(item._asdict()) def augment_args(args): """Augment commandline arguments with config file parameters""" conf = toml.load(args.config) if not args.no_fetch_url: args.no_fetch_url = conf.get('no_fetch_url', False) if not args.no_fetch_instance: args.no_fetch_instance = conf.get('no_fetch_instance', False) if not args.no_push_instance: args.no_push_instance = conf.get('no_push_instance', False) if not args.blocklist_savefile: args.blocklist_savefile = conf.get('blocklist_savefile', None) if not args.save_intermediate: args.save_intermediate = conf.get('save_intermediate', False) if not args.savedir: args.savedir = conf.get('savedir', '/tmp') if not args.export_fields: args.export_fields = conf.get('export_fields', []) if not args.import_fields: args.import_fields = conf.get('import_fields', []) args.blocklist_url_sources = conf.get('blocklist_url_sources') args.blocklist_instance_sources = conf.get('blocklist_instance_sources') args.blocklist_instance_destinations = conf.get('blocklist_instance_destinations') return args def main(): ap = argparse.ArgumentParser( description="Bulk blocklist tool", epilog=f"Part of FediBlockHole v{__version__}", formatter_class=argparse.ArgumentDefaultsHelpFormatter) ap.add_argument('-c', '--config', default='/etc/default/fediblockhole.conf.toml', help="Config file") ap.add_argument('-V', '--version', action='store_true', help="Show version and exit.") ap.add_argument('-o', '--outfile', dest="blocklist_savefile", help="Save merged blocklist to a local file.") ap.add_argument('-S', '--save-intermediate', dest="save_intermediate", action='store_true', help="Save intermediate blocklists we fetch to local files.") ap.add_argument('-D', '--savedir', dest="savedir", help="Directory path to save intermediate lists.") ap.add_argument('-m', '--mergeplan', choices=['min', 'max'], default='max', help="Set mergeplan.") ap.add_argument('-I', '--import-field', dest='import_fields', action='append', help="Extra blocklist fields to import.") ap.add_argument('-E', '--export-field', dest='export_fields', action='append', help="Extra blocklist fields to export.") ap.add_argument('--no-fetch-url', dest='no_fetch_url', action='store_true', help="Don't fetch from URLs, even if configured.") ap.add_argument('--no-fetch-instance', dest='no_fetch_instance', action='store_true', help="Don't fetch from instances, even if configured.") ap.add_argument('--no-push-instance', dest='no_push_instance', action='store_true', help="Don't push to instances, even if configured.") ap.add_argument('--loglevel', choices=['debug', 'info', 'warning', 'error', 'critical'], help="Set log output level.") ap.add_argument('--dryrun', action='store_true', help="Don't actually push updates, just show what would happen.") args = ap.parse_args() if args.loglevel is not None: levelname = args.loglevel.upper() log.setLevel(getattr(logging, levelname)) if args.version: print(f"v{__version__}") sys.exit(0) # Load the configuration file args = augment_args(args) # Do the work of syncing sync_blocklists(args)