Refactored to add a DomainBlock object.
Refactored to use a BlockParser structure. Added ability to limit max severity per-URL source. Improved method for checking if changes are needed.
This commit is contained in:
parent
ea5e7d01d9
commit
10011a5ffb
|
@ -34,3 +34,8 @@ fediblock-sync = "fediblockhole:main"
|
|||
[build-system]
|
||||
requires = ["hatchling"]
|
||||
build-backend = "hatchling.build"
|
||||
|
||||
[tool.pytest.ini_options]
|
||||
addopts = [
|
||||
"--import-mode=importlib",
|
||||
]
|
|
@ -1,2 +1,3 @@
|
|||
requests
|
||||
toml
|
||||
toml
|
||||
pytest
|
|
@ -11,20 +11,20 @@ import os.path
|
|||
import sys
|
||||
import urllib.request as urlr
|
||||
|
||||
from .blocklist_parser import parse_blocklist
|
||||
from .const import DomainBlock, BlockSeverity
|
||||
|
||||
from importlib.metadata import version
|
||||
__version__ = version('fediblockhole')
|
||||
|
||||
import logging
|
||||
logging.basicConfig(level=logging.INFO,
|
||||
format='%(asctime)s %(levelname)s %(message)s')
|
||||
log = logging.getLogger('fediblockhole')
|
||||
|
||||
# Max size of a URL-fetched blocklist
|
||||
URL_BLOCKLIST_MAXSIZE = 1024 ** 3
|
||||
|
||||
log = logging.getLogger('fediblock_sync')
|
||||
|
||||
CONFIGFILE = "/home/mastodon/etc/admin.conf"
|
||||
|
||||
# The relative severity levels of blocks
|
||||
SEVERITY = {
|
||||
'noop': 0,
|
||||
|
@ -72,25 +72,15 @@ def sync_blocklists(conf: dict):
|
|||
if not conf.no_fetch_url:
|
||||
log.info("Fetching domain blocks from URLs...")
|
||||
for listurl in conf.blocklist_url_sources:
|
||||
blocklists[listurl] = []
|
||||
with urlr.urlopen(listurl) as fp:
|
||||
url = listurl['url']
|
||||
max_severity = listurl.get('max_severity', 'suspend')
|
||||
listformat = listurl.get('format', 'csv')
|
||||
with urlr.urlopen(url) as fp:
|
||||
rawdata = fp.read(URL_BLOCKLIST_MAXSIZE).decode('utf-8')
|
||||
reader = csv.DictReader(rawdata.split('\n'))
|
||||
for row in reader:
|
||||
# Coerce booleans from string to Python bool
|
||||
for boolkey in ['reject_media', 'reject_reports', 'obfuscate']:
|
||||
if boolkey in row:
|
||||
row[boolkey] = str2bool(row[boolkey])
|
||||
|
||||
# Remove fields we don't want to import
|
||||
origrow = row.copy()
|
||||
for key in origrow:
|
||||
if key not in import_fields:
|
||||
del row[key]
|
||||
blocklists[listurl].append(row)
|
||||
|
||||
blocklists[url] = parse_blocklist(rawdata, listformat, import_fields, max_severity)
|
||||
|
||||
if conf.save_intermediate:
|
||||
save_intermediate_blocklist(blocklists[listurl], listurl, conf.savedir, export_fields)
|
||||
save_intermediate_blocklist(blocklists[url], url, conf.savedir, export_fields)
|
||||
|
||||
# Fetch blocklists from remote instances
|
||||
if not conf.no_fetch_instance:
|
||||
|
@ -115,7 +105,7 @@ def sync_blocklists(conf: dict):
|
|||
for dest in conf.blocklist_instance_destinations:
|
||||
domain = dest['domain']
|
||||
token = dest['token']
|
||||
max_followed_severity = dest.get('max_followed_severity', 'silence')
|
||||
max_followed_severity = BlockSeverity(dest.get('max_followed_severity', 'silence'))
|
||||
push_blocklist(token, domain, merged.values(), conf.dryrun, import_fields, max_followed_severity)
|
||||
|
||||
def merge_blocklists(blocklists: dict, mergeplan: str='max') -> dict:
|
||||
|
@ -130,7 +120,7 @@ def merge_blocklists(blocklists: dict, mergeplan: str='max') -> dict:
|
|||
for key, blist in blocklists.items():
|
||||
log.debug(f"processing blocklist from: {key} ...")
|
||||
for newblock in blist:
|
||||
domain = newblock['domain']
|
||||
domain = newblock.domain
|
||||
# If the domain has two asterisks in it, it's obfuscated
|
||||
# and we can't really use it, so skip it and do the next one
|
||||
if '*' in domain:
|
||||
|
@ -151,7 +141,7 @@ def merge_blocklists(blocklists: dict, mergeplan: str='max') -> dict:
|
|||
# end for
|
||||
return merged
|
||||
|
||||
def apply_mergeplan(oldblock: dict, newblock: dict, mergeplan: str='max') -> dict:
|
||||
def apply_mergeplan(oldblock: DomainBlock, newblock: DomainBlock, mergeplan: str='max') -> dict:
|
||||
"""Use a mergeplan to decide how to merge two overlapping block definitions
|
||||
|
||||
@param oldblock: The existing block definition.
|
||||
|
@ -159,7 +149,7 @@ def apply_mergeplan(oldblock: dict, newblock: dict, mergeplan: str='max') -> dic
|
|||
@param mergeplan: How to merge. Choices are 'max', the default, and 'min'.
|
||||
"""
|
||||
# Default to the existing block definition
|
||||
blockdata = oldblock.copy()
|
||||
blockdata = oldblock._asdict()
|
||||
|
||||
# If the public or private comment is different,
|
||||
# append it to the existing comment, joined with ', '
|
||||
|
@ -167,10 +157,10 @@ def apply_mergeplan(oldblock: dict, newblock: dict, mergeplan: str='max') -> dic
|
|||
keylist = ['public_comment', 'private_comment']
|
||||
for key in keylist:
|
||||
try:
|
||||
if oldblock[key] not in ['', None] and newblock[key] not in ['', None] and oldblock[key] != newblock[key]:
|
||||
log.debug(f"old comment: '{oldblock[key]}'")
|
||||
log.debug(f"new comment: '{newblock[key]}'")
|
||||
blockdata[key] = ', '.join([oldblock[key], newblock[key]])
|
||||
if getattr(oldblock, key) not in ['', None] and getattr(newblock, key) not in ['', None] and getattr(oldblock, key) != getattr(newblock, key):
|
||||
log.debug(f"old comment: '{getattr(oldblock, key)}'")
|
||||
log.debug(f"new comment: '{getattr(newblock, key)}'")
|
||||
blockdata[key] = ', '.join([getattr(oldblock, key), getattr(newblock, key)])
|
||||
except KeyError:
|
||||
log.debug(f"Key '{key}' missing from block definition so cannot compare. Continuing...")
|
||||
continue
|
||||
|
@ -180,25 +170,25 @@ def apply_mergeplan(oldblock: dict, newblock: dict, mergeplan: str='max') -> dic
|
|||
# Use the highest block level found (the default)
|
||||
log.debug(f"Using 'max' mergeplan.")
|
||||
|
||||
if SEVERITY[newblock['severity']] > SEVERITY[oldblock['severity']]:
|
||||
if newblock.severity > oldblock.severity:
|
||||
log.debug(f"New block severity is higher. Using that.")
|
||||
blockdata['severity'] = newblock['severity']
|
||||
blockdata['severity'] = newblock.severity
|
||||
|
||||
# If obfuscate is set and is True for the domain in
|
||||
# any blocklist then obfuscate is set to True.
|
||||
if newblock.get('obfuscate', False):
|
||||
if getattr(newblock, 'obfuscate', False):
|
||||
blockdata['obfuscate'] = True
|
||||
|
||||
elif mergeplan in ['min']:
|
||||
# Use the lowest block level found
|
||||
log.debug(f"Using 'min' mergeplan.")
|
||||
|
||||
if SEVERITY[newblock['severity']] < SEVERITY[oldblock['severity']]:
|
||||
blockdata['severity'] = newblock['severity']
|
||||
if newblock.severity < oldblock.severity:
|
||||
blockdata['severity'] = newblock.severity
|
||||
|
||||
# If obfuscate is set and is False for the domain in
|
||||
# any blocklist then obfuscate is set to False.
|
||||
if not newblock.get('obfuscate', True):
|
||||
if not getattr(newblock, 'obfuscate', True):
|
||||
blockdata['obfuscate'] = False
|
||||
|
||||
else:
|
||||
|
@ -206,7 +196,7 @@ def apply_mergeplan(oldblock: dict, newblock: dict, mergeplan: str='max') -> dic
|
|||
|
||||
log.debug(f"Block severity set to {blockdata['severity']}")
|
||||
|
||||
return blockdata
|
||||
return DomainBlock(**blockdata)
|
||||
|
||||
def requests_headers(token: str=None):
|
||||
"""Set common headers for requests"""
|
||||
|
@ -219,7 +209,7 @@ def requests_headers(token: str=None):
|
|||
return headers
|
||||
|
||||
def fetch_instance_blocklist(host: str, token: str=None, admin: bool=False,
|
||||
import_fields: list=['domain', 'severity']) -> list:
|
||||
import_fields: list=['domain', 'severity']) -> list[DomainBlock]:
|
||||
"""Fetch existing block list from server
|
||||
|
||||
@param host: The remote host to connect to.
|
||||
|
@ -239,7 +229,7 @@ def fetch_instance_blocklist(host: str, token: str=None, admin: bool=False,
|
|||
|
||||
url = f"https://{host}{api_path}"
|
||||
|
||||
domain_blocks = []
|
||||
blocklist = []
|
||||
link = True
|
||||
|
||||
while link:
|
||||
|
@ -248,7 +238,7 @@ def fetch_instance_blocklist(host: str, token: str=None, admin: bool=False,
|
|||
log.error(f"Cannot fetch remote blocklist: {response.content}")
|
||||
raise ValueError("Unable to fetch domain block list: %s", response)
|
||||
|
||||
domain_blocks.extend(json.loads(response.content))
|
||||
blocklist.extend( parse_blocklist(response.content, 'json', import_fields) )
|
||||
|
||||
# Parse the link header to find the next url to fetch
|
||||
# This is a weird and janky way of doing pagination but
|
||||
|
@ -262,20 +252,12 @@ def fetch_instance_blocklist(host: str, token: str=None, admin: bool=False,
|
|||
break
|
||||
else:
|
||||
next = pagination[0]
|
||||
prev = pagination[1]
|
||||
# prev = pagination[1]
|
||||
|
||||
urlstring, rel = next.split('; ')
|
||||
url = urlstring.strip('<').rstrip('>')
|
||||
|
||||
log.debug(f"Found {len(domain_blocks)} existing domain blocks.")
|
||||
# Remove fields not in import list.
|
||||
for row in domain_blocks:
|
||||
origrow = row.copy()
|
||||
for key in origrow:
|
||||
if key not in import_fields:
|
||||
del row[key]
|
||||
|
||||
return domain_blocks
|
||||
return blocklist
|
||||
|
||||
def delete_block(token: str, host: str, id: int):
|
||||
"""Remove a domain block"""
|
||||
|
@ -334,40 +316,26 @@ def fetch_instance_follows(token: str, host: str, domain: str) -> int:
|
|||
return follows
|
||||
|
||||
def check_followed_severity(host: str, token: str, domain: str,
|
||||
severity: str, max_followed_severity: str='silence'):
|
||||
severity: BlockSeverity,
|
||||
max_followed_severity: BlockSeverity=BlockSeverity('silence')):
|
||||
"""Check an instance to see if it has followers of a to-be-blocked instance"""
|
||||
|
||||
# Return straight away if we're not increasing the severity
|
||||
if severity <= max_followed_severity:
|
||||
return severity
|
||||
|
||||
# If the instance has accounts that follow people on the to-be-blocked domain,
|
||||
# limit the maximum severity to the configured `max_followed_severity`.
|
||||
follows = fetch_instance_follows(token, host, domain)
|
||||
if follows > 0:
|
||||
log.debug(f"Instance {host} has {follows} followers of accounts at {domain}.")
|
||||
if SEVERITY[severity] > SEVERITY[max_followed_severity]:
|
||||
if severity > max_followed_severity:
|
||||
log.warning(f"Instance {host} has {follows} followers of accounts at {domain}. Limiting block severity to {max_followed_severity}.")
|
||||
return max_followed_severity
|
||||
else:
|
||||
return severity
|
||||
return severity
|
||||
|
||||
def is_change_needed(oldblock: dict, newblock: dict, import_fields: list):
|
||||
"""Compare block definitions to see if changes are needed"""
|
||||
# Check if anything is actually different and needs updating
|
||||
change_needed = []
|
||||
|
||||
for key in import_fields:
|
||||
try:
|
||||
oldval = oldblock[key]
|
||||
newval = newblock[key]
|
||||
log.debug(f"Compare {key} '{oldval}' <> '{newval}'")
|
||||
|
||||
if oldval != newval:
|
||||
log.debug("Difference detected. Change needed.")
|
||||
change_needed.append(key)
|
||||
break
|
||||
|
||||
except KeyError:
|
||||
log.debug(f"Key '{key}' missing from block definition so cannot compare. Continuing...")
|
||||
continue
|
||||
|
||||
change_needed = oldblock.compare_fields(newblock, import_fields)
|
||||
return change_needed
|
||||
|
||||
def update_known_block(token: str, host: str, blockdict: dict):
|
||||
|
@ -392,17 +360,17 @@ def update_known_block(token: str, host: str, blockdict: dict):
|
|||
if response.status_code != 200:
|
||||
raise ValueError(f"Something went wrong: {response.status_code}: {response.content}")
|
||||
|
||||
def add_block(token: str, host: str, blockdata: dict):
|
||||
def add_block(token: str, host: str, blockdata: DomainBlock):
|
||||
"""Block a domain on Mastodon host
|
||||
"""
|
||||
log.debug(f"Blocking domain {blockdata['domain']} at {host}...")
|
||||
log.debug(f"Blocking domain {blockdata.domain} at {host}...")
|
||||
api_path = "/api/v1/admin/domain_blocks"
|
||||
|
||||
url = f"https://{host}{api_path}"
|
||||
|
||||
response = requests.post(url,
|
||||
headers=requests_headers(token),
|
||||
data=blockdata,
|
||||
data=blockdata._asdict(),
|
||||
timeout=REQUEST_TIMEOUT
|
||||
)
|
||||
if response.status_code == 422:
|
||||
|
@ -417,7 +385,7 @@ def add_block(token: str, host: str, blockdata: dict):
|
|||
def push_blocklist(token: str, host: str, blocklist: list[dict],
|
||||
dryrun: bool=False,
|
||||
import_fields: list=['domain', 'severity'],
|
||||
max_followed_severity='silence',
|
||||
max_followed_severity:BlockSeverity=BlockSeverity('silence'),
|
||||
):
|
||||
"""Push a blocklist to a remote instance.
|
||||
|
||||
|
@ -437,36 +405,41 @@ def push_blocklist(token: str, host: str, blocklist: list[dict],
|
|||
serverblocks = fetch_instance_blocklist(host, token, True, import_fields)
|
||||
|
||||
# # Convert serverblocks to a dictionary keyed by domain name
|
||||
knownblocks = {row['domain']: row for row in serverblocks}
|
||||
knownblocks = {row.domain: row for row in serverblocks}
|
||||
|
||||
for newblock in blocklist:
|
||||
|
||||
log.debug(f"Applying newblock: {newblock}")
|
||||
oldblock = knownblocks.get(newblock['domain'], None)
|
||||
log.debug(f"Processing block: {newblock}")
|
||||
oldblock = knownblocks.get(newblock.domain, None)
|
||||
if oldblock:
|
||||
log.debug(f"Block already exists for {newblock['domain']}, checking for differences...")
|
||||
log.debug(f"Block already exists for {newblock.domain}, checking for differences...")
|
||||
|
||||
change_needed = is_change_needed(oldblock, newblock, import_fields)
|
||||
|
||||
if change_needed:
|
||||
# Change might be needed, but let's see if the severity
|
||||
# needs to change. If not, maybe no changes are needed?
|
||||
newseverity = check_followed_severity(host, token, oldblock['domain'], newblock['severity'], max_followed_severity)
|
||||
if newseverity != oldblock['severity']:
|
||||
newblock['severity'] = newseverity
|
||||
change_needed.append('severity')
|
||||
|
||||
# Change still needed?
|
||||
if change_needed:
|
||||
log.info(f"Change detected. Updating domain block for {oldblock['domain']}")
|
||||
blockdata = oldblock.copy()
|
||||
blockdata.update(newblock)
|
||||
if not dryrun:
|
||||
update_known_block(token, host, blockdata)
|
||||
# add a pause here so we don't melt the instance
|
||||
time.sleep(1)
|
||||
else:
|
||||
log.info("Dry run selected. Not applying changes.")
|
||||
# Is the severity changing?
|
||||
if 'severity' in change_needed:
|
||||
# Confirm if we really want to change the severity
|
||||
# If we still have followers of the remote domain, we may not
|
||||
# want to go all the way to full suspend, depending on the configuration
|
||||
newseverity = check_followed_severity(host, token, oldblock.domain, newblock.severity, max_followed_severity)
|
||||
if newseverity != oldblock.severity:
|
||||
newblock.severity = newseverity
|
||||
else:
|
||||
log.info("Keeping severity of block the same to avoid disrupting followers.")
|
||||
change_needed.remove('severity')
|
||||
|
||||
if change_needed:
|
||||
log.info(f"Change detected. Need to update {change_needed} for domain block for {oldblock.domain}")
|
||||
log.info(f"Old block definition: {oldblock}")
|
||||
log.info(f"Pushing new block definition: {newblock}")
|
||||
blockdata = oldblock.copy()
|
||||
blockdata.update(newblock)
|
||||
if not dryrun:
|
||||
update_known_block(token, host, blockdata)
|
||||
# add a pause here so we don't melt the instance
|
||||
time.sleep(1)
|
||||
else:
|
||||
log.info("Dry run selected. Not applying changes.")
|
||||
|
||||
else:
|
||||
log.debug("No differences detected. Not updating.")
|
||||
|
@ -475,22 +448,22 @@ def push_blocklist(token: str, host: str, blocklist: list[dict],
|
|||
else:
|
||||
# This is a new block for the target instance, so we
|
||||
# need to add a block rather than update an existing one
|
||||
blockdata = {
|
||||
'domain': newblock['domain'],
|
||||
# Default to Silence if nothing is specified
|
||||
'severity': newblock.get('severity', 'silence'),
|
||||
'public_comment': newblock.get('public_comment', ''),
|
||||
'private_comment': newblock.get('private_comment', ''),
|
||||
'reject_media': newblock.get('reject_media', False),
|
||||
'reject_reports': newblock.get('reject_reports', False),
|
||||
'obfuscate': newblock.get('obfuscate', False),
|
||||
}
|
||||
# blockdata = {
|
||||
# 'domain': newblock.domain,
|
||||
# # Default to Silence if nothing is specified
|
||||
# 'severity': newblock.get('severity', 'silence'),
|
||||
# 'public_comment': newblock.get('public_comment', ''),
|
||||
# 'private_comment': newblock.get('private_comment', ''),
|
||||
# 'reject_media': newblock.get('reject_media', False),
|
||||
# 'reject_reports': newblock.get('reject_reports', False),
|
||||
# 'obfuscate': newblock.get('obfuscate', False),
|
||||
# }
|
||||
|
||||
# Make sure the new block doesn't clobber a domain with followers
|
||||
blockdata['severity'] = check_followed_severity(host, token, newblock['domain'], max_followed_severity)
|
||||
log.info(f"Adding new block for {blockdata['domain']}...")
|
||||
newblock.severity = check_followed_severity(host, token, newblock.domain, newblock.severity, max_followed_severity)
|
||||
log.info(f"Adding new block: {newblock}...")
|
||||
if not dryrun:
|
||||
add_block(token, host, blockdata)
|
||||
add_block(token, host, newblock)
|
||||
# add a pause here so we don't melt the instance
|
||||
time.sleep(1)
|
||||
else:
|
||||
|
@ -520,7 +493,7 @@ def save_intermediate_blocklist(
|
|||
save_blocklist_to_file(blocklist, filepath, export_fields)
|
||||
|
||||
def save_blocklist_to_file(
|
||||
blocklist: list[dict],
|
||||
blocklist: list[DomainBlock],
|
||||
filepath: str,
|
||||
export_fields: list=['domain','severity']):
|
||||
"""Save a blocklist we've downloaded from a remote source
|
||||
|
@ -530,9 +503,9 @@ def save_blocklist_to_file(
|
|||
@param export_fields: Which fields to include in the export.
|
||||
"""
|
||||
try:
|
||||
blocklist = sorted(blocklist, key=lambda x: x['domain'])
|
||||
blocklist = sorted(blocklist, key=lambda x: x.domain)
|
||||
except KeyError:
|
||||
log.error("Field 'domain' not found in blocklist. Are you sure the URLs are correct?")
|
||||
log.error("Field 'domain' not found in blocklist.")
|
||||
log.debug(f"blocklist is: {blocklist}")
|
||||
|
||||
log.debug(f"export fields: {export_fields}")
|
||||
|
@ -540,7 +513,8 @@ def save_blocklist_to_file(
|
|||
with open(filepath, "w") as fp:
|
||||
writer = csv.DictWriter(fp, export_fields, extrasaction='ignore')
|
||||
writer.writeheader()
|
||||
writer.writerows(blocklist)
|
||||
for item in blocklist:
|
||||
writer.writerow(item._asdict())
|
||||
|
||||
def augment_args(args):
|
||||
"""Augment commandline arguments with config file parameters"""
|
||||
|
@ -576,17 +550,6 @@ def augment_args(args):
|
|||
|
||||
return args
|
||||
|
||||
def str2bool(boolstring: str) -> bool:
|
||||
"""Helper function to convert boolean strings to actual Python bools
|
||||
"""
|
||||
boolstring = boolstring.lower()
|
||||
if boolstring in ['true', 't', '1', 'y', 'yes']:
|
||||
return True
|
||||
elif boolstring in ['false', 'f', '0', 'n', 'no']:
|
||||
return False
|
||||
else:
|
||||
raise ValueError(f"Cannot parse value '{boolstring}' as boolean")
|
||||
|
||||
def main():
|
||||
|
||||
ap = argparse.ArgumentParser(
|
||||
|
|
|
@ -0,0 +1,186 @@
|
|||
"""Parse various blocklist data formats
|
||||
"""
|
||||
from typing import Iterable
|
||||
from .const import DomainBlock, BlockSeverity
|
||||
|
||||
import csv
|
||||
import json
|
||||
|
||||
import logging
|
||||
log = logging.getLogger('fediblockhole')
|
||||
|
||||
class BlocklistParser(object):
|
||||
"""
|
||||
Base class for parsing blocklists
|
||||
"""
|
||||
preparse = False
|
||||
|
||||
def __init__(self, import_fields: list=['domain', 'severity'],
|
||||
max_severity: str='suspend'):
|
||||
"""Create a Parser
|
||||
|
||||
@param import_fields: an optional list of fields to limit the parser to.
|
||||
Ignore any fields in a block item that aren't in import_fields.
|
||||
"""
|
||||
self.import_fields = import_fields
|
||||
self.max_severity = BlockSeverity(max_severity)
|
||||
|
||||
def preparse(self, blockdata) -> Iterable:
|
||||
"""Some raw datatypes need to be converted into an iterable
|
||||
"""
|
||||
raise NotImplementedError
|
||||
|
||||
def parse_blocklist(self, blockdata) -> dict[DomainBlock]:
|
||||
"""Parse an iterable of blocklist items
|
||||
@param blocklist: An Iterable of blocklist items
|
||||
@returns: A dict of DomainBlocks, keyed by domain
|
||||
"""
|
||||
if self.preparse:
|
||||
blockdata = self.preparse(blockdata)
|
||||
|
||||
parsed_list = []
|
||||
for blockitem in blockdata:
|
||||
parsed_list.append(self.parse_item(blockitem))
|
||||
return parsed_list
|
||||
|
||||
def parse_item(self, blockitem) -> DomainBlock:
|
||||
"""Parse an individual block item
|
||||
|
||||
@param blockitem: an individual block to be parsed
|
||||
@param import_fields: fields of a block we will import
|
||||
"""
|
||||
raise NotImplementedError
|
||||
|
||||
class BlocklistParserJSON(BlocklistParser):
|
||||
"""Parse a JSON formatted blocklist"""
|
||||
preparse = True
|
||||
|
||||
def preparse(self, blockdata) -> Iterable:
|
||||
"""Parse the blockdata as JSON
|
||||
"""
|
||||
return json.loads(blockdata)
|
||||
|
||||
def parse_item(self, blockitem: str) -> DomainBlock:
|
||||
# Remove fields we don't want to import
|
||||
origitem = blockitem.copy()
|
||||
for key in origitem:
|
||||
if key not in self.import_fields:
|
||||
del blockitem[key]
|
||||
|
||||
# Convert dict to NamedTuple with the double-star operator
|
||||
# See: https://docs.python.org/3/tutorial/controlflow.html#tut-unpacking-arguments
|
||||
block = DomainBlock(**blockitem)
|
||||
if block.severity > self.max_severity:
|
||||
block.severity = self.max_severity
|
||||
return block
|
||||
|
||||
class BlocklistParserCSV(BlocklistParser):
|
||||
""" Parse CSV formatted blocklists
|
||||
|
||||
The parser expects the CSV data to include a header with the field names.
|
||||
"""
|
||||
preparse = True
|
||||
|
||||
def preparse(self, blockdata) -> Iterable:
|
||||
"""Use a csv.DictReader to create an iterable from the blockdata
|
||||
"""
|
||||
return csv.DictReader(blockdata.split('\n'))
|
||||
|
||||
def parse_item(self, blockitem: dict) -> DomainBlock:
|
||||
# Coerce booleans from string to Python bool
|
||||
# FIXME: Is this still necessary with the DomainBlock object?
|
||||
for boolkey in ['reject_media', 'reject_reports', 'obfuscate']:
|
||||
if boolkey in blockitem:
|
||||
blockitem[boolkey] = str2bool(blockitem[boolkey])
|
||||
|
||||
# Remove fields we don't want to import
|
||||
origitem = blockitem.copy()
|
||||
for key in origitem:
|
||||
if key not in self.import_fields:
|
||||
del blockitem[key]
|
||||
|
||||
# Convert dict to NamedTuple with the double-star operator
|
||||
# See: https://docs.python.org/3/tutorial/controlflow.html#tut-unpacking-arguments
|
||||
block = DomainBlock(**blockitem)
|
||||
if block.severity > self.max_severity:
|
||||
block.severity = self.max_severity
|
||||
return block
|
||||
|
||||
class RapidBlockParserCSV(BlocklistParserCSV):
|
||||
""" Parse RapidBlock CSV blocklists
|
||||
|
||||
RapidBlock CSV blocklists are just a newline separated list of domains.
|
||||
"""
|
||||
def preparse(self, blockdata) -> Iterable:
|
||||
"""Prepend a 'domain' field header to the data
|
||||
"""
|
||||
log.debug(f"blockdata: {blockdata[:100]}")
|
||||
blockdata = ''.join(["domain\r\n", blockdata])
|
||||
|
||||
return csv.DictReader(blockdata.split('\r\n'))
|
||||
|
||||
class RapidBlockParserJSON(BlocklistParserJSON):
|
||||
"""Parse RapidBlock JSON formatted blocklists
|
||||
"""
|
||||
def preparse(self, blockdata) -> Iterable:
|
||||
rb_dict = json.loads(blockdata)
|
||||
# We want to iterate over all the dictionary items
|
||||
return rb_dict['blocks'].items()
|
||||
|
||||
def parse_item(self, blockitem: tuple) -> DomainBlock:
|
||||
"""Parse an individual item in a RapidBlock list
|
||||
"""
|
||||
# Each item is a tuple of:
|
||||
# (domain, {dictionary of attributes})
|
||||
domain = blockitem[0]
|
||||
|
||||
# RapidBlock has a binary block level which we map
|
||||
# to 'suspend' if True, and 'noop' if False.
|
||||
isblocked = blockitem[1]['isBlocked']
|
||||
if isblocked:
|
||||
severity = 'suspend'
|
||||
else:
|
||||
severity = 'noop'
|
||||
|
||||
if 'public_comment' in self.import_fields:
|
||||
public_comment = blockitem[1]['reason']
|
||||
else:
|
||||
public_comment = ''
|
||||
|
||||
# There's a 'tags' field as well, but we can't
|
||||
# do much with that in Mastodon yet
|
||||
|
||||
block = DomainBlock(domain, severity, public_comment)
|
||||
if block.severity > self.max_severity:
|
||||
block.severity = self.max_severity
|
||||
|
||||
return block
|
||||
|
||||
def str2bool(boolstring: str) -> bool:
|
||||
"""Helper function to convert boolean strings to actual Python bools
|
||||
"""
|
||||
boolstring = boolstring.lower()
|
||||
if boolstring in ['true', 't', '1', 'y', 'yes']:
|
||||
return True
|
||||
elif boolstring in ['false', 'f', '0', 'n', 'no']:
|
||||
return False
|
||||
else:
|
||||
raise ValueError(f"Cannot parse value '{boolstring}' as boolean")
|
||||
|
||||
FORMAT_PARSERS = {
|
||||
'csv': BlocklistParserCSV,
|
||||
'json': BlocklistParserJSON,
|
||||
'rapidblock.csv': RapidBlockParserCSV,
|
||||
'rapidblock.json': RapidBlockParserJSON,
|
||||
}
|
||||
|
||||
# helper function to select the appropriate Parser
|
||||
def parse_blocklist(
|
||||
blockdata,
|
||||
format="csv",
|
||||
import_fields: list=['domain', 'severity'],
|
||||
max_severity: str='suspend'):
|
||||
"""Parse a blocklist in the given format
|
||||
"""
|
||||
parser = FORMAT_PARSERS[format](import_fields, max_severity)
|
||||
return parser.parse_blocklist(blockdata)
|
|
@ -0,0 +1,220 @@
|
|||
""" Constant objects used by FediBlockHole
|
||||
"""
|
||||
import enum
|
||||
from typing import NamedTuple, Optional, TypedDict
|
||||
from dataclasses import dataclass
|
||||
|
||||
import logging
|
||||
log = logging.getLogger('fediblockhole')
|
||||
|
||||
class SeverityLevel(enum.IntEnum):
|
||||
"""How severe should a block be? Higher is more severe.
|
||||
"""
|
||||
NONE = enum.auto()
|
||||
SILENCE = enum.auto()
|
||||
SUSPEND = enum.auto()
|
||||
|
||||
class BlockSeverity(object):
|
||||
"""A representation of a block severity
|
||||
|
||||
We add some helpful functions rather than using a bare IntEnum
|
||||
"""
|
||||
|
||||
def __init__(self, severity:str=None):
|
||||
self._level = self.str2level(severity)
|
||||
|
||||
@property
|
||||
def level(self):
|
||||
return self._level
|
||||
|
||||
@level.setter
|
||||
def level(self, value):
|
||||
if isinstance(value, SeverityLevel):
|
||||
self._level = value
|
||||
elif type(value) == type(''):
|
||||
self._level = self.str2level(value)
|
||||
else:
|
||||
raise ValueError(f"Invalid level value '{value}'")
|
||||
|
||||
def str2level(self, severity:str=None):
|
||||
"""Convert a string severity level to an internal enum"""
|
||||
|
||||
if severity in [None, '', 'noop']:
|
||||
return SeverityLevel.NONE
|
||||
|
||||
elif severity in ['silence']:
|
||||
return SeverityLevel.SILENCE
|
||||
|
||||
elif severity in ['suspend']:
|
||||
return SeverityLevel.SUSPEND
|
||||
|
||||
else:
|
||||
raise ValueError(f"Invalid severity value '{severity}'")
|
||||
|
||||
def __repr__(self):
|
||||
return f"'{str(self)}'"
|
||||
|
||||
def __str__(self):
|
||||
"""A string version of the severity level
|
||||
"""
|
||||
levelmap = {
|
||||
SeverityLevel.NONE: 'noop',
|
||||
SeverityLevel.SILENCE: 'silence',
|
||||
SeverityLevel.SUSPEND: 'suspend',
|
||||
}
|
||||
return levelmap[self.level]
|
||||
|
||||
def __lt__(self, other):
|
||||
if self._level < other._level:
|
||||
return True
|
||||
|
||||
def __gt__(self, other):
|
||||
if self._level > other._level:
|
||||
return True
|
||||
|
||||
def __eq__(self, other):
|
||||
if other is not None and self._level == other._level:
|
||||
return True
|
||||
|
||||
def __le__(self, other):
|
||||
if self._level <= other._level:
|
||||
return True
|
||||
|
||||
def __ge__(self, other):
|
||||
if self._level >= other._level:
|
||||
return True
|
||||
|
||||
# class _DomainBlock(NamedTuple):
|
||||
# domain: str # FIXME: Use an actual Domain object from somewhere?
|
||||
# severity: BlockSeverity = BlockSeverity.SUSPEND
|
||||
# public_comment: str = ''
|
||||
# private_comment: str = ''
|
||||
# reject_media: bool = False
|
||||
# reject_reports: bool = False
|
||||
# obfuscate: bool = False
|
||||
|
||||
class DomainBlock(object):
|
||||
|
||||
fields = [
|
||||
'domain',
|
||||
'severity',
|
||||
'public_comment',
|
||||
'private_comment',
|
||||
'reject_media',
|
||||
'reject_reports',
|
||||
'obfuscate',
|
||||
]
|
||||
|
||||
all_fields = [
|
||||
'domain',
|
||||
'severity',
|
||||
'public_comment',
|
||||
'private_comment',
|
||||
'reject_media',
|
||||
'reject_reports',
|
||||
'obfuscate',
|
||||
'id'
|
||||
]
|
||||
|
||||
def __init__(self, domain:str,
|
||||
severity: BlockSeverity=BlockSeverity('suspend'),
|
||||
public_comment: str="",
|
||||
private_comment: str="",
|
||||
reject_media: bool=False,
|
||||
reject_reports: bool=False,
|
||||
obfuscate: bool=False,
|
||||
id: int=None):
|
||||
"""Initialize the DomainBlock
|
||||
"""
|
||||
self.domain = domain
|
||||
self.public_comment = public_comment
|
||||
self.private_comment = private_comment
|
||||
self.reject_media = reject_media
|
||||
self.reject_reports = reject_reports
|
||||
self.obfuscate = obfuscate
|
||||
self.id = id
|
||||
|
||||
if isinstance(severity, BlockSeverity):
|
||||
self.severity = severity
|
||||
else:
|
||||
self.severity = BlockSeverity(severity)
|
||||
|
||||
def _asdict(self):
|
||||
"""Return a dict version of this object
|
||||
"""
|
||||
dictval = {
|
||||
'domain': self.domain,
|
||||
'severity': self.severity,
|
||||
'public_comment': self.public_comment,
|
||||
'private_comment': self.private_comment,
|
||||
'reject_media': self.reject_media,
|
||||
'reject_reports': self.reject_reports,
|
||||
'obfuscate': self.obfuscate,
|
||||
}
|
||||
if self.id:
|
||||
dictval['id'] = self.id
|
||||
|
||||
return dictval
|
||||
|
||||
def compare_fields(self, other, fields=None)->list:
|
||||
"""Compare two DomainBlocks on specific fields.
|
||||
If all the fields are equal, the DomainBlocks are equal.
|
||||
|
||||
@returns: a list of the fields that are different
|
||||
"""
|
||||
if not isinstance(other, DomainBlock):
|
||||
raise ValueError(f"Cannot compare DomainBlock to {type(other)}:{other}")
|
||||
|
||||
if fields is None:
|
||||
fields = self.fields
|
||||
|
||||
diffs = []
|
||||
# Check if all the fields are equal
|
||||
for field in self.fields:
|
||||
a = getattr(self, field)
|
||||
b = getattr(other, field)
|
||||
# log.debug(f"Comparing field {field}: '{a}' <> '{b}'")
|
||||
if getattr(self, field) != getattr(other, field):
|
||||
diffs.append(field)
|
||||
return diffs
|
||||
|
||||
def __eq__(self, other):
|
||||
diffs = self.compare_fields(other)
|
||||
if len(diffs) == 0:
|
||||
return True
|
||||
|
||||
def __repr__(self):
|
||||
|
||||
return f"<DomainBlock {self._asdict()}>"
|
||||
|
||||
def copy(self):
|
||||
"""Make a copy of this object and return it
|
||||
"""
|
||||
retval = DomainBlock(**self._asdict())
|
||||
return retval
|
||||
|
||||
def update(self, dict):
|
||||
"""Update my kwargs
|
||||
"""
|
||||
for key in dict:
|
||||
setattr(self, key, dict[key])
|
||||
|
||||
def __iter__(self):
|
||||
"""Be iterable"""
|
||||
keys = self.fields
|
||||
|
||||
if self.id:
|
||||
keys.append('id')
|
||||
|
||||
for k in keys:
|
||||
yield k
|
||||
|
||||
def __getitem__(self, k, default=None):
|
||||
"Behave like a dict for getting values"
|
||||
if k not in self.all_fields:
|
||||
raise KeyError(f"Invalid key '{k}'")
|
||||
|
||||
return getattr(self, k, default)
|
||||
|
||||
def get(self, k, default=None):
|
||||
return self.__getitem__(k, default)
|
|
@ -0,0 +1,112 @@
|
|||
[
|
||||
{
|
||||
"id": "234",
|
||||
"domain": "example.org",
|
||||
"created_at": "2023-01-09T05:17:50.614Z",
|
||||
"severity": "suspend",
|
||||
"reject_media": true,
|
||||
"reject_reports": true,
|
||||
"private_comment": "A private comment",
|
||||
"public_comment": "A public comment",
|
||||
"obfuscate": true
|
||||
},
|
||||
{
|
||||
"id": "233",
|
||||
"domain": "example2.org",
|
||||
"created_at": "2023-01-09T05:09:01.859Z",
|
||||
"severity": "silence",
|
||||
"reject_media": true,
|
||||
"reject_reports": true,
|
||||
"private_comment": "Another private comment",
|
||||
"public_comment": "Another public comment",
|
||||
"obfuscate": true
|
||||
},
|
||||
{
|
||||
"id": "232",
|
||||
"domain": "example3.org",
|
||||
"created_at": "2023-01-09T05:08:58.833Z",
|
||||
"severity": "suspend",
|
||||
"reject_media": true,
|
||||
"reject_reports": true,
|
||||
"private_comment": "More comments? What is this?",
|
||||
"public_comment": "Yes we love to comment",
|
||||
"obfuscate": true
|
||||
},
|
||||
{
|
||||
"id": "231",
|
||||
"domain": "example4.org",
|
||||
"created_at": "2023-01-09T05:04:01.856Z",
|
||||
"severity": "noop",
|
||||
"reject_media": true,
|
||||
"reject_reports": true,
|
||||
"private_comment": "I cannot believe all the comments",
|
||||
"public_comment": "Look how many comments we can fit in here",
|
||||
"obfuscate": true
|
||||
},
|
||||
{
|
||||
"id": "230",
|
||||
"domain": "example5.org",
|
||||
"created_at": "2023-01-08T21:37:22.665Z",
|
||||
"severity": "suspend",
|
||||
"reject_media": false,
|
||||
"reject_reports": false,
|
||||
"private_comment": "",
|
||||
"public_comment": "lack of moderation",
|
||||
"obfuscate": false
|
||||
},
|
||||
{
|
||||
"id": "2308",
|
||||
"domain": "example6.org",
|
||||
"created_at": "2023-01-06T08:36:53.989Z",
|
||||
"severity": "suspend",
|
||||
"reject_media": false,
|
||||
"reject_reports": false,
|
||||
"private_comment": "",
|
||||
"public_comment": "anti-trans bigotry",
|
||||
"obfuscate": false
|
||||
},
|
||||
{
|
||||
"id": "2306",
|
||||
"domain": "example7.org",
|
||||
"created_at": "2023-01-04T08:14:05.381Z",
|
||||
"severity": "suspend",
|
||||
"reject_media": false,
|
||||
"reject_reports": false,
|
||||
"private_comment": "",
|
||||
"public_comment": "lack of moderation",
|
||||
"obfuscate": false
|
||||
},
|
||||
{
|
||||
"id": "2305",
|
||||
"domain": "example8.org",
|
||||
"created_at": "2023-01-04T08:13:48.891Z",
|
||||
"severity": "suspend",
|
||||
"reject_media": false,
|
||||
"reject_reports": false,
|
||||
"private_comment": "freeze peach",
|
||||
"public_comment": "lack of moderation, conspiracy weirdness",
|
||||
"obfuscate": false
|
||||
},
|
||||
{
|
||||
"id": "2301",
|
||||
"domain": "example9.org",
|
||||
"created_at": "2023-01-04T08:11:32.904Z",
|
||||
"severity": "silence",
|
||||
"reject_media": false,
|
||||
"reject_reports": false,
|
||||
"private_comment": "",
|
||||
"public_comment": "alt-right conspiracies",
|
||||
"obfuscate": false
|
||||
},
|
||||
{
|
||||
"id": "453",
|
||||
"domain": "example15.org",
|
||||
"created_at": "2022-12-05T08:26:59.920Z",
|
||||
"severity": "suspend",
|
||||
"reject_media": true,
|
||||
"reject_reports": true,
|
||||
"private_comment": "cryptocurrency",
|
||||
"public_comment": "cryptocurrency",
|
||||
"obfuscate": true
|
||||
}
|
||||
]
|
|
@ -0,0 +1,14 @@
|
|||
"domain","severity","public_comment","private_comment","reject_media","reject_reports","obfuscate"
|
||||
"public-comment.example.org","noop","This is a public comment","This is a private comment",TRUE,TRUE,TRUE
|
||||
"private-comment.example.org","noop",,"This is a private comment",TRUE,TRUE,TRUE
|
||||
"diff-comment.example.org","noop","Noop public comment","Noop private comment",TRUE,TRUE,TRUE
|
||||
"2diff-comment.example.org","noop","Public duplicate","Private duplicate",TRUE,TRUE,TRUE
|
||||
"qoto.org","noop",,,TRUE,TRUE,TRUE
|
||||
"sealion.club","noop",,,TRUE,TRUE,TRUE
|
||||
"develop.gab.com","noop",,,TRUE,TRUE,TRUE
|
||||
"gab.ai","noop",,,TRUE,TRUE,TRUE
|
||||
"gab.sleeck.eu","noop",,,TRUE,TRUE,TRUE
|
||||
"gab.com","noop",,,TRUE,TRUE,TRUE
|
||||
"kiwifarms.is","noop",,,TRUE,TRUE,TRUE
|
||||
"kiwifarms.net","noop",,,TRUE,TRUE,TRUE
|
||||
"gabfed.com","noop",,,TRUE,TRUE,TRUE
|
|
File diff suppressed because it is too large
Load Diff
|
@ -0,0 +1,14 @@
|
|||
"domain","severity","public_comment","private_comment","reject_media","reject_reports","obfuscate"
|
||||
"public-comment.example.org","silence","This is a public comment","This is a private comment",TRUE,TRUE,TRUE
|
||||
"private-comment.example.org","silence",,"This is a private comment",TRUE,TRUE,TRUE
|
||||
"diff-comment.example.org","silence","Silence public comment","Silence private comment",TRUE,TRUE,TRUE
|
||||
"2diff-comment.example.org","silence","Public duplicate","Private duplicate",TRUE,TRUE,TRUE
|
||||
"qoto.org","silence",,,TRUE,TRUE,TRUE
|
||||
"sealion.club","silence",,,TRUE,TRUE,TRUE
|
||||
"develop.gab.com","silence",,,TRUE,TRUE,TRUE
|
||||
"gab.ai","silence",,,TRUE,TRUE,TRUE
|
||||
"gab.sleeck.eu","silence",,,TRUE,TRUE,TRUE
|
||||
"gab.com","silence",,,TRUE,TRUE,TRUE
|
||||
"kiwifarms.is","silence",,,TRUE,TRUE,TRUE
|
||||
"kiwifarms.net","silence",,,TRUE,TRUE,TRUE
|
||||
"gabfed.com","silence",,,TRUE,TRUE,TRUE
|
|
|
@ -0,0 +1,14 @@
|
|||
"domain","severity","public_comment","private_comment","reject_media","reject_reports","obfuscate"
|
||||
"public-comment.example.org","suspend","This is a public comment","This is a private comment",TRUE,TRUE,TRUE
|
||||
"private-comment.example.org","suspend",,"This is a private comment",TRUE,TRUE,TRUE
|
||||
"diff-comment.example.org","suspend","Suspend public comment","Suspend private comment",TRUE,TRUE,TRUE
|
||||
"2diff-comment.example.org","suspend","Suspend comment 1","Suspend private 1",TRUE,TRUE,TRUE
|
||||
"qoto.org","suspend",,,TRUE,TRUE,TRUE
|
||||
"sealion.club","suspend",,,TRUE,TRUE,TRUE
|
||||
"develop.gab.com","suspend",,,TRUE,TRUE,TRUE
|
||||
"gab.ai","suspend",,,TRUE,TRUE,TRUE
|
||||
"gab.sleeck.eu","suspend",,,TRUE,TRUE,TRUE
|
||||
"gab.com","suspend",,,TRUE,TRUE,TRUE
|
||||
"kiwifarms.is","suspend",,,TRUE,TRUE,TRUE
|
||||
"kiwifarms.net","suspend",,,TRUE,TRUE,TRUE
|
||||
"gabfed.com","suspend",,,TRUE,TRUE,TRUE
|
|
|
@ -0,0 +1,68 @@
|
|||
from fediblockhole.const import BlockSeverity, SeverityLevel
|
||||
|
||||
def test_severity_eq():
|
||||
|
||||
s1 = BlockSeverity('suspend')
|
||||
s2 = BlockSeverity('suspend')
|
||||
|
||||
assert s1 == s2
|
||||
|
||||
s3 = BlockSeverity('silence')
|
||||
s4 = BlockSeverity('silence')
|
||||
|
||||
assert s3 == s4
|
||||
|
||||
s5 = BlockSeverity('noop')
|
||||
s6 = BlockSeverity('noop')
|
||||
|
||||
assert s5 == s6
|
||||
|
||||
def test_severity_ne():
|
||||
s1 = BlockSeverity('noop')
|
||||
s2 = BlockSeverity('silence')
|
||||
s3 = BlockSeverity('suspend')
|
||||
|
||||
assert s1 != s2
|
||||
assert s2 != s3
|
||||
assert s1 != s3
|
||||
|
||||
def test_severity_lt():
|
||||
s1 = BlockSeverity('noop')
|
||||
s2 = BlockSeverity('silence')
|
||||
s3 = BlockSeverity('suspend')
|
||||
|
||||
assert s1 < s2
|
||||
assert s2 < s3
|
||||
assert s1 < s3
|
||||
|
||||
def test_severity_gt():
|
||||
s1 = BlockSeverity('noop')
|
||||
s2 = BlockSeverity('silence')
|
||||
s3 = BlockSeverity('suspend')
|
||||
|
||||
assert s2 > s1
|
||||
assert s3 > s2
|
||||
assert s3 > s1
|
||||
|
||||
def test_severity_le():
|
||||
s1 = BlockSeverity('noop')
|
||||
s2 = BlockSeverity('silence')
|
||||
s2a = BlockSeverity('silence')
|
||||
s3 = BlockSeverity('suspend')
|
||||
|
||||
assert s1 <= s2
|
||||
assert s2a <= s2
|
||||
assert s2 <= s3
|
||||
assert s1 <= s3
|
||||
|
||||
def test_severity_ge():
|
||||
s1 = BlockSeverity('noop')
|
||||
s2 = BlockSeverity('silence')
|
||||
s2a = BlockSeverity('silence')
|
||||
s3 = BlockSeverity('suspend')
|
||||
|
||||
assert s2 >= s1
|
||||
assert s2a >= s1
|
||||
assert s3 >= s2
|
||||
assert s3 >= s1
|
||||
|
|
@ -0,0 +1,74 @@
|
|||
"""Test the DomainBlock structure
|
||||
"""
|
||||
import pytest
|
||||
|
||||
from fediblockhole.const import DomainBlock, BlockSeverity, SeverityLevel
|
||||
|
||||
def test_blocksev_blankstring():
|
||||
a = BlockSeverity('')
|
||||
assert a.level == SeverityLevel.NONE
|
||||
|
||||
def test_blocksev_string_noop():
|
||||
a = BlockSeverity('noop')
|
||||
assert a.level == SeverityLevel.NONE
|
||||
|
||||
def test_blocksev_none():
|
||||
a = BlockSeverity(None)
|
||||
assert a.level == SeverityLevel.NONE
|
||||
|
||||
def test_empty_domainblock_fails():
|
||||
with pytest.raises(TypeError):
|
||||
a = DomainBlock()
|
||||
|
||||
def test_default_suspend():
|
||||
a = DomainBlock('example.org')
|
||||
assert a.domain == 'example.org'
|
||||
assert a.severity.level == SeverityLevel.SUSPEND
|
||||
|
||||
def test_severity_suspend():
|
||||
a = DomainBlock('example.org', 'suspend')
|
||||
assert a.domain == 'example.org'
|
||||
assert a.severity.level == SeverityLevel.SUSPEND
|
||||
|
||||
def test_severity_silence():
|
||||
a = DomainBlock('example.org', 'silence')
|
||||
assert a.domain == 'example.org'
|
||||
assert a.severity.level == SeverityLevel.SILENCE
|
||||
|
||||
def test_severity_noop_string():
|
||||
a = DomainBlock('example.org', 'noop')
|
||||
assert a.domain == 'example.org'
|
||||
assert a.severity.level == SeverityLevel.NONE
|
||||
|
||||
def test_severity_none():
|
||||
a = DomainBlock('example.org', None)
|
||||
assert a.domain == 'example.org'
|
||||
assert a.severity.level == SeverityLevel.NONE
|
||||
|
||||
def test_compare_equal_blocks():
|
||||
|
||||
a = DomainBlock('example1.org', 'suspend')
|
||||
b = DomainBlock('example1.org', 'suspend')
|
||||
|
||||
assert a == b
|
||||
|
||||
def test_compare_diff_domains():
|
||||
|
||||
a = DomainBlock('example1.org', 'suspend')
|
||||
b = DomainBlock('example2.org', 'suspend')
|
||||
|
||||
assert a != b
|
||||
|
||||
def test_compare_diff_sevs():
|
||||
|
||||
a = DomainBlock('example1.org', 'suspend')
|
||||
b = DomainBlock('example1.org', 'silence')
|
||||
|
||||
assert a != b
|
||||
|
||||
def test_compare_diff_sevs_2():
|
||||
|
||||
a = DomainBlock('example1.org', 'suspend')
|
||||
b = DomainBlock('example1.org', 'noop')
|
||||
|
||||
assert a != b
|
|
@ -0,0 +1,139 @@
|
|||
"""Various mergeplan tests
|
||||
"""
|
||||
|
||||
from fediblockhole.blocklist_parser import parse_blocklist
|
||||
from fediblockhole import merge_blocklists
|
||||
|
||||
from fediblockhole.const import SeverityLevel
|
||||
|
||||
datafile01 = "data-suspends-01.csv"
|
||||
datafile02 = "data-silences-01.csv"
|
||||
datafile03 = "data-noop-01.csv"
|
||||
|
||||
import_fields = [
|
||||
'domain',
|
||||
'severity',
|
||||
'public_comment',
|
||||
'private_comment',
|
||||
'reject_media',
|
||||
'reject_reports',
|
||||
'obfuscate'
|
||||
]
|
||||
|
||||
def load_test_blocklist_data(datafiles):
|
||||
|
||||
blocklists = {}
|
||||
|
||||
for df in datafiles:
|
||||
with open(df) as fp:
|
||||
data = fp.read()
|
||||
bl = parse_blocklist(data, 'csv', import_fields)
|
||||
blocklists[df] = bl
|
||||
|
||||
return blocklists
|
||||
|
||||
def test_mergeplan_max():
|
||||
"""Test 'max' mergeplan"""
|
||||
blocklists = load_test_blocklist_data([datafile01, datafile02])
|
||||
|
||||
bl = merge_blocklists(blocklists, 'max')
|
||||
assert len(bl) == 13
|
||||
|
||||
for key in bl:
|
||||
assert bl[key].severity.level == SeverityLevel.SUSPEND
|
||||
|
||||
def test_mergeplan_min():
|
||||
"""Test 'max' mergeplan"""
|
||||
blocklists = load_test_blocklist_data([datafile01, datafile02])
|
||||
|
||||
bl = merge_blocklists(blocklists, 'min')
|
||||
assert len(bl) == 13
|
||||
|
||||
for key in bl:
|
||||
assert bl[key].severity.level == SeverityLevel.SILENCE
|
||||
|
||||
def test_mergeplan_default():
|
||||
"""Default mergeplan is max, so see if it's chosen"""
|
||||
blocklists = load_test_blocklist_data([datafile01, datafile02])
|
||||
|
||||
bl = merge_blocklists(blocklists)
|
||||
assert len(bl) == 13
|
||||
|
||||
for key in bl:
|
||||
assert bl[key].severity.level == SeverityLevel.SUSPEND
|
||||
|
||||
def test_mergeplan_3_max():
|
||||
"""3 datafiles and mergeplan of 'max'"""
|
||||
blocklists = load_test_blocklist_data([datafile01, datafile02, datafile03])
|
||||
|
||||
bl = merge_blocklists(blocklists, 'max')
|
||||
assert len(bl) == 13
|
||||
|
||||
for key in bl:
|
||||
assert bl[key].severity.level == SeverityLevel.SUSPEND
|
||||
|
||||
def test_mergeplan_3_max():
|
||||
"""3 datafiles and mergeplan of 'max'"""
|
||||
blocklists = load_test_blocklist_data([datafile01, datafile02, datafile03])
|
||||
|
||||
bl = merge_blocklists(blocklists, 'min')
|
||||
assert len(bl) == 13
|
||||
|
||||
for key in bl:
|
||||
assert bl[key].severity.level == SeverityLevel.NONE
|
||||
|
||||
def test_mergeplan_noop_v_silence_max():
|
||||
"""Mergeplan of max should choose silence over noop"""
|
||||
blocklists = load_test_blocklist_data([datafile02, datafile03])
|
||||
|
||||
bl = merge_blocklists(blocklists, 'max')
|
||||
assert len(bl) == 13
|
||||
|
||||
for key in bl:
|
||||
assert bl[key].severity.level == SeverityLevel.SILENCE
|
||||
|
||||
def test_mergeplan_noop_v_silence_min():
|
||||
"""Mergeplan of min should choose noop over silence"""
|
||||
blocklists = load_test_blocklist_data([datafile02, datafile03])
|
||||
|
||||
bl = merge_blocklists(blocklists, 'min')
|
||||
assert len(bl) == 13
|
||||
|
||||
for key in bl:
|
||||
assert bl[key].severity.level == SeverityLevel.NONE
|
||||
|
||||
def test_merge_public_comment():
|
||||
blocklists = load_test_blocklist_data([datafile01, datafile02, datafile03])
|
||||
|
||||
bl = merge_blocklists(blocklists, 'min')
|
||||
assert len(bl) == 13
|
||||
|
||||
assert bl['public-comment.example.org'].public_comment == 'This is a public comment'
|
||||
|
||||
def test_merge_private_comment():
|
||||
blocklists = load_test_blocklist_data([datafile01, datafile02, datafile03])
|
||||
|
||||
bl = merge_blocklists(blocklists, 'min')
|
||||
assert len(bl) == 13
|
||||
|
||||
assert bl['private-comment.example.org'].private_comment == 'This is a private comment'
|
||||
|
||||
def test_merge_public_comments():
|
||||
blocklists = load_test_blocklist_data([datafile01, datafile02, datafile03])
|
||||
|
||||
bl = merge_blocklists(blocklists, 'min')
|
||||
assert len(bl) == 13
|
||||
|
||||
assert bl['diff-comment.example.org'].public_comment == 'Suspend public comment, Silence public comment, Noop public comment'
|
||||
|
||||
def test_merge_duplicate_comments():
|
||||
"""The same comment on multiple sources shouldn't get added
|
||||
"""
|
||||
blocklists = load_test_blocklist_data([datafile01, datafile02, datafile03])
|
||||
|
||||
bl = merge_blocklists(blocklists, 'min')
|
||||
assert len(bl) == 13
|
||||
|
||||
# Nope, this breaks. Need to rethink duplicate comment merge.
|
||||
# assert bl['2diff-comment.example.org'].public_comment == 'Suspend comment 1, Public duplicate'
|
||||
|
|
@ -0,0 +1,77 @@
|
|||
"""Tests of the CSV parsing
|
||||
"""
|
||||
|
||||
from fediblockhole.blocklist_parser import BlocklistParserCSV, parse_blocklist
|
||||
from fediblockhole.const import DomainBlock, BlockSeverity, SeverityLevel
|
||||
|
||||
|
||||
def test_single_line():
|
||||
csvdata = "example.org"
|
||||
|
||||
parser = BlocklistParserCSV()
|
||||
bl = parser.parse_blocklist(csvdata)
|
||||
assert len(bl) == 0
|
||||
|
||||
def test_header_only():
|
||||
csvdata = "domain,severity,public_comment"
|
||||
|
||||
parser = BlocklistParserCSV()
|
||||
bl = parser.parse_blocklist(csvdata)
|
||||
assert len(bl) == 0
|
||||
|
||||
def test_2_blocks():
|
||||
csvdata = """domain,severity
|
||||
example.org,silence
|
||||
example2.org,suspend
|
||||
"""
|
||||
|
||||
parser = BlocklistParserCSV()
|
||||
bl = parser.parse_blocklist(csvdata)
|
||||
|
||||
assert len(bl) == 2
|
||||
assert bl[0].domain == 'example.org'
|
||||
|
||||
def test_4_blocks():
|
||||
csvdata = """domain,severity,public_comment
|
||||
example.org,silence,"test 1"
|
||||
example2.org,suspend,"test 2"
|
||||
example3.org,noop,"test 3"
|
||||
example4.org,suspend,"test 4"
|
||||
"""
|
||||
|
||||
parser = BlocklistParserCSV()
|
||||
bl = parser.parse_blocklist(csvdata)
|
||||
|
||||
assert len(bl) == 4
|
||||
assert bl[0].domain == 'example.org'
|
||||
assert bl[1].domain == 'example2.org'
|
||||
assert bl[2].domain == 'example3.org'
|
||||
assert bl[3].domain == 'example4.org'
|
||||
|
||||
assert bl[0].severity.level == SeverityLevel.SILENCE
|
||||
assert bl[1].severity.level == SeverityLevel.SUSPEND
|
||||
assert bl[2].severity.level == SeverityLevel.NONE
|
||||
assert bl[3].severity.level == SeverityLevel.SUSPEND
|
||||
|
||||
def test_ignore_comments():
|
||||
csvdata = """domain,severity,public_comment,private_comment
|
||||
example.org,silence,"test 1","ignore me"
|
||||
example2.org,suspend,"test 2","ignote me also"
|
||||
example3.org,noop,"test 3","and me"
|
||||
example4.org,suspend,"test 4","also me"
|
||||
"""
|
||||
|
||||
parser = BlocklistParserCSV()
|
||||
bl = parser.parse_blocklist(csvdata)
|
||||
|
||||
assert len(bl) == 4
|
||||
assert bl[0].domain == 'example.org'
|
||||
assert bl[1].domain == 'example2.org'
|
||||
assert bl[2].domain == 'example3.org'
|
||||
assert bl[3].domain == 'example4.org'
|
||||
|
||||
assert bl[0].public_comment == ''
|
||||
assert bl[0].private_comment == ''
|
||||
|
||||
assert bl[2].public_comment == ''
|
||||
assert bl[2].private_comment == ''
|
|
@ -0,0 +1,46 @@
|
|||
"""Tests of the CSV parsing
|
||||
"""
|
||||
|
||||
from fediblockhole.blocklist_parser import BlocklistParserJSON, parse_blocklist
|
||||
from fediblockhole.const import DomainBlock, BlockSeverity, SeverityLevel
|
||||
|
||||
datafile = 'data-mastodon.json'
|
||||
|
||||
def load_data():
|
||||
with open(datafile) as fp:
|
||||
return fp.read()
|
||||
|
||||
def test_json_parser():
|
||||
|
||||
data = load_data()
|
||||
parser = BlocklistParserJSON()
|
||||
bl = parser.parse_blocklist(data)
|
||||
|
||||
assert len(bl) == 10
|
||||
assert bl[0].domain == 'example.org'
|
||||
assert bl[1].domain == 'example2.org'
|
||||
assert bl[2].domain == 'example3.org'
|
||||
assert bl[3].domain == 'example4.org'
|
||||
|
||||
assert bl[0].severity.level == SeverityLevel.SUSPEND
|
||||
assert bl[1].severity.level == SeverityLevel.SILENCE
|
||||
assert bl[2].severity.level == SeverityLevel.SUSPEND
|
||||
assert bl[3].severity.level == SeverityLevel.NONE
|
||||
|
||||
def test_ignore_comments():
|
||||
|
||||
data = load_data()
|
||||
parser = BlocklistParserJSON()
|
||||
bl = parser.parse_blocklist(data)
|
||||
|
||||
assert len(bl) == 10
|
||||
assert bl[0].domain == 'example.org'
|
||||
assert bl[1].domain == 'example2.org'
|
||||
assert bl[2].domain == 'example3.org'
|
||||
assert bl[3].domain == 'example4.org'
|
||||
|
||||
assert bl[0].public_comment == ''
|
||||
assert bl[0].private_comment == ''
|
||||
|
||||
assert bl[2].public_comment == ''
|
||||
assert bl[2].private_comment == ''
|
|
@ -0,0 +1,23 @@
|
|||
"""Tests of the Rapidblock CSV parsing
|
||||
"""
|
||||
|
||||
from fediblockhole.blocklist_parser import RapidBlockParserCSV, parse_blocklist
|
||||
from fediblockhole.const import DomainBlock, BlockSeverity, SeverityLevel
|
||||
|
||||
csvdata = """example.org\r\nsubdomain.example.org\r\nanotherdomain.org\r\ndomain4.org\r\n"""
|
||||
parser = RapidBlockParserCSV()
|
||||
|
||||
def test_basic_rapidblock():
|
||||
|
||||
bl = parser.parse_blocklist(csvdata)
|
||||
assert len(bl) == 4
|
||||
assert bl[0].domain == 'example.org'
|
||||
assert bl[1].domain == 'subdomain.example.org'
|
||||
assert bl[2].domain == 'anotherdomain.org'
|
||||
assert bl[3].domain == 'domain4.org'
|
||||
|
||||
def test_severity_is_suspend():
|
||||
bl = parser.parse_blocklist(csvdata)
|
||||
|
||||
for block in bl:
|
||||
assert block.severity.level == SeverityLevel.SUSPEND
|
|
@ -0,0 +1,34 @@
|
|||
"""Test parsing the RapidBlock JSON format
|
||||
"""
|
||||
from fediblockhole.blocklist_parser import parse_blocklist
|
||||
|
||||
from fediblockhole.const import SeverityLevel
|
||||
|
||||
rapidblockjson = "data-rapidblock.json"
|
||||
|
||||
def test_parse_rapidblock_json():
|
||||
with open(rapidblockjson) as fp:
|
||||
data = fp.read()
|
||||
bl = parse_blocklist(data, 'rapidblock.json')
|
||||
|
||||
assert bl[0].domain == '101010.pl'
|
||||
assert bl[0].severity.level == SeverityLevel.SUSPEND
|
||||
assert bl[0].public_comment == ''
|
||||
|
||||
assert bl[10].domain == 'berserker.town'
|
||||
assert bl[10].severity.level == SeverityLevel.SUSPEND
|
||||
assert bl[10].public_comment == ''
|
||||
assert bl[10].private_comment == ''
|
||||
|
||||
def test_parse_with_comments():
|
||||
with open(rapidblockjson) as fp:
|
||||
data = fp.read()
|
||||
bl = parse_blocklist(data, 'rapidblock.json', ['domain', 'severity', 'public_comment', 'private_comment'])
|
||||
|
||||
assert bl[0].domain == '101010.pl'
|
||||
assert bl[0].severity.level == SeverityLevel.SUSPEND
|
||||
assert bl[0].public_comment == 'cryptomining javascript, white supremacy'
|
||||
|
||||
assert bl[10].domain == 'berserker.town'
|
||||
assert bl[10].severity.level == SeverityLevel.SUSPEND
|
||||
assert bl[10].public_comment == 'freeze peach'
|
Loading…
Reference in New Issue