diff --git a/src/fediblockhole/__init__.py b/src/fediblockhole/__init__.py index 1b0ea53..945e29c 100755 --- a/src/fediblockhole/__init__.py +++ b/src/fediblockhole/__init__.py @@ -41,7 +41,7 @@ ALLOWLIST_IMPORT_FIELDS = ['domain', 'severity', 'public_comment', 'private_comm # We always export the domain and the severity EXPORT_FIELDS = ['domain', 'severity'] -def sync_blocklists(conf: dict): +def sync_blocklists(conf: argparse.Namespace): """Sync instance blocklists from remote sources. @param conf: A configuration dictionary @@ -73,24 +73,11 @@ def sync_blocklists(conf: dict): # Merge blocklists into an update dict merged = merge_blocklists(blocklists, conf.mergeplan) - # Apply allows specified on the commandline - for domain in conf.allow_domains: - log.info(f"Allowing domain '{domain}' specified on commandline.") - merged[domain] = DomainBlock(domain, 'noop') - - # Apply allows from URLs lists - if conf.allowlist_url_sources: - log.info("Adding allows from URL lists...") - allowlists = fetch_from_urls({}, conf.allowlist_url_sources, ALLOWLIST_IMPORT_FIELDS) - for key, alist in allowlists.items(): - log.debug(f"Processing allows from '{key}'...") - for allowed in alist: - # Ensure the severity is always 'noop' - # This is to prevent accidentally blocking something you wanted to allow. - allowed.severity = 'noop' - merged[allowed.domain] = allowed - log.debug(f"Allowed domain '{allowed.domain}' from allowlist: {allowed}") + # Remove items listed in allowlists, if any + allowlists = fetch_allowlists(conf) + merged = apply_allowlists(merged, conf, allowlists) + # Save the final mergelist, if requested if conf.blocklist_savefile: log.info(f"Saving merged blocklist to {conf.blocklist_savefile}") save_blocklist_to_file(merged.values(), conf.blocklist_savefile, export_fields) @@ -104,6 +91,35 @@ def sync_blocklists(conf: dict): max_followed_severity = BlockSeverity(dest.get('max_followed_severity', 'silence')) push_blocklist(token, domain, merged.values(), conf.dryrun, import_fields, max_followed_severity) +def apply_allowlists(merged: dict, conf: argparse.Namespace, allowlists: dict): + """Apply allowlists + """ + # Apply allows specified on the commandline + for domain in conf.allow_domains: + log.info(f"'{domain}' allowed by commandline, removing any blocks...") + if domain in merged: + del merged[domain] + + # Apply allows from URLs lists + log.info("Removing domains from URL allowlists...") + for key, alist in allowlists.items(): + log.debug(f"Processing allows from '{key}'...") + for allowed in alist: + domain = allowed.domain + log.debug(f"Removing allowlisted domain '{domain}' from merged list.") + if domain in merged: + del merged[domain] + + return merged + +def fetch_allowlists(conf: argparse.Namespace) -> dict: + """ + """ + if conf.allowlist_url_sources: + allowlists = fetch_from_urls({}, conf.allowlist_url_sources, ALLOWLIST_IMPORT_FIELDS) + return allowlists + return {} + def fetch_from_urls(blocklists: dict, url_sources: dict, import_fields: list=IMPORT_FIELDS, save_intermediate: bool=False, @@ -655,10 +671,10 @@ def augment_args(args, tomldata: str=None): if not args.mergeplan: args.mergeplan = conf.get('mergeplan', 'max') - args.blocklist_url_sources = conf.get('blocklist_url_sources', None) - args.blocklist_instance_sources = conf.get('blocklist_instance_sources', None) - args.allowlist_url_sources = conf.get('allowlist_url_sources', None) - args.blocklist_instance_destinations = conf.get('blocklist_instance_destinations', None) + args.blocklist_url_sources = conf.get('blocklist_url_sources', []) + args.blocklist_instance_sources = conf.get('blocklist_instance_sources', []) + args.allowlist_url_sources = conf.get('allowlist_url_sources', []) + args.blocklist_instance_destinations = conf.get('blocklist_instance_destinations', []) return args diff --git a/tests/test_allowlist.py b/tests/test_allowlist.py new file mode 100644 index 0000000..e632361 --- /dev/null +++ b/tests/test_allowlist.py @@ -0,0 +1,49 @@ +""" Test allowlists +""" +import pytest + +from util import shim_argparse +from fediblockhole.const import DomainBlock +from fediblockhole import fetch_allowlists, apply_allowlists + +def test_cmdline_allow_removes_domain(): + """Test that -A removes entries from merged + """ + conf = shim_argparse(['-A', 'removeme.org']) + + merged = { + 'example.org': DomainBlock('example.org'), + 'example2.org': DomainBlock('example.org'), + 'removeme.org': DomainBlock('removeme.org'), + 'keepblockingme.org': DomainBlock('keepblockingme.org'), + } + + # allowlists = { + # 'testlist': [ DomainBlock('removeme.org', 'noop'), ] + # } + + merged = apply_allowlists(merged, conf, {}) + + with pytest.raises(KeyError): + merged['removeme.org'] + +def test_allowlist_removes_domain(): + """Test that an item in an allowlist removes entries from merged + """ + conf = shim_argparse() + + merged = { + 'example.org': DomainBlock('example.org'), + 'example2.org': DomainBlock('example.org'), + 'removeme.org': DomainBlock('removeme.org'), + 'keepblockingme.org': DomainBlock('keepblockingme.org'), + } + + allowlists = { + 'testlist': [ DomainBlock('removeme.org', 'noop'), ] + } + + merged = apply_allowlists(merged, conf, allowlists) + + with pytest.raises(KeyError): + merged['removeme.org']