Merge branch 'eigenmagic:main' into main

This commit is contained in:
cunningpike 2023-01-15 20:10:43 -05:00 committed by GitHub
commit 7989627c09
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 306 additions and 55 deletions

View File

@ -81,20 +81,28 @@ token.
The application needs the `admin:read:domain_blocks` OAuth scope, but The application needs the `admin:read:domain_blocks` OAuth scope, but
unfortunately this scope isn't available in the current application screen unfortunately this scope isn't available in the current application screen
(v4.0.2 of Mastodon at time of writing). There is a way to do it with scopes, (v4.0.2 of Mastodon at time of writing, but this has been fixed in the main
but it's really dangerous, so I'm not going to tell you what it is here. branch).
A better way is to ask the instance admin to connect to the PostgreSQL database You can allow full `admin:read` access, but be aware that this authorizes
and add the scope there, like this: someone to read all the data in the instance. That's asking a lot of a remote
instance admin who just wants to share domain_blocks with you.
For now, you can ask the instance admin to update the scope in the database
directly like this:
``` ```
UPDATE oauth_access_tokens UPDATE oauth_applications as app
SET scopes='admin:read:domain_blocks' SET scopes = 'admin:read:domain_blocks'
WHERE token='<your_app_token>'; FROM oauth_access_tokens as tok
WHERE app.id = tok.application_id
AND app.name = '<the_app_name>'
;
``` ```
When that's done, FediBlockHole should be able to use its token to read domain When that's done, regenerate the token (so it has the new scopes) in the
blocks via the API. application screen in the instance GUI. FediBlockHole should then able to use
the app token to read domain blocks via the API, but nothing else.
Alternately, you could ask the remote instance admin to set up FediBlockHole and Alternately, you could ask the remote instance admin to set up FediBlockHole and
use it to dump out a CSV blocklist from their instance and then put it somewhere use it to dump out a CSV blocklist from their instance and then put it somewhere
@ -104,12 +112,17 @@ as explained below.
### Writing instance blocklists ### Writing instance blocklists
To write domain blocks into an instance requires both the `admin:read` and To write domain blocks into an instance requires both the `admin:read` and
`admin:write:domain_blocks` OAuth scopes. The `read` scope is used to read the `admin:write:domain_blocks` OAuth scopes.
current list of domain blocks so we update ones that already exist, rather than
trying to add all new ones and clutter up the instance. It's also used to check The tool needs `admin:read:domain_blocks` scope to read the current list of
if the instance has any accounts that follow accounts on a domain that is about domain blocks so we update ones that already exist, rather than trying to add
to get `suspend`ed and automatically drop the block severity to `silence` level all new ones and clutter up the instance.
so people have time to migrate accounts before a full defederation takes effect.
`admin:read` access is needed to check if the instance has any accounts that
follow accounts on a domain that is about to get `suspend`ed and automatically
drop the block severity to `silence` level so people have time to migrate
accounts before a full defederation takes effect. Unfortunately, the statistics
measure used to learn this information requires `admin:read` scope.
You can add `admin:read` scope in the application admin screen. Please be aware You can add `admin:read` scope in the application admin screen. Please be aware
that this grants full read access to all information in the instance to the that this grants full read access to all information in the instance to the
@ -122,12 +135,15 @@ chmod o-r <configfile>
You can also grant full `admin:write` scope to the application, but if you'd You can also grant full `admin:write` scope to the application, but if you'd
prefer to keep things more tightly secured you'll need to use SQL to set the prefer to keep things more tightly secured you'll need to use SQL to set the
scopes in the database: scopes in the database and then regenerate the token:
``` ```
UPDATE oauth_access_tokens UPDATE oauth_applications as app
SET scopes='admin:read admin:write:domain_blocks' SET scopes = 'admin:read admin:write:domain_blocks'
WHERE token='<your_app_token>'; FROM oauth_access_tokens as tok
WHERE app.id = tok.application_id
AND app.name = '<the_app_name>'
;
``` ```
When that's done, FediBlockHole should be able to use its token to authorise When that's done, FediBlockHole should be able to use its token to authorise
@ -159,11 +175,12 @@ Or you can use the default location of `/etc/default/fediblockhole.conf.toml`.
As the filename suggests, FediBlockHole uses TOML syntax. As the filename suggests, FediBlockHole uses TOML syntax.
There are 3 key sections: There are 4 key sections:
1. `blocklist_urls_sources`: A list of URLs to read blocklists from 1. `blocklist_urls_sources`: A list of URLs to read blocklists from
1. `blocklist_instance_sources`: A list of Mastodon instances to read blocklists from via API 1. `blocklist_instance_sources`: A list of Mastodon instances to read blocklists from via API
1. `blocklist_instance_destinations`: A list of Mastodon instances to write blocklists to via API 1. `blocklist_instance_destinations`: A list of Mastodon instances to write blocklists to via API
1. `allowlist_url_sources`: A list of URLs to read allowlists from
More detail on configuring the tool is provided below. More detail on configuring the tool is provided below.
@ -286,6 +303,24 @@ mergeplan.
Once the follow count drops to 0 on your instance, the tool will automatically Once the follow count drops to 0 on your instance, the tool will automatically
use the highest severity it finds again (if you're using the `max` mergeplan). use the highest severity it finds again (if you're using the `max` mergeplan).
### Allowlists
Sometimes you might want to completely ignore the blocklist definitions for
certain domains. That's what allowlists are for.
Allowlists remove any domain in the list from the merged list of blocks before
the merged list is saved out to a file or pushed to any instance.
Allowlists can be in any format supported by `blocklist_urls_sources` but ignore
all fields that aren't `domain`.
You can also allow domains on the commandline by using the `-A` or `--allow`
flag and providing the domain name to allow. You can use the flag multiple
times to allow multiple domains.
It is probably wise to include your own instance domain in an allowlist so you
don't accidentally defederate from yourself.
## More advanced configuration ## More advanced configuration
For a list of possible configuration options, check the `--help` and read the For a list of possible configuration options, check the `--help` and read the

View File

@ -16,7 +16,7 @@ blocklist_instance_sources = [
# max_severity tells the parser to override any severities that are higher than this value # max_severity tells the parser to override any severities that are higher than this value
# import_fields tells the parser to only import that set of fields from a specific source # import_fields tells the parser to only import that set of fields from a specific source
blocklist_url_sources = [ blocklist_url_sources = [
# { url = 'file:///home/daedalus/src/fediblockhole/samples/demo-blocklist-01.csv', format = 'csv' }, # { url = 'file:///path/to/fediblockhole/samples/demo-blocklist-01.csv', format = 'csv' },
{ url = 'https://raw.githubusercontent.com/eigenmagic/fediblockhole/main/samples/demo-blocklist-01.csv', format = 'csv' }, { url = 'https://raw.githubusercontent.com/eigenmagic/fediblockhole/main/samples/demo-blocklist-01.csv', format = 'csv' },
] ]

View File

@ -1,6 +1,6 @@
[project] [project]
name = "fediblockhole" name = "fediblockhole"
version = "0.4.0" version = "0.4.1"
description = "Federated blocklist management for Mastodon" description = "Federated blocklist management for Mastodon"
readme = "README.md" readme = "README.md"
license = {file = "LICENSE"} license = {file = "LICENSE"}

2
setup.cfg Normal file
View File

@ -0,0 +1,2 @@
[pytest]
norecursedirs=tests/helpers

View File

@ -35,10 +35,13 @@ API_CALL_DELAY = 5 * 60 / 300 # 300 calls per 5 minutes
# We always import the domain and the severity # We always import the domain and the severity
IMPORT_FIELDS = ['domain', 'severity'] IMPORT_FIELDS = ['domain', 'severity']
# Allowlists always import these fields
ALLOWLIST_IMPORT_FIELDS = ['domain', 'severity', 'public_comment', 'private_comment', 'reject_media', 'reject_reports', 'obfuscate']
# We always export the domain and the severity # We always export the domain and the severity
EXPORT_FIELDS = ['domain', 'severity'] EXPORT_FIELDS = ['domain', 'severity']
def sync_blocklists(conf: dict): def sync_blocklists(conf: argparse.Namespace):
"""Sync instance blocklists from remote sources. """Sync instance blocklists from remote sources.
@param conf: A configuration dictionary @param conf: A configuration dictionary
@ -69,6 +72,12 @@ def sync_blocklists(conf: dict):
# Merge blocklists into an update dict # Merge blocklists into an update dict
merged = merge_blocklists(blocklists, conf.mergeplan) merged = merge_blocklists(blocklists, conf.mergeplan)
# Remove items listed in allowlists, if any
allowlists = fetch_allowlists(conf)
merged = apply_allowlists(merged, conf, allowlists)
# Save the final mergelist, if requested
if conf.blocklist_savefile: if conf.blocklist_savefile:
log.info(f"Saving merged blocklist to {conf.blocklist_savefile}") log.info(f"Saving merged blocklist to {conf.blocklist_savefile}")
save_blocklist_to_file(merged.values(), conf.blocklist_savefile, export_fields) save_blocklist_to_file(merged.values(), conf.blocklist_savefile, export_fields)
@ -82,6 +91,35 @@ def sync_blocklists(conf: dict):
max_followed_severity = BlockSeverity(dest.get('max_followed_severity', 'silence')) max_followed_severity = BlockSeverity(dest.get('max_followed_severity', 'silence'))
push_blocklist(token, domain, merged.values(), conf.dryrun, import_fields, max_followed_severity) push_blocklist(token, domain, merged.values(), conf.dryrun, import_fields, max_followed_severity)
def apply_allowlists(merged: dict, conf: argparse.Namespace, allowlists: dict):
"""Apply allowlists
"""
# Apply allows specified on the commandline
for domain in conf.allow_domains:
log.info(f"'{domain}' allowed by commandline, removing any blocks...")
if domain in merged:
del merged[domain]
# Apply allows from URLs lists
log.info("Removing domains from URL allowlists...")
for key, alist in allowlists.items():
log.debug(f"Processing allows from '{key}'...")
for allowed in alist:
domain = allowed.domain
log.debug(f"Removing allowlisted domain '{domain}' from merged list.")
if domain in merged:
del merged[domain]
return merged
def fetch_allowlists(conf: argparse.Namespace) -> dict:
"""
"""
if conf.allowlist_url_sources:
allowlists = fetch_from_urls({}, conf.allowlist_url_sources, ALLOWLIST_IMPORT_FIELDS)
return allowlists
return {}
def fetch_from_urls(blocklists: dict, url_sources: dict, def fetch_from_urls(blocklists: dict, url_sources: dict,
import_fields: list=IMPORT_FIELDS, import_fields: list=IMPORT_FIELDS,
save_intermediate: bool=False, save_intermediate: bool=False,
@ -126,6 +164,8 @@ def fetch_from_instances(blocklists: dict, sources: dict,
domain = item['domain'] domain = item['domain']
admin = item.get('admin', False) admin = item.get('admin', False)
token = item.get('token', None) token = item.get('token', None)
itemsrc = f"https://{domain}/api"
# If import fields are provided, they override the global ones passed in # If import fields are provided, they override the global ones passed in
source_import_fields = item.get('import_fields', None) source_import_fields = item.get('import_fields', None)
if source_import_fields: if source_import_fields:
@ -133,16 +173,19 @@ def fetch_from_instances(blocklists: dict, sources: dict,
import_fields = IMPORT_FIELDS.extend(source_import_fields) import_fields = IMPORT_FIELDS.extend(source_import_fields)
# Add the blocklist with the domain as the source key # Add the blocklist with the domain as the source key
blocklists[domain] = fetch_instance_blocklist(domain, token, admin, import_fields) blocklists[itemsrc] = fetch_instance_blocklist(domain, token, admin, import_fields)
if save_intermediate: if save_intermediate:
save_intermediate_blocklist(blocklists[domain], domain, savedir, export_fields) save_intermediate_blocklist(blocklists[itemsrc], domain, savedir, export_fields)
return blocklists return blocklists
def merge_blocklists(blocklists: dict, mergeplan: str='max') -> dict: def merge_blocklists(blocklists: dict, mergeplan: str='max') -> dict:
"""Merge fetched remote blocklists into a bulk update """Merge fetched remote blocklists into a bulk update
@param blocklists: A dict of lists of DomainBlocks, keyed by source.
Each value is a list of DomainBlocks
@param mergeplan: An optional method of merging overlapping block definitions @param mergeplan: An optional method of merging overlapping block definitions
'max' (the default) uses the highest severity block found 'max' (the default) uses the highest severity block found
'min' uses the lowest severity block found 'min' uses the lowest severity block found
@param returns: A dict of DomainBlocks keyed by domain
""" """
merged = {} merged = {}
@ -433,7 +476,7 @@ def update_known_block(token: str, host: str, block: DomainBlock):
response = requests.put(url, response = requests.put(url,
headers=requests_headers(token), headers=requests_headers(token),
data=blockdata, json=blockdata._asdict(),
timeout=REQUEST_TIMEOUT timeout=REQUEST_TIMEOUT
) )
if response.status_code != 200: if response.status_code != 200:
@ -442,14 +485,14 @@ def update_known_block(token: str, host: str, block: DomainBlock):
def add_block(token: str, host: str, blockdata: DomainBlock): def add_block(token: str, host: str, blockdata: DomainBlock):
"""Block a domain on Mastodon host """Block a domain on Mastodon host
""" """
log.debug(f"Blocking domain {blockdata.domain} at {host}...") log.debug(f"Adding block entry for {blockdata.domain} at {host}...")
api_path = "/api/v1/admin/domain_blocks" api_path = "/api/v1/admin/domain_blocks"
url = f"https://{host}{api_path}" url = f"https://{host}{api_path}"
response = requests.post(url, response = requests.post(url,
headers=requests_headers(token), headers=requests_headers(token),
data=blockdata._asdict(), json=blockdata._asdict(),
timeout=REQUEST_TIMEOUT timeout=REQUEST_TIMEOUT
) )
if response.status_code == 422: if response.status_code == 422:
@ -515,6 +558,8 @@ def push_blocklist(token: str, host: str, blocklist: list[dict],
log.info(f"Pushing new block definition: {newblock}") log.info(f"Pushing new block definition: {newblock}")
blockdata = oldblock.copy() blockdata = oldblock.copy()
blockdata.update(newblock) blockdata.update(newblock)
log.debug(f"Block as dict: {blockdata._asdict()}")
if not dryrun: if not dryrun:
update_known_block(token, host, blockdata) update_known_block(token, host, blockdata)
# add a pause here so we don't melt the instance # add a pause here so we don't melt the instance
@ -530,6 +575,7 @@ def push_blocklist(token: str, host: str, blocklist: list[dict],
# This is a new block for the target instance, so we # This is a new block for the target instance, so we
# need to add a block rather than update an existing one # need to add a block rather than update an existing one
log.info(f"Adding new block: {newblock}...") log.info(f"Adding new block: {newblock}...")
log.debug(f"Block as dict: {newblock._asdict()}")
# Make sure the new block doesn't clobber a domain with followers # Make sure the new block doesn't clobber a domain with followers
newblock.severity = check_followed_severity(host, token, newblock.domain, newblock.severity, max_followed_severity) newblock.severity = check_followed_severity(host, token, newblock.domain, newblock.severity, max_followed_severity)
@ -587,8 +633,15 @@ def save_blocklist_to_file(
for item in blocklist: for item in blocklist:
writer.writerow(item._asdict()) writer.writerow(item._asdict())
def augment_args(args): def augment_args(args, tomldata: str=None):
"""Augment commandline arguments with config file parameters""" """Augment commandline arguments with config file parameters
If tomldata is provided, uses that data instead of loading
from a config file.
"""
if tomldata:
conf = toml.loads(tomldata)
else:
conf = toml.load(args.config) conf = toml.load(args.config)
if not args.no_fetch_url: if not args.no_fetch_url:
@ -615,14 +668,19 @@ def augment_args(args):
if not args.import_fields: if not args.import_fields:
args.import_fields = conf.get('import_fields', []) args.import_fields = conf.get('import_fields', [])
args.blocklist_url_sources = conf.get('blocklist_url_sources') if not args.mergeplan:
args.blocklist_instance_sources = conf.get('blocklist_instance_sources') args.mergeplan = conf.get('mergeplan', 'max')
args.blocklist_instance_destinations = conf.get('blocklist_instance_destinations')
args.blocklist_url_sources = conf.get('blocklist_url_sources', [])
args.blocklist_instance_sources = conf.get('blocklist_instance_sources', [])
args.allowlist_url_sources = conf.get('allowlist_url_sources', [])
args.blocklist_instance_destinations = conf.get('blocklist_instance_destinations', [])
return args return args
def main(): def setup_argparse():
"""Setup the commandline arguments
"""
ap = argparse.ArgumentParser( ap = argparse.ArgumentParser(
description="Bulk blocklist tool", description="Bulk blocklist tool",
epilog=f"Part of FediBlockHole v{__version__}", epilog=f"Part of FediBlockHole v{__version__}",
@ -633,10 +691,11 @@ def main():
ap.add_argument('-o', '--outfile', dest="blocklist_savefile", help="Save merged blocklist to a local file.") ap.add_argument('-o', '--outfile', dest="blocklist_savefile", help="Save merged blocklist to a local file.")
ap.add_argument('-S', '--save-intermediate', dest="save_intermediate", action='store_true', help="Save intermediate blocklists we fetch to local files.") ap.add_argument('-S', '--save-intermediate', dest="save_intermediate", action='store_true', help="Save intermediate blocklists we fetch to local files.")
ap.add_argument('-D', '--savedir', dest="savedir", help="Directory path to save intermediate lists.") ap.add_argument('-D', '--savedir', dest="savedir", help="Directory path to save intermediate lists.")
ap.add_argument('-m', '--mergeplan', choices=['min', 'max'], default='max', help="Set mergeplan.") ap.add_argument('-m', '--mergeplan', choices=['min', 'max'], help="Set mergeplan.")
ap.add_argument('-I', '--import-field', dest='import_fields', action='append', help="Extra blocklist fields to import.") ap.add_argument('-I', '--import-field', dest='import_fields', action='append', help="Extra blocklist fields to import.")
ap.add_argument('-E', '--export-field', dest='export_fields', action='append', help="Extra blocklist fields to export.") ap.add_argument('-E', '--export-field', dest='export_fields', action='append', help="Extra blocklist fields to export.")
ap.add_argument('-A', '--allow', dest="allow_domains", action='append', default=[], help="Override any blocks to allow this domain.")
ap.add_argument('--no-fetch-url', dest='no_fetch_url', action='store_true', help="Don't fetch from URLs, even if configured.") ap.add_argument('--no-fetch-url', dest='no_fetch_url', action='store_true', help="Don't fetch from URLs, even if configured.")
ap.add_argument('--no-fetch-instance', dest='no_fetch_instance', action='store_true', help="Don't fetch from instances, even if configured.") ap.add_argument('--no-fetch-instance', dest='no_fetch_instance', action='store_true', help="Don't fetch from instances, even if configured.")
@ -645,7 +704,13 @@ def main():
ap.add_argument('--loglevel', choices=['debug', 'info', 'warning', 'error', 'critical'], help="Set log output level.") ap.add_argument('--loglevel', choices=['debug', 'info', 'warning', 'error', 'critical'], help="Set log output level.")
ap.add_argument('--dryrun', action='store_true', help="Don't actually push updates, just show what would happen.") ap.add_argument('--dryrun', action='store_true', help="Don't actually push updates, just show what would happen.")
return ap
def main():
ap = setup_argparse()
args = ap.parse_args() args = ap.parse_args()
if args.loglevel is not None: if args.loglevel is not None:
levelname = args.loglevel.upper() levelname = args.loglevel.upper()
log.setLevel(getattr(logging, levelname)) log.setLevel(getattr(logging, levelname))

View File

@ -97,9 +97,10 @@ class BlocklistParserCSV(BlocklistParser):
origitem = blockitem.copy() origitem = blockitem.copy()
for key in origitem: for key in origitem:
if key not in self.import_fields: if key not in self.import_fields:
log.debug(f"ignoring field '{key}'")
del blockitem[key] del blockitem[key]
# Convert dict to NamedTuple with the double-star operator # Convert dict to DomainBlock with the double-star operator
# See: https://docs.python.org/3/tutorial/controlflow.html#tut-unpacking-arguments # See: https://docs.python.org/3/tutorial/controlflow.html#tut-unpacking-arguments
block = DomainBlock(**blockitem) block = DomainBlock(**blockitem)
if block.severity > self.max_severity: if block.severity > self.max_severity:
@ -162,7 +163,7 @@ def str2bool(boolstring: str) -> bool:
boolstring = boolstring.lower() boolstring = boolstring.lower()
if boolstring in ['true', 't', '1', 'y', 'yes']: if boolstring in ['true', 't', '1', 'y', 'yes']:
return True return True
elif boolstring in ['false', 'f', '0', 'n', 'no']: elif boolstring in ['', 'false', 'f', '0', 'n', 'no']:
return False return False
else: else:
raise ValueError(f"Cannot parse value '{boolstring}' as boolean") raise ValueError(f"Cannot parse value '{boolstring}' as boolean")
@ -183,4 +184,5 @@ def parse_blocklist(
"""Parse a blocklist in the given format """Parse a blocklist in the given format
""" """
parser = FORMAT_PARSERS[format](import_fields, max_severity) parser = FORMAT_PARSERS[format](import_fields, max_severity)
log.debug(f"parsing {format} blocklist with import_fields: {import_fields}...")
return parser.parse_blocklist(blockdata) return parser.parse_blocklist(blockdata)

View File

@ -127,13 +127,13 @@ class DomainBlock(object):
"""Initialize the DomainBlock """Initialize the DomainBlock
""" """
self.domain = domain self.domain = domain
self.severity = severity
self.public_comment = public_comment self.public_comment = public_comment
self.private_comment = private_comment self.private_comment = private_comment
self.reject_media = reject_media self.reject_media = reject_media
self.reject_reports = reject_reports self.reject_reports = reject_reports
self.obfuscate = obfuscate self.obfuscate = obfuscate
self.id = id self.id = id
self.severity = severity
@property @property
def severity(self): def severity(self):
@ -146,17 +146,12 @@ class DomainBlock(object):
else: else:
self._severity = BlockSeverity(sev) self._severity = BlockSeverity(sev)
# Suspend implies reject_media,reject_reports == True
if self._severity.level == SeverityLevel.SUSPEND:
self.reject_media = True
self.reject_reports = True
def _asdict(self): def _asdict(self):
"""Return a dict version of this object """Return a dict version of this object
""" """
dictval = { dictval = {
'domain': self.domain, 'domain': self.domain,
'severity': self.severity, 'severity': str(self.severity),
'public_comment': self.public_comment, 'public_comment': self.public_comment,
'private_comment': self.private_comment, 'private_comment': self.private_comment,
'reject_media': self.reject_media, 'reject_media': self.reject_media,

3
tests/conftest.py Normal file
View File

@ -0,0 +1,3 @@
import sys
import os
sys.path.append(os.path.join(os.path.dirname(__file__), 'helpers'))

View File

11
tests/helpers/util.py Normal file
View File

@ -0,0 +1,11 @@
""" Utility functions for tests
"""
from fediblockhole import setup_argparse, augment_args
def shim_argparse(testargv: list=[], tomldata: str=None):
"""Helper function to parse test args
"""
ap = setup_argparse()
args = ap.parse_args(testargv)
args = augment_args(args, tomldata)
return args

49
tests/test_allowlist.py Normal file
View File

@ -0,0 +1,49 @@
""" Test allowlists
"""
import pytest
from util import shim_argparse
from fediblockhole.const import DomainBlock
from fediblockhole import fetch_allowlists, apply_allowlists
def test_cmdline_allow_removes_domain():
"""Test that -A <domain> removes entries from merged
"""
conf = shim_argparse(['-A', 'removeme.org'])
merged = {
'example.org': DomainBlock('example.org'),
'example2.org': DomainBlock('example.org'),
'removeme.org': DomainBlock('removeme.org'),
'keepblockingme.org': DomainBlock('keepblockingme.org'),
}
# allowlists = {
# 'testlist': [ DomainBlock('removeme.org', 'noop'), ]
# }
merged = apply_allowlists(merged, conf, {})
with pytest.raises(KeyError):
merged['removeme.org']
def test_allowlist_removes_domain():
"""Test that an item in an allowlist removes entries from merged
"""
conf = shim_argparse()
merged = {
'example.org': DomainBlock('example.org'),
'example2.org': DomainBlock('example.org'),
'removeme.org': DomainBlock('removeme.org'),
'keepblockingme.org': DomainBlock('keepblockingme.org'),
}
allowlists = {
'testlist': [ DomainBlock('removeme.org', 'noop'), ]
}
merged = apply_allowlists(merged, conf, allowlists)
with pytest.raises(KeyError):
merged['removeme.org']

47
tests/test_cmdline.py Normal file
View File

@ -0,0 +1,47 @@
"""Test the commandline defined parameters correctly
"""
from util import shim_argparse
from fediblockhole import setup_argparse, augment_args
def test_cmdline_no_configfile():
""" Test bare command with no configfile
"""
ap = setup_argparse()
args = ap.parse_args([])
assert args.config == '/etc/default/fediblockhole.conf.toml'
assert args.mergeplan == None
assert args.blocklist_savefile == None
assert args.save_intermediate == False
assert args.savedir == None
assert args.import_fields == None
assert args.export_fields == None
assert args.no_fetch_url == False
assert args.no_fetch_instance == False
assert args.no_push_instance == False
assert args.dryrun == False
assert args.loglevel == None
def test_cmdline_mergeplan_min():
""" Test setting mergeplan min
"""
ap = setup_argparse()
args = ap.parse_args(['-m', 'min'])
assert args.mergeplan == 'min'
def test_set_allow_domain():
"""Set a single allow domain on commandline"""
ap = setup_argparse()
args = ap.parse_args(['-A', 'example.org'])
assert args.allow_domains == ['example.org']
def test_set_multiple_allow_domains():
"""Set multiple allow domains on commandline"""
ap = setup_argparse()
args = ap.parse_args(['-A', 'example.org', '-A', 'example2.org', '-A', 'example3.org'])
assert args.allow_domains == ['example.org', 'example2.org', 'example3.org']

51
tests/test_configfile.py Normal file
View File

@ -0,0 +1,51 @@
"""Test the config file is loading parameters correctly
"""
from util import shim_argparse
from fediblockhole import setup_argparse, augment_args
def test_parse_tomldata():
tomldata = """
# Test TOML config for FediBlockHole
blocklist_instance_sources = []
blocklist_url_sources = []
save_intermediate = true
import_fields = ['public_comment']
"""
ap = setup_argparse()
args = ap.parse_args([])
args = augment_args(args, tomldata)
assert args.blocklist_instance_sources == []
assert args.blocklist_url_sources == []
assert args.save_intermediate == True
assert args.import_fields == ['public_comment']
def test_set_mergeplan_max():
tomldata = """mergeplan = 'max'
"""
args = shim_argparse([], tomldata)
assert args.mergeplan == 'max'
def test_set_mergeplan_min():
tomldata = """mergeplan = 'min'
"""
args = shim_argparse([], tomldata)
assert args.mergeplan == 'min'
def test_set_allowlists():
tomldata = """# Comment on config
allowlist_url_sources = [ { url='file:///path/to/allowlist', format='csv'} ]
"""
args = shim_argparse([], tomldata)
assert args.mergeplan == 'max'
assert args.allowlist_url_sources == [{
'url': 'file:///path/to/allowlist',
'format': 'csv',
}]

View File

@ -72,12 +72,3 @@ def test_compare_diff_sevs_2():
b = DomainBlock('example1.org', 'noop') b = DomainBlock('example1.org', 'noop')
assert a != b assert a != b
def test_suspend_rejects():
"""A suspend should reject_media and reject_reports
"""
a = DomainBlock('example.org', 'suspend')
assert a.severity.level == SeverityLevel.SUSPEND
assert a.reject_media == True
assert a.reject_reports == True