Refactored to add a DomainBlock object.

Refactored to use a BlockParser structure.
Added ability to limit max severity per-URL source.
Improved method for checking if changes are needed.
This commit is contained in:
Justin Warren 2023-01-12 07:02:48 +11:00
parent ea5e7d01d9
commit 10011a5ffb
No known key found for this signature in database
17 changed files with 2838 additions and 128 deletions

View File

@ -34,3 +34,8 @@ fediblock-sync = "fediblockhole:main"
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"
[tool.pytest.ini_options]
addopts = [
"--import-mode=importlib",
]

View File

@ -1,2 +1,3 @@
requests
toml
toml
pytest

View File

@ -11,20 +11,20 @@ import os.path
import sys
import urllib.request as urlr
from .blocklist_parser import parse_blocklist
from .const import DomainBlock, BlockSeverity
from importlib.metadata import version
__version__ = version('fediblockhole')
import logging
logging.basicConfig(level=logging.INFO,
format='%(asctime)s %(levelname)s %(message)s')
log = logging.getLogger('fediblockhole')
# Max size of a URL-fetched blocklist
URL_BLOCKLIST_MAXSIZE = 1024 ** 3
log = logging.getLogger('fediblock_sync')
CONFIGFILE = "/home/mastodon/etc/admin.conf"
# The relative severity levels of blocks
SEVERITY = {
'noop': 0,
@ -72,25 +72,15 @@ def sync_blocklists(conf: dict):
if not conf.no_fetch_url:
log.info("Fetching domain blocks from URLs...")
for listurl in conf.blocklist_url_sources:
blocklists[listurl] = []
with urlr.urlopen(listurl) as fp:
url = listurl['url']
max_severity = listurl.get('max_severity', 'suspend')
listformat = listurl.get('format', 'csv')
with urlr.urlopen(url) as fp:
rawdata = fp.read(URL_BLOCKLIST_MAXSIZE).decode('utf-8')
reader = csv.DictReader(rawdata.split('\n'))
for row in reader:
# Coerce booleans from string to Python bool
for boolkey in ['reject_media', 'reject_reports', 'obfuscate']:
if boolkey in row:
row[boolkey] = str2bool(row[boolkey])
# Remove fields we don't want to import
origrow = row.copy()
for key in origrow:
if key not in import_fields:
del row[key]
blocklists[listurl].append(row)
blocklists[url] = parse_blocklist(rawdata, listformat, import_fields, max_severity)
if conf.save_intermediate:
save_intermediate_blocklist(blocklists[listurl], listurl, conf.savedir, export_fields)
save_intermediate_blocklist(blocklists[url], url, conf.savedir, export_fields)
# Fetch blocklists from remote instances
if not conf.no_fetch_instance:
@ -115,7 +105,7 @@ def sync_blocklists(conf: dict):
for dest in conf.blocklist_instance_destinations:
domain = dest['domain']
token = dest['token']
max_followed_severity = dest.get('max_followed_severity', 'silence')
max_followed_severity = BlockSeverity(dest.get('max_followed_severity', 'silence'))
push_blocklist(token, domain, merged.values(), conf.dryrun, import_fields, max_followed_severity)
def merge_blocklists(blocklists: dict, mergeplan: str='max') -> dict:
@ -130,7 +120,7 @@ def merge_blocklists(blocklists: dict, mergeplan: str='max') -> dict:
for key, blist in blocklists.items():
log.debug(f"processing blocklist from: {key} ...")
for newblock in blist:
domain = newblock['domain']
domain = newblock.domain
# If the domain has two asterisks in it, it's obfuscated
# and we can't really use it, so skip it and do the next one
if '*' in domain:
@ -151,7 +141,7 @@ def merge_blocklists(blocklists: dict, mergeplan: str='max') -> dict:
# end for
return merged
def apply_mergeplan(oldblock: dict, newblock: dict, mergeplan: str='max') -> dict:
def apply_mergeplan(oldblock: DomainBlock, newblock: DomainBlock, mergeplan: str='max') -> dict:
"""Use a mergeplan to decide how to merge two overlapping block definitions
@param oldblock: The existing block definition.
@ -159,7 +149,7 @@ def apply_mergeplan(oldblock: dict, newblock: dict, mergeplan: str='max') -> dic
@param mergeplan: How to merge. Choices are 'max', the default, and 'min'.
"""
# Default to the existing block definition
blockdata = oldblock.copy()
blockdata = oldblock._asdict()
# If the public or private comment is different,
# append it to the existing comment, joined with ', '
@ -167,10 +157,10 @@ def apply_mergeplan(oldblock: dict, newblock: dict, mergeplan: str='max') -> dic
keylist = ['public_comment', 'private_comment']
for key in keylist:
try:
if oldblock[key] not in ['', None] and newblock[key] not in ['', None] and oldblock[key] != newblock[key]:
log.debug(f"old comment: '{oldblock[key]}'")
log.debug(f"new comment: '{newblock[key]}'")
blockdata[key] = ', '.join([oldblock[key], newblock[key]])
if getattr(oldblock, key) not in ['', None] and getattr(newblock, key) not in ['', None] and getattr(oldblock, key) != getattr(newblock, key):
log.debug(f"old comment: '{getattr(oldblock, key)}'")
log.debug(f"new comment: '{getattr(newblock, key)}'")
blockdata[key] = ', '.join([getattr(oldblock, key), getattr(newblock, key)])
except KeyError:
log.debug(f"Key '{key}' missing from block definition so cannot compare. Continuing...")
continue
@ -180,25 +170,25 @@ def apply_mergeplan(oldblock: dict, newblock: dict, mergeplan: str='max') -> dic
# Use the highest block level found (the default)
log.debug(f"Using 'max' mergeplan.")
if SEVERITY[newblock['severity']] > SEVERITY[oldblock['severity']]:
if newblock.severity > oldblock.severity:
log.debug(f"New block severity is higher. Using that.")
blockdata['severity'] = newblock['severity']
blockdata['severity'] = newblock.severity
# If obfuscate is set and is True for the domain in
# any blocklist then obfuscate is set to True.
if newblock.get('obfuscate', False):
if getattr(newblock, 'obfuscate', False):
blockdata['obfuscate'] = True
elif mergeplan in ['min']:
# Use the lowest block level found
log.debug(f"Using 'min' mergeplan.")
if SEVERITY[newblock['severity']] < SEVERITY[oldblock['severity']]:
blockdata['severity'] = newblock['severity']
if newblock.severity < oldblock.severity:
blockdata['severity'] = newblock.severity
# If obfuscate is set and is False for the domain in
# any blocklist then obfuscate is set to False.
if not newblock.get('obfuscate', True):
if not getattr(newblock, 'obfuscate', True):
blockdata['obfuscate'] = False
else:
@ -206,7 +196,7 @@ def apply_mergeplan(oldblock: dict, newblock: dict, mergeplan: str='max') -> dic
log.debug(f"Block severity set to {blockdata['severity']}")
return blockdata
return DomainBlock(**blockdata)
def requests_headers(token: str=None):
"""Set common headers for requests"""
@ -219,7 +209,7 @@ def requests_headers(token: str=None):
return headers
def fetch_instance_blocklist(host: str, token: str=None, admin: bool=False,
import_fields: list=['domain', 'severity']) -> list:
import_fields: list=['domain', 'severity']) -> list[DomainBlock]:
"""Fetch existing block list from server
@param host: The remote host to connect to.
@ -239,7 +229,7 @@ def fetch_instance_blocklist(host: str, token: str=None, admin: bool=False,
url = f"https://{host}{api_path}"
domain_blocks = []
blocklist = []
link = True
while link:
@ -248,7 +238,7 @@ def fetch_instance_blocklist(host: str, token: str=None, admin: bool=False,
log.error(f"Cannot fetch remote blocklist: {response.content}")
raise ValueError("Unable to fetch domain block list: %s", response)
domain_blocks.extend(json.loads(response.content))
blocklist.extend( parse_blocklist(response.content, 'json', import_fields) )
# Parse the link header to find the next url to fetch
# This is a weird and janky way of doing pagination but
@ -262,20 +252,12 @@ def fetch_instance_blocklist(host: str, token: str=None, admin: bool=False,
break
else:
next = pagination[0]
prev = pagination[1]
# prev = pagination[1]
urlstring, rel = next.split('; ')
url = urlstring.strip('<').rstrip('>')
log.debug(f"Found {len(domain_blocks)} existing domain blocks.")
# Remove fields not in import list.
for row in domain_blocks:
origrow = row.copy()
for key in origrow:
if key not in import_fields:
del row[key]
return domain_blocks
return blocklist
def delete_block(token: str, host: str, id: int):
"""Remove a domain block"""
@ -334,40 +316,26 @@ def fetch_instance_follows(token: str, host: str, domain: str) -> int:
return follows
def check_followed_severity(host: str, token: str, domain: str,
severity: str, max_followed_severity: str='silence'):
severity: BlockSeverity,
max_followed_severity: BlockSeverity=BlockSeverity('silence')):
"""Check an instance to see if it has followers of a to-be-blocked instance"""
# Return straight away if we're not increasing the severity
if severity <= max_followed_severity:
return severity
# If the instance has accounts that follow people on the to-be-blocked domain,
# limit the maximum severity to the configured `max_followed_severity`.
follows = fetch_instance_follows(token, host, domain)
if follows > 0:
log.debug(f"Instance {host} has {follows} followers of accounts at {domain}.")
if SEVERITY[severity] > SEVERITY[max_followed_severity]:
if severity > max_followed_severity:
log.warning(f"Instance {host} has {follows} followers of accounts at {domain}. Limiting block severity to {max_followed_severity}.")
return max_followed_severity
else:
return severity
return severity
def is_change_needed(oldblock: dict, newblock: dict, import_fields: list):
"""Compare block definitions to see if changes are needed"""
# Check if anything is actually different and needs updating
change_needed = []
for key in import_fields:
try:
oldval = oldblock[key]
newval = newblock[key]
log.debug(f"Compare {key} '{oldval}' <> '{newval}'")
if oldval != newval:
log.debug("Difference detected. Change needed.")
change_needed.append(key)
break
except KeyError:
log.debug(f"Key '{key}' missing from block definition so cannot compare. Continuing...")
continue
change_needed = oldblock.compare_fields(newblock, import_fields)
return change_needed
def update_known_block(token: str, host: str, blockdict: dict):
@ -392,17 +360,17 @@ def update_known_block(token: str, host: str, blockdict: dict):
if response.status_code != 200:
raise ValueError(f"Something went wrong: {response.status_code}: {response.content}")
def add_block(token: str, host: str, blockdata: dict):
def add_block(token: str, host: str, blockdata: DomainBlock):
"""Block a domain on Mastodon host
"""
log.debug(f"Blocking domain {blockdata['domain']} at {host}...")
log.debug(f"Blocking domain {blockdata.domain} at {host}...")
api_path = "/api/v1/admin/domain_blocks"
url = f"https://{host}{api_path}"
response = requests.post(url,
headers=requests_headers(token),
data=blockdata,
data=blockdata._asdict(),
timeout=REQUEST_TIMEOUT
)
if response.status_code == 422:
@ -417,7 +385,7 @@ def add_block(token: str, host: str, blockdata: dict):
def push_blocklist(token: str, host: str, blocklist: list[dict],
dryrun: bool=False,
import_fields: list=['domain', 'severity'],
max_followed_severity='silence',
max_followed_severity:BlockSeverity=BlockSeverity('silence'),
):
"""Push a blocklist to a remote instance.
@ -437,36 +405,41 @@ def push_blocklist(token: str, host: str, blocklist: list[dict],
serverblocks = fetch_instance_blocklist(host, token, True, import_fields)
# # Convert serverblocks to a dictionary keyed by domain name
knownblocks = {row['domain']: row for row in serverblocks}
knownblocks = {row.domain: row for row in serverblocks}
for newblock in blocklist:
log.debug(f"Applying newblock: {newblock}")
oldblock = knownblocks.get(newblock['domain'], None)
log.debug(f"Processing block: {newblock}")
oldblock = knownblocks.get(newblock.domain, None)
if oldblock:
log.debug(f"Block already exists for {newblock['domain']}, checking for differences...")
log.debug(f"Block already exists for {newblock.domain}, checking for differences...")
change_needed = is_change_needed(oldblock, newblock, import_fields)
if change_needed:
# Change might be needed, but let's see if the severity
# needs to change. If not, maybe no changes are needed?
newseverity = check_followed_severity(host, token, oldblock['domain'], newblock['severity'], max_followed_severity)
if newseverity != oldblock['severity']:
newblock['severity'] = newseverity
change_needed.append('severity')
# Change still needed?
if change_needed:
log.info(f"Change detected. Updating domain block for {oldblock['domain']}")
blockdata = oldblock.copy()
blockdata.update(newblock)
if not dryrun:
update_known_block(token, host, blockdata)
# add a pause here so we don't melt the instance
time.sleep(1)
else:
log.info("Dry run selected. Not applying changes.")
# Is the severity changing?
if 'severity' in change_needed:
# Confirm if we really want to change the severity
# If we still have followers of the remote domain, we may not
# want to go all the way to full suspend, depending on the configuration
newseverity = check_followed_severity(host, token, oldblock.domain, newblock.severity, max_followed_severity)
if newseverity != oldblock.severity:
newblock.severity = newseverity
else:
log.info("Keeping severity of block the same to avoid disrupting followers.")
change_needed.remove('severity')
if change_needed:
log.info(f"Change detected. Need to update {change_needed} for domain block for {oldblock.domain}")
log.info(f"Old block definition: {oldblock}")
log.info(f"Pushing new block definition: {newblock}")
blockdata = oldblock.copy()
blockdata.update(newblock)
if not dryrun:
update_known_block(token, host, blockdata)
# add a pause here so we don't melt the instance
time.sleep(1)
else:
log.info("Dry run selected. Not applying changes.")
else:
log.debug("No differences detected. Not updating.")
@ -475,22 +448,22 @@ def push_blocklist(token: str, host: str, blocklist: list[dict],
else:
# This is a new block for the target instance, so we
# need to add a block rather than update an existing one
blockdata = {
'domain': newblock['domain'],
# Default to Silence if nothing is specified
'severity': newblock.get('severity', 'silence'),
'public_comment': newblock.get('public_comment', ''),
'private_comment': newblock.get('private_comment', ''),
'reject_media': newblock.get('reject_media', False),
'reject_reports': newblock.get('reject_reports', False),
'obfuscate': newblock.get('obfuscate', False),
}
# blockdata = {
# 'domain': newblock.domain,
# # Default to Silence if nothing is specified
# 'severity': newblock.get('severity', 'silence'),
# 'public_comment': newblock.get('public_comment', ''),
# 'private_comment': newblock.get('private_comment', ''),
# 'reject_media': newblock.get('reject_media', False),
# 'reject_reports': newblock.get('reject_reports', False),
# 'obfuscate': newblock.get('obfuscate', False),
# }
# Make sure the new block doesn't clobber a domain with followers
blockdata['severity'] = check_followed_severity(host, token, newblock['domain'], max_followed_severity)
log.info(f"Adding new block for {blockdata['domain']}...")
newblock.severity = check_followed_severity(host, token, newblock.domain, newblock.severity, max_followed_severity)
log.info(f"Adding new block: {newblock}...")
if not dryrun:
add_block(token, host, blockdata)
add_block(token, host, newblock)
# add a pause here so we don't melt the instance
time.sleep(1)
else:
@ -520,7 +493,7 @@ def save_intermediate_blocklist(
save_blocklist_to_file(blocklist, filepath, export_fields)
def save_blocklist_to_file(
blocklist: list[dict],
blocklist: list[DomainBlock],
filepath: str,
export_fields: list=['domain','severity']):
"""Save a blocklist we've downloaded from a remote source
@ -530,9 +503,9 @@ def save_blocklist_to_file(
@param export_fields: Which fields to include in the export.
"""
try:
blocklist = sorted(blocklist, key=lambda x: x['domain'])
blocklist = sorted(blocklist, key=lambda x: x.domain)
except KeyError:
log.error("Field 'domain' not found in blocklist. Are you sure the URLs are correct?")
log.error("Field 'domain' not found in blocklist.")
log.debug(f"blocklist is: {blocklist}")
log.debug(f"export fields: {export_fields}")
@ -540,7 +513,8 @@ def save_blocklist_to_file(
with open(filepath, "w") as fp:
writer = csv.DictWriter(fp, export_fields, extrasaction='ignore')
writer.writeheader()
writer.writerows(blocklist)
for item in blocklist:
writer.writerow(item._asdict())
def augment_args(args):
"""Augment commandline arguments with config file parameters"""
@ -576,17 +550,6 @@ def augment_args(args):
return args
def str2bool(boolstring: str) -> bool:
"""Helper function to convert boolean strings to actual Python bools
"""
boolstring = boolstring.lower()
if boolstring in ['true', 't', '1', 'y', 'yes']:
return True
elif boolstring in ['false', 'f', '0', 'n', 'no']:
return False
else:
raise ValueError(f"Cannot parse value '{boolstring}' as boolean")
def main():
ap = argparse.ArgumentParser(

View File

@ -0,0 +1,186 @@
"""Parse various blocklist data formats
"""
from typing import Iterable
from .const import DomainBlock, BlockSeverity
import csv
import json
import logging
log = logging.getLogger('fediblockhole')
class BlocklistParser(object):
"""
Base class for parsing blocklists
"""
preparse = False
def __init__(self, import_fields: list=['domain', 'severity'],
max_severity: str='suspend'):
"""Create a Parser
@param import_fields: an optional list of fields to limit the parser to.
Ignore any fields in a block item that aren't in import_fields.
"""
self.import_fields = import_fields
self.max_severity = BlockSeverity(max_severity)
def preparse(self, blockdata) -> Iterable:
"""Some raw datatypes need to be converted into an iterable
"""
raise NotImplementedError
def parse_blocklist(self, blockdata) -> dict[DomainBlock]:
"""Parse an iterable of blocklist items
@param blocklist: An Iterable of blocklist items
@returns: A dict of DomainBlocks, keyed by domain
"""
if self.preparse:
blockdata = self.preparse(blockdata)
parsed_list = []
for blockitem in blockdata:
parsed_list.append(self.parse_item(blockitem))
return parsed_list
def parse_item(self, blockitem) -> DomainBlock:
"""Parse an individual block item
@param blockitem: an individual block to be parsed
@param import_fields: fields of a block we will import
"""
raise NotImplementedError
class BlocklistParserJSON(BlocklistParser):
"""Parse a JSON formatted blocklist"""
preparse = True
def preparse(self, blockdata) -> Iterable:
"""Parse the blockdata as JSON
"""
return json.loads(blockdata)
def parse_item(self, blockitem: str) -> DomainBlock:
# Remove fields we don't want to import
origitem = blockitem.copy()
for key in origitem:
if key not in self.import_fields:
del blockitem[key]
# Convert dict to NamedTuple with the double-star operator
# See: https://docs.python.org/3/tutorial/controlflow.html#tut-unpacking-arguments
block = DomainBlock(**blockitem)
if block.severity > self.max_severity:
block.severity = self.max_severity
return block
class BlocklistParserCSV(BlocklistParser):
""" Parse CSV formatted blocklists
The parser expects the CSV data to include a header with the field names.
"""
preparse = True
def preparse(self, blockdata) -> Iterable:
"""Use a csv.DictReader to create an iterable from the blockdata
"""
return csv.DictReader(blockdata.split('\n'))
def parse_item(self, blockitem: dict) -> DomainBlock:
# Coerce booleans from string to Python bool
# FIXME: Is this still necessary with the DomainBlock object?
for boolkey in ['reject_media', 'reject_reports', 'obfuscate']:
if boolkey in blockitem:
blockitem[boolkey] = str2bool(blockitem[boolkey])
# Remove fields we don't want to import
origitem = blockitem.copy()
for key in origitem:
if key not in self.import_fields:
del blockitem[key]
# Convert dict to NamedTuple with the double-star operator
# See: https://docs.python.org/3/tutorial/controlflow.html#tut-unpacking-arguments
block = DomainBlock(**blockitem)
if block.severity > self.max_severity:
block.severity = self.max_severity
return block
class RapidBlockParserCSV(BlocklistParserCSV):
""" Parse RapidBlock CSV blocklists
RapidBlock CSV blocklists are just a newline separated list of domains.
"""
def preparse(self, blockdata) -> Iterable:
"""Prepend a 'domain' field header to the data
"""
log.debug(f"blockdata: {blockdata[:100]}")
blockdata = ''.join(["domain\r\n", blockdata])
return csv.DictReader(blockdata.split('\r\n'))
class RapidBlockParserJSON(BlocklistParserJSON):
"""Parse RapidBlock JSON formatted blocklists
"""
def preparse(self, blockdata) -> Iterable:
rb_dict = json.loads(blockdata)
# We want to iterate over all the dictionary items
return rb_dict['blocks'].items()
def parse_item(self, blockitem: tuple) -> DomainBlock:
"""Parse an individual item in a RapidBlock list
"""
# Each item is a tuple of:
# (domain, {dictionary of attributes})
domain = blockitem[0]
# RapidBlock has a binary block level which we map
# to 'suspend' if True, and 'noop' if False.
isblocked = blockitem[1]['isBlocked']
if isblocked:
severity = 'suspend'
else:
severity = 'noop'
if 'public_comment' in self.import_fields:
public_comment = blockitem[1]['reason']
else:
public_comment = ''
# There's a 'tags' field as well, but we can't
# do much with that in Mastodon yet
block = DomainBlock(domain, severity, public_comment)
if block.severity > self.max_severity:
block.severity = self.max_severity
return block
def str2bool(boolstring: str) -> bool:
"""Helper function to convert boolean strings to actual Python bools
"""
boolstring = boolstring.lower()
if boolstring in ['true', 't', '1', 'y', 'yes']:
return True
elif boolstring in ['false', 'f', '0', 'n', 'no']:
return False
else:
raise ValueError(f"Cannot parse value '{boolstring}' as boolean")
FORMAT_PARSERS = {
'csv': BlocklistParserCSV,
'json': BlocklistParserJSON,
'rapidblock.csv': RapidBlockParserCSV,
'rapidblock.json': RapidBlockParserJSON,
}
# helper function to select the appropriate Parser
def parse_blocklist(
blockdata,
format="csv",
import_fields: list=['domain', 'severity'],
max_severity: str='suspend'):
"""Parse a blocklist in the given format
"""
parser = FORMAT_PARSERS[format](import_fields, max_severity)
return parser.parse_blocklist(blockdata)

220
src/fediblockhole/const.py Normal file
View File

@ -0,0 +1,220 @@
""" Constant objects used by FediBlockHole
"""
import enum
from typing import NamedTuple, Optional, TypedDict
from dataclasses import dataclass
import logging
log = logging.getLogger('fediblockhole')
class SeverityLevel(enum.IntEnum):
"""How severe should a block be? Higher is more severe.
"""
NONE = enum.auto()
SILENCE = enum.auto()
SUSPEND = enum.auto()
class BlockSeverity(object):
"""A representation of a block severity
We add some helpful functions rather than using a bare IntEnum
"""
def __init__(self, severity:str=None):
self._level = self.str2level(severity)
@property
def level(self):
return self._level
@level.setter
def level(self, value):
if isinstance(value, SeverityLevel):
self._level = value
elif type(value) == type(''):
self._level = self.str2level(value)
else:
raise ValueError(f"Invalid level value '{value}'")
def str2level(self, severity:str=None):
"""Convert a string severity level to an internal enum"""
if severity in [None, '', 'noop']:
return SeverityLevel.NONE
elif severity in ['silence']:
return SeverityLevel.SILENCE
elif severity in ['suspend']:
return SeverityLevel.SUSPEND
else:
raise ValueError(f"Invalid severity value '{severity}'")
def __repr__(self):
return f"'{str(self)}'"
def __str__(self):
"""A string version of the severity level
"""
levelmap = {
SeverityLevel.NONE: 'noop',
SeverityLevel.SILENCE: 'silence',
SeverityLevel.SUSPEND: 'suspend',
}
return levelmap[self.level]
def __lt__(self, other):
if self._level < other._level:
return True
def __gt__(self, other):
if self._level > other._level:
return True
def __eq__(self, other):
if other is not None and self._level == other._level:
return True
def __le__(self, other):
if self._level <= other._level:
return True
def __ge__(self, other):
if self._level >= other._level:
return True
# class _DomainBlock(NamedTuple):
# domain: str # FIXME: Use an actual Domain object from somewhere?
# severity: BlockSeverity = BlockSeverity.SUSPEND
# public_comment: str = ''
# private_comment: str = ''
# reject_media: bool = False
# reject_reports: bool = False
# obfuscate: bool = False
class DomainBlock(object):
fields = [
'domain',
'severity',
'public_comment',
'private_comment',
'reject_media',
'reject_reports',
'obfuscate',
]
all_fields = [
'domain',
'severity',
'public_comment',
'private_comment',
'reject_media',
'reject_reports',
'obfuscate',
'id'
]
def __init__(self, domain:str,
severity: BlockSeverity=BlockSeverity('suspend'),
public_comment: str="",
private_comment: str="",
reject_media: bool=False,
reject_reports: bool=False,
obfuscate: bool=False,
id: int=None):
"""Initialize the DomainBlock
"""
self.domain = domain
self.public_comment = public_comment
self.private_comment = private_comment
self.reject_media = reject_media
self.reject_reports = reject_reports
self.obfuscate = obfuscate
self.id = id
if isinstance(severity, BlockSeverity):
self.severity = severity
else:
self.severity = BlockSeverity(severity)
def _asdict(self):
"""Return a dict version of this object
"""
dictval = {
'domain': self.domain,
'severity': self.severity,
'public_comment': self.public_comment,
'private_comment': self.private_comment,
'reject_media': self.reject_media,
'reject_reports': self.reject_reports,
'obfuscate': self.obfuscate,
}
if self.id:
dictval['id'] = self.id
return dictval
def compare_fields(self, other, fields=None)->list:
"""Compare two DomainBlocks on specific fields.
If all the fields are equal, the DomainBlocks are equal.
@returns: a list of the fields that are different
"""
if not isinstance(other, DomainBlock):
raise ValueError(f"Cannot compare DomainBlock to {type(other)}:{other}")
if fields is None:
fields = self.fields
diffs = []
# Check if all the fields are equal
for field in self.fields:
a = getattr(self, field)
b = getattr(other, field)
# log.debug(f"Comparing field {field}: '{a}' <> '{b}'")
if getattr(self, field) != getattr(other, field):
diffs.append(field)
return diffs
def __eq__(self, other):
diffs = self.compare_fields(other)
if len(diffs) == 0:
return True
def __repr__(self):
return f"<DomainBlock {self._asdict()}>"
def copy(self):
"""Make a copy of this object and return it
"""
retval = DomainBlock(**self._asdict())
return retval
def update(self, dict):
"""Update my kwargs
"""
for key in dict:
setattr(self, key, dict[key])
def __iter__(self):
"""Be iterable"""
keys = self.fields
if self.id:
keys.append('id')
for k in keys:
yield k
def __getitem__(self, k, default=None):
"Behave like a dict for getting values"
if k not in self.all_fields:
raise KeyError(f"Invalid key '{k}'")
return getattr(self, k, default)
def get(self, k, default=None):
return self.__getitem__(k, default)

112
tests/data-mastodon.json Normal file
View File

@ -0,0 +1,112 @@
[
{
"id": "234",
"domain": "example.org",
"created_at": "2023-01-09T05:17:50.614Z",
"severity": "suspend",
"reject_media": true,
"reject_reports": true,
"private_comment": "A private comment",
"public_comment": "A public comment",
"obfuscate": true
},
{
"id": "233",
"domain": "example2.org",
"created_at": "2023-01-09T05:09:01.859Z",
"severity": "silence",
"reject_media": true,
"reject_reports": true,
"private_comment": "Another private comment",
"public_comment": "Another public comment",
"obfuscate": true
},
{
"id": "232",
"domain": "example3.org",
"created_at": "2023-01-09T05:08:58.833Z",
"severity": "suspend",
"reject_media": true,
"reject_reports": true,
"private_comment": "More comments? What is this?",
"public_comment": "Yes we love to comment",
"obfuscate": true
},
{
"id": "231",
"domain": "example4.org",
"created_at": "2023-01-09T05:04:01.856Z",
"severity": "noop",
"reject_media": true,
"reject_reports": true,
"private_comment": "I cannot believe all the comments",
"public_comment": "Look how many comments we can fit in here",
"obfuscate": true
},
{
"id": "230",
"domain": "example5.org",
"created_at": "2023-01-08T21:37:22.665Z",
"severity": "suspend",
"reject_media": false,
"reject_reports": false,
"private_comment": "",
"public_comment": "lack of moderation",
"obfuscate": false
},
{
"id": "2308",
"domain": "example6.org",
"created_at": "2023-01-06T08:36:53.989Z",
"severity": "suspend",
"reject_media": false,
"reject_reports": false,
"private_comment": "",
"public_comment": "anti-trans bigotry",
"obfuscate": false
},
{
"id": "2306",
"domain": "example7.org",
"created_at": "2023-01-04T08:14:05.381Z",
"severity": "suspend",
"reject_media": false,
"reject_reports": false,
"private_comment": "",
"public_comment": "lack of moderation",
"obfuscate": false
},
{
"id": "2305",
"domain": "example8.org",
"created_at": "2023-01-04T08:13:48.891Z",
"severity": "suspend",
"reject_media": false,
"reject_reports": false,
"private_comment": "freeze peach",
"public_comment": "lack of moderation, conspiracy weirdness",
"obfuscate": false
},
{
"id": "2301",
"domain": "example9.org",
"created_at": "2023-01-04T08:11:32.904Z",
"severity": "silence",
"reject_media": false,
"reject_reports": false,
"private_comment": "",
"public_comment": "alt-right conspiracies",
"obfuscate": false
},
{
"id": "453",
"domain": "example15.org",
"created_at": "2022-12-05T08:26:59.920Z",
"severity": "suspend",
"reject_media": true,
"reject_reports": true,
"private_comment": "cryptocurrency",
"public_comment": "cryptocurrency",
"obfuscate": true
}
]

14
tests/data-noop-01.csv Normal file
View File

@ -0,0 +1,14 @@
"domain","severity","public_comment","private_comment","reject_media","reject_reports","obfuscate"
"public-comment.example.org","noop","This is a public comment","This is a private comment",TRUE,TRUE,TRUE
"private-comment.example.org","noop",,"This is a private comment",TRUE,TRUE,TRUE
"diff-comment.example.org","noop","Noop public comment","Noop private comment",TRUE,TRUE,TRUE
"2diff-comment.example.org","noop","Public duplicate","Private duplicate",TRUE,TRUE,TRUE
"qoto.org","noop",,,TRUE,TRUE,TRUE
"sealion.club","noop",,,TRUE,TRUE,TRUE
"develop.gab.com","noop",,,TRUE,TRUE,TRUE
"gab.ai","noop",,,TRUE,TRUE,TRUE
"gab.sleeck.eu","noop",,,TRUE,TRUE,TRUE
"gab.com","noop",,,TRUE,TRUE,TRUE
"kiwifarms.is","noop",,,TRUE,TRUE,TRUE
"kiwifarms.net","noop",,,TRUE,TRUE,TRUE
"gabfed.com","noop",,,TRUE,TRUE,TRUE
1 domain severity public_comment private_comment reject_media reject_reports obfuscate
2 public-comment.example.org noop This is a public comment This is a private comment TRUE TRUE TRUE
3 private-comment.example.org noop This is a private comment TRUE TRUE TRUE
4 diff-comment.example.org noop Noop public comment Noop private comment TRUE TRUE TRUE
5 2diff-comment.example.org noop Public duplicate Private duplicate TRUE TRUE TRUE
6 qoto.org noop TRUE TRUE TRUE
7 sealion.club noop TRUE TRUE TRUE
8 develop.gab.com noop TRUE TRUE TRUE
9 gab.ai noop TRUE TRUE TRUE
10 gab.sleeck.eu noop TRUE TRUE TRUE
11 gab.com noop TRUE TRUE TRUE
12 kiwifarms.is noop TRUE TRUE TRUE
13 kiwifarms.net noop TRUE TRUE TRUE
14 gabfed.com noop TRUE TRUE TRUE

1720
tests/data-rapidblock.json Normal file

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,14 @@
"domain","severity","public_comment","private_comment","reject_media","reject_reports","obfuscate"
"public-comment.example.org","silence","This is a public comment","This is a private comment",TRUE,TRUE,TRUE
"private-comment.example.org","silence",,"This is a private comment",TRUE,TRUE,TRUE
"diff-comment.example.org","silence","Silence public comment","Silence private comment",TRUE,TRUE,TRUE
"2diff-comment.example.org","silence","Public duplicate","Private duplicate",TRUE,TRUE,TRUE
"qoto.org","silence",,,TRUE,TRUE,TRUE
"sealion.club","silence",,,TRUE,TRUE,TRUE
"develop.gab.com","silence",,,TRUE,TRUE,TRUE
"gab.ai","silence",,,TRUE,TRUE,TRUE
"gab.sleeck.eu","silence",,,TRUE,TRUE,TRUE
"gab.com","silence",,,TRUE,TRUE,TRUE
"kiwifarms.is","silence",,,TRUE,TRUE,TRUE
"kiwifarms.net","silence",,,TRUE,TRUE,TRUE
"gabfed.com","silence",,,TRUE,TRUE,TRUE
1 domain severity public_comment private_comment reject_media reject_reports obfuscate
2 public-comment.example.org silence This is a public comment This is a private comment TRUE TRUE TRUE
3 private-comment.example.org silence This is a private comment TRUE TRUE TRUE
4 diff-comment.example.org silence Silence public comment Silence private comment TRUE TRUE TRUE
5 2diff-comment.example.org silence Public duplicate Private duplicate TRUE TRUE TRUE
6 qoto.org silence TRUE TRUE TRUE
7 sealion.club silence TRUE TRUE TRUE
8 develop.gab.com silence TRUE TRUE TRUE
9 gab.ai silence TRUE TRUE TRUE
10 gab.sleeck.eu silence TRUE TRUE TRUE
11 gab.com silence TRUE TRUE TRUE
12 kiwifarms.is silence TRUE TRUE TRUE
13 kiwifarms.net silence TRUE TRUE TRUE
14 gabfed.com silence TRUE TRUE TRUE

View File

@ -0,0 +1,14 @@
"domain","severity","public_comment","private_comment","reject_media","reject_reports","obfuscate"
"public-comment.example.org","suspend","This is a public comment","This is a private comment",TRUE,TRUE,TRUE
"private-comment.example.org","suspend",,"This is a private comment",TRUE,TRUE,TRUE
"diff-comment.example.org","suspend","Suspend public comment","Suspend private comment",TRUE,TRUE,TRUE
"2diff-comment.example.org","suspend","Suspend comment 1","Suspend private 1",TRUE,TRUE,TRUE
"qoto.org","suspend",,,TRUE,TRUE,TRUE
"sealion.club","suspend",,,TRUE,TRUE,TRUE
"develop.gab.com","suspend",,,TRUE,TRUE,TRUE
"gab.ai","suspend",,,TRUE,TRUE,TRUE
"gab.sleeck.eu","suspend",,,TRUE,TRUE,TRUE
"gab.com","suspend",,,TRUE,TRUE,TRUE
"kiwifarms.is","suspend",,,TRUE,TRUE,TRUE
"kiwifarms.net","suspend",,,TRUE,TRUE,TRUE
"gabfed.com","suspend",,,TRUE,TRUE,TRUE
1 domain severity public_comment private_comment reject_media reject_reports obfuscate
2 public-comment.example.org suspend This is a public comment This is a private comment TRUE TRUE TRUE
3 private-comment.example.org suspend This is a private comment TRUE TRUE TRUE
4 diff-comment.example.org suspend Suspend public comment Suspend private comment TRUE TRUE TRUE
5 2diff-comment.example.org suspend Suspend comment 1 Suspend private 1 TRUE TRUE TRUE
6 qoto.org suspend TRUE TRUE TRUE
7 sealion.club suspend TRUE TRUE TRUE
8 develop.gab.com suspend TRUE TRUE TRUE
9 gab.ai suspend TRUE TRUE TRUE
10 gab.sleeck.eu suspend TRUE TRUE TRUE
11 gab.com suspend TRUE TRUE TRUE
12 kiwifarms.is suspend TRUE TRUE TRUE
13 kiwifarms.net suspend TRUE TRUE TRUE
14 gabfed.com suspend TRUE TRUE TRUE

View File

@ -0,0 +1,68 @@
from fediblockhole.const import BlockSeverity, SeverityLevel
def test_severity_eq():
s1 = BlockSeverity('suspend')
s2 = BlockSeverity('suspend')
assert s1 == s2
s3 = BlockSeverity('silence')
s4 = BlockSeverity('silence')
assert s3 == s4
s5 = BlockSeverity('noop')
s6 = BlockSeverity('noop')
assert s5 == s6
def test_severity_ne():
s1 = BlockSeverity('noop')
s2 = BlockSeverity('silence')
s3 = BlockSeverity('suspend')
assert s1 != s2
assert s2 != s3
assert s1 != s3
def test_severity_lt():
s1 = BlockSeverity('noop')
s2 = BlockSeverity('silence')
s3 = BlockSeverity('suspend')
assert s1 < s2
assert s2 < s3
assert s1 < s3
def test_severity_gt():
s1 = BlockSeverity('noop')
s2 = BlockSeverity('silence')
s3 = BlockSeverity('suspend')
assert s2 > s1
assert s3 > s2
assert s3 > s1
def test_severity_le():
s1 = BlockSeverity('noop')
s2 = BlockSeverity('silence')
s2a = BlockSeverity('silence')
s3 = BlockSeverity('suspend')
assert s1 <= s2
assert s2a <= s2
assert s2 <= s3
assert s1 <= s3
def test_severity_ge():
s1 = BlockSeverity('noop')
s2 = BlockSeverity('silence')
s2a = BlockSeverity('silence')
s3 = BlockSeverity('suspend')
assert s2 >= s1
assert s2a >= s1
assert s3 >= s2
assert s3 >= s1

74
tests/test_domainblock.py Normal file
View File

@ -0,0 +1,74 @@
"""Test the DomainBlock structure
"""
import pytest
from fediblockhole.const import DomainBlock, BlockSeverity, SeverityLevel
def test_blocksev_blankstring():
a = BlockSeverity('')
assert a.level == SeverityLevel.NONE
def test_blocksev_string_noop():
a = BlockSeverity('noop')
assert a.level == SeverityLevel.NONE
def test_blocksev_none():
a = BlockSeverity(None)
assert a.level == SeverityLevel.NONE
def test_empty_domainblock_fails():
with pytest.raises(TypeError):
a = DomainBlock()
def test_default_suspend():
a = DomainBlock('example.org')
assert a.domain == 'example.org'
assert a.severity.level == SeverityLevel.SUSPEND
def test_severity_suspend():
a = DomainBlock('example.org', 'suspend')
assert a.domain == 'example.org'
assert a.severity.level == SeverityLevel.SUSPEND
def test_severity_silence():
a = DomainBlock('example.org', 'silence')
assert a.domain == 'example.org'
assert a.severity.level == SeverityLevel.SILENCE
def test_severity_noop_string():
a = DomainBlock('example.org', 'noop')
assert a.domain == 'example.org'
assert a.severity.level == SeverityLevel.NONE
def test_severity_none():
a = DomainBlock('example.org', None)
assert a.domain == 'example.org'
assert a.severity.level == SeverityLevel.NONE
def test_compare_equal_blocks():
a = DomainBlock('example1.org', 'suspend')
b = DomainBlock('example1.org', 'suspend')
assert a == b
def test_compare_diff_domains():
a = DomainBlock('example1.org', 'suspend')
b = DomainBlock('example2.org', 'suspend')
assert a != b
def test_compare_diff_sevs():
a = DomainBlock('example1.org', 'suspend')
b = DomainBlock('example1.org', 'silence')
assert a != b
def test_compare_diff_sevs_2():
a = DomainBlock('example1.org', 'suspend')
b = DomainBlock('example1.org', 'noop')
assert a != b

139
tests/test_mergeplan.py Normal file
View File

@ -0,0 +1,139 @@
"""Various mergeplan tests
"""
from fediblockhole.blocklist_parser import parse_blocklist
from fediblockhole import merge_blocklists
from fediblockhole.const import SeverityLevel
datafile01 = "data-suspends-01.csv"
datafile02 = "data-silences-01.csv"
datafile03 = "data-noop-01.csv"
import_fields = [
'domain',
'severity',
'public_comment',
'private_comment',
'reject_media',
'reject_reports',
'obfuscate'
]
def load_test_blocklist_data(datafiles):
blocklists = {}
for df in datafiles:
with open(df) as fp:
data = fp.read()
bl = parse_blocklist(data, 'csv', import_fields)
blocklists[df] = bl
return blocklists
def test_mergeplan_max():
"""Test 'max' mergeplan"""
blocklists = load_test_blocklist_data([datafile01, datafile02])
bl = merge_blocklists(blocklists, 'max')
assert len(bl) == 13
for key in bl:
assert bl[key].severity.level == SeverityLevel.SUSPEND
def test_mergeplan_min():
"""Test 'max' mergeplan"""
blocklists = load_test_blocklist_data([datafile01, datafile02])
bl = merge_blocklists(blocklists, 'min')
assert len(bl) == 13
for key in bl:
assert bl[key].severity.level == SeverityLevel.SILENCE
def test_mergeplan_default():
"""Default mergeplan is max, so see if it's chosen"""
blocklists = load_test_blocklist_data([datafile01, datafile02])
bl = merge_blocklists(blocklists)
assert len(bl) == 13
for key in bl:
assert bl[key].severity.level == SeverityLevel.SUSPEND
def test_mergeplan_3_max():
"""3 datafiles and mergeplan of 'max'"""
blocklists = load_test_blocklist_data([datafile01, datafile02, datafile03])
bl = merge_blocklists(blocklists, 'max')
assert len(bl) == 13
for key in bl:
assert bl[key].severity.level == SeverityLevel.SUSPEND
def test_mergeplan_3_max():
"""3 datafiles and mergeplan of 'max'"""
blocklists = load_test_blocklist_data([datafile01, datafile02, datafile03])
bl = merge_blocklists(blocklists, 'min')
assert len(bl) == 13
for key in bl:
assert bl[key].severity.level == SeverityLevel.NONE
def test_mergeplan_noop_v_silence_max():
"""Mergeplan of max should choose silence over noop"""
blocklists = load_test_blocklist_data([datafile02, datafile03])
bl = merge_blocklists(blocklists, 'max')
assert len(bl) == 13
for key in bl:
assert bl[key].severity.level == SeverityLevel.SILENCE
def test_mergeplan_noop_v_silence_min():
"""Mergeplan of min should choose noop over silence"""
blocklists = load_test_blocklist_data([datafile02, datafile03])
bl = merge_blocklists(blocklists, 'min')
assert len(bl) == 13
for key in bl:
assert bl[key].severity.level == SeverityLevel.NONE
def test_merge_public_comment():
blocklists = load_test_blocklist_data([datafile01, datafile02, datafile03])
bl = merge_blocklists(blocklists, 'min')
assert len(bl) == 13
assert bl['public-comment.example.org'].public_comment == 'This is a public comment'
def test_merge_private_comment():
blocklists = load_test_blocklist_data([datafile01, datafile02, datafile03])
bl = merge_blocklists(blocklists, 'min')
assert len(bl) == 13
assert bl['private-comment.example.org'].private_comment == 'This is a private comment'
def test_merge_public_comments():
blocklists = load_test_blocklist_data([datafile01, datafile02, datafile03])
bl = merge_blocklists(blocklists, 'min')
assert len(bl) == 13
assert bl['diff-comment.example.org'].public_comment == 'Suspend public comment, Silence public comment, Noop public comment'
def test_merge_duplicate_comments():
"""The same comment on multiple sources shouldn't get added
"""
blocklists = load_test_blocklist_data([datafile01, datafile02, datafile03])
bl = merge_blocklists(blocklists, 'min')
assert len(bl) == 13
# Nope, this breaks. Need to rethink duplicate comment merge.
# assert bl['2diff-comment.example.org'].public_comment == 'Suspend comment 1, Public duplicate'

77
tests/test_parser_csv.py Normal file
View File

@ -0,0 +1,77 @@
"""Tests of the CSV parsing
"""
from fediblockhole.blocklist_parser import BlocklistParserCSV, parse_blocklist
from fediblockhole.const import DomainBlock, BlockSeverity, SeverityLevel
def test_single_line():
csvdata = "example.org"
parser = BlocklistParserCSV()
bl = parser.parse_blocklist(csvdata)
assert len(bl) == 0
def test_header_only():
csvdata = "domain,severity,public_comment"
parser = BlocklistParserCSV()
bl = parser.parse_blocklist(csvdata)
assert len(bl) == 0
def test_2_blocks():
csvdata = """domain,severity
example.org,silence
example2.org,suspend
"""
parser = BlocklistParserCSV()
bl = parser.parse_blocklist(csvdata)
assert len(bl) == 2
assert bl[0].domain == 'example.org'
def test_4_blocks():
csvdata = """domain,severity,public_comment
example.org,silence,"test 1"
example2.org,suspend,"test 2"
example3.org,noop,"test 3"
example4.org,suspend,"test 4"
"""
parser = BlocklistParserCSV()
bl = parser.parse_blocklist(csvdata)
assert len(bl) == 4
assert bl[0].domain == 'example.org'
assert bl[1].domain == 'example2.org'
assert bl[2].domain == 'example3.org'
assert bl[3].domain == 'example4.org'