Merge branch 'main' into allowlists
This commit is contained in:
commit
04c19c62df
|
@ -148,6 +148,8 @@ def fetch_from_instances(blocklists: dict, sources: dict,
|
||||||
domain = item['domain']
|
domain = item['domain']
|
||||||
admin = item.get('admin', False)
|
admin = item.get('admin', False)
|
||||||
token = item.get('token', None)
|
token = item.get('token', None)
|
||||||
|
itemsrc = f"https://{domain}/api"
|
||||||
|
|
||||||
# If import fields are provided, they override the global ones passed in
|
# If import fields are provided, they override the global ones passed in
|
||||||
source_import_fields = item.get('import_fields', None)
|
source_import_fields = item.get('import_fields', None)
|
||||||
if source_import_fields:
|
if source_import_fields:
|
||||||
|
@ -155,9 +157,9 @@ def fetch_from_instances(blocklists: dict, sources: dict,
|
||||||
import_fields = IMPORT_FIELDS.extend(source_import_fields)
|
import_fields = IMPORT_FIELDS.extend(source_import_fields)
|
||||||
|
|
||||||
# Add the blocklist with the domain as the source key
|
# Add the blocklist with the domain as the source key
|
||||||
blocklists[domain] = fetch_instance_blocklist(domain, token, admin, import_fields)
|
blocklists[itemsrc] = fetch_instance_blocklist(domain, token, admin, import_fields)
|
||||||
if save_intermediate:
|
if save_intermediate:
|
||||||
save_intermediate_blocklist(blocklists[domain], domain, savedir, export_fields)
|
save_intermediate_blocklist(blocklists[itemsrc], domain, savedir, export_fields)
|
||||||
return blocklists
|
return blocklists
|
||||||
|
|
||||||
def merge_blocklists(blocklists: dict, mergeplan: str='max') -> dict:
|
def merge_blocklists(blocklists: dict, mergeplan: str='max') -> dict:
|
||||||
|
@ -615,8 +617,15 @@ def save_blocklist_to_file(
|
||||||
for item in blocklist:
|
for item in blocklist:
|
||||||
writer.writerow(item._asdict())
|
writer.writerow(item._asdict())
|
||||||
|
|
||||||
def augment_args(args):
|
def augment_args(args, tomldata: str=None):
|
||||||
"""Augment commandline arguments with config file parameters"""
|
"""Augment commandline arguments with config file parameters
|
||||||
|
|
||||||
|
If tomldata is provided, uses that data instead of loading
|
||||||
|
from a config file.
|
||||||
|
"""
|
||||||
|
if tomldata:
|
||||||
|
conf = toml.loads(tomldata)
|
||||||
|
else:
|
||||||
conf = toml.load(args.config)
|
conf = toml.load(args.config)
|
||||||
|
|
||||||
if not args.no_fetch_url:
|
if not args.no_fetch_url:
|
||||||
|
@ -643,6 +652,9 @@ def augment_args(args):
|
||||||
if not args.import_fields:
|
if not args.import_fields:
|
||||||
args.import_fields = conf.get('import_fields', [])
|
args.import_fields = conf.get('import_fields', [])
|
||||||
|
|
||||||
|
if not args.mergeplan:
|
||||||
|
args.mergeplan = conf.get('mergeplan', 'max')
|
||||||
|
|
||||||
args.blocklist_url_sources = conf.get('blocklist_url_sources', None)
|
args.blocklist_url_sources = conf.get('blocklist_url_sources', None)
|
||||||
args.blocklist_instance_sources = conf.get('blocklist_instance_sources', None)
|
args.blocklist_instance_sources = conf.get('blocklist_instance_sources', None)
|
||||||
args.allowlist_url_sources = conf.get('allowlist_url_sources', None)
|
args.allowlist_url_sources = conf.get('allowlist_url_sources', None)
|
||||||
|
@ -650,8 +662,9 @@ def augment_args(args):
|
||||||
|
|
||||||
return args
|
return args
|
||||||
|
|
||||||
def main():
|
def setup_argparse():
|
||||||
|
"""Setup the commandline arguments
|
||||||
|
"""
|
||||||
ap = argparse.ArgumentParser(
|
ap = argparse.ArgumentParser(
|
||||||
description="Bulk blocklist tool",
|
description="Bulk blocklist tool",
|
||||||
epilog=f"Part of FediBlockHole v{__version__}",
|
epilog=f"Part of FediBlockHole v{__version__}",
|
||||||
|
@ -662,7 +675,7 @@ def main():
|
||||||
ap.add_argument('-o', '--outfile', dest="blocklist_savefile", help="Save merged blocklist to a local file.")
|
ap.add_argument('-o', '--outfile', dest="blocklist_savefile", help="Save merged blocklist to a local file.")
|
||||||
ap.add_argument('-S', '--save-intermediate', dest="save_intermediate", action='store_true', help="Save intermediate blocklists we fetch to local files.")
|
ap.add_argument('-S', '--save-intermediate', dest="save_intermediate", action='store_true', help="Save intermediate blocklists we fetch to local files.")
|
||||||
ap.add_argument('-D', '--savedir', dest="savedir", help="Directory path to save intermediate lists.")
|
ap.add_argument('-D', '--savedir', dest="savedir", help="Directory path to save intermediate lists.")
|
||||||
ap.add_argument('-m', '--mergeplan', choices=['min', 'max'], default='max', help="Set mergeplan.")
|
ap.add_argument('-m', '--mergeplan', choices=['min', 'max'], help="Set mergeplan.")
|
||||||
|
|
||||||
ap.add_argument('-I', '--import-field', dest='import_fields', action='append', help="Extra blocklist fields to import.")
|
ap.add_argument('-I', '--import-field', dest='import_fields', action='append', help="Extra blocklist fields to import.")
|
||||||
ap.add_argument('-E', '--export-field', dest='export_fields', action='append', help="Extra blocklist fields to export.")
|
ap.add_argument('-E', '--export-field', dest='export_fields', action='append', help="Extra blocklist fields to export.")
|
||||||
|
@ -675,7 +688,13 @@ def main():
|
||||||
ap.add_argument('--loglevel', choices=['debug', 'info', 'warning', 'error', 'critical'], help="Set log output level.")
|
ap.add_argument('--loglevel', choices=['debug', 'info', 'warning', 'error', 'critical'], help="Set log output level.")
|
||||||
ap.add_argument('--dryrun', action='store_true', help="Don't actually push updates, just show what would happen.")
|
ap.add_argument('--dryrun', action='store_true', help="Don't actually push updates, just show what would happen.")
|
||||||
|
|
||||||
|
return ap
|
||||||
|
|
||||||
|
def main():
|
||||||
|
|
||||||
|
ap = setup_argparse()
|
||||||
args = ap.parse_args()
|
args = ap.parse_args()
|
||||||
|
|
||||||
if args.loglevel is not None:
|
if args.loglevel is not None:
|
||||||
levelname = args.loglevel.upper()
|
levelname = args.loglevel.upper()
|
||||||
log.setLevel(getattr(logging, levelname))
|
log.setLevel(getattr(logging, levelname))
|
||||||
|
|
|
@ -0,0 +1,40 @@
|
||||||
|
"""Test the commandline defined parameters correctly
|
||||||
|
"""
|
||||||
|
from fediblockhole import setup_argparse, augment_args
|
||||||
|
|
||||||
|
def shim_argparse(testargv: list=[], tomldata: str=None):
|
||||||
|
"""Helper function to parse test args
|
||||||
|
"""
|
||||||
|
ap = setup_argparse()
|
||||||
|
args = ap.parse_args(testargv)
|
||||||
|
args = augment_args(args, tomldata)
|
||||||
|
return args
|
||||||
|
|
||||||
|
def test_cmdline_no_configfile():
|
||||||
|
""" Test bare command with no configfile
|
||||||
|
"""
|
||||||
|
ap = setup_argparse()
|
||||||
|
args = ap.parse_args([])
|
||||||
|
|
||||||
|
assert args.config == '/etc/default/fediblockhole.conf.toml'
|
||||||
|
assert args.mergeplan == None
|
||||||
|
assert args.blocklist_savefile == None
|
||||||
|
assert args.save_intermediate == False
|
||||||
|
assert args.savedir == None
|
||||||
|
assert args.import_fields == None
|
||||||
|
assert args.export_fields == None
|
||||||
|
|
||||||
|
assert args.no_fetch_url == False
|
||||||
|
assert args.no_fetch_instance == False
|
||||||
|
assert args.no_push_instance == False
|
||||||
|
assert args.dryrun == False
|
||||||
|
|
||||||
|
assert args.loglevel == None
|
||||||
|
|
||||||
|
def test_cmdline_mergeplan_min():
|
||||||
|
""" Test setting mergeplan min
|
||||||
|
"""
|
||||||
|
ap = setup_argparse()
|
||||||
|
args = ap.parse_args(['-m', 'min'])
|
||||||
|
|
||||||
|
assert args.mergeplan == 'min'
|
|
@ -0,0 +1,47 @@
|
||||||
|
"""Test the config file is loading parameters correctly
|
||||||
|
"""
|
||||||
|
from fediblockhole import setup_argparse, augment_args
|
||||||
|
|
||||||
|
def shim_argparse(testargv: list=[], tomldata: str=None):
|
||||||
|
"""Helper function to parse test args
|
||||||
|
"""
|
||||||
|
ap = setup_argparse()
|
||||||
|
args = ap.parse_args(testargv)
|
||||||
|
args = augment_args(args, tomldata)
|
||||||
|
return args
|
||||||
|
|
||||||
|
def test_parse_tomldata():
|
||||||
|
tomldata = """
|
||||||
|
# Test TOML config for FediBlockHole
|
||||||
|
|
||||||
|
blocklist_instance_sources = []
|
||||||
|
|
||||||
|
blocklist_url_sources = []
|
||||||
|
|
||||||
|
save_intermediate = true
|
||||||
|
|
||||||
|
import_fields = ['public_comment']
|
||||||
|
"""
|
||||||
|
ap = setup_argparse()
|
||||||
|
args = ap.parse_args([])
|
||||||
|
args = augment_args(args, tomldata)
|
||||||
|
|
||||||
|
assert args.blocklist_instance_sources == []
|
||||||
|
assert args.blocklist_url_sources == []
|
||||||
|
assert args.save_intermediate == True
|
||||||
|
assert args.import_fields == ['public_comment']
|
||||||
|
|
||||||
|
def test_set_mergeplan_max():
|
||||||
|
tomldata = """mergeplan = 'max'
|
||||||
|
"""
|
||||||
|
args = shim_argparse([], tomldata)
|
||||||
|
|
||||||
|
assert args.mergeplan == 'max'
|
||||||
|
|
||||||
|
def test_set_mergeplan_min():
|
||||||
|
tomldata = """mergeplan = 'min'
|
||||||
|
"""
|
||||||
|
args = shim_argparse([], tomldata)
|
||||||
|
|
||||||
|
assert args.mergeplan == 'min'
|
||||||
|
|
Loading…
Reference in New Issue