Support a merge threshold level.

Added a Blocklist object.
Refactored tests to match changed code.
This commit is contained in:
Justin Warren 2023-01-17 09:04:34 +11:00
parent ede2918740
commit 70b1ff32ff
No known key found for this signature in database
8 changed files with 165 additions and 101 deletions

View File

@ -11,7 +11,7 @@ import os.path
import sys
import urllib.request as urlr
from .blocklist_parser import parse_blocklist
from .blocklists import Blocklist, parse_blocklist
from .const import DomainBlock, BlockSeverity
from importlib.metadata import version
@ -178,41 +178,71 @@ def fetch_from_instances(blocklists: dict, sources: dict,
save_intermediate_blocklist(blocklists[itemsrc], domain, savedir, export_fields)
return blocklists
def merge_blocklists(blocklists: dict, mergeplan: str='max') -> dict:
def merge_blocklists(blocklists: list[Blocklist], mergeplan: str='max', threshold: int=0) -> dict:
"""Merge fetched remote blocklists into a bulk update
@param blocklists: A dict of lists of DomainBlocks, keyed by source.
Each value is a list of DomainBlocks
@param mergeplan: An optional method of merging overlapping block definitions
'max' (the default) uses the highest severity block found
'min' uses the lowest severity block found
@param threshold: An integer percentage [0-100].
If a domain is not present in this pct or more of the blocklists,
it will not get merged into the final list.
@param returns: A dict of DomainBlocks keyed by domain
"""
merged = {}
for key, blist in blocklists.items():
log.debug(f"processing blocklist from: {key} ...")
for newblock in blist:
domain = newblock.domain
# If the domain has two asterisks in it, it's obfuscated
# and we can't really use it, so skip it and do the next one
if '*' in domain:
num_blocklists = len(blocklists)
# Create a domain keyed list of blocks for each domain
domain_blocks = {}
for bl in blocklists:
for block in bl.values():
if '*' in block.domain:
log.debug(f"Domain '{domain}' is obfuscated. Skipping it.")
continue
elif domain in merged:
log.debug(f"Overlapping block for domain {domain}. Merging...")
blockdata = apply_mergeplan(merged[domain], newblock, mergeplan)
elif block.domain in domain_blocks:
domain_blocks[block.domain].append(block)
else:
# New block
blockdata = newblock
domain_blocks[block.domain] = [block,]
# end if
log.debug(f"blockdata is: {blockdata}")
merged[domain] = blockdata
# end for
# Only merge items if there are more than `threshold` pct of them
for domain in domain_blocks:
pct = len(domain_blocks[domain]) / num_blocklists
if pct >= threshold:
# Add first block in the list to merged
merged[domain] = domain_blocks[domain][0]
# Merge the others with this record
for block in domain_blocks[domain][1:]:
merged[domain] = apply_mergeplan(merged[domain], block, mergeplan)
return merged
# for key, blist in blocklists.items():
# log.debug(f"processing blocklist from: {key} ...")
# for newblock in blist:
# domain = newblock.domain
# # If the domain has two asterisks in it, it's obfuscated
# # and we can't really use it, so skip it and do the next one
# if '*' in domain:
# log.debug(f"Domain '{domain}' is obfuscated. Skipping it.")
# continue
# elif domain in merged:
# log.debug(f"Overlapping block for domain {domain}. Merging...")
# blockdata = apply_mergeplan(merged[domain], newblock, mergeplan)
# else:
# # New block
# blockdata = newblock
# # end if
# log.debug(f"blockdata is: {blockdata}")
# merged[domain] = blockdata
# # end for
# return merged
def apply_mergeplan(oldblock: DomainBlock, newblock: DomainBlock, mergeplan: str='max') -> dict:
"""Use a mergeplan to decide how to merge two overlapping block definitions

View File

@ -1,14 +1,42 @@
"""Parse various blocklist data formats
"""
from typing import Iterable
from .const import DomainBlock, BlockSeverity
import csv
import json
from typing import Iterable
from dataclasses import dataclass, field
from .const import DomainBlock, BlockSeverity
import logging
log = logging.getLogger('fediblockhole')
@dataclass
class Blocklist:
""" A Blocklist object
A Blocklist is a list of DomainBlocks from an origin
"""
origin: str = None
blocks: dict[str, DomainBlock] = field(default_factory=dict)
def __len__(self):
return len(self.blocks)
def __class_getitem__(cls, item):
return dict[str, DomainBlock]
def __getitem__(self, item):
return self.blocks[item]
def __iter__(self):
return self.blocks.__iter__()
def items(self):
return self.blocks.items()
def values(self):
return self.blocks.values()
class BlocklistParser(object):
"""
Base class for parsing blocklists
@ -30,7 +58,7 @@ class BlocklistParser(object):
"""
raise NotImplementedError
def parse_blocklist(self, blockdata) -> dict[DomainBlock]:
def parse_blocklist(self, blockdata, origin:str=None) -> Blocklist:
"""Parse an iterable of blocklist items
@param blocklist: An Iterable of blocklist items
@returns: A dict of DomainBlocks, keyed by domain
@ -38,9 +66,10 @@ class BlocklistParser(object):
if self.preparse:
blockdata = self.preparse(blockdata)
parsed_list = []
parsed_list = Blocklist(origin)
for blockitem in blockdata:
parsed_list.append(self.parse_item(blockitem))
block = self.parse_item(blockitem)
parsed_list.blocks[block.domain] = block
return parsed_list
def parse_item(self, blockitem) -> DomainBlock:
@ -178,6 +207,7 @@ FORMAT_PARSERS = {
# helper function to select the appropriate Parser
def parse_blocklist(
blockdata,
origin,
format="csv",
import_fields: list=['domain', 'severity'],
max_severity: str='suspend'):
@ -185,4 +215,4 @@ def parse_blocklist(
"""
parser = FORMAT_PARSERS[format](import_fields, max_severity)
log.debug(f"parsing {format} blocklist with import_fields: {import_fields}...")
return parser.parse_blocklist(blockdata)
return parser.parse_blocklist(blockdata, origin)

View File

@ -123,7 +123,8 @@ class DomainBlock(object):
reject_media: bool=False,
reject_reports: bool=False,
obfuscate: bool=False,
id: int=None):
id: int=None,
count: int=0):
"""Initialize the DomainBlock
"""
self.domain = domain
@ -134,6 +135,7 @@ class DomainBlock(object):
self.reject_reports = reject_reports
self.obfuscate = obfuscate
self.id = id
self.count = 0
@property
def severity(self):

View File

@ -1,7 +1,7 @@
"""Various mergeplan tests
"""
from fediblockhole.blocklist_parser import parse_blocklist
from fediblockhole.blocklists import parse_blocklist
from fediblockhole import merge_blocklists, merge_comments, apply_mergeplan
from fediblockhole.const import SeverityLevel, DomainBlock
@ -22,20 +22,19 @@ import_fields = [
def load_test_blocklist_data(datafiles):
blocklists = {}
blocklists = []
for df in datafiles:
with open(df) as fp:
data = fp.read()
bl = parse_blocklist(data, 'csv', import_fields)
blocklists[df] = bl
bl = parse_blocklist(data, df, 'csv', import_fields)
blocklists.append(bl)
return blocklists
def test_mergeplan_max():
"""Test 'max' mergeplan"""
blocklists = load_test_blocklist_data([datafile01, datafile02])
bl = merge_blocklists(blocklists, 'max')
assert len(bl) == 13

View File

@ -1,22 +1,24 @@
"""Tests of the CSV parsing
"""
from fediblockhole.blocklist_parser import BlocklistParserCSV, parse_blocklist
from fediblockhole.const import DomainBlock, BlockSeverity, SeverityLevel
from fediblockhole.blocklists import BlocklistParserCSV, parse_blocklist
from fediblockhole.const import SeverityLevel
def test_single_line():
csvdata = "example.org"
origin = "csvfile"
parser = BlocklistParserCSV()
bl = parser.parse_blocklist(csvdata)
bl = parser.parse_blocklist(csvdata, origin)
assert len(bl) == 0
def test_header_only():
csvdata = "domain,severity,public_comment"
origin = "csvfile"
parser = BlocklistParserCSV()
bl = parser.parse_blocklist(csvdata)
bl = parser.parse_blocklist(csvdata, origin)
assert len(bl) == 0
def test_2_blocks():
@ -24,12 +26,13 @@ def test_2_blocks():
example.org,silence
example2.org,suspend
"""
origin = "csvfile"
parser = BlocklistParserCSV()
bl = parser.parse_blocklist(csvdata)
bl = parser.parse_blocklist(csvdata, origin)
assert len(bl) == 2
assert bl[0].domain == 'example.org'
assert 'example.org' in bl
def test_4_blocks():
csvdata = """domain,severity,public_comment
@ -38,20 +41,21 @@ example2.org,suspend,"test 2"
example3.org,noop,"test 3"
example4.org,suspend,"test 4"
"""
origin = "csvfile"
parser = BlocklistParserCSV()
bl = parser.parse_blocklist(csvdata)
bl = parser.parse_blocklist(csvdata, origin)
assert len(bl) == 4
assert bl[0].domain == 'example.org'
assert bl[1].domain == 'example2.org'
assert bl[2].domain == 'example3.org'
assert bl[3].domain == 'example4.org'
assert 'example.org' in bl
assert 'example2.org' in bl
assert 'example3.org' in bl
assert 'example4.org' in bl
assert bl[0].severity.level == SeverityLevel.SILENCE
assert bl[1].severity.level == SeverityLevel.SUSPEND
assert bl[2].severity.level == SeverityLevel.NONE
assert bl[3].severity.level == SeverityLevel.SUSPEND
assert bl['example.org'].severity.level == SeverityLevel.SILENCE
assert bl['example2.org'].severity.level == SeverityLevel.SUSPEND
assert bl['example3.org'].severity.level == SeverityLevel.NONE
assert bl['example4.org'].severity.level == SeverityLevel.SUSPEND
def test_ignore_comments():
csvdata = """domain,severity,public_comment,private_comment
@ -60,18 +64,18 @@ example2.org,suspend,"test 2","ignote me also"
example3.org,noop,"test 3","and me"
example4.org,suspend,"test 4","also me"
"""
origin = "csvfile"
parser = BlocklistParserCSV()
bl = parser.parse_blocklist(csvdata)
bl = parser.parse_blocklist(csvdata, origin)
assert len(bl) == 4
assert bl[0].domain == 'example.org'
assert bl[1].domain == 'example2.org'
assert bl[2].domain == 'example3.org'
assert bl[3].domain == 'example4.org'
assert 'example.org' in bl
assert 'example2.org' in bl
assert 'example3.org' in bl
assert 'example4.org' in bl
assert bl[0].public_comment == ''
assert bl[0].private_comment == ''
assert bl[2].public_comment == ''
assert bl[2].private_comment == ''
assert bl['example.org'].public_comment == ''
assert bl['example.org'].private_comment == ''
assert bl['example3.org'].public_comment == ''
assert bl['example4.org'].private_comment == ''

View File

@ -1,8 +1,8 @@
"""Tests of the CSV parsing
"""
from fediblockhole.blocklist_parser import BlocklistParserJSON, parse_blocklist
from fediblockhole.const import DomainBlock, BlockSeverity, SeverityLevel
from fediblockhole.blocklists import BlocklistParserJSON, parse_blocklist
from fediblockhole.const import SeverityLevel
datafile = 'data-mastodon.json'
@ -14,33 +14,32 @@ def test_json_parser():
data = load_data()
parser = BlocklistParserJSON()
bl = parser.parse_blocklist(data)
bl = parser.parse_blocklist(data, 'test_json')
assert len(bl) == 10
assert bl[0].domain == 'example.org'
assert bl[1].domain == 'example2.org'
assert bl[2].domain == 'example3.org'
assert bl[3].domain == 'example4.org'
assert 'example.org' in bl
assert 'example2.org' in bl
assert 'example3.org' in bl
assert 'example4.org' in bl
assert bl[0].severity.level == SeverityLevel.SUSPEND
assert bl[1].severity.level == SeverityLevel.SILENCE
assert bl[2].severity.level == SeverityLevel.SUSPEND
assert bl[3].severity.level == SeverityLevel.NONE
assert bl['example.org'].severity.level == SeverityLevel.SUSPEND
assert bl['example2.org'].severity.level == SeverityLevel.SILENCE
assert bl['example3.org'].severity.level == SeverityLevel.SUSPEND
assert bl['example4.org'].severity.level == SeverityLevel.NONE
def test_ignore_comments():
data = load_data()
parser = BlocklistParserJSON()
bl = parser.parse_blocklist(data)
bl = parser.parse_blocklist(data, 'test_json')
assert len(bl) == 10
assert bl[0].domain == 'example.org'
assert bl[1].domain == 'example2.org'
assert bl[2].domain == 'example3.org'
assert bl[3].domain == 'example4.org'
assert 'example.org' in bl
assert 'example2.org' in bl
assert 'example3.org' in bl
assert 'example4.org' in bl
assert bl[0].public_comment == ''
assert bl[0].private_comment == ''
assert bl[2].public_comment == ''
assert bl[2].private_comment == ''
assert bl['example.org'].public_comment == ''
assert bl['example.org'].private_comment == ''
assert bl['example3.org'].public_comment == ''
assert bl['example4.org'].private_comment == ''

View File

@ -1,7 +1,7 @@
"""Tests of the Rapidblock CSV parsing
"""
from fediblockhole.blocklist_parser import RapidBlockParserCSV, parse_blocklist
from fediblockhole.blocklists import RapidBlockParserCSV, parse_blocklist
from fediblockhole.const import DomainBlock, BlockSeverity, SeverityLevel
csvdata = """example.org\r\nsubdomain.example.org\r\nanotherdomain.org\r\ndomain4.org\r\n"""
@ -11,13 +11,13 @@ def test_basic_rapidblock():
bl = parser.parse_blocklist(csvdata)
assert len(bl) == 4
assert bl[0].domain == 'example.org'
assert bl[1].domain == 'subdomain.example.org'
assert bl[2].domain == 'anotherdomain.org'
assert bl[3].domain == 'domain4.org'
assert 'example.org' in bl
assert 'subdomain.example.org' in bl
assert 'anotherdomain.org' in bl
assert 'domain4.org' in bl
def test_severity_is_suspend():
bl = parser.parse_blocklist(csvdata)
for block in bl:
for block in bl.values():
assert block.severity.level == SeverityLevel.SUSPEND

View File

@ -1,6 +1,6 @@
"""Test parsing the RapidBlock JSON format
"""
from fediblockhole.blocklist_parser import parse_blocklist
from fediblockhole.blocklists import parse_blocklist
from fediblockhole.const import SeverityLevel
@ -9,26 +9,26 @@ rapidblockjson = "data-rapidblock.json"
def test_parse_rapidblock_json():
with open(rapidblockjson) as fp:
data = fp.read()
bl = parse_blocklist(data, 'rapidblock.json')
bl = parse_blocklist(data, 'pytest', 'rapidblock.json')
assert bl[0].domain == '101010.pl'
assert bl[0].severity.level == SeverityLevel.SUSPEND
assert bl[0].public_comment == ''
assert '101010.pl' in bl
assert bl['101010.pl'].severity.level == SeverityLevel.SUSPEND
assert bl['101010.pl'].public_comment == ''
assert bl[10].domain == 'berserker.town'
assert bl[10].severity.level == SeverityLevel.SUSPEND
assert bl[10].public_comment == ''
assert bl[10].private_comment == ''
assert 'berserker.town' in bl
assert bl['berserker.town'].severity.level == SeverityLevel.SUSPEND
assert bl['berserker.town'].public_comment == ''
assert bl['berserker.town'].private_comment == ''
def test_parse_with_comments():
with open(rapidblockjson) as fp:
data = fp.read()
bl = parse_blocklist(data, 'rapidblock.json', ['domain', 'severity', 'public_comment', 'private_comment'])
bl = parse_blocklist(data, 'pytest', 'rapidblock.json', ['domain', 'severity', 'public_comment', 'private_comment'])
assert bl[0].domain == '101010.pl'
assert bl[0].severity.level == SeverityLevel.SUSPEND
assert bl[0].public_comment == 'cryptomining javascript, white supremacy'
assert '101010.pl' in bl
assert bl['101010.pl'].severity.level == SeverityLevel.SUSPEND
assert bl['101010.pl'].public_comment == 'cryptomining javascript, white supremacy'
assert bl[10].domain == 'berserker.town'
assert bl[10].severity.level == SeverityLevel.SUSPEND
assert bl[10].public_comment == 'freeze peach'
assert 'berserker.town' in bl
assert bl['berserker.town'].severity.level == SeverityLevel.SUSPEND
assert bl['berserker.town'].public_comment == 'freeze peach'