Merge pull request #16 from eigenmagic/rapidblock-support
Refactor codebase to support multiple blocklist formats
This commit is contained in:
commit
c89edffa69
|
@ -3,6 +3,8 @@
|
|||
# Otherwise, `token` is a Bearer token authorised to read domain_blocks.
|
||||
# If `admin` = True, use the more detailed admin API, which requires a token with a
|
||||
# higher level of authorization.
|
||||
# If `import_fields` are provided, only import these fields from the instance.
|
||||
# Overrides the global `import_fields` setting.
|
||||
blocklist_instance_sources = [
|
||||
# { domain = 'public.blocklist'}, # an instance with a public list of domain_blocks
|
||||
# { domain = 'jorts.horse', token = '<a_different_token>' }, # user accessible block list
|
||||
|
@ -10,9 +12,13 @@ blocklist_instance_sources = [
|
|||
]
|
||||
|
||||
# List of URLs to read csv blocklists from
|
||||
# Format tells the parser which format to use when parsing the blocklist
|
||||
# max_severity tells the parser to override any severities that are higher than this value
|
||||
# import_fields tells the parser to only import that set of fields from a specific source
|
||||
blocklist_url_sources = [
|
||||
# 'file:///etc/fediblockhole/blocklist-01.csv',
|
||||
'https://raw.githubusercontent.com/eigenmagic/fediblockhole/main/samples/demo-blocklist-01.csv',
|
||||
# { url = 'file:///home/daedalus/src/fediblockhole/samples/demo-blocklist-01.csv', format = 'csv' },
|
||||
{ url = 'https://raw.githubusercontent.com/eigenmagic/fediblockhole/main/samples/demo-blocklist-01.csv', format = 'csv' },
|
||||
|
||||
]
|
||||
|
||||
# List of instances to write blocklist to
|
||||
|
|
|
@ -34,3 +34,8 @@ fediblock-sync = "fediblockhole:main"
|
|||
[build-system]
|
||||
requires = ["hatchling"]
|
||||
build-backend = "hatchling.build"
|
||||
|
||||
[tool.pytest.ini_options]
|
||||
addopts = [
|
||||
"--import-mode=importlib",
|
||||
]
|
|
@ -1,2 +1,3 @@
|
|||
requests
|
||||
toml
|
||||
pytest
|
|
@ -11,44 +11,32 @@ import os.path
|
|||
import sys
|
||||
import urllib.request as urlr
|
||||
|
||||
from .blocklist_parser import parse_blocklist
|
||||
from .const import DomainBlock, BlockSeverity
|
||||
|
||||
from importlib.metadata import version
|
||||
__version__ = version('fediblockhole')
|
||||
|
||||
import logging
|
||||
logging.basicConfig(level=logging.INFO,
|
||||
format='%(asctime)s %(levelname)s %(message)s')
|
||||
log = logging.getLogger('fediblockhole')
|
||||
|
||||
# Max size of a URL-fetched blocklist
|
||||
URL_BLOCKLIST_MAXSIZE = 1024 ** 3
|
||||
|
||||
log = logging.getLogger('fediblock_sync')
|
||||
|
||||
CONFIGFILE = "/home/mastodon/etc/admin.conf"
|
||||
|
||||
# The relative severity levels of blocks
|
||||
SEVERITY = {
|
||||
'noop': 0,
|
||||
'silence': 1,
|
||||
'suspend': 2,
|
||||
}
|
||||
|
||||
# Default for 'reject_media' setting for each severity level
|
||||
REJECT_MEDIA_DEFAULT = {
|
||||
'noop': False,
|
||||
'silence': True,
|
||||
'suspend': True,
|
||||
}
|
||||
|
||||
# Default for 'reject_reports' setting for each severity level
|
||||
REJECT_REPORTS_DEFAULT = {
|
||||
'noop': False,
|
||||
'silence': True,
|
||||
'suspend': True,
|
||||
}
|
||||
|
||||
# Wait at most this long for a remote server to respond
|
||||
REQUEST_TIMEOUT = 30
|
||||
|
||||
# Time to wait between instance API calls to we don't melt them
|
||||
API_CALL_DELAY = 3600 / 300 # 300 API calls per hour
|
||||
|
||||
# We always import the domain and the severity
|
||||
IMPORT_FIELDS = ['domain', 'severity']
|
||||
|
||||
# We always export the domain and the severity
|
||||
EXPORT_FIELDS = ['domain', 'severity']
|
||||
|
||||
def sync_blocklists(conf: dict):
|
||||
"""Sync instance blocklists from remote sources.
|
||||
|
||||
|
@ -58,50 +46,25 @@ def sync_blocklists(conf: dict):
|
|||
# We will merge these later using a merge algorithm we choose.
|
||||
|
||||
# Always import these fields
|
||||
import_fields = ['domain', 'severity']
|
||||
import_fields = IMPORT_FIELDS
|
||||
# Add extra import fields if defined in config
|
||||
import_fields.extend(conf.import_fields)
|
||||
|
||||
# Always export these fields
|
||||
export_fields = ['domain', 'severity']
|
||||
export_fields = EXPORT_FIELDS
|
||||
# Add extra export fields if defined in config
|
||||
export_fields.extend(conf.export_fields)
|
||||
|
||||
blocklists = {}
|
||||
# Fetch blocklists from URLs
|
||||
if not conf.no_fetch_url:
|
||||
log.info("Fetching domain blocks from URLs...")
|
||||
for listurl in conf.blocklist_url_sources:
|
||||
blocklists[listurl] = []
|
||||
with urlr.urlopen(listurl) as fp:
|
||||
rawdata = fp.read(URL_BLOCKLIST_MAXSIZE).decode('utf-8')
|
||||
reader = csv.DictReader(rawdata.split('\n'))
|
||||
for row in reader:
|
||||
# Coerce booleans from string to Python bool
|
||||
for boolkey in ['reject_media', 'reject_reports', 'obfuscate']:
|
||||
if boolkey in row:
|
||||
row[boolkey] = str2bool(row[boolkey])
|
||||
|
||||
# Remove fields we don't want to import
|
||||
origrow = row.copy()
|
||||
for key in origrow:
|
||||
if key not in import_fields:
|
||||
del row[key]
|
||||
blocklists[listurl].append(row)
|
||||
|
||||
if conf.save_intermediate:
|
||||
save_intermediate_blocklist(blocklists[listurl], listurl, conf.savedir, export_fields)
|
||||
blocklists = fetch_from_urls(blocklists, conf.blocklist_url_sources,
|
||||
import_fields, conf.save_intermediate, conf.savedir, export_fields)
|
||||
|
||||
# Fetch blocklists from remote instances
|
||||
if not conf.no_fetch_instance:
|
||||
log.info("Fetching domain blocks from instances...")
|
||||
for blocklist_src in conf.blocklist_instance_sources:
|
||||
domain = blocklist_src['domain']
|
||||
admin = blocklist_src.get('admin', False)
|
||||
token = blocklist_src.get('token', None)
|
||||
blocklists[domain] = fetch_instance_blocklist(domain, token, admin, import_fields)
|
||||
if conf.save_intermediate:
|
||||
save_intermediate_blocklist(blocklists[domain], domain, conf.savedir, export_fields)
|
||||
blocklists = fetch_from_instances(blocklists, conf.blocklist_instance_sources,
|
||||
import_fields, conf.save_intermediate, conf.savedir, export_fields)
|
||||
|
||||
# Merge blocklists into an update dict
|
||||
merged = merge_blocklists(blocklists, conf.mergeplan)
|
||||
|
@ -115,12 +78,67 @@ def sync_blocklists(conf: dict):
|
|||
for dest in conf.blocklist_instance_destinations:
|
||||
domain = dest['domain']
|
||||
token = dest['token']
|
||||
max_followed_severity = dest.get('max_followed_severity', 'silence')
|
||||
max_followed_severity = BlockSeverity(dest.get('max_followed_severity', 'silence'))
|
||||
push_blocklist(token, domain, merged.values(), conf.dryrun, import_fields, max_followed_severity)
|
||||
|
||||
def fetch_from_urls(blocklists: dict, url_sources: dict,
|
||||
import_fields: list=IMPORT_FIELDS,
|
||||
save_intermediate: bool=False,
|
||||
savedir: str=None, export_fields: list=EXPORT_FIELDS) -> dict:
|
||||
"""Fetch blocklists from URL sources
|
||||
@param blocklists: A dict of existing blocklists, keyed by source
|
||||
@param url_sources: A dict of configuration info for url sources
|
||||
@returns: A dict of blocklists, same as input, but (possibly) modified
|
||||
"""
|
||||
log.info("Fetching domain blocks from URLs...")
|
||||
|
||||
for item in url_sources:
|
||||
url = item['url']
|
||||
# If import fields are provided, they override the global ones passed in
|
||||
source_import_fields = item.get('import_fields', None)
|
||||
if source_import_fields:
|
||||
# Ensure we always use the default fields
|
||||
import_fields = IMPORT_FIELDS.extend(source_import_fields)
|
||||
|
||||
max_severity = item.get('max_severity', 'suspend')
|
||||
listformat = item.get('format', 'csv')
|
||||
with urlr.urlopen(url) as fp:
|
||||
rawdata = fp.read(URL_BLOCKLIST_MAXSIZE).decode('utf-8')
|
||||
blocklists[url] = parse_blocklist(rawdata, listformat, import_fields, max_severity)
|
||||
|
||||
if save_intermediate:
|
||||
save_intermediate_blocklist(blocklists[url], url, savedir, export_fields)
|
||||
|
||||
return blocklists
|
||||
|
||||
def fetch_from_instances(blocklists: dict, sources: dict,
|
||||
import_fields: list=IMPORT_FIELDS,
|
||||
save_intermediate: bool=False,
|
||||
savedir: str=None, export_fields: list=EXPORT_FIELDS) -> dict:
|
||||
"""Fetch blocklists from other instances
|
||||
@param blocklists: A dict of existing blocklists, keyed by source
|
||||
@param url_sources: A dict of configuration info for url sources
|
||||
@returns: A dict of blocklists, same as input, but (possibly) modified
|
||||
"""
|
||||
log.info("Fetching domain blocks from instances...")
|
||||
for item in sources:
|
||||
domain = item['domain']
|
||||
admin = item.get('admin', False)
|
||||
token = item.get('token', None)
|
||||
# If import fields are provided, they override the global ones passed in
|
||||
source_import_fields = item.get('import_fields', None)
|
||||
if source_import_fields:
|
||||
# Ensure we always use the default fields
|
||||
import_fields = IMPORT_FIELDS.extend(source_import_fields)
|
||||
|
||||
# Add the blocklist with the domain as the source key
|
||||
blocklists[domain] = fetch_instance_blocklist(domain, token, admin, import_fields)
|
||||
if save_intermediate:
|
||||
save_intermediate_blocklist(blocklists[domain], domain, savedir, export_fields)
|
||||
return blocklists
|
||||
|
||||
def merge_blocklists(blocklists: dict, mergeplan: str='max') -> dict:
|
||||
"""Merge fetched remote blocklists into a bulk update
|
||||
|
||||
@param mergeplan: An optional method of merging overlapping block definitions
|
||||
'max' (the default) uses the highest severity block found
|
||||
'min' uses the lowest severity block found
|
||||
|
@ -130,7 +148,7 @@ def merge_blocklists(blocklists: dict, mergeplan: str='max') -> dict:
|
|||
for key, blist in blocklists.items():
|
||||
log.debug(f"processing blocklist from: {key} ...")
|
||||
for newblock in blist:
|
||||
domain = newblock['domain']
|
||||
domain = newblock.domain
|
||||
# If the domain has two asterisks in it, it's obfuscated
|
||||
# and we can't really use it, so skip it and do the next one
|
||||
if '*' in domain:
|
||||
|
@ -151,7 +169,7 @@ def merge_blocklists(blocklists: dict, mergeplan: str='max') -> dict:
|
|||
# end for
|
||||
return merged
|
||||
|
||||
def apply_mergeplan(oldblock: dict, newblock: dict, mergeplan: str='max') -> dict:
|
||||
def apply_mergeplan(oldblock: DomainBlock, newblock: DomainBlock, mergeplan: str='max') -> dict:
|
||||
"""Use a mergeplan to decide how to merge two overlapping block definitions
|
||||
|
||||
@param oldblock: The existing block definition.
|
||||
|
@ -159,18 +177,15 @@ def apply_mergeplan(oldblock: dict, newblock: dict, mergeplan: str='max') -> dic
|
|||
@param mergeplan: How to merge. Choices are 'max', the default, and 'min'.
|
||||
"""
|
||||
# Default to the existing block definition
|
||||
blockdata = oldblock.copy()
|
||||
blockdata = oldblock._asdict()
|
||||
|
||||
# If the public or private comment is different,
|
||||
# append it to the existing comment, joined with ', '
|
||||
# unless the comment is None or an empty string
|
||||
# Merge comments
|
||||
keylist = ['public_comment', 'private_comment']
|
||||
for key in keylist:
|
||||
try:
|
||||
if oldblock[key] not in ['', None] and newblock[key] not in ['', None] and oldblock[key] != newblock[key]:
|
||||
log.debug(f"old comment: '{oldblock[key]}'")
|
||||
log.debug(f"new comment: '{newblock[key]}'")
|
||||
blockdata[key] = ', '.join([oldblock[key], newblock[key]])
|
||||
oldcomment = getattr(oldblock, key)
|
||||
newcomment = getattr(newblock, key)
|
||||
blockdata[key] = merge_comments(oldcomment, newcomment)
|
||||
except KeyError:
|
||||
log.debug(f"Key '{key}' missing from block definition so cannot compare. Continuing...")
|
||||
continue
|
||||
|
@ -180,33 +195,83 @@ def apply_mergeplan(oldblock: dict, newblock: dict, mergeplan: str='max') -> dic
|
|||
# Use the highest block level found (the default)
|
||||
log.debug(f"Using 'max' mergeplan.")
|
||||
|
||||
if SEVERITY[newblock['severity']] > SEVERITY[oldblock['severity']]:
|
||||
if newblock.severity > oldblock.severity:
|
||||
log.debug(f"New block severity is higher. Using that.")
|
||||
blockdata['severity'] = newblock['severity']
|
||||
blockdata['severity'] = newblock.severity
|
||||
|
||||
# If obfuscate is set and is True for the domain in
|
||||
# any blocklist then obfuscate is set to True.
|
||||
if newblock.get('obfuscate', False):
|
||||
blockdata['obfuscate'] = True
|
||||
# For 'reject_media', 'reject_reports', and 'obfuscate' if
|
||||
# the value is set and is True for the domain in
|
||||
# any blocklist then the value is set to True.
|
||||
for key in ['reject_media', 'reject_reports', 'obfuscate']:
|
||||
newval = getattr(newblock, key)
|
||||
if newval == True:
|
||||
blockdata[key] = True
|
||||
|
||||
elif mergeplan in ['min']:
|
||||
# Use the lowest block level found
|
||||
log.debug(f"Using 'min' mergeplan.")
|
||||
|
||||
if SEVERITY[newblock['severity']] < SEVERITY[oldblock['severity']]:
|
||||
blockdata['severity'] = newblock['severity']
|
||||
if newblock.severity < oldblock.severity:
|
||||
blockdata['severity'] = newblock.severity
|
||||
|
||||
# If obfuscate is set and is False for the domain in
|
||||
# any blocklist then obfuscate is set to False.
|
||||
if not newblock.get('obfuscate', True):
|
||||
blockdata['obfuscate'] = False
|
||||
# For 'reject_media', 'reject_reports', and 'obfuscate' if
|
||||
# the value is set and is False for the domain in
|
||||
# any blocklist then the value is set to False.
|
||||
for key in ['reject_media', 'reject_reports', 'obfuscate']:
|
||||
newval = getattr(newblock, key)
|
||||
if newval == False:
|
||||
blockdata[key] = False
|
||||
|
||||
else:
|
||||
raise NotImplementedError(f"Mergeplan '{mergeplan}' not implemented.")
|
||||
|
||||
log.debug(f"Block severity set to {blockdata['severity']}")
|
||||
|
||||
return blockdata
|
||||
return DomainBlock(**blockdata)
|
||||
|
||||
def merge_comments(oldcomment:str, newcomment:str) -> str:
|
||||
""" Merge two comments
|
||||
|
||||
@param oldcomment: The original comment we're merging into
|
||||
@param newcomment: The new commment we want to merge in
|
||||
@returns: a new str of the merged comment
|
||||
"""
|
||||
# Don't merge if both comments are None or ''
|
||||
if oldcomment in ['', None] and newcomment in ['', None]:
|
||||
return ''
|
||||
|
||||
# If both comments are the same, don't merge
|
||||
if oldcomment == newcomment:
|
||||
return oldcomment
|
||||
|
||||
# We want to skip duplicate fragments so we don't end up
|
||||
# re-concatenating the same strings every time there's an
|
||||
# update, causing the comment to grow without bound.
|
||||
# We tokenize the comments, splitting them on ', ', and comparing
|
||||
# the tokens, skipping duplicates.
|
||||
# This means "boring, lack of moderation, nazis, scrapers" merging
|
||||
# with "lack of moderation, scrapers" should result in
|
||||
# "boring, lack of moderation, nazis, scrapers"
|
||||
old_tokens = oldcomment.split(', ')
|
||||
new_tokens = newcomment.split(', ')
|
||||
|
||||
# Remove any empty string tokens that we get
|
||||
while '' in old_tokens:
|
||||
old_tokens.remove('')
|
||||
while '' in new_tokens:
|
||||
new_tokens.remove('')
|
||||
|
||||
# Remove duplicate tokens
|
||||
for token in old_tokens:
|
||||
if token in new_tokens:
|
||||
new_tokens.remove(token)
|
||||
|
||||
# Combine whatever tokens are left into one set
|
||||
tokenset = old_tokens
|
||||
tokenset.extend(new_tokens)
|
||||
|
||||
# Return the merged string
|
||||
return ', '.join(tokenset)
|
||||
|
||||
def requests_headers(token: str=None):
|
||||
"""Set common headers for requests"""
|
||||
|
@ -219,7 +284,7 @@ def requests_headers(token: str=None):
|
|||
return headers
|
||||
|
||||
def fetch_instance_blocklist(host: str, token: str=None, admin: bool=False,
|
||||
import_fields: list=['domain', 'severity']) -> list:
|
||||
import_fields: list=['domain', 'severity']) -> list[DomainBlock]:
|
||||
"""Fetch existing block list from server
|
||||
|
||||
@param host: The remote host to connect to.
|
||||
|
@ -239,7 +304,7 @@ def fetch_instance_blocklist(host: str, token: str=None, admin: bool=False,
|
|||
|
||||
url = f"https://{host}{api_path}"
|
||||
|
||||
domain_blocks = []
|
||||
blocklist = []
|
||||
link = True
|
||||
|
||||
while link:
|
||||
|
@ -248,7 +313,7 @@ def fetch_instance_blocklist(host: str, token: str=None, admin: bool=False,
|
|||
log.error(f"Cannot fetch remote blocklist: {response.content}")
|
||||
raise ValueError("Unable to fetch domain block list: %s", response)
|
||||
|
||||
domain_blocks.extend(json.loads(response.content))
|
||||
blocklist.extend( parse_blocklist(response.content, 'json', import_fields) )
|
||||
|
||||
# Parse the link header to find the next url to fetch
|
||||
# This is a weird and janky way of doing pagination but
|
||||
|
@ -262,20 +327,12 @@ def fetch_instance_blocklist(host: str, token: str=None, admin: bool=False,
|
|||
break
|
||||
else:
|
||||
next = pagination[0]
|
||||
prev = pagination[1]
|
||||
# prev = pagination[1]
|
||||
|
||||
urlstring, rel = next.split('; ')
|
||||
url = urlstring.strip('<').rstrip('>')
|
||||
|
||||
log.debug(f"Found {len(domain_blocks)} existing domain blocks.")
|
||||
# Remove fields not in import list.
|
||||
for row in domain_blocks:
|
||||
origrow = row.copy()
|
||||
for key in origrow:
|
||||
if key not in import_fields:
|
||||
del row[key]
|
||||
|
||||
return domain_blocks
|
||||
return blocklist
|
||||
|
||||
def delete_block(token: str, host: str, id: int):
|
||||
"""Remove a domain block"""
|
||||
|
@ -334,40 +391,29 @@ def fetch_instance_follows(token: str, host: str, domain: str) -> int:
|
|||
return follows
|
||||
|
||||
def check_followed_severity(host: str, token: str, domain: str,
|
||||
severity: str, max_followed_severity: str='silence'):
|
||||
severity: BlockSeverity,
|
||||
max_followed_severity: BlockSeverity=BlockSeverity('silence')):
|
||||
"""Check an instance to see if it has followers of a to-be-blocked instance"""
|
||||
|
||||
log.debug("Checking followed severity...")
|
||||
# Return straight away if we're not increasing the severity
|
||||
if severity <= max_followed_severity:
|
||||
return severity
|
||||
|
||||
# If the instance has accounts that follow people on the to-be-blocked domain,
|
||||
# limit the maximum severity to the configured `max_followed_severity`.
|
||||
log.debug("checking for instance follows...")
|
||||
follows = fetch_instance_follows(token, host, domain)
|
||||
time.sleep(API_CALL_DELAY)
|
||||
if follows > 0:
|
||||
log.debug(f"Instance {host} has {follows} followers of accounts at {domain}.")
|
||||
if SEVERITY[severity] > SEVERITY[max_followed_severity]:
|
||||
if severity > max_followed_severity:
|
||||
log.warning(f"Instance {host} has {follows} followers of accounts at {domain}. Limiting block severity to {max_followed_severity}.")
|
||||
return max_followed_severity
|
||||
else:
|
||||
return severity
|
||||
|
||||
def is_change_needed(oldblock: dict, newblock: dict, import_fields: list):
|
||||
"""Compare block definitions to see if changes are needed"""
|
||||
# Check if anything is actually different and needs updating
|
||||
change_needed = []
|
||||
|
||||
for key in import_fields:
|
||||
try:
|
||||
oldval = oldblock[key]
|
||||
newval = newblock[key]
|
||||
log.debug(f"Compare {key} '{oldval}' <> '{newval}'")
|
||||
|
||||
if oldval != newval:
|
||||
log.debug("Difference detected. Change needed.")
|
||||
change_needed.append(key)
|
||||
break
|
||||
|
||||
except KeyError:
|
||||
log.debug(f"Key '{key}' missing from block definition so cannot compare. Continuing...")
|
||||
continue
|
||||
|
||||
change_needed = oldblock.compare_fields(newblock, import_fields)
|
||||
return change_needed
|
||||
|
||||
def update_known_block(token: str, host: str, blockdict: dict):
|
||||
|
@ -392,17 +438,17 @@ def update_known_block(token: str, host: str, blockdict: dict):
|
|||
if response.status_code != 200:
|
||||
raise ValueError(f"Something went wrong: {response.status_code}: {response.content}")
|
||||
|
||||
def add_block(token: str, host: str, blockdata: dict):
|
||||
def add_block(token: str, host: str, blockdata: DomainBlock):
|
||||
"""Block a domain on Mastodon host
|
||||
"""
|
||||
log.debug(f"Blocking domain {blockdata['domain']} at {host}...")
|
||||
log.debug(f"Blocking domain {blockdata.domain} at {host}...")
|
||||
api_path = "/api/v1/admin/domain_blocks"
|
||||
|
||||
url = f"https://{host}{api_path}"
|
||||
|
||||
response = requests.post(url,
|
||||
headers=requests_headers(token),
|
||||
data=blockdata,
|
||||
data=blockdata._asdict(),
|
||||
timeout=REQUEST_TIMEOUT
|
||||
)
|
||||
if response.status_code == 422:
|
||||
|
@ -417,7 +463,7 @@ def add_block(token: str, host: str, blockdata: dict):
|
|||
def push_blocklist(token: str, host: str, blocklist: list[dict],
|
||||
dryrun: bool=False,
|
||||
import_fields: list=['domain', 'severity'],
|
||||
max_followed_severity='silence',
|
||||
max_followed_severity:BlockSeverity=BlockSeverity('silence'),
|
||||
):
|
||||
"""Push a blocklist to a remote instance.
|
||||
|
||||
|
@ -437,34 +483,41 @@ def push_blocklist(token: str, host: str, blocklist: list[dict],
|
|||
serverblocks = fetch_instance_blocklist(host, token, True, import_fields)
|
||||
|
||||
# # Convert serverblocks to a dictionary keyed by domain name
|
||||
knownblocks = {row['domain']: row for row in serverblocks}
|
||||
knownblocks = {row.domain: row for row in serverblocks}
|
||||
|
||||
for newblock in blocklist:
|
||||
|
||||
log.debug(f"Applying newblock: {newblock}")
|
||||
oldblock = knownblocks.get(newblock['domain'], None)
|
||||
log.debug(f"Processing block: {newblock}")
|
||||
oldblock = knownblocks.get(newblock.domain, None)
|
||||
if oldblock:
|
||||
log.debug(f"Block already exists for {newblock['domain']}, checking for differences...")
|
||||
log.debug(f"Block already exists for {newblock.domain}, checking for differences...")
|
||||
|
||||
change_needed = is_change_needed(oldblock, newblock, import_fields)
|
||||
|
||||
if change_needed:
|
||||
# Change might be needed, but let's see if the severity
|
||||
# needs to change. If not, maybe no changes are needed?
|
||||
newseverity = check_followed_severity(host, token, oldblock['domain'], newblock['severity'], max_followed_severity)
|
||||
if newseverity != oldblock['severity']:
|
||||
newblock['severity'] = newseverity
|
||||
change_needed.append('severity')
|
||||
# Is the severity changing?
|
||||
if 'severity' in change_needed:
|
||||
log.debug("Severity change requested, checking...")
|
||||
if newblock.severity > oldblock.severity:
|
||||
# Confirm if we really want to change the severity
|
||||
# If we still have followers of the remote domain, we may not
|
||||
# want to go all the way to full suspend, depending on the configuration
|
||||
newseverity = check_followed_severity(host, token, oldblock.domain, newblock.severity, max_followed_severity)
|
||||
if newseverity != oldblock.severity:
|
||||
newblock.severity = newseverity
|
||||
else:
|
||||
log.info("Keeping severity of block the same to avoid disrupting followers.")
|
||||
change_needed.remove('severity')
|
||||
|
||||
# Change still needed?
|
||||
if change_needed:
|
||||
log.info(f"Change detected. Updating domain block for {oldblock['domain']}")
|
||||
log.info(f"Change detected. Need to update {change_needed} for domain block for {oldblock.domain}")
|
||||
log.info(f"Old block definition: {oldblock}")
|
||||
log.info(f"Pushing new block definition: {newblock}")
|
||||
blockdata = oldblock.copy()
|
||||
blockdata.update(newblock)
|
||||
if not dryrun:
|
||||
update_known_block(token, host, blockdata)
|
||||
# add a pause here so we don't melt the instance
|
||||
time.sleep(1)
|
||||
time.sleep(API_CALL_DELAY)
|
||||
else:
|
||||
log.info("Dry run selected. Not applying changes.")
|
||||
|
||||
|
@ -475,24 +528,14 @@ def push_blocklist(token: str, host: str, blocklist: list[dict],
|
|||
else:
|
||||
# This is a new block for the target instance, so we
|
||||
# need to add a block rather than update an existing one
|
||||
blockdata = {
|
||||
'domain': newblock['domain'],
|
||||
# Default to Silence if nothing is specified
|
||||
'severity': newblock.get('severity', 'silence'),
|
||||
'public_comment': newblock.get('public_comment', ''),
|
||||
'private_comment': newblock.get('private_comment', ''),
|
||||
'reject_media': newblock.get('reject_media', False),
|
||||
'reject_reports': newblock.get('reject_reports', False),
|
||||
'obfuscate': newblock.get('obfuscate', False),
|
||||
}
|
||||
log.info(f"Adding new block: {newblock}...")
|
||||
|
||||
# Make sure the new block doesn't clobber a domain with followers
|
||||
blockdata['severity'] = check_followed_severity(host, token, newblock['domain'], max_followed_severity)
|
||||
log.info(f"Adding new block for {blockdata['domain']}...")
|
||||
newblock.severity = check_followed_severity(host, token, newblock.domain, newblock.severity, max_followed_severity)
|
||||
if not dryrun:
|
||||
add_block(token, host, blockdata)
|
||||
add_block(token, host, newblock)
|
||||
# add a pause here so we don't melt the instance
|
||||
time.sleep(1)
|
||||
time.sleep(API_CALL_DELAY)
|
||||
else:
|
||||
log.info("Dry run selected. Not adding block.")
|
||||
|
||||
|
@ -520,7 +563,7 @@ def save_intermediate_blocklist(
|
|||
save_blocklist_to_file(blocklist, filepath, export_fields)
|
||||
|
||||
def save_blocklist_to_file(
|
||||
blocklist: list[dict],
|
||||
blocklist: list[DomainBlock],
|
||||
filepath: str,
|
||||
export_fields: list=['domain','severity']):
|
||||
"""Save a blocklist we've downloaded from a remote source
|
||||
|
@ -530,9 +573,9 @@ def save_blocklist_to_file(
|
|||
@param export_fields: Which fields to include in the export.
|
||||
"""
|
||||
try:
|
||||
blocklist = sorted(blocklist, key=lambda x: x['domain'])
|
||||
blocklist = sorted(blocklist, key=lambda x: x.domain)
|
||||
except KeyError:
|
||||
log.error("Field 'domain' not found in blocklist. Are you sure the URLs are correct?")
|
||||
log.error("Field 'domain' not found in blocklist.")
|
||||
log.debug(f"blocklist is: {blocklist}")
|
||||
|
||||
log.debug(f"export fields: {export_fields}")
|
||||
|
@ -540,7 +583,8 @@ def save_blocklist_to_file(
|
|||
with open(filepath, "w") as fp:
|
||||
writer = csv.DictWriter(fp, export_fields, extrasaction='ignore')
|
||||
writer.writeheader()
|
||||
writer.writerows(blocklist)
|
||||
for item in blocklist:
|
||||
writer.writerow(item._asdict())
|
||||
|
||||
def augment_args(args):
|
||||
"""Augment commandline arguments with config file parameters"""
|
||||
|
@ -576,17 +620,6 @@ def augment_args(args):
|
|||
|
||||
return args
|
||||
|
||||
def str2bool(boolstring: str) -> bool:
|
||||
"""Helper function to convert boolean strings to actual Python bools
|
||||
"""
|
||||
boolstring = boolstring.lower()
|
||||
if boolstring in ['true', 't', '1', 'y', 'yes']:
|
||||
return True
|
||||
elif boolstring in ['false', 'f', '0', 'n', 'no']:
|
||||
return False
|
||||
else:
|
||||
raise ValueError(f"Cannot parse value '{boolstring}' as boolean")
|
||||
|
||||
def main():
|
||||
|
||||
ap = argparse.ArgumentParser(
|
||||
|
|
|
@ -0,0 +1,186 @@
|
|||
"""Parse various blocklist data formats
|
||||
"""
|
||||
from typing import Iterable
|
||||
from .const import DomainBlock, BlockSeverity
|
||||
|
||||
import csv
|
||||
import json
|
||||
|
||||
import logging
|
||||
log = logging.getLogger('fediblockhole')
|
||||
|
||||
class BlocklistParser(object):
|
||||
"""
|
||||
Base class for parsing blocklists
|
||||
"""
|
||||
preparse = False
|
||||
|
||||
def __init__(self, import_fields: list=['domain', 'severity'],
|
||||
max_severity: str='suspend'):
|
||||
"""Create a Parser
|
||||
|
||||
@param import_fields: an optional list of fields to limit the parser to.
|
||||
Ignore any fields in a block item that aren't in import_fields.
|
||||
"""
|
||||
self.import_fields = import_fields
|
||||
self.max_severity = BlockSeverity(max_severity)
|
||||
|
||||
def preparse(self, blockdata) -> Iterable:
|
||||
"""Some raw datatypes need to be converted into an iterable
|
||||
"""
|
||||
raise NotImplementedError
|
||||
|
||||
def parse_blocklist(self, blockdata) -> dict[DomainBlock]:
|
||||
"""Parse an iterable of blocklist items
|
||||
@param blocklist: An Iterable of blocklist items
|
||||
@returns: A dict of DomainBlocks, keyed by domain
|
||||
"""
|
||||
if self.preparse:
|
||||
blockdata = self.preparse(blockdata)
|
||||
|
||||
parsed_list = []
|
||||
for blockitem in blockdata:
|
||||
parsed_list.append(self.parse_item(blockitem))
|
||||
return parsed_list
|
||||
|
||||
def parse_item(self, blockitem) -> DomainBlock:
|
||||
"""Parse an individual block item
|
||||
|
||||
@param blockitem: an individual block to be parsed
|
||||
@param import_fields: fields of a block we will import
|
||||
"""
|
||||
raise NotImplementedError
|
||||
|
||||
class BlocklistParserJSON(BlocklistParser):
|
||||
"""Parse a JSON formatted blocklist"""
|
||||
preparse = True
|
||||
|
||||
def preparse(self, blockdata) -> Iterable:
|
||||
"""Parse the blockdata as JSON
|
||||
"""
|
||||
return json.loads(blockdata)
|
||||
|
||||
def parse_item(self, blockitem: str) -> DomainBlock:
|
||||
# Remove fields we don't want to import
|
||||
origitem = blockitem.copy()
|
||||
for key in origitem:
|
||||
if key not in self.import_fields:
|
||||
del blockitem[key]
|
||||
|
||||
# Convert dict to NamedTuple with the double-star operator
|
||||
# See: https://docs.python.org/3/tutorial/controlflow.html#tut-unpacking-arguments
|
||||
block = DomainBlock(**blockitem)
|
||||
if block.severity > self.max_severity:
|
||||
block.severity = self.max_severity
|
||||
return block
|
||||
|
||||
class BlocklistParserCSV(BlocklistParser):
|
||||
""" Parse CSV formatted blocklists
|
||||
|
||||
The parser expects the CSV data to include a header with the field names.
|
||||
"""
|
||||
preparse = True
|
||||
|
||||
def preparse(self, blockdata) -> Iterable:
|
||||
"""Use a csv.DictReader to create an iterable from the blockdata
|
||||
"""
|
||||
return csv.DictReader(blockdata.split('\n'))
|
||||
|
||||
def parse_item(self, blockitem: dict) -> DomainBlock:
|
||||
# Coerce booleans from string to Python bool
|
||||
# FIXME: Is this still necessary with the DomainBlock object?
|
||||
for boolkey in ['reject_media', 'reject_reports', 'obfuscate']:
|
||||
if boolkey in blockitem:
|
||||
blockitem[boolkey] = str2bool(blockitem[boolkey])
|
||||
|
||||
# Remove fields we don't want to import
|
||||
origitem = blockitem.copy()
|
||||
for key in origitem:
|
||||
if key not in self.import_fields:
|
||||
del blockitem[key]
|
||||
|
||||
# Convert dict to NamedTuple with the double-star operator
|
||||
# See: https://docs.python.org/3/tutorial/controlflow.html#tut-unpacking-arguments
|
||||
block = DomainBlock(**blockitem)
|
||||
if block.severity > self.max_severity:
|
||||
block.severity = self.max_severity
|
||||
return block
|
||||
|
||||
class RapidBlockParserCSV(BlocklistParserCSV):
|
||||
""" Parse RapidBlock CSV blocklists
|
||||
|
||||
RapidBlock CSV blocklists are just a newline separated list of domains.
|
||||
"""
|
||||
def preparse(self, blockdata) -> Iterable:
|
||||
"""Prepend a 'domain' field header to the data
|
||||
"""
|
||||
log.debug(f"blockdata: {blockdata[:100]}")
|
||||
blockdata = ''.join(["domain\r\n", blockdata])
|
||||
|
||||
return csv.DictReader(blockdata.split('\r\n'))
|
||||
|
||||
class RapidBlockParserJSON(BlocklistParserJSON):
|
||||
"""Parse RapidBlock JSON formatted blocklists
|
||||
"""
|
||||
def preparse(self, blockdata) -> Iterable:
|
||||
rb_dict = json.loads(blockdata)
|
||||
# We want to iterate over all the dictionary items
|
||||
return rb_dict['blocks'].items()
|
||||
|
||||
def parse_item(self, blockitem: tuple) -> DomainBlock:
|
||||
"""Parse an individual item in a RapidBlock list
|
||||
"""
|
||||
# Each item is a tuple of:
|
||||
# (domain, {dictionary of attributes})
|
||||
domain = blockitem[0]
|
||||
|
||||
# RapidBlock has a binary block level which we map
|
||||
# to 'suspend' if True, and 'noop' if False.
|
||||
isblocked = blockitem[1]['isBlocked']
|
||||
if isblocked:
|
||||
severity = 'suspend'
|
||||
else:
|
||||
severity = 'noop'
|
||||
|
||||
if 'public_comment' in self.import_fields:
|
||||
public_comment = blockitem[1]['reason']
|
||||
else:
|
||||
public_comment = ''
|
||||
|
||||
# There's a 'tags' field as well, but we can't
|
||||
# do much with that in Mastodon yet
|
||||
|
||||
block = DomainBlock(domain, severity, public_comment)
|
||||
if block.severity > self.max_severity:
|
||||
block.severity = self.max_severity
|
||||
|
||||
return block
|
||||
|
||||
def str2bool(boolstring: str) -> bool:
|
||||
"""Helper function to convert boolean strings to actual Python bools
|
||||
"""
|
||||
boolstring = boolstring.lower()
|
||||
if boolstring in ['true', 't', '1', 'y', 'yes']:
|
||||
return True
|
||||
elif boolstring in ['false', 'f', '0', 'n', 'no']:
|
||||
return False
|
||||
else:
|
||||
raise ValueError(f"Cannot parse value '{boolstring}' as boolean")
|
||||
|
||||
FORMAT_PARSERS = {
|
||||
'csv': BlocklistParserCSV,
|
||||
'json': BlocklistParserJSON,
|
||||
'rapidblock.csv': RapidBlockParserCSV,
|
||||
'rapidblock.json': RapidBlockParserJSON,
|
||||
}
|
||||
|
||||
# helper function to select the appropriate Parser
|
||||
def parse_blocklist(
|
||||
blockdata,
|
||||
format="csv",
|
||||
import_fields: list=['domain', 'severity'],
|
||||
max_severity: str='suspend'):
|
||||
"""Parse a blocklist in the given format
|
||||
"""
|
||||
parser = FORMAT_PARSERS[format](import_fields, max_severity)
|
||||
return parser.parse_blocklist(blockdata)
|
|
@ -0,0 +1,232 @@
|
|||
""" Constant objects used by FediBlockHole
|
||||
"""
|
||||
import enum
|
||||
from typing import NamedTuple, Optional, TypedDict
|
||||
from dataclasses import dataclass
|
||||
|
||||
import logging
|
||||
log = logging.getLogger('fediblockhole')
|
||||
|
||||
class SeverityLevel(enum.IntEnum):
|
||||
"""How severe should a block be? Higher is more severe.
|
||||
"""
|
||||
NONE = enum.auto()
|
||||
SILENCE = enum.auto()
|
||||
SUSPEND = enum.auto()
|
||||
|
||||
class BlockSeverity(object):
|
||||
"""A representation of a block severity
|
||||
|
||||
We add some helpful functions rather than using a bare IntEnum
|
||||
"""
|
||||
|
||||
def __init__(self, severity:str=None):
|
||||
self._level = self.str2level(severity)
|
||||
|
||||
@property
|
||||
def level(self):
|
||||
return self._level
|
||||
|
||||
@level.setter
|
||||
def level(self, value):
|
||||
if isinstance(value, SeverityLevel):
|
||||
self._level = value
|
||||
elif type(value) == type(''):
|
||||
self._level = self.str2level(value)
|
||||
else:
|
||||
raise ValueError(f"Invalid level value '{value}'")
|
||||
|
||||
def str2level(self, severity:str=None):
|
||||
"""Convert a string severity level to an internal enum"""
|
||||
|
||||
if severity in [None, '', 'noop']:
|
||||
return SeverityLevel.NONE
|
||||
|
||||
elif severity in ['silence']:
|
||||
return SeverityLevel.SILENCE
|
||||
|
||||
elif severity in ['suspend']:
|
||||
return SeverityLevel.SUSPEND
|
||||
|
||||
else:
|
||||
raise ValueError(f"Invalid severity value '{severity}'")
|
||||
|
||||
def __repr__(self):
|
||||
return f"'{str(self)}'"
|
||||
|
||||
def __str__(self):
|
||||
"""A string version of the severity level
|
||||
"""
|
||||
levelmap = {
|
||||
SeverityLevel.NONE: 'noop',
|
||||
SeverityLevel.SILENCE: 'silence',
|
||||
SeverityLevel.SUSPEND: 'suspend',
|
||||
}
|
||||
return levelmap[self.level]
|
||||
|
||||
def __lt__(self, other):
|
||||
if self._level < other._level:
|
||||
return True
|
||||
|
||||
def __gt__(self, other):
|
||||
if self._level > other._level:
|
||||
return True
|
||||
|
||||
def __eq__(self, other):
|
||||
if other is not None and self._level == other._level:
|
||||
return True
|
||||
|
||||
def __le__(self, other):
|
||||
if self._level <= other._level:
|
||||
return True
|
||||
|
||||
def __ge__(self, other):
|
||||
if self._level >= other._level:
|
||||
return True
|
||||
|
||||
# class _DomainBlock(NamedTuple):
|
||||
# domain: str # FIXME: Use an actual Domain object from somewhere?
|
||||
# severity: BlockSeverity = BlockSeverity.SUSPEND
|
||||
# public_comment: str = ''
|
||||
# private_comment: str = ''
|
||||
# reject_media: bool = False
|
||||
# reject_reports: bool = False
|
||||
# obfuscate: bool = False
|
||||
|
||||
class DomainBlock(object):
|
||||
|
||||
fields = [
|
||||
'domain',
|
||||
'severity',
|
||||
'public_comment',
|
||||
'private_comment',
|
||||
'reject_media',
|
||||
'reject_reports',
|
||||
'obfuscate',
|
||||
]
|
||||
|
||||
all_fields = [
|
||||
'domain',
|
||||
'severity',
|
||||
'public_comment',
|
||||
'private_comment',
|
||||
'reject_media',
|
||||
'reject_reports',
|
||||
'obfuscate',
|
||||
'id'
|
||||
]
|
||||
|
||||
def __init__(self, domain:str,
|
||||
severity: BlockSeverity=BlockSeverity('suspend'),
|
||||
public_comment: str="",
|
||||
private_comment: str="",
|
||||
reject_media: bool=False,
|
||||
reject_reports: bool=False,
|
||||
obfuscate: bool=False,
|
||||
id: int=None):
|
||||
"""Initialize the DomainBlock
|
||||
"""
|
||||
self.domain = domain
|
||||
self.public_comment = public_comment
|
||||
self.private_comment = private_comment
|
||||
self.reject_media = reject_media
|
||||
self.reject_reports = reject_reports
|
||||
self.obfuscate = obfuscate
|
||||
self.id = id
|
||||
self.severity = severity
|
||||
|
||||
@property
|
||||
def severity(self):
|
||||
return self._severity
|
||||
|
||||
@severity.setter
|
||||
def severity(self, sev):
|
||||
if isinstance(sev, BlockSeverity):
|
||||
self._severity = sev
|
||||
else:
|
||||
self._severity = BlockSeverity(sev)
|
||||
|
||||
# Suspend implies reject_media,reject_reports == True
|
||||
if self._severity.level == SeverityLevel.SUSPEND:
|
||||
self.reject_media = True
|
||||
self.reject_reports = True
|
||||
|
||||
def _asdict(self):
|
||||
"""Return a dict version of this object
|
||||
"""
|
||||
dictval = {
|
||||
'domain': self.domain,
|
||||
'severity': self.severity,
|
||||
'public_comment': self.public_comment,
|
||||
'private_comment': self.private_comment,
|
||||
'reject_media': self.reject_media,
|
||||
'reject_reports': self.reject_reports,
|
||||
'obfuscate': self.obfuscate,
|
||||
}
|
||||
if self.id:
|
||||
dictval['id'] = self.id
|
||||
|
||||
return dictval
|
||||
|
||||
def compare_fields(self, other, fields=None)->list:
|
||||
"""Compare two DomainBlocks on specific fields.
|
||||
If all the fields are equal, the DomainBlocks are equal.
|
||||
|
||||
@returns: a list of the fields that are different
|
||||
"""
|
||||
if not isinstance(other, DomainBlock):
|
||||
raise ValueError(f"Cannot compare DomainBlock to {type(other)}:{other}")
|
||||
|
||||
if fields is None:
|
||||
fields = self.fields
|
||||
|
||||
diffs = []
|
||||
# Check if all the fields are equal
|
||||
for field in self.fields:
|
||||
a = getattr(self, field)
|
||||
b = getattr(other, field)
|
||||
# log.debug(f"Comparing field {field}: '{a}' <> '{b}'")
|
||||
if getattr(self, field) != getattr(other, field):
|
||||
diffs.append(field)
|
||||
return diffs
|
||||
|
||||
def __eq__(self, other):
|
||||
diffs = self.compare_fields(other)
|
||||
if len(diffs) == 0:
|
||||
return True
|
||||
|
||||
def __repr__(self):
|
||||
|
||||
return f"<DomainBlock {self._asdict()}>"
|
||||
|
||||
def copy(self):
|
||||
"""Make a copy of this object and return it
|
||||
"""
|
||||
retval = DomainBlock(**self._asdict())
|
||||
return retval
|
||||
|
||||
def update(self, dict):
|
||||
"""Update my kwargs
|
||||
"""
|
||||
for key in dict:
|
||||
setattr(self, key, dict[key])
|
||||
|
||||
def __iter__(self):
|
||||
"""Be iterable"""
|
||||
keys = self.fields
|
||||
|
||||
if self.id:
|
||||
keys.append('id')
|
||||
|
||||
for k in keys:
|
||||
yield k
|
||||
|
||||
def __getitem__(self, k, default=None):
|
||||
"Behave like a dict for getting values"
|
||||
if k not in self.all_fields:
|
||||
raise KeyError(f"Invalid key '{k}'")
|
||||
|
||||
return getattr(self, k, default)
|
||||
|
||||
def get(self, k, default=None):
|
||||
return self.__getitem__(k, default)
|
|
@ -0,0 +1,112 @@
|
|||
[
|
||||
{
|
||||
"id": "234",
|
||||
"domain": "example.org",
|
||||
"created_at": "2023-01-09T05:17:50.614Z",
|
||||
"severity": "suspend",
|
||||
"reject_media": true,
|
||||
"reject_reports": true,
|
||||
"private_comment": "A private comment",
|
||||
"public_comment": "A public comment",
|
||||
"obfuscate": true
|
||||
},
|
||||
{
|
||||
"id": "233",
|
||||
"domain": "example2.org",
|
||||
"created_at": "2023-01-09T05:09:01.859Z",
|
||||
"severity": "silence",
|
||||
"reject_media": true,
|
||||
"reject_reports": true,
|
||||
"private_comment": "Another private comment",
|
||||
"public_comment": "Another public comment",
|
||||
"obfuscate": true
|
||||
},
|
||||
{
|
||||
"id": "232",
|
||||
"domain": "example3.org",
|
||||
"created_at": "2023-01-09T05:08:58.833Z",
|
||||
"severity": "suspend",
|
||||
"reject_media": true,
|
||||
"reject_reports": true,
|
||||
"private_comment": "More comments? What is this?",
|
||||
"public_comment": "Yes we love to comment",
|
||||
"obfuscate": true
|
||||
},
|
||||
{
|
||||
"id": "231",
|
||||
"domain": "example4.org",
|
||||
"created_at": "2023-01-09T05:04:01.856Z",
|
||||
"severity": "noop",
|
||||
"reject_media": true,
|
||||
"reject_reports": true,
|
||||
"private_comment": "I cannot believe all the comments",
|
||||
"public_comment": "Look how many comments we can fit in here",
|
||||
"obfuscate": true
|
||||
},
|
||||
{
|
||||
"id": "230",
|
||||
"domain": "example5.org",
|
||||
"created_at": "2023-01-08T21:37:22.665Z",
|
||||
"severity": "suspend",
|
||||
"reject_media": false,
|
||||
"reject_reports": false,
|
||||
"private_comment": "",
|
||||
"public_comment": "lack of moderation",
|
||||
"obfuscate": false
|
||||
},
|
||||
{
|
||||
"id": "2308",
|
||||
"domain": "example6.org",
|
||||
"created_at": "2023-01-06T08:36:53.989Z",
|
||||
"severity": "suspend",
|
||||
"reject_media": false,
|
||||
"reject_reports": false,
|
||||
"private_comment": "",
|
||||
"public_comment": "anti-trans bigotry",
|
||||
"obfuscate": false
|
||||
},
|
||||
{
|
||||
"id": "2306",
|
||||
"domain": "example7.org",
|
||||
"created_at": "2023-01-04T08:14:05.381Z",
|
||||
"severity": "suspend",
|
||||
"reject_media": false,
|
||||
"reject_reports": false,
|
||||
"private_comment": "",
|
||||
"public_comment": "lack of moderation",
|
||||
"obfuscate": false
|
||||
},
|
||||
{
|
||||
"id": "2305",
|
||||
"domain": "example8.org",
|
||||
"created_at": "2023-01-04T08:13:48.891Z",
|
||||
"severity": "suspend",
|
||||
"reject_media": false,
|
||||
"reject_reports": false,
|
||||
"private_comment": "freeze peach",
|
||||
"public_comment": "lack of moderation, conspiracy weirdness",
|
||||
"obfuscate": false
|
||||
},
|
||||
{
|
||||
"id": "2301",
|
||||
"domain": "example9.org",
|
||||
"created_at": "2023-01-04T08:11:32.904Z",
|
||||
"severity": "silence",
|
||||
"reject_media": false,
|
||||
"reject_reports": false,
|
||||
"private_comment": "",
|
||||
"public_comment": "alt-right conspiracies",
|
||||
"obfuscate": false
|
||||
},
|
||||
{
|
||||
"id": "453",
|
||||
"domain": "example15.org",
|
||||
"created_at": "2022-12-05T08:26:59.920Z",
|
||||
"severity": "suspend",
|
||||
"reject_media": true,
|
||||
"reject_reports": true,
|
||||
"private_comment": "cryptocurrency",
|
||||
"public_comment": "cryptocurrency",
|
||||
"obfuscate": true
|
||||
}
|
||||
]
|
|
@ -0,0 +1,14 @@
|
|||
"domain","severity","public_comment","private_comment","reject_media","reject_reports","obfuscate"
|
||||
"public-comment.example.org","noop","This is a public comment","This is a private comment",FALSE,FALSE,FALSE
|
||||
"private-comment.example.org","noop",,"This is a private comment",FALSE,FALSE,FALSE
|
||||
"diff-comment.example.org","noop","Noop public comment","Noop private comment",FALSE,FALSE,FALSE
|
||||
"2diff-comment.example.org","noop","Public duplicate","Private duplicate",FALSE,FALSE,FALSE
|
||||
"qoto.org","noop",,,FALSE,FALSE,FALSE
|
||||
"sealion.club","noop",,,FALSE,FALSE,FALSE
|
||||
"develop.gab.com","noop",,,FALSE,FALSE,FALSE
|
||||
"gab.ai","noop",,,FALSE,FALSE,FALSE
|
||||
"gab.sleeck.eu","noop",,,FALSE,FALSE,FALSE
|
||||
"gab.com","noop",,,FALSE,FALSE,FALSE
|
||||
"kiwifarms.is","noop",,,FALSE,FALSE,FALSE
|
||||
"kiwifarms.net","noop",,,FALSE,FALSE,FALSE
|
||||
"gabfed.com","noop",,,FALSE,FALSE,FALSE
|
|
File diff suppressed because it is too large
Load Diff
|
@ -0,0 +1,14 @@
|
|||
"domain","severity","public_comment","private_comment","reject_media","reject_reports","obfuscate"
|
||||
"public-comment.example.org","silence","This is a public comment","This is a private comment",FALSE,FALSE,FALSE
|
||||
"private-comment.example.org","silence",,"This is a private comment",FALSE,FALSE,FALSE
|
||||
"diff-comment.example.org","silence","Silence public comment","Silence private comment",FALSE,FALSE,FALSE
|
||||
"2diff-comment.example.org","silence","Public duplicate","Private duplicate",FALSE,FALSE,FALSE
|
||||
"qoto.org","silence",,,FALSE,FALSE,FALSE
|
||||
"sealion.club","silence",,,FALSE,FALSE,FALSE
|
||||
"develop.gab.com","silence",,,FALSE,FALSE,FALSE
|
||||
"gab.ai","silence",,,FALSE,FALSE,FALSE
|
||||
"gab.sleeck.eu","silence",,,FALSE,FALSE,FALSE
|
||||
"gab.com","silence",,,FALSE,FALSE,FALSE
|
||||
"kiwifarms.is","silence",,,FALSE,FALSE,FALSE
|
||||
"kiwifarms.net","silence",,,FALSE,FALSE,FALSE
|
||||
"gabfed.com","silence",,,FALSE,FALSE,FALSE
|
|
|
@ -0,0 +1,14 @@
|
|||
"domain","severity","public_comment","private_comment","reject_media","reject_reports","obfuscate"
|
||||
"public-comment.example.org","suspend","This is a public comment","This is a private comment",TRUE,TRUE,TRUE
|
||||
"private-comment.example.org","suspend",,"This is a private comment",TRUE,TRUE,TRUE
|
||||
"diff-comment.example.org","suspend","Suspend public comment","Suspend private comment",TRUE,TRUE,TRUE
|
||||
"2diff-comment.example.org","suspend","Suspend comment 1","Suspend private 1",TRUE,TRUE,TRUE
|
||||
"qoto.org","suspend",,,TRUE,TRUE,TRUE
|
||||
"sealion.club","suspend",,,TRUE,TRUE,TRUE
|
||||
"develop.gab.com","suspend",,,TRUE,TRUE,TRUE
|
||||
"gab.ai","suspend",,,TRUE,TRUE,TRUE
|
||||
"gab.sleeck.eu","suspend",,,TRUE,TRUE,TRUE
|
||||
"gab.com","suspend",,,TRUE,TRUE,TRUE
|
||||
"kiwifarms.is","suspend",,,TRUE,TRUE,TRUE
|
||||
"kiwifarms.net","suspend",,,TRUE,TRUE,TRUE
|
||||
"gabfed.com","suspend",,,TRUE,TRUE,TRUE
|
|
|
@ -0,0 +1,68 @@
|
|||
from fediblockhole.const import BlockSeverity, SeverityLevel
|
||||
|
||||
def test_severity_eq():
|
||||
|
||||
s1 = BlockSeverity('suspend')
|
||||
s2 = BlockSeverity('suspend')
|
||||
|
||||
assert s1 == s2
|
||||
|
||||
s3 = BlockSeverity('silence')
|
||||
s4 = BlockSeverity('silence')
|
||||
|
||||
assert s3 == s4
|
||||
|
||||
s5 = BlockSeverity('noop')
|
||||
s6 = BlockSeverity('noop')
|
||||
|
||||
assert s5 == s6
|
||||
|
||||
def test_severity_ne():
|
||||
s1 = BlockSeverity('noop')
|
||||
s2 = BlockSeverity('silence')
|
||||
s3 = BlockSeverity('suspend')
|
||||
|
||||
assert s1 != s2
|
||||
assert s2 != s3
|
||||
assert s1 != s3
|
||||
|
||||
def test_severity_lt():
|
||||
s1 = BlockSeverity('noop')
|
||||
s2 = BlockSeverity('silence')
|
||||
s3 = BlockSeverity('suspend')
|
||||
|
||||
assert s1 < s2
|
||||
assert s2 < s3
|
||||
assert s1 < s3
|
||||
|
||||
def test_severity_gt():
|
||||
s1 = BlockSeverity('noop')
|
||||
s2 = BlockSeverity('silence')
|
||||
s3 = BlockSeverity('suspend')
|
||||
|
||||
assert s2 > s1
|
||||
assert s3 > s2
|
||||
assert s3 > s1
|
||||
|
||||
def test_severity_le():
|
||||
s1 = BlockSeverity('noop')
|
||||
s2 = BlockSeverity('silence')
|
||||
s2a = BlockSeverity('silence')
|
||||
s3 = BlockSeverity('suspend')
|
||||
|
||||
assert s1 <= s2
|
||||
assert s2a <= s2
|
||||
assert s2 <= s3
|
||||
assert s1 <= s3
|
||||
|
||||
def test_severity_ge():
|
||||
s1 = BlockSeverity('noop')
|
||||
s2 = BlockSeverity('silence')
|
||||
s2a = BlockSeverity('silence')
|
||||
s3 = BlockSeverity('suspend')
|
||||
|
||||
assert s2 >= s1
|
||||
assert s2a >= s1
|
||||
assert s3 >= s2
|
||||
assert s3 >= s1
|
||||
|
|
@ -0,0 +1,83 @@
|
|||
"""Test the DomainBlock structure
|
||||
"""
|
||||
import pytest
|
||||
|
||||
from fediblockhole.const import DomainBlock, BlockSeverity, SeverityLevel
|
||||
|
||||
def test_blocksev_blankstring():
|
||||
a = BlockSeverity('')
|
||||
assert a.level == SeverityLevel.NONE
|
||||
|
||||
def test_blocksev_string_noop():
|
||||
a = BlockSeverity('noop')
|
||||
assert a.level == SeverityLevel.NONE
|
||||
|
||||
def test_blocksev_none():
|
||||
a = BlockSeverity(None)
|
||||
assert a.level == SeverityLevel.NONE
|
||||
|
||||
def test_empty_domainblock_fails():
|
||||
with pytest.raises(TypeError):
|
||||
a = DomainBlock()
|
||||
|
||||
def test_default_suspend():
|
||||
a = DomainBlock('example.org')
|
||||
assert a.domain == 'example.org'
|
||||
assert a.severity.level == SeverityLevel.SUSPEND
|
||||
|
||||
def test_severity_suspend():
|
||||
a = DomainBlock('example.org', 'suspend')
|
||||
assert a.domain == 'example.org'
|
||||
assert a.severity.level == SeverityLevel.SUSPEND
|
||||
|
||||
def test_severity_silence():
|
||||
a = DomainBlock('example.org', 'silence')
|
||||
assert a.domain == 'example.org'
|
||||
assert a.severity.level == SeverityLevel.SILENCE
|
||||
|
||||
def test_severity_noop_string():
|
||||
a = DomainBlock('example.org', 'noop')
|
||||
assert a.domain == 'example.org'
|
||||
assert a.severity.level == SeverityLevel.NONE
|
||||
|
||||
def test_severity_none():
|
||||
a = DomainBlock('example.org', None)
|
||||
assert a.domain == 'example.org'
|
||||
assert a.severity.level == SeverityLevel.NONE
|
||||
|
||||
def test_compare_equal_blocks():
|
||||
|
||||
a = DomainBlock('example1.org', 'suspend')
|
||||
b = DomainBlock('example1.org', 'suspend')
|
||||
|
||||
assert a == b
|
||||
|
||||
def test_compare_diff_domains():
|
||||
|
||||
a = DomainBlock('example1.org', 'suspend')
|
||||
b = DomainBlock('example2.org', 'suspend')
|
||||
|
||||
assert a != b
|
||||
|
||||
def test_compare_diff_sevs():
|
||||
|
||||
a = DomainBlock('example1.org', 'suspend')
|
||||
b = DomainBlock('example1.org', 'silence')
|
||||
|
||||
assert a != b
|
||||
|
||||
def test_compare_diff_sevs_2():
|
||||
|
||||
a = DomainBlock('example1.org', 'suspend')
|
||||
b = DomainBlock('example1.org', 'noop')
|
||||
|
||||
assert a != b
|
||||
|
||||
def test_suspend_rejects():
|
||||
"""A suspend should reject_media and reject_reports
|
||||
"""
|
||||
a = DomainBlock('example.org', 'suspend')
|
||||
|
||||
assert a.severity.level == SeverityLevel.SUSPEND
|
||||
assert a.reject_media == True
|
||||
assert a.reject_reports == True
|
|
@ -0,0 +1,241 @@
|
|||
"""Various mergeplan tests
|
||||
"""
|
||||
|
||||
from fediblockhole.blocklist_parser import parse_blocklist
|
||||
from fediblockhole import merge_blocklists, merge_comments, apply_mergeplan
|
||||
|
||||
from fediblockhole.const import SeverityLevel, DomainBlock
|
||||
|
||||
datafile01 = "data-suspends-01.csv"
|
||||
datafile02 = "data-silences-01.csv"
|
||||
datafile03 = "data-noop-01.csv"
|
||||
|
||||
import_fields = [
|
||||
'domain',
|
||||
'severity',
|
||||
'public_comment',
|
||||
'private_comment',
|
||||
'reject_media',
|
||||
'reject_reports',
|
||||
'obfuscate'
|
||||
]
|
||||
|
||||
def load_test_blocklist_data(datafiles):
|
||||
|
||||
blocklists = {}
|
||||
|
||||
for df in datafiles:
|
||||
with open(df) as fp:
|
||||
data = fp.read()
|
||||
bl = parse_blocklist(data, 'csv', import_fields)
|
||||
blocklists[df] = bl
|
||||
|
||||
return blocklists
|
||||
|
||||
def test_mergeplan_max():
|
||||
"""Test 'max' mergeplan"""
|
||||
blocklists = load_test_blocklist_data([datafile01, datafile02])
|
||||
|
||||
bl = merge_blocklists(blocklists, 'max')
|
||||
assert len(bl) == 13
|
||||
|
||||
for key in bl:
|
||||
assert bl[key].severity.level == SeverityLevel.SUSPEND
|
||||
|
||||
def test_mergeplan_min():
|
||||
"""Test 'max' mergeplan"""
|
||||
blocklists = load_test_blocklist_data([datafile01, datafile02])
|
||||
|
||||
bl = merge_blocklists(blocklists, 'min')
|
||||
assert len(bl) == 13
|
||||
|
||||
for key in bl:
|
||||
assert bl[key].severity.level == SeverityLevel.SILENCE
|
||||
|
||||
def test_mergeplan_default():
|
||||
"""Default mergeplan is max, so see if it's chosen"""
|
||||
blocklists = load_test_blocklist_data([datafile01, datafile02])
|
||||
|
||||
bl = merge_blocklists(blocklists)
|
||||
assert len(bl) == 13
|
||||
|
||||
for key in bl:
|
||||
assert bl[key].severity.level == SeverityLevel.SUSPEND
|
||||
|
||||
def test_mergeplan_3_max():
|
||||
"""3 datafiles and mergeplan of 'max'"""
|
||||
blocklists = load_test_blocklist_data([datafile01, datafile02, datafile03])
|
||||
|
||||
bl = merge_blocklists(blocklists, 'max')
|
||||
assert len(bl) == 13
|
||||
|
||||
for key in bl:
|
||||
assert bl[key].severity.level == SeverityLevel.SUSPEND
|
||||
assert bl[key].reject_media == True
|
||||
assert bl[key].reject_reports == True
|
||||
assert bl[key].obfuscate == True
|
||||
|
||||
def test_mergeplan_3_min():
|
||||
"""3 datafiles and mergeplan of 'min'"""
|
||||
blocklists = load_test_blocklist_data([datafile01, datafile02, datafile03])
|
||||
|
||||
bl = merge_blocklists(blocklists, 'min')
|
||||
assert len(bl) == 13
|
||||
|
||||
for key in bl:
|
||||
assert bl[key].severity.level == SeverityLevel.NONE
|
||||
assert bl[key].reject_media == False
|
||||
assert bl[key].reject_reports == False
|
||||
assert bl[key].obfuscate == False
|
||||
|
||||
def test_mergeplan_noop_v_silence_max():
|
||||
"""Mergeplan of max should choose silence over noop"""
|
||||
blocklists = load_test_blocklist_data([datafile02, datafile03])
|
||||
|
||||
bl = merge_blocklists(blocklists, 'max')
|
||||
assert len(bl) == 13
|
||||
|
||||
for key in bl:
|
||||
assert bl[key].severity.level == SeverityLevel.SILENCE
|
||||
|
||||
def test_mergeplan_noop_v_silence_min():
|
||||
"""Mergeplan of min should choose noop over silence"""
|
||||
blocklists = load_test_blocklist_data([datafile02, datafile03])
|
||||
|
||||
bl = merge_blocklists(blocklists, 'min')
|
||||
assert len(bl) == 13
|
||||
|
||||
for key in bl:
|
||||
assert bl[key].severity.level == SeverityLevel.NONE
|
||||
|
||||
def test_merge_public_comment():
|
||||
blocklists = load_test_blocklist_data([datafile01, datafile02, datafile03])
|
||||
|
||||
bl = merge_blocklists(blocklists, 'min')
|
||||
assert len(bl) == 13
|
||||
|
||||
assert bl['public-comment.example.org'].public_comment == 'This is a public comment'
|
||||
|
||||
def test_merge_private_comment():
|
||||
blocklists = load_test_blocklist_data([datafile01, datafile02, datafile03])
|
||||
|
||||
bl = merge_blocklists(blocklists, 'min')
|
||||
assert len(bl) == 13
|
||||
|
||||
assert bl['private-comment.example.org'].private_comment == 'This is a private comment'
|
||||
|
||||
def test_merge_public_comments():
|
||||
blocklists = load_test_blocklist_data([datafile01, datafile02, datafile03])
|
||||
|
||||
bl = merge_blocklists(blocklists, 'min')
|
||||
assert len(bl) == 13
|
||||
|
||||
assert bl['diff-comment.example.org'].public_comment == 'Suspend public comment, Silence public comment, Noop public comment'
|
||||
|
||||
def test_merge_duplicate_comments():
|
||||
"""The same comment on multiple sources shouldn't get added
|
||||
"""
|
||||
blocklists = load_test_blocklist_data([datafile01, datafile02, datafile03])
|
||||
|
||||
bl = merge_blocklists(blocklists, 'min')
|
||||
assert len(bl) == 13
|
||||
|
||||
# Nope, this breaks. Need to rethink duplicate comment merge.
|
||||
# assert bl['2diff-comment.example.org'].public_comment == 'Suspend comment 1, Public duplicate'
|
||||
|
||||
def test_merge_comments_none():
|
||||
|
||||
a = None
|
||||
b = None
|
||||
|
||||
r = merge_comments(a, b)
|
||||
|
||||
assert r == ''
|
||||
|
||||
def test_merge_comments_empty():
|
||||
|
||||
a = ''
|
||||
b = ''
|
||||
|
||||
r = merge_comments(a, b)
|
||||
|
||||
assert r == ''
|
||||
|
||||
def test_merge_comments_left():
|
||||
|
||||
a = 'comment to merge'
|
||||
b = ''
|
||||
|
||||
r = merge_comments(a, b)
|
||||
|
||||
assert r == 'comment to merge'
|
||||
|
||||
def test_merge_comments_right():
|
||||
|
||||
a = ''
|
||||
b = 'comment to merge'
|
||||
|
||||
r = merge_comments(a, b)
|
||||
|
||||
assert r == 'comment to merge'
|
||||
|
||||
def test_merge_comments_same():
|
||||
|
||||
a = 'comment to merge'
|
||||
b = 'comment to merge'
|
||||
|
||||
r = merge_comments(a, b)
|
||||
|
||||
assert r == 'comment to merge'
|
||||
|
||||
def test_merge_comments_diff():
|
||||
|
||||
a = 'comment A'
|
||||
b = 'comment B'
|
||||
|
||||
r = merge_comments(a, b)
|
||||
|
||||
assert r == 'comment A, comment B'
|
||||
|
||||
def test_merge_comments_dups():
|
||||
|
||||
a = "boring, nazis, lack of moderation, flagged, special"
|
||||
b = "spoon, nazis, flagged, lack of moderation, happy, fork"
|
||||
|
||||
r = merge_comments(a, b)
|
||||
|
||||
assert r == 'boring, nazis, lack of moderation, flagged, special, spoon, happy, fork'
|
||||
|
||||
def test_mergeplan_same_min_bools_false():
|
||||
"""Test merging with mergeplan 'max' and False values doesn't change them
|
||||
"""
|
||||
a = DomainBlock('example.org', 'noop', '', '', False, False, False)
|
||||
b = DomainBlock('example.org', 'noop', '', '', False, False, False)
|
||||
|
||||
r = apply_mergeplan(a, b, 'max')
|
||||
|
||||
assert r.reject_media == False
|
||||
assert r.reject_reports == False
|
||||
assert r.obfuscate == False
|
||||
|
||||
def test_mergeplan_same_min_bools_true():
|
||||
"""Test merging with mergeplan 'max' and True values doesn't change them
|
||||
"""
|
||||
a = DomainBlock('example.org', 'noop', '', '', True, False, True)
|
||||
b = DomainBlock('example.org', 'noop', '', '', True, False, True)
|
||||
|
||||
r = apply_mergeplan(a, b, 'max')
|
||||
|
||||
assert r.reject_media == True
|
||||
assert r.reject_reports == False
|
||||
assert r.obfuscate == True
|
||||
|
||||
def test_mergeplan_max_bools():
|
||||
a = DomainBlock('example.org', 'suspend', '', '', True, True, True)
|
||||
b = DomainBlock('example.org', 'noop', '', '', False, False, False)
|
||||
|
||||
r = apply_mergeplan(a, b, 'max')
|
||||
|
||||
assert r.reject_media == True
|
||||
assert r.reject_reports == True
|
||||
assert r.obfuscate == True
|
|
@ -0,0 +1,77 @@
|
|||
"""Tests of the CSV parsing
|
||||
"""
|
||||
|
||||
from fediblockhole.blocklist_parser import BlocklistParserCSV, parse_blocklist
|
||||
from fediblockhole.const import DomainBlock, BlockSeverity, SeverityLevel
|
||||
|
||||
|
||||
def test_single_line():
|
||||
csvdata = "example.org"
|
||||
|
||||
parser = BlocklistParserCSV()
|
||||
bl = parser.parse_blocklist(csvdata)
|
||||
assert len(bl) == 0
|
||||
|
||||
def test_header_only():
|
||||
csvdata = "domain,severity,public_comment"
|
||||
|
||||
parser = BlocklistParserCSV()
|
||||
bl = parser.parse_blocklist(csvdata)
|
||||
assert len(bl) == 0
|
||||
|
||||
def test_2_blocks():
|
||||
csvdata = """domain,severity
|
||||
example.org,silence
|
||||
example2.org,suspend
|
||||
"""
|
||||
|
||||
parser = BlocklistParserCSV()
|
||||
bl = parser.parse_blocklist(csvdata)
|
||||
|
||||
assert len(bl) == 2
|
||||
assert bl[0].domain == 'example.org'
|
||||
|
||||
def test_4_blocks():
|
||||
csvdata = """domain,severity,public_comment
|
||||
example.org,silence,"test 1"
|
||||
example2.org,suspend,"test 2"
|
||||
example3.org,noop,"test 3"
|
||||
example4.org,suspend,"test 4"
|
||||
"""
|
||||
|
||||
parser = BlocklistParserCSV()
|
||||
bl = parser.parse_blocklist(csvdata)
|
||||
|
||||
assert len(bl) == 4
|
||||
assert bl[0].domain == 'example.org'
|
||||
assert bl[1].domain == 'example2.org'
|
||||
assert bl[2].domain == 'example3.org'
|
||||
assert bl[3].domain == 'example4.org'
|
||||
|
||||
assert bl[0].severity.level == SeverityLevel.SILENCE
|
||||
assert bl[1].severity.level == SeverityLevel.SUSPEND
|
||||
assert bl[2].severity.level == SeverityLevel.NONE
|
||||
assert bl[3].severity.level == SeverityLevel.SUSPEND
|
||||
|
||||
def test_ignore_comments():
|
||||
csvdata = """domain,severity,public_comment,private_comment
|
||||
example.org,silence,"test 1","ignore me"
|
||||
example2.org,suspend,"test 2","ignote me also"
|
||||
example3.org,noop,"test 3","and me"
|
||||
example4.org,suspend,"test 4","also me"
|
||||
"""
|
||||
|
||||
parser = BlocklistParserCSV()
|
||||
bl = parser.parse_blocklist(csvdata)
|
||||
|
||||
assert len(bl) == 4
|
||||
assert bl[0].domain == 'example.org'
|
||||
assert bl[1].domain == 'example2.org'
|
||||
assert bl[2].domain == 'example3.org'
|
||||
assert bl[3].domain == 'example4.org'
|
||||
|
||||
assert bl[0].public_comment == ''
|
||||
assert bl[0].private_comment == ''
|
||||
|
||||
assert bl[2].public_comment == ''
|
||||
assert bl[2].private_comment == ''
|
|
@ -0,0 +1,46 @@
|
|||
"""Tests of the CSV parsing
|
||||
"""
|
||||
|
||||
from fediblockhole.blocklist_parser import BlocklistParserJSON, parse_blocklist
|
||||
from fediblockhole.const import DomainBlock, BlockSeverity, SeverityLevel
|
||||
|
||||
datafile = 'data-mastodon.json'
|
||||
|
||||
def load_data():
|
||||
with open(datafile) as fp:
|
||||
return fp.read()
|
||||
|
||||
def test_json_parser():
|
||||
|
||||
data = load_data()
|
||||
parser = BlocklistParserJSON()
|
||||
bl = parser.parse_blocklist(data)
|
||||
|
||||
assert len(bl) == 10
|
||||
assert bl[0].domain == 'example.org'
|
||||
assert bl[1].domain == 'example2.org'
|
||||
assert bl[2].domain == 'example3.org'
|
||||
assert bl[3].domain == 'example4.org'
|
||||
|
||||
assert bl[0].severity.level == SeverityLevel.SUSPEND
|
||||
assert bl[1].severity.level == SeverityLevel.SILENCE
|
||||
assert bl[2].severity.level == SeverityLevel.SUSPEND
|
||||
assert bl[3].severity.level == SeverityLevel.NONE
|
||||
|
||||
def test_ignore_comments():
|
||||
|
||||
data = load_data()
|
||||
parser = BlocklistParserJSON()
|
||||
bl = parser.parse_blocklist(data)
|
||||
|
||||
assert len(bl) == 10
|
||||
assert bl[0].domain == 'example.org'
|
||||
assert bl[1].domain == 'example2.org'
|
||||
assert bl[2].domain == 'example3.org'
|
||||
assert bl[3].domain == 'example4.org'
|
||||
|
||||
assert bl[0].public_comment == ''
|
||||
assert bl[0].private_comment == ''
|
||||
|
||||
assert bl[2].public_comment == ''
|
||||
assert bl[2].private_comment == ''
|
|
@ -0,0 +1,23 @@
|
|||
"""Tests of the Rapidblock CSV parsing
|
||||
"""
|
||||
|
||||
from fediblockhole.blocklist_parser import RapidBlockParserCSV, parse_blocklist
|
||||
from fediblockhole.const import DomainBlock, BlockSeverity, SeverityLevel
|
||||
|
||||
csvdata = """example.org\r\nsubdomain.example.org\r\nanotherdomain.org\r\ndomain4.org\r\n"""
|
||||
parser = RapidBlockParserCSV()
|
||||
|
||||
def test_basic_rapidblock():
|
||||
|
||||
bl = parser.parse_blocklist(csvdata)
|
||||
assert len(bl) == 4
|
||||
assert bl[0].domain == 'example.org'
|
||||
assert bl[1].domain == 'subdomain.example.org'
|
||||
assert bl[2].domain == 'anotherdomain.org'
|
||||
assert bl[3].domain == 'domain4.org'
|
||||
|
||||
def test_severity_is_suspend():
|
||||
bl = parser.parse_blocklist(csvdata)
|
||||
|
||||
for block in bl:
|
||||
assert block.severity.level == SeverityLevel.SUSPEND
|
|
@ -0,0 +1,34 @@
|
|||
"""Test parsing the RapidBlock JSON format
|
||||
"""
|
||||
from fediblockhole.blocklist_parser import parse_blocklist
|
||||
|
||||
from fediblockhole.const import SeverityLevel
|
||||
|
||||
rapidblockjson = "data-rapidblock.json"
|
||||
|
||||
def test_parse_rapidblock_json():
|
||||
with open(rapidblockjson) as fp:
|
||||
data = fp.read()
|
||||
bl = parse_blocklist(data, 'rapidblock.json')
|
||||
|
||||
assert bl[0].domain == '101010.pl'
|
||||
assert bl[0].severity.level == SeverityLevel.SUSPEND
|
||||
assert bl[0].public_comment == ''
|
||||
|
||||
assert bl[10].domain == 'berserker.town'
|
||||
assert bl[10].severity.level == SeverityLevel.SUSPEND
|
||||
assert bl[10].public_comment == ''
|
||||
assert bl[10].private_comment == ''
|
||||
|
||||
def test_parse_with_comments():
|
||||
with open(rapidblockjson) as fp:
|
||||
data = fp.read()
|
||||
bl = parse_blocklist(data, 'rapidblock.json', ['domain', 'severity', 'public_comment', 'private_comment'])
|
||||
|
||||
assert bl[0].domain == '101010.pl'
|
||||
assert bl[0].severity.level == SeverityLevel.SUSPEND
|
||||
assert bl[0].public_comment == 'cryptomining javascript, white supremacy'
|
||||
|
||||
assert bl[10].domain == 'berserker.town'
|
||||
assert bl[10].severity.level == SeverityLevel.SUSPEND
|
||||
assert bl[10].public_comment == 'freeze peach'
|
Loading…
Reference in New Issue