Merge branch 'main' into release-0.4.3
This commit is contained in:
commit
c2bf5ff492
|
@ -41,6 +41,8 @@ appropriate.
|
||||||
|
|
||||||
- Provides (hopefully) sensible defaults to minimise first-time setup.
|
- Provides (hopefully) sensible defaults to minimise first-time setup.
|
||||||
- Global and fine-grained configuration options available for those complex situations that crop up sometimes.
|
- Global and fine-grained configuration options available for those complex situations that crop up sometimes.
|
||||||
|
- Allowlists to override blocks in blocklists to ensure you never block instances you want to keep.
|
||||||
|
- Blocklist thresholds if you want to only block when an instance shows up in multiple blocklists.
|
||||||
|
|
||||||
## Installing
|
## Installing
|
||||||
|
|
||||||
|
|
|
@ -1,3 +1,4 @@
|
||||||
"domain","severity","private_comment","public_comment","reject_media","reject_reports","obfuscate"
|
"domain","severity","private_comment","public_comment","reject_media","reject_reports","obfuscate"
|
||||||
"eigenmagic.net","noop","Never block me","Only the domain field matters",False,False,False
|
"eigenmagic.net","noop","Never block me","Only the domain field matters for allowlists",False,False,False
|
||||||
"example.org","noop","Never block me either","The severity is ignored as are all other fields",False,False,False
|
"example.org","noop","Never block me either","The severity is ignored in allowlists as are all other fields",False,False,False
|
||||||
|
"demo01.example.org","noop","Never block me either","But you can use them to leave yourself or others notes on why the item is here",False,False,False
|
||||||
|
|
|
|
@ -11,7 +11,7 @@ import os.path
|
||||||
import sys
|
import sys
|
||||||
import urllib.request as urlr
|
import urllib.request as urlr
|
||||||
|
|
||||||
from .blocklist_parser import parse_blocklist
|
from .blocklists import Blocklist, parse_blocklist
|
||||||
from .const import DomainBlock, BlockSeverity
|
from .const import DomainBlock, BlockSeverity
|
||||||
|
|
||||||
from importlib.metadata import version
|
from importlib.metadata import version
|
||||||
|
@ -59,16 +59,16 @@ def sync_blocklists(conf: argparse.Namespace):
|
||||||
# Add extra export fields if defined in config
|
# Add extra export fields if defined in config
|
||||||
export_fields.extend(conf.export_fields)
|
export_fields.extend(conf.export_fields)
|
||||||
|
|
||||||
blocklists = {}
|
blocklists = []
|
||||||
# Fetch blocklists from URLs
|
# Fetch blocklists from URLs
|
||||||
if not conf.no_fetch_url:
|
if not conf.no_fetch_url:
|
||||||
blocklists = fetch_from_urls(blocklists, conf.blocklist_url_sources,
|
blocklists.extend(fetch_from_urls(conf.blocklist_url_sources,
|
||||||
import_fields, conf.save_intermediate, conf.savedir, export_fields)
|
import_fields, conf.save_intermediate, conf.savedir, export_fields))
|
||||||
|
|
||||||
# Fetch blocklists from remote instances
|
# Fetch blocklists from remote instances
|
||||||
if not conf.no_fetch_instance:
|
if not conf.no_fetch_instance:
|
||||||
blocklists = fetch_from_instances(blocklists, conf.blocklist_instance_sources,
|
blocklists.extend(fetch_from_instances(conf.blocklist_instance_sources,
|
||||||
import_fields, conf.save_intermediate, conf.savedir, export_fields)
|
import_fields, conf.save_intermediate, conf.savedir, export_fields))
|
||||||
|
|
||||||
# Merge blocklists into an update dict
|
# Merge blocklists into an update dict
|
||||||
merged = merge_blocklists(blocklists, conf.mergeplan)
|
merged = merge_blocklists(blocklists, conf.mergeplan)
|
||||||
|
@ -80,48 +80,48 @@ def sync_blocklists(conf: argparse.Namespace):
|
||||||
# Save the final mergelist, if requested
|
# Save the final mergelist, if requested
|
||||||
if conf.blocklist_savefile:
|
if conf.blocklist_savefile:
|
||||||
log.info(f"Saving merged blocklist to {conf.blocklist_savefile}")
|
log.info(f"Saving merged blocklist to {conf.blocklist_savefile}")
|
||||||
save_blocklist_to_file(merged.values(), conf.blocklist_savefile, export_fields)
|
save_blocklist_to_file(merged, conf.blocklist_savefile, export_fields)
|
||||||
|
|
||||||
# Push the blocklist to destination instances
|
# Push the blocklist to destination instances
|
||||||
if not conf.no_push_instance:
|
if not conf.no_push_instance:
|
||||||
log.info("Pushing domain blocks to instances...")
|
log.info("Pushing domain blocks to instances...")
|
||||||
for dest in conf.blocklist_instance_destinations:
|
for dest in conf.blocklist_instance_destinations:
|
||||||
domain = dest['domain']
|
target = dest['domain']
|
||||||
token = dest['token']
|
token = dest['token']
|
||||||
scheme = dest.get('scheme', 'https')
|
scheme = dest.get('scheme', 'https')
|
||||||
max_followed_severity = BlockSeverity(dest.get('max_followed_severity', 'silence'))
|
max_followed_severity = BlockSeverity(dest.get('max_followed_severity', 'silence'))
|
||||||
push_blocklist(token, domain, merged.values(), conf.dryrun, import_fields, max_followed_severity, scheme)
|
push_blocklist(token, target, merged, conf.dryrun, import_fields, max_followed_severity, scheme)
|
||||||
|
|
||||||
def apply_allowlists(merged: dict, conf: argparse.Namespace, allowlists: dict):
|
def apply_allowlists(merged: Blocklist, conf: argparse.Namespace, allowlists: dict):
|
||||||
"""Apply allowlists
|
"""Apply allowlists
|
||||||
"""
|
"""
|
||||||
# Apply allows specified on the commandline
|
# Apply allows specified on the commandline
|
||||||
for domain in conf.allow_domains:
|
for domain in conf.allow_domains:
|
||||||
log.info(f"'{domain}' allowed by commandline, removing any blocks...")
|
log.info(f"'{domain}' allowed by commandline, removing any blocks...")
|
||||||
if domain in merged:
|
if domain in merged.blocks:
|
||||||
del merged[domain]
|
del merged.blocks[domain]
|
||||||
|
|
||||||
# Apply allows from URLs lists
|
# Apply allows from URLs lists
|
||||||
log.info("Removing domains from URL allowlists...")
|
log.info("Removing domains from URL allowlists...")
|
||||||
for key, alist in allowlists.items():
|
for alist in allowlists:
|
||||||
log.debug(f"Processing allows from '{key}'...")
|
log.debug(f"Processing allows from '{alist.origin}'...")
|
||||||
for allowed in alist:
|
for allowed in alist.blocks.values():
|
||||||
domain = allowed.domain
|
domain = allowed.domain
|
||||||
log.debug(f"Removing allowlisted domain '{domain}' from merged list.")
|
log.debug(f"Removing allowlisted domain '{domain}' from merged list.")
|
||||||
if domain in merged:
|
if domain in merged.blocks:
|
||||||
del merged[domain]
|
del merged.blocks[domain]
|
||||||
|
|
||||||
return merged
|
return merged
|
||||||
|
|
||||||
def fetch_allowlists(conf: argparse.Namespace) -> dict:
|
def fetch_allowlists(conf: argparse.Namespace) -> Blocklist:
|
||||||
"""
|
"""
|
||||||
"""
|
"""
|
||||||
if conf.allowlist_url_sources:
|
if conf.allowlist_url_sources:
|
||||||
allowlists = fetch_from_urls({}, conf.allowlist_url_sources, ALLOWLIST_IMPORT_FIELDS)
|
allowlists = fetch_from_urls(conf.allowlist_url_sources, ALLOWLIST_IMPORT_FIELDS, conf.save_intermediate, conf.savedir)
|
||||||
return allowlists
|
return allowlists
|
||||||
return {}
|
return Blocklist()
|
||||||
|
|
||||||
def fetch_from_urls(blocklists: dict, url_sources: dict,
|
def fetch_from_urls(url_sources: dict,
|
||||||
import_fields: list=IMPORT_FIELDS,
|
import_fields: list=IMPORT_FIELDS,
|
||||||
save_intermediate: bool=False,
|
save_intermediate: bool=False,
|
||||||
savedir: str=None, export_fields: list=EXPORT_FIELDS) -> dict:
|
savedir: str=None, export_fields: list=EXPORT_FIELDS) -> dict:
|
||||||
|
@ -131,7 +131,7 @@ def fetch_from_urls(blocklists: dict, url_sources: dict,
|
||||||
@returns: A dict of blocklists, same as input, but (possibly) modified
|
@returns: A dict of blocklists, same as input, but (possibly) modified
|
||||||
"""
|
"""
|
||||||
log.info("Fetching domain blocks from URLs...")
|
log.info("Fetching domain blocks from URLs...")
|
||||||
|
blocklists = []
|
||||||
for item in url_sources:
|
for item in url_sources:
|
||||||
url = item['url']
|
url = item['url']
|
||||||
# If import fields are provided, they override the global ones passed in
|
# If import fields are provided, they override the global ones passed in
|
||||||
|
@ -144,14 +144,14 @@ def fetch_from_urls(blocklists: dict, url_sources: dict,
|
||||||
listformat = item.get('format', 'csv')
|
listformat = item.get('format', 'csv')
|
||||||
with urlr.urlopen(url) as fp:
|
with urlr.urlopen(url) as fp:
|
||||||
rawdata = fp.read(URL_BLOCKLIST_MAXSIZE).decode('utf-8')
|
rawdata = fp.read(URL_BLOCKLIST_MAXSIZE).decode('utf-8')
|
||||||
blocklists[url] = parse_blocklist(rawdata, listformat, import_fields, max_severity)
|
bl = parse_blocklist(rawdata, url, listformat, import_fields, max_severity)
|
||||||
|
blocklists.append(bl)
|
||||||
if save_intermediate:
|
if save_intermediate:
|
||||||
save_intermediate_blocklist(blocklists[url], url, savedir, export_fields)
|
save_intermediate_blocklist(bl, savedir, export_fields)
|
||||||
|
|
||||||
return blocklists
|
return blocklists
|
||||||
|
|
||||||
def fetch_from_instances(blocklists: dict, sources: dict,
|
def fetch_from_instances(sources: dict,
|
||||||
import_fields: list=IMPORT_FIELDS,
|
import_fields: list=IMPORT_FIELDS,
|
||||||
save_intermediate: bool=False,
|
save_intermediate: bool=False,
|
||||||
savedir: str=None, export_fields: list=EXPORT_FIELDS) -> dict:
|
savedir: str=None, export_fields: list=EXPORT_FIELDS) -> dict:
|
||||||
|
@ -161,12 +161,13 @@ def fetch_from_instances(blocklists: dict, sources: dict,
|
||||||
@returns: A dict of blocklists, same as input, but (possibly) modified
|
@returns: A dict of blocklists, same as input, but (possibly) modified
|
||||||
"""
|
"""
|
||||||
log.info("Fetching domain blocks from instances...")
|
log.info("Fetching domain blocks from instances...")
|
||||||
|
blocklists = []
|
||||||
for item in sources:
|
for item in sources:
|
||||||
domain = item['domain']
|
domain = item['domain']
|
||||||
admin = item.get('admin', False)
|
admin = item.get('admin', False)
|
||||||
token = item.get('token', None)
|
token = item.get('token', None)
|
||||||
scheme = item.get('scheme', 'https')
|
scheme = item.get('scheme', 'https')
|
||||||
itemsrc = f"{scheme}://{domain}/api"
|
# itemsrc = f"{scheme}://{domain}/api"
|
||||||
|
|
||||||
# If import fields are provided, they override the global ones passed in
|
# If import fields are provided, they override the global ones passed in
|
||||||
source_import_fields = item.get('import_fields', None)
|
source_import_fields = item.get('import_fields', None)
|
||||||
|
@ -174,45 +175,65 @@ def fetch_from_instances(blocklists: dict, sources: dict,
|
||||||
# Ensure we always use the default fields
|
# Ensure we always use the default fields
|
||||||
import_fields = IMPORT_FIELDS.extend(source_import_fields)
|
import_fields = IMPORT_FIELDS.extend(source_import_fields)
|
||||||
|
|
||||||
# Add the blocklist with the domain as the source key
|
bl = fetch_instance_blocklist(domain, token, admin, import_fields, scheme)
|
||||||
blocklists[itemsrc] = fetch_instance_blocklist(domain, token, admin, import_fields, scheme)
|
blocklists.append(bl)
|
||||||
if save_intermediate:
|
if save_intermediate:
|
||||||
save_intermediate_blocklist(blocklists[itemsrc], domain, savedir, export_fields)
|
save_intermediate_blocklist(bl, savedir, export_fields)
|
||||||
return blocklists
|
return blocklists
|
||||||
|
|
||||||
def merge_blocklists(blocklists: dict, mergeplan: str='max') -> dict:
|
def merge_blocklists(blocklists: list[Blocklist], mergeplan: str='max',
|
||||||
|
threshold: int=0,
|
||||||
|
threshold_type: str='count') -> Blocklist:
|
||||||
"""Merge fetched remote blocklists into a bulk update
|
"""Merge fetched remote blocklists into a bulk update
|
||||||
@param blocklists: A dict of lists of DomainBlocks, keyed by source.
|
@param blocklists: A dict of lists of DomainBlocks, keyed by source.
|
||||||
Each value is a list of DomainBlocks
|
Each value is a list of DomainBlocks
|
||||||
@param mergeplan: An optional method of merging overlapping block definitions
|
@param mergeplan: An optional method of merging overlapping block definitions
|
||||||
'max' (the default) uses the highest severity block found
|
'max' (the default) uses the highest severity block found
|
||||||
'min' uses the lowest severity block found
|
'min' uses the lowest severity block found
|
||||||
|
@param threshold: An integer used in the threshold mechanism.
|
||||||
|
If a domain is not present in this number/pct or more of the blocklists,
|
||||||
|
it will not get merged into the final list.
|
||||||
|
@param threshold_type: choice of ['count', 'pct']
|
||||||
|
If `count`, threshold is met if block is present in `threshold`
|
||||||
|
or more blocklists.
|
||||||
|
If `pct`, theshold is met if block is present in
|
||||||
|
count_of_mentions / number_of_blocklists.
|
||||||
@param returns: A dict of DomainBlocks keyed by domain
|
@param returns: A dict of DomainBlocks keyed by domain
|
||||||
"""
|
"""
|
||||||
merged = {}
|
merged = Blocklist('fediblockhole.merge_blocklists')
|
||||||
|
|
||||||
for key, blist in blocklists.items():
|
num_blocklists = len(blocklists)
|
||||||
log.debug(f"processing blocklist from: {key} ...")
|
|
||||||
for newblock in blist:
|
# Create a domain keyed list of blocks for each domain
|
||||||
domain = newblock.domain
|
domain_blocks = {}
|
||||||
# If the domain has two asterisks in it, it's obfuscated
|
|
||||||
# and we can't really use it, so skip it and do the next one
|
for bl in blocklists:
|
||||||
if '*' in domain:
|
for block in bl.values():
|
||||||
log.debug(f"Domain '{domain}' is obfuscated. Skipping it.")
|
if '*' in block.domain:
|
||||||
|
log.debug(f"Domain '{block.domain}' is obfuscated. Skipping it.")
|
||||||
continue
|
continue
|
||||||
|
elif block.domain in domain_blocks:
|
||||||
elif domain in merged:
|
domain_blocks[block.domain].append(block)
|
||||||
log.debug(f"Overlapping block for domain {domain}. Merging...")
|
|
||||||
blockdata = apply_mergeplan(merged[domain], newblock, mergeplan)
|
|
||||||
|
|
||||||
else:
|
else:
|
||||||
# New block
|
domain_blocks[block.domain] = [block,]
|
||||||
blockdata = newblock
|
|
||||||
|
# Only merge items if `threshold` is met or exceeded
|
||||||
|
for domain in domain_blocks:
|
||||||
|
if threshold_type == 'count':
|
||||||
|
domain_threshold_level = len(domain_blocks[domain])
|
||||||
|
elif threshold_type == 'pct':
|
||||||
|
domain_threshold_level = len(domain_blocks[domain]) / num_blocklists
|
||||||
|
else:
|
||||||
|
raise ValueError(f"Unsupported threshold type '{threshold_type}'. Supported values are: 'count', 'pct'")
|
||||||
|
|
||||||
|
if domain_threshold_level >= threshold:
|
||||||
|
# Add first block in the list to merged
|
||||||
|
block = domain_blocks[domain][0]
|
||||||
|
# Merge the others with this record
|
||||||
|
for newblock in domain_blocks[domain][1:]:
|
||||||
|
block = apply_mergeplan(block, newblock, mergeplan)
|
||||||
|
merged.blocks[block.domain] = block
|
||||||
|
|
||||||
# end if
|
|
||||||
log.debug(f"blockdata is: {blockdata}")
|
|
||||||
merged[domain] = blockdata
|
|
||||||
# end for
|
|
||||||
return merged
|
return merged
|
||||||
|
|
||||||
def apply_mergeplan(oldblock: DomainBlock, newblock: DomainBlock, mergeplan: str='max') -> dict:
|
def apply_mergeplan(oldblock: DomainBlock, newblock: DomainBlock, mergeplan: str='max') -> dict:
|
||||||
|
@ -239,10 +260,10 @@ def apply_mergeplan(oldblock: DomainBlock, newblock: DomainBlock, mergeplan: str
|
||||||
# How do we override an earlier block definition?
|
# How do we override an earlier block definition?
|
||||||
if mergeplan in ['max', None]:
|
if mergeplan in ['max', None]:
|
||||||
# Use the highest block level found (the default)
|
# Use the highest block level found (the default)
|
||||||
log.debug(f"Using 'max' mergeplan.")
|
# log.debug(f"Using 'max' mergeplan.")
|
||||||
|
|
||||||
if newblock.severity > oldblock.severity:
|
if newblock.severity > oldblock.severity:
|
||||||
log.debug(f"New block severity is higher. Using that.")
|
# log.debug(f"New block severity is higher. Using that.")
|
||||||
blockdata['severity'] = newblock.severity
|
blockdata['severity'] = newblock.severity
|
||||||
|
|
||||||
# For 'reject_media', 'reject_reports', and 'obfuscate' if
|
# For 'reject_media', 'reject_reports', and 'obfuscate' if
|
||||||
|
@ -271,7 +292,7 @@ def apply_mergeplan(oldblock: DomainBlock, newblock: DomainBlock, mergeplan: str
|
||||||
else:
|
else:
|
||||||
raise NotImplementedError(f"Mergeplan '{mergeplan}' not implemented.")
|
raise NotImplementedError(f"Mergeplan '{mergeplan}' not implemented.")
|
||||||
|
|
||||||
log.debug(f"Block severity set to {blockdata['severity']}")
|
# log.debug(f"Block severity set to {blockdata['severity']}")
|
||||||
|
|
||||||
return DomainBlock(**blockdata)
|
return DomainBlock(**blockdata)
|
||||||
|
|
||||||
|
@ -357,17 +378,19 @@ def fetch_instance_blocklist(host: str, token: str=None, admin: bool=False,
|
||||||
|
|
||||||
url = f"{scheme}://{host}{api_path}"
|
url = f"{scheme}://{host}{api_path}"
|
||||||
|
|
||||||
blocklist = []
|
blockdata = []
|
||||||
link = True
|
link = True
|
||||||
|
|
||||||
while link:
|
while link:
|
||||||
response = requests.get(url, headers=headers, timeout=REQUEST_TIMEOUT)
|
response = requests.get(url, headers=headers, timeout=REQUEST_TIMEOUT)
|
||||||
if response.status_code != 200:
|
if response.status_code != 200:
|
||||||
log.error(f"Cannot fetch remote blocklist: {response.content}")
|
log.error(f"Cannot fetch remote blocklist: {response.content}")
|
||||||
raise ValueError("Unable to fetch domain block list: %s", response)
|
raise ValueError("Unable to fetch domain block list: %s", response)
|
||||||
|
|
||||||
blocklist.extend( parse_blocklist(response.content, parse_format, import_fields) )
|
# Each block of returned data is a JSON list of dicts
|
||||||
|
# so we parse them and append them to the fetched list
|
||||||
|
# of JSON data we need to parse.
|
||||||
|
|
||||||
|
blockdata.extend(json.loads(response.content.decode('utf-8')))
|
||||||
# Parse the link header to find the next url to fetch
|
# Parse the link header to find the next url to fetch
|
||||||
# This is a weird and janky way of doing pagination but
|
# This is a weird and janky way of doing pagination but
|
||||||
# hey nothing we can do about it we just have to deal
|
# hey nothing we can do about it we just have to deal
|
||||||
|
@ -385,6 +408,8 @@ def fetch_instance_blocklist(host: str, token: str=None, admin: bool=False,
|
||||||
urlstring, rel = next.split('; ')
|
urlstring, rel = next.split('; ')
|
||||||
url = urlstring.strip('<').rstrip('>')
|
url = urlstring.strip('<').rstrip('>')
|
||||||
|
|
||||||
|
blocklist = parse_blocklist(blockdata, url, parse_format, import_fields)
|
||||||
|
|
||||||
return blocklist
|
return blocklist
|
||||||
|
|
||||||
def delete_block(token: str, host: str, id: int, scheme: str='https'):
|
def delete_block(token: str, host: str, id: int, scheme: str='https'):
|
||||||
|
@ -474,13 +499,9 @@ def update_known_block(token: str, host: str, block: DomainBlock, scheme: str='h
|
||||||
"""Update an existing domain block with information in blockdict"""
|
"""Update an existing domain block with information in blockdict"""
|
||||||
api_path = "/api/v1/admin/domain_blocks/"
|
api_path = "/api/v1/admin/domain_blocks/"
|
||||||
|
|
||||||
try:
|
|
||||||
id = block.id
|
id = block.id
|
||||||
blockdata = block._asdict()
|
blockdata = block._asdict()
|
||||||
del blockdata['id']
|
del blockdata['id']
|
||||||
except KeyError:
|
|
||||||
import pdb
|
|
||||||
pdb.set_trace()
|
|
||||||
|
|
||||||
url = f"{scheme}://{host}{api_path}{id}"
|
url = f"{scheme}://{host}{api_path}{id}"
|
||||||
|
|
||||||
|
@ -514,7 +535,7 @@ def add_block(token: str, host: str, blockdata: DomainBlock, scheme: str='https'
|
||||||
|
|
||||||
raise ValueError(f"Something went wrong: {response.status_code}: {response.content}")
|
raise ValueError(f"Something went wrong: {response.status_code}: {response.content}")
|
||||||
|
|
||||||
def push_blocklist(token: str, host: str, blocklist: list[dict],
|
def push_blocklist(token: str, host: str, blocklist: list[DomainBlock],
|
||||||
dryrun: bool=False,
|
dryrun: bool=False,
|
||||||
import_fields: list=['domain', 'severity'],
|
import_fields: list=['domain', 'severity'],
|
||||||
max_followed_severity:BlockSeverity=BlockSeverity('silence'),
|
max_followed_severity:BlockSeverity=BlockSeverity('silence'),
|
||||||
|
@ -522,8 +543,7 @@ def push_blocklist(token: str, host: str, blocklist: list[dict],
|
||||||
):
|
):
|
||||||
"""Push a blocklist to a remote instance.
|
"""Push a blocklist to a remote instance.
|
||||||
|
|
||||||
Merging the blocklist with the existing list the instance has,
|
Updates existing entries if they exist, creates new blocks if they don't.
|
||||||
updating existing entries if they exist.
|
|
||||||
|
|
||||||
@param token: The Bearer token for OAUTH API authentication
|
@param token: The Bearer token for OAUTH API authentication
|
||||||
@param host: The instance host, FQDN or IP
|
@param host: The instance host, FQDN or IP
|
||||||
|
@ -538,15 +558,16 @@ def push_blocklist(token: str, host: str, blocklist: list[dict],
|
||||||
serverblocks = fetch_instance_blocklist(host, token, True, import_fields, scheme)
|
serverblocks = fetch_instance_blocklist(host, token, True, import_fields, scheme)
|
||||||
|
|
||||||
# # Convert serverblocks to a dictionary keyed by domain name
|
# # Convert serverblocks to a dictionary keyed by domain name
|
||||||
knownblocks = {row.domain: row for row in serverblocks}
|
# knownblocks = {row.domain: row for row in serverblocks}
|
||||||
|
|
||||||
for newblock in blocklist:
|
for newblock in blocklist.values():
|
||||||
|
|
||||||
log.debug(f"Processing block: {newblock}")
|
log.debug(f"Processing block: {newblock}")
|
||||||
oldblock = knownblocks.get(newblock.domain, None)
|
if newblock.domain in serverblocks:
|
||||||
if oldblock:
|
|
||||||
log.debug(f"Block already exists for {newblock.domain}, checking for differences...")
|
log.debug(f"Block already exists for {newblock.domain}, checking for differences...")
|
||||||
|
|
||||||
|
oldblock = serverblocks[newblock.domain]
|
||||||
|
|
||||||
change_needed = is_change_needed(oldblock, newblock, import_fields)
|
change_needed = is_change_needed(oldblock, newblock, import_fields)
|
||||||
|
|
||||||
# Is the severity changing?
|
# Is the severity changing?
|
||||||
|
@ -605,15 +626,14 @@ def load_config(configfile: str):
|
||||||
conf = toml.load(configfile)
|
conf = toml.load(configfile)
|
||||||
return conf
|
return conf
|
||||||
|
|
||||||
def save_intermediate_blocklist(
|
def save_intermediate_blocklist(blocklist: Blocklist, filedir: str,
|
||||||
blocklist: list[dict], source: str,
|
|
||||||
filedir: str,
|
|
||||||
export_fields: list=['domain','severity']):
|
export_fields: list=['domain','severity']):
|
||||||
"""Save a local copy of a blocklist we've downloaded
|
"""Save a local copy of a blocklist we've downloaded
|
||||||
"""
|
"""
|
||||||
# Invent a filename based on the remote source
|
# Invent a filename based on the remote source
|
||||||
# If the source was a URL, convert it to something less messy
|
# If the source was a URL, convert it to something less messy
|
||||||
# If the source was a remote domain, just use the name of the domain
|
# If the source was a remote domain, just use the name of the domain
|
||||||
|
source = blocklist.origin
|
||||||
log.debug(f"Saving intermediate blocklist from {source}")
|
log.debug(f"Saving intermediate blocklist from {source}")
|
||||||
source = source.replace('/','-')
|
source = source.replace('/','-')
|
||||||
filename = f"{source}.csv"
|
filename = f"{source}.csv"
|
||||||
|
@ -621,7 +641,7 @@ def save_intermediate_blocklist(
|
||||||
save_blocklist_to_file(blocklist, filepath, export_fields)
|
save_blocklist_to_file(blocklist, filepath, export_fields)
|
||||||
|
|
||||||
def save_blocklist_to_file(
|
def save_blocklist_to_file(
|
||||||
blocklist: list[DomainBlock],
|
blocklist: Blocklist,
|
||||||
filepath: str,
|
filepath: str,
|
||||||
export_fields: list=['domain','severity']):
|
export_fields: list=['domain','severity']):
|
||||||
"""Save a blocklist we've downloaded from a remote source
|
"""Save a blocklist we've downloaded from a remote source
|
||||||
|
@ -631,18 +651,22 @@ def save_blocklist_to_file(
|
||||||
@param export_fields: Which fields to include in the export.
|
@param export_fields: Which fields to include in the export.
|
||||||
"""
|
"""
|
||||||
try:
|
try:
|
||||||
blocklist = sorted(blocklist, key=lambda x: x.domain)
|
sorted_list = sorted(blocklist.blocks.items())
|
||||||
except KeyError:
|
except KeyError:
|
||||||
log.error("Field 'domain' not found in blocklist.")
|
log.error("Field 'domain' not found in blocklist.")
|
||||||
log.debug(f"blocklist is: {blocklist}")
|
log.debug(f"blocklist is: {sorted_list}")
|
||||||
|
except AttributeError:
|
||||||
|
log.error("Attribute error!")
|
||||||
|
import pdb
|
||||||
|
pdb.set_trace()
|
||||||
|
|
||||||
log.debug(f"export fields: {export_fields}")
|
log.debug(f"export fields: {export_fields}")
|
||||||
|
|
||||||
with open(filepath, "w") as fp:
|
with open(filepath, "w") as fp:
|
||||||
writer = csv.DictWriter(fp, export_fields, extrasaction='ignore')
|
writer = csv.DictWriter(fp, export_fields, extrasaction='ignore')
|
||||||
writer.writeheader()
|
writer.writeheader()
|
||||||
for item in blocklist:
|
for key, value in sorted_list:
|
||||||
writer.writerow(item._asdict())
|
writer.writerow(value)
|
||||||
|
|
||||||
def augment_args(args, tomldata: str=None):
|
def augment_args(args, tomldata: str=None):
|
||||||
"""Augment commandline arguments with config file parameters
|
"""Augment commandline arguments with config file parameters
|
||||||
|
|
|
@ -1,19 +1,47 @@
|
||||||
"""Parse various blocklist data formats
|
"""Parse various blocklist data formats
|
||||||
"""
|
"""
|
||||||
from typing import Iterable
|
|
||||||
from .const import DomainBlock, BlockSeverity
|
|
||||||
|
|
||||||
import csv
|
import csv
|
||||||
import json
|
import json
|
||||||
|
from typing import Iterable
|
||||||
|
from dataclasses import dataclass, field
|
||||||
|
|
||||||
|
from .const import DomainBlock, BlockSeverity
|
||||||
|
|
||||||
import logging
|
import logging
|
||||||
log = logging.getLogger('fediblockhole')
|
log = logging.getLogger('fediblockhole')
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class Blocklist:
|
||||||
|
""" A Blocklist object
|
||||||
|
|
||||||
|
A Blocklist is a list of DomainBlocks from an origin
|
||||||
|
"""
|
||||||
|
origin: str = None
|
||||||
|
blocks: dict[str, DomainBlock] = field(default_factory=dict)
|
||||||
|
|
||||||
|
def __len__(self):
|
||||||
|
return len(self.blocks)
|
||||||
|
|
||||||
|
def __class_getitem__(cls, item):
|
||||||
|
return dict[str, DomainBlock]
|
||||||
|
|
||||||
|
def __getitem__(self, item):
|
||||||
|
return self.blocks[item]
|
||||||
|
|
||||||
|
def __iter__(self):
|
||||||
|
return self.blocks.__iter__()
|
||||||
|
|
||||||
|
def items(self):
|
||||||
|
return self.blocks.items()
|
||||||
|
|
||||||
|
def values(self):
|
||||||
|
return self.blocks.values()
|
||||||
|
|
||||||
class BlocklistParser(object):
|
class BlocklistParser(object):
|
||||||
"""
|
"""
|
||||||
Base class for parsing blocklists
|
Base class for parsing blocklists
|
||||||
"""
|
"""
|
||||||
preparse = False
|
do_preparse = False
|
||||||
|
|
||||||
def __init__(self, import_fields: list=['domain', 'severity'],
|
def __init__(self, import_fields: list=['domain', 'severity'],
|
||||||
max_severity: str='suspend'):
|
max_severity: str='suspend'):
|
||||||
|
@ -30,17 +58,18 @@ class BlocklistParser(object):
|
||||||
"""
|
"""
|
||||||
raise NotImplementedError
|
raise NotImplementedError
|
||||||
|
|
||||||
def parse_blocklist(self, blockdata) -> dict[DomainBlock]:
|
def parse_blocklist(self, blockdata, origin:str=None) -> Blocklist:
|
||||||
"""Parse an iterable of blocklist items
|
"""Parse an iterable of blocklist items
|
||||||
@param blocklist: An Iterable of blocklist items
|
@param blocklist: An Iterable of blocklist items
|
||||||
@returns: A dict of DomainBlocks, keyed by domain
|
@returns: A dict of DomainBlocks, keyed by domain
|
||||||
"""
|
"""
|
||||||
if self.preparse:
|
if self.do_preparse:
|
||||||
blockdata = self.preparse(blockdata)
|
blockdata = self.preparse(blockdata)
|
||||||
|
|
||||||
parsed_list = []
|
parsed_list = Blocklist(origin)
|
||||||
for blockitem in blockdata:
|
for blockitem in blockdata:
|
||||||
parsed_list.append(self.parse_item(blockitem))
|
block = self.parse_item(blockitem)
|
||||||
|
parsed_list.blocks[block.domain] = block
|
||||||
return parsed_list
|
return parsed_list
|
||||||
|
|
||||||
def parse_item(self, blockitem) -> DomainBlock:
|
def parse_item(self, blockitem) -> DomainBlock:
|
||||||
|
@ -53,12 +82,13 @@ class BlocklistParser(object):
|
||||||
|
|
||||||
class BlocklistParserJSON(BlocklistParser):
|
class BlocklistParserJSON(BlocklistParser):
|
||||||
"""Parse a JSON formatted blocklist"""
|
"""Parse a JSON formatted blocklist"""
|
||||||
preparse = True
|
do_preparse = True
|
||||||
|
|
||||||
def preparse(self, blockdata) -> Iterable:
|
def preparse(self, blockdata) -> Iterable:
|
||||||
"""Parse the blockdata as JSON
|
"""Parse the blockdata as JSON if needed"""
|
||||||
"""
|
if type(blockdata) == type(''):
|
||||||
return json.loads(blockdata)
|
return json.loads(blockdata)
|
||||||
|
return blockdata
|
||||||
|
|
||||||
def parse_item(self, blockitem: dict) -> DomainBlock:
|
def parse_item(self, blockitem: dict) -> DomainBlock:
|
||||||
# Remove fields we don't want to import
|
# Remove fields we don't want to import
|
||||||
|
@ -102,7 +132,7 @@ class BlocklistParserCSV(BlocklistParser):
|
||||||
|
|
||||||
The parser expects the CSV data to include a header with the field names.
|
The parser expects the CSV data to include a header with the field names.
|
||||||
"""
|
"""
|
||||||
preparse = True
|
do_preparse = True
|
||||||
|
|
||||||
def preparse(self, blockdata) -> Iterable:
|
def preparse(self, blockdata) -> Iterable:
|
||||||
"""Use a csv.DictReader to create an iterable from the blockdata
|
"""Use a csv.DictReader to create an iterable from the blockdata
|
||||||
|
@ -202,11 +232,13 @@ FORMAT_PARSERS = {
|
||||||
# helper function to select the appropriate Parser
|
# helper function to select the appropriate Parser
|
||||||
def parse_blocklist(
|
def parse_blocklist(
|
||||||
blockdata,
|
blockdata,
|
||||||
|
origin,
|
||||||
format="csv",
|
format="csv",
|
||||||
import_fields: list=['domain', 'severity'],
|
import_fields: list=['domain', 'severity'],
|
||||||
max_severity: str='suspend'):
|
max_severity: str='suspend'):
|
||||||
"""Parse a blocklist in the given format
|
"""Parse a blocklist in the given format
|
||||||
"""
|
"""
|
||||||
parser = FORMAT_PARSERS[format](import_fields, max_severity)
|
|
||||||
log.debug(f"parsing {format} blocklist with import_fields: {import_fields}...")
|
log.debug(f"parsing {format} blocklist with import_fields: {import_fields}...")
|
||||||
return parser.parse_blocklist(blockdata)
|
|
||||||
|
parser = FORMAT_PARSERS[format](import_fields, max_severity)
|
||||||
|
return parser.parse_blocklist(blockdata, origin)
|
|
@ -4,6 +4,7 @@ import pytest
|
||||||
|
|
||||||
from util import shim_argparse
|
from util import shim_argparse
|
||||||
from fediblockhole.const import DomainBlock
|
from fediblockhole.const import DomainBlock
|
||||||
|
from fediblockhole.blocklists import Blocklist
|
||||||
from fediblockhole import fetch_allowlists, apply_allowlists
|
from fediblockhole import fetch_allowlists, apply_allowlists
|
||||||
|
|
||||||
def test_cmdline_allow_removes_domain():
|
def test_cmdline_allow_removes_domain():
|
||||||
|
@ -11,16 +12,12 @@ def test_cmdline_allow_removes_domain():
|
||||||
"""
|
"""
|
||||||
conf = shim_argparse(['-A', 'removeme.org'])
|
conf = shim_argparse(['-A', 'removeme.org'])
|
||||||
|
|
||||||
merged = {
|
merged = Blocklist('test_allowlist.merged', {
|
||||||
'example.org': DomainBlock('example.org'),
|
'example.org': DomainBlock('example.org'),
|
||||||
'example2.org': DomainBlock('example2.org'),
|
'example2.org': DomainBlock('example2.org'),
|
||||||
'removeme.org': DomainBlock('removeme.org'),
|
'removeme.org': DomainBlock('removeme.org'),
|
||||||
'keepblockingme.org': DomainBlock('keepblockingme.org'),
|
'keepblockingme.org': DomainBlock('keepblockingme.org'),
|
||||||
}
|
})
|
||||||
|
|
||||||
# allowlists = {
|
|
||||||
# 'testlist': [ DomainBlock('removeme.org', 'noop'), ]
|
|
||||||
# }
|
|
||||||
|
|
||||||
merged = apply_allowlists(merged, conf, {})
|
merged = apply_allowlists(merged, conf, {})
|
||||||
|
|
||||||
|
@ -32,16 +29,18 @@ def test_allowlist_removes_domain():
|
||||||
"""
|
"""
|
||||||
conf = shim_argparse()
|
conf = shim_argparse()
|
||||||
|
|
||||||
merged = {
|
merged = Blocklist('test_allowlist.merged', {
|
||||||
'example.org': DomainBlock('example.org'),
|
'example.org': DomainBlock('example.org'),
|
||||||
'example2.org': DomainBlock('example2.org'),
|
'example2.org': DomainBlock('example2.org'),
|
||||||
'removeme.org': DomainBlock('removeme.org'),
|
'removeme.org': DomainBlock('removeme.org'),
|
||||||
'keepblockingme.org': DomainBlock('keepblockingme.org'),
|
'keepblockingme.org': DomainBlock('keepblockingme.org'),
|
||||||
}
|
})
|
||||||
|
|
||||||
allowlists = {
|
allowlists = [
|
||||||
'testlist': [ DomainBlock('removeme.org', 'noop'), ]
|
Blocklist('test_allowlist', {
|
||||||
}
|
'removeme.org': DomainBlock('removeme.org', 'noop'),
|
||||||
|
})
|
||||||
|
]
|
||||||
|
|
||||||
merged = apply_allowlists(merged, conf, allowlists)
|
merged = apply_allowlists(merged, conf, allowlists)
|
||||||
|
|
||||||
|
@ -53,19 +52,19 @@ def test_allowlist_removes_tld():
|
||||||
"""
|
"""
|
||||||
conf = shim_argparse()
|
conf = shim_argparse()
|
||||||
|
|
||||||
merged = {
|
merged = Blocklist('test_allowlist.merged', {
|
||||||
'.cf': DomainBlock('.cf'),
|
'.cf': DomainBlock('.cf'),
|
||||||
'example.org': DomainBlock('example.org'),
|
'example.org': DomainBlock('example.org'),
|
||||||
'.tk': DomainBlock('.tk'),
|
'.tk': DomainBlock('.tk'),
|
||||||
'keepblockingme.org': DomainBlock('keepblockingme.org'),
|
'keepblockingme.org': DomainBlock('keepblockingme.org'),
|
||||||
}
|
})
|
||||||
|
|
||||||
allowlists = {
|
allowlists = [
|
||||||
'list1': [
|
Blocklist('test_allowlist.list1', {
|
||||||
DomainBlock('.cf', 'noop'),
|
'.cf': DomainBlock('.cf', 'noop'),
|
||||||
DomainBlock('.tk', 'noop'),
|
'.tk': DomainBlock('.tk', 'noop'),
|
||||||
|
})
|
||||||
]
|
]
|
||||||
}
|
|
||||||
|
|
||||||
merged = apply_allowlists(merged, conf, allowlists)
|
merged = apply_allowlists(merged, conf, allowlists)
|
||||||
|
|
||||||
|
|
|
@ -1,7 +1,7 @@
|
||||||
"""Various mergeplan tests
|
"""Various mergeplan tests
|
||||||
"""
|
"""
|
||||||
|
|
||||||
from fediblockhole.blocklist_parser import parse_blocklist
|
from fediblockhole.blocklists import parse_blocklist
|
||||||
from fediblockhole import merge_blocklists, merge_comments, apply_mergeplan
|
from fediblockhole import merge_blocklists, merge_comments, apply_mergeplan
|
||||||
|
|
||||||
from fediblockhole.const import SeverityLevel, DomainBlock
|
from fediblockhole.const import SeverityLevel, DomainBlock
|
||||||
|
@ -22,20 +22,19 @@ import_fields = [
|
||||||
|
|
||||||
def load_test_blocklist_data(datafiles):
|
def load_test_blocklist_data(datafiles):
|
||||||
|
|
||||||
blocklists = {}
|
blocklists = []
|
||||||
|
|
||||||
for df in datafiles:
|
for df in datafiles:
|
||||||
with open(df) as fp:
|
with open(df) as fp:
|
||||||
data = fp.read()
|
data = fp.read()
|
||||||
bl = parse_blocklist(data, 'csv', import_fields)
|
bl = parse_blocklist(data, df, 'csv', import_fields)
|
||||||
blocklists[df] = bl
|
blocklists.append(bl)
|
||||||
|
|
||||||
return blocklists
|
return blocklists
|
||||||
|
|
||||||
def test_mergeplan_max():
|
def test_mergeplan_max():
|
||||||
"""Test 'max' mergeplan"""
|
"""Test 'max' mergeplan"""
|
||||||
blocklists = load_test_blocklist_data([datafile01, datafile02])
|
blocklists = load_test_blocklist_data([datafile01, datafile02])
|
||||||
|
|
||||||
bl = merge_blocklists(blocklists, 'max')
|
bl = merge_blocklists(blocklists, 'max')
|
||||||
assert len(bl) == 13
|
assert len(bl) == 13
|
||||||
|
|
||||||
|
|
|
@ -1,22 +1,24 @@
|
||||||
"""Tests of the CSV parsing
|
"""Tests of the CSV parsing
|
||||||
"""
|
"""
|
||||||
|
|
||||||
from fediblockhole.blocklist_parser import BlocklistParserCSV, parse_blocklist
|
from fediblockhole.blocklists import BlocklistParserCSV, parse_blocklist
|
||||||
from fediblockhole.const import DomainBlock, BlockSeverity, SeverityLevel
|
from fediblockhole.const import SeverityLevel
|
||||||
|
|
||||||
|
|
||||||
def test_single_line():
|
def test_single_line():
|
||||||
csvdata = "example.org"
|
csvdata = "example.org"
|
||||||
|
origin = "csvfile"
|
||||||
|
|
||||||
parser = BlocklistParserCSV()
|
parser = BlocklistParserCSV()
|
||||||
bl = parser.parse_blocklist(csvdata)
|
bl = parser.parse_blocklist(csvdata, origin)
|
||||||
assert len(bl) == 0
|
assert len(bl) == 0
|
||||||
|
|
||||||
def test_header_only():
|
def test_header_only():
|
||||||
csvdata = "domain,severity,public_comment"
|
csvdata = "domain,severity,public_comment"
|
||||||
|
origin = "csvfile"
|
||||||
|
|
||||||
parser = BlocklistParserCSV()
|
parser = BlocklistParserCSV()
|
||||||
bl = parser.parse_blocklist(csvdata)
|
bl = parser.parse_blocklist(csvdata, origin)
|
||||||
assert len(bl) == 0
|
assert len(bl) == 0
|
||||||
|
|
||||||
def test_2_blocks():
|
def test_2_blocks():
|
||||||
|
@ -24,12 +26,13 @@ def test_2_blocks():
|
||||||
example.org,silence
|
example.org,silence
|
||||||
example2.org,suspend
|
example2.org,suspend
|
||||||
"""
|
"""
|
||||||
|
origin = "csvfile"
|
||||||
|
|
||||||
parser = BlocklistParserCSV()
|
parser = BlocklistParserCSV()
|
||||||
bl = parser.parse_blocklist(csvdata)
|
bl = parser.parse_blocklist(csvdata, origin)
|
||||||
|
|
||||||
assert len(bl) == 2
|
assert len(bl) == 2
|
||||||
assert bl[0].domain == 'example.org'
|
assert 'example.org' in bl
|
||||||
|
|
||||||
def test_4_blocks():
|
def test_4_blocks():
|
||||||
csvdata = """domain,severity,public_comment
|
csvdata = """domain,severity,public_comment
|
||||||
|
@ -38,20 +41,21 @@ example2.org,suspend,"test 2"
|
||||||
example3.org,noop,"test 3"
|
example3.org,noop,"test 3"
|
||||||
example4.org,suspend,"test 4"
|
example4.org,suspend,"test 4"
|
||||||
"""
|
"""
|
||||||
|
origin = "csvfile"
|
||||||
|
|
||||||
parser = BlocklistParserCSV()
|
parser = BlocklistParserCSV()
|
||||||
bl = parser.parse_blocklist(csvdata)
|
bl = parser.parse_blocklist(csvdata, origin)
|
||||||
|
|
||||||
assert len(bl) == 4
|
assert len(bl) == 4
|
||||||
assert bl[0].domain == 'example.org'
|
assert 'example.org' in bl
|
||||||
assert bl[1].domain == 'example2.org'
|
assert 'example2.org' in bl
|
||||||
assert bl[2].domain == 'example3.org'
|
assert 'example3.org' in bl
|
||||||
assert bl[3].domain == 'example4.org'
|
assert 'example4.org' in bl
|
||||||
|
|
||||||
assert bl[0].severity.level == SeverityLevel.SILENCE
|
assert bl['example.org'].severity.level == SeverityLevel.SILENCE
|
||||||
assert bl[1].severity.level == SeverityLevel.SUSPEND
|
assert bl['example2.org'].severity.level == SeverityLevel.SUSPEND
|
||||||
assert bl[2].severity.level == SeverityLevel.NONE
|
assert bl['example3.org'].severity.level == SeverityLevel.NONE
|
||||||
assert bl[3].severity.level == SeverityLevel.SUSPEND
|
assert bl['example4.org'].severity.level == SeverityLevel.SUSPEND
|
||||||
|
|
||||||
def test_ignore_comments():
|
def test_ignore_comments():
|
||||||
csvdata = """domain,severity,public_comment,private_comment
|
csvdata = """domain,severity,public_comment,private_comment
|
||||||
|
@ -60,18 +64,18 @@ example2.org,suspend,"test 2","ignote me also"
|
||||||
example3.org,noop,"test 3","and me"
|
example3.org,noop,"test 3","and me"
|
||||||
example4.org,suspend,"test 4","also me"
|
example4.org,suspend,"test 4","also me"
|
||||||
"""
|
"""
|
||||||
|
origin = "csvfile"
|
||||||
|
|
||||||
parser = BlocklistParserCSV()
|
parser = BlocklistParserCSV()
|
||||||
bl = parser.parse_blocklist(csvdata)
|
bl = parser.parse_blocklist(csvdata, origin)
|
||||||
|
|
||||||
assert len(bl) == 4
|
assert len(bl) == 4
|
||||||
assert bl[0].domain == 'example.org'
|
assert 'example.org' in bl
|
||||||
assert bl[1].domain == 'example2.org'
|
assert 'example2.org' in bl
|
||||||
assert bl[2].domain == 'example3.org'
|
assert 'example3.org' in bl
|
||||||
assert bl[3].domain == 'example4.org'
|
assert 'example4.org' in bl
|
||||||
|
|
||||||
assert bl[0].public_comment == ''
|
assert bl['example.org'].public_comment == ''
|
||||||
assert bl[0].private_comment == ''
|
assert bl['example.org'].private_comment == ''
|
||||||
|
assert bl['example3.org'].public_comment == ''
|
||||||
assert bl[2].public_comment == ''
|
assert bl['example4.org'].private_comment == ''
|
||||||
assert bl[2].private_comment == ''
|
|
|
@ -1,8 +1,8 @@
|
||||||
"""Tests of the CSV parsing
|
"""Tests of the CSV parsing
|
||||||
"""
|
"""
|
||||||
|
|
||||||
from fediblockhole.blocklist_parser import BlocklistParserJSON, parse_blocklist
|
from fediblockhole.blocklists import BlocklistParserJSON, parse_blocklist
|
||||||
from fediblockhole.const import DomainBlock, BlockSeverity, SeverityLevel
|
from fediblockhole.const import SeverityLevel
|
||||||
|
|
||||||
datafile = 'data-mastodon.json'
|
datafile = 'data-mastodon.json'
|
||||||
|
|
||||||
|
@ -14,33 +14,32 @@ def test_json_parser():
|
||||||
|
|
||||||
data = load_data()
|
data = load_data()
|
||||||
parser = BlocklistParserJSON()
|
parser = BlocklistParserJSON()
|
||||||
bl = parser.parse_blocklist(data)
|
bl = parser.parse_blocklist(data, 'test_json')
|
||||||
|
|
||||||
assert len(bl) == 10
|
assert len(bl) == 10
|
||||||
assert bl[0].domain == 'example.org'
|
assert 'example.org' in bl
|
||||||
assert bl[1].domain == 'example2.org'
|
assert 'example2.org' in bl
|
||||||
assert bl[2].domain == 'example3.org'
|
assert 'example3.org' in bl
|
||||||
assert bl[3].domain == 'example4.org'
|
assert 'example4.org' in bl
|
||||||
|
|
||||||
assert bl[0].severity.level == SeverityLevel.SUSPEND
|
assert bl['example.org'].severity.level == SeverityLevel.SUSPEND
|
||||||
assert bl[1].severity.level == SeverityLevel.SILENCE
|
assert bl['example2.org'].severity.level == SeverityLevel.SILENCE
|
||||||
assert bl[2].severity.level == SeverityLevel.SUSPEND
|
assert bl['example3.org'].severity.level == SeverityLevel.SUSPEND
|
||||||
assert bl[3].severity.level == SeverityLevel.NONE
|
assert bl['example4.org'].severity.level == SeverityLevel.NONE
|
||||||
|
|
||||||
def test_ignore_comments():
|
def test_ignore_comments():
|
||||||
|
|
||||||
data = load_data()
|
data = load_data()
|
||||||
parser = BlocklistParserJSON()
|
parser = BlocklistParserJSON()
|
||||||
bl = parser.parse_blocklist(data)
|
bl = parser.parse_blocklist(data, 'test_json')
|
||||||
|
|
||||||
assert len(bl) == 10
|
assert len(bl) == 10
|
||||||
assert bl[0].domain == 'example.org'
|
assert 'example.org' in bl
|
||||||
assert bl[1].domain == 'example2.org'
|
assert 'example2.org' in bl
|
||||||
assert bl[2].domain == 'example3.org'
|
assert 'example3.org' in bl
|
||||||
assert bl[3].domain == 'example4.org'
|
assert 'example4.org' in bl
|
||||||
|
|
||||||
assert bl[0].public_comment == ''
|
assert bl['example.org'].public_comment == ''
|
||||||
assert bl[0].private_comment == ''
|
assert bl['example.org'].private_comment == ''
|
||||||
|
assert bl['example3.org'].public_comment == ''
|
||||||
assert bl[2].public_comment == ''
|
assert bl['example4.org'].private_comment == ''
|
||||||
assert bl[2].private_comment == ''
|
|
|
@ -1,7 +1,7 @@
|
||||||
"""Tests of the Rapidblock CSV parsing
|
"""Tests of the Rapidblock CSV parsing
|
||||||
"""
|
"""
|
||||||
|
|
||||||
from fediblockhole.blocklist_parser import RapidBlockParserCSV, parse_blocklist
|
from fediblockhole.blocklists import RapidBlockParserCSV, parse_blocklist
|
||||||
from fediblockhole.const import DomainBlock, BlockSeverity, SeverityLevel
|
from fediblockhole.const import DomainBlock, BlockSeverity, SeverityLevel
|
||||||
|
|
||||||
csvdata = """example.org\r\nsubdomain.example.org\r\nanotherdomain.org\r\ndomain4.org\r\n"""
|
csvdata = """example.org\r\nsubdomain.example.org\r\nanotherdomain.org\r\ndomain4.org\r\n"""
|
||||||
|
@ -11,13 +11,13 @@ def test_basic_rapidblock():
|
||||||
|
|
||||||
bl = parser.parse_blocklist(csvdata)
|
bl = parser.parse_blocklist(csvdata)
|
||||||
assert len(bl) == 4
|
assert len(bl) == 4
|
||||||
assert bl[0].domain == 'example.org'
|
assert 'example.org' in bl
|
||||||
assert bl[1].domain == 'subdomain.example.org'
|
assert 'subdomain.example.org' in bl
|
||||||
assert bl[2].domain == 'anotherdomain.org'
|
assert 'anotherdomain.org' in bl
|
||||||
assert bl[3].domain == 'domain4.org'
|
assert 'domain4.org' in bl
|
||||||
|
|
||||||
def test_severity_is_suspend():
|
def test_severity_is_suspend():
|
||||||
bl = parser.parse_blocklist(csvdata)
|
bl = parser.parse_blocklist(csvdata)
|
||||||
|
|
||||||
for block in bl:
|
for block in bl.values():
|
||||||
assert block.severity.level == SeverityLevel.SUSPEND
|
assert block.severity.level == SeverityLevel.SUSPEND
|
|
@ -1,6 +1,6 @@
|
||||||
"""Test parsing the RapidBlock JSON format
|
"""Test parsing the RapidBlock JSON format
|
||||||
"""
|
"""
|
||||||
from fediblockhole.blocklist_parser import parse_blocklist
|
from fediblockhole.blocklists import parse_blocklist
|
||||||
|
|
||||||
from fediblockhole.const import SeverityLevel
|
from fediblockhole.const import SeverityLevel
|
||||||
|
|
||||||
|
@ -9,26 +9,26 @@ rapidblockjson = "data-rapidblock.json"
|
||||||
def test_parse_rapidblock_json():
|
def test_parse_rapidblock_json():
|
||||||
with open(rapidblockjson) as fp:
|
with open(rapidblockjson) as fp:
|
||||||
data = fp.read()
|
data = fp.read()
|
||||||
bl = parse_blocklist(data, 'rapidblock.json')
|
bl = parse_blocklist(data, 'pytest', 'rapidblock.json')
|
||||||
|
|
||||||
assert bl[0].domain == '101010.pl'
|
assert '101010.pl' in bl
|
||||||
assert bl[0].severity.level == SeverityLevel.SUSPEND
|
assert bl['101010.pl'].severity.level == SeverityLevel.SUSPEND
|
||||||
assert bl[0].public_comment == ''
|
assert bl['101010.pl'].public_comment == ''
|
||||||
|
|
||||||
assert bl[10].domain == 'berserker.town'
|
assert 'berserker.town' in bl
|
||||||
assert bl[10].severity.level == SeverityLevel.SUSPEND
|
assert bl['berserker.town'].severity.level == SeverityLevel.SUSPEND
|
||||||
assert bl[10].public_comment == ''
|
assert bl['berserker.town'].public_comment == ''
|
||||||
assert bl[10].private_comment == ''
|
assert bl['berserker.town'].private_comment == ''
|
||||||
|
|
||||||
def test_parse_with_comments():
|
def test_parse_with_comments():
|
||||||
with open(rapidblockjson) as fp:
|
with open(rapidblockjson) as fp:
|
||||||
data = fp.read()
|
data = fp.read()
|
||||||
bl = parse_blocklist(data, 'rapidblock.json', ['domain', 'severity', 'public_comment', 'private_comment'])
|
bl = parse_blocklist(data, 'pytest', 'rapidblock.json', ['domain', 'severity', 'public_comment', 'private_comment'])
|
||||||
|
|
||||||
assert bl[0].domain == '101010.pl'
|
assert '101010.pl' in bl
|
||||||
assert bl[0].severity.level == SeverityLevel.SUSPEND
|
assert bl['101010.pl'].severity.level == SeverityLevel.SUSPEND
|
||||||
assert bl[0].public_comment == 'cryptomining javascript, white supremacy'
|
assert bl['101010.pl'].public_comment == 'cryptomining javascript, white supremacy'
|
||||||
|
|
||||||
assert bl[10].domain == 'berserker.town'
|
assert 'berserker.town' in bl
|
||||||
assert bl[10].severity.level == SeverityLevel.SUSPEND
|
assert bl['berserker.town'].severity.level == SeverityLevel.SUSPEND
|
||||||
assert bl[10].public_comment == 'freeze peach'
|
assert bl['berserker.town'].public_comment == 'freeze peach'
|
Loading…
Reference in New Issue