Merge pull request #1 from eigenmagic/main

Bring in all the updates
This commit is contained in:
Shawn Grigson 2023-08-18 11:18:56 -05:00 committed by GitHub
commit 525dc876e0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
27 changed files with 1005 additions and 210 deletions

View File

@ -6,7 +6,36 @@ This project uses [Semantic Versioning] and generally follows the conventions of
## [Unreleased] ## [Unreleased]
- Planning to add allowlist thresholds as noted in #28 ## [v0.4.4] - 2023-07-09
### Added
- Added citation for creators of #Fediblock (a64875b)
- Added parser for Mastodon 4.1 blocklist CSV format (9f95f14)
- Added container support (76d5b61)
### Fixed
- Use __future__.annotations so type hints work with Python < 2.9 (8265639)
- test util no longer tries to load default config file if conf tomldata is empty. (2da57b2)
## [v0.4.3] - 2023-02-13
### Added
- Added Mastodon public API parser type because #33 (9fe9342)
- Added ability to set scheme when talking to instances (9fe9342)
- Added tests of comment merging. (fb3a7ec)
- Added blocklist thresholds. (bb1d89e)
- Added logging to help debug threshold-based merging. (b67ff0c)
- Added extra documentation on configuring thresholds. (6c72af8)
- Updated documentation to reflect Mastodon v4.1.0 changes to the application scopes screen. (b92dd21)
### Changed
- Dropped minimum Python version to 3.6 (df3c16f)
- Don't merge comments if new comment is empty. (b8aa11e)
- Tweaked comment merging to pass tests. (fb3a7ec)
## [v0.4.2] - 2023-01-19 ## [v0.4.2] - 2023-01-19

View File

@ -6,15 +6,19 @@ The broad design goal for FediBlockHole is to support pulling in a list of
blocklists from a set of trusted sources, merge them into a combined blocklist, blocklists from a set of trusted sources, merge them into a combined blocklist,
and then push that merged list to a set of managed instances. and then push that merged list to a set of managed instances.
Inspired by the way PiHole works for maintaining a set of blocklists of adtech
domains.
Mastodon admins can choose who they think maintain quality lists and subscribe Mastodon admins can choose who they think maintain quality lists and subscribe
to them, helping to distribute the load for maintaining blocklists among a to them, helping to distribute the load for maintaining blocklists among a
community of people. Control ultimately rests with the admins themselves so they community of people. Control ultimately rests with the admins themselves so they
can outsource as much, or as little, of the effort to others as they deem can outsource as much, or as little, of the effort to others as they deem
appropriate. appropriate.
Inspired by the way PiHole works for maintaining a set of blocklists of adtech
domains. Builds on the work of
[@CaribenxMarciaX@scholar.social](https://scholar.social/@CaribenxMarciaX) and
[@gingerrroot@kitty.town](https://kitty.town/@gingerrroot) who started the
#Fediblock hashtag and did a lot of advocacy around it, often at great personal
cost.
## Features ## Features
### Blocklist Sources ### Blocklist Sources
@ -41,6 +45,8 @@ appropriate.
- Provides (hopefully) sensible defaults to minimise first-time setup. - Provides (hopefully) sensible defaults to minimise first-time setup.
- Global and fine-grained configuration options available for those complex situations that crop up sometimes. - Global and fine-grained configuration options available for those complex situations that crop up sometimes.
- Allowlists to override blocks in blocklists to ensure you never block instances you want to keep.
- Blocklist thresholds if you want to only block when an instance shows up in multiple blocklists.
## Installing ## Installing
@ -79,17 +85,16 @@ admin to add a new Application at
`https://<instance-domain>/settings/applications/` and then tell you the access `https://<instance-domain>/settings/applications/` and then tell you the access
token. token.
The application needs the `admin:read:domain_blocks` OAuth scope, but The application needs the `admin:read:domain_blocks` OAuth scope. You can allow
unfortunately this scope isn't available in the current application screen full `admin:read` access, but be aware that this authorizes someone to read all
(v4.0.2 of Mastodon at time of writing, but this has been fixed in the main the data in the instance. That's asking a lot of a remote instance admin who
branch). just wants to share domain_blocks with you.
You can allow full `admin:read` access, but be aware that this authorizes The `admin:read:domain_blocks` scope is available as of Mastodon v4.1.0, but for
someone to read all the data in the instance. That's asking a lot of a remote earlier versions admins will need to use the manual method described below.
instance admin who just wants to share domain_blocks with you.
For now, you can ask the instance admin to update the scope in the database You can update the scope for your application in the database directly like
directly like this: this:
``` ```
UPDATE oauth_applications as app UPDATE oauth_applications as app
@ -134,8 +139,12 @@ chmod o-r <configfile>
``` ```
You can also grant full `admin:write` scope to the application, but if you'd You can also grant full `admin:write` scope to the application, but if you'd
prefer to keep things more tightly secured you'll need to use SQL to set the prefer to keep things more tightly secured, limit the scope to
scopes in the database and then regenerate the token: `admin:read:domain_blocks`.
Again, this scope is only available in the application config screen as of
Mastodon v4.1.0. If your instance is on an earlier version, you'll need to use
SQL to set the scopes in the database and then regenerate the token:
``` ```
UPDATE oauth_applications as app UPDATE oauth_applications as app
@ -192,6 +201,7 @@ Supported formats are currently:
- Comma-Separated Values (CSV) - Comma-Separated Values (CSV)
- JSON - JSON
- Mastodon v4.1 flavoured CSV
- RapidBlock CSV - RapidBlock CSV
- RapidBlock JSON - RapidBlock JSON
@ -209,6 +219,17 @@ A CSV format blocklist must contain a header row with at least a `domain` and `s
Optional fields, as listed about, may also be included. Optional fields, as listed about, may also be included.
#### Mastodon v4.1 CSV format
As of v4.1.0, Mastodon can export domain blocks as a CSV file. However, in their
infinite wisdom, the Mastodon devs decided that field names should begin with a
`#` character in the header, unlike the field names in the JSON output via the
API… or in pretty much any other CSV file anywhere else.
Setting the format to `mastodon_csv` will strip off the `#` character when
parsing and FediBlockHole can then use Mastodon v4.1 CSV blocklists like any
other CSV formatted blocklist.
#### JSON format #### JSON format
JSON is also supported. It uses the same format as the JSON returned from the Mastodon API. JSON is also supported. It uses the same format as the JSON returned from the Mastodon API.

34
chart/.helmignore Normal file
View File

@ -0,0 +1,34 @@
# A helm chart's templates and default values can be packaged into a .tgz file.
# When doing that, not everything should be bundled into the .tgz file. This
# file describes what to not bundle.
#
# Manually added by us
# --------------------
#
# Boilerplate .helmignore from `helm create mastodon`
# ---------------------------------------------------
#
# Patterns to ignore when building packages.
# This supports shell glob matching, relative path matching, and
# negation (prefixed with !). Only one pattern per line.
.DS_Store
# Common VCS dirs
.git/
.gitignore
.bzr/
.bzrignore
.hg/
.hgignore
.svn/
# Common backup files
*.swp
*.bak
*.tmp
*.orig
*~
# Various IDEs
.project
.idea/
*.tmproj
.vscode/

23
chart/Chart.yaml Normal file
View File

@ -0,0 +1,23 @@
apiVersion: v2
name: fediblockhole
description: FediBlockHole is a tool for keeping a Mastodon instance blocklist synchronised with remote lists.
# A chart can be either an 'application' or a 'library' chart.
#
# Application charts are a collection of templates that can be packaged into versioned archives
# to be deployed.
#
# Library charts provide useful utilities or functions for the chart developer. They're included as
# a dependency of application charts to inject those utilities and functions into the rendering
# pipeline. Library charts do not define any templates and therefore cannot be deployed.
type: application
# This is the chart version. This version number should be incremented each time you make changes
# to the chart and its templates, including the app version.
# Versions are expected to follow Semantic Versioning (https://semver.org/)
version: 1.1.0
# This is the version number of the application being deployed. This version number should be
# incremented each time you make changes to the application. Versions are not expected to
# follow Semantic Versioning. They should reflect the version the application is using.
appVersion: 0.4.2

View File

@ -0,0 +1,67 @@
# List of instances to read blocklists from.
# If the instance makes its blocklist public, no authorization token is needed.
# Otherwise, `token` is a Bearer token authorised to read domain_blocks.
# If `admin` = True, use the more detailed admin API, which requires a token with a
# higher level of authorization.
# If `import_fields` are provided, only import these fields from the instance.
# Overrides the global `import_fields` setting.
blocklist_instance_sources = [
# { domain = 'public.blocklist'}, # an instance with a public list of domain_blocks
# { domain = 'jorts.horse', token = '<a_different_token>' }, # user accessible block list
# { domain = 'eigenmagic.net', token = '<a_token_with_read_auth>', admin = true }, # admin access required
]
# List of URLs to read csv blocklists from
# Format tells the parser which format to use when parsing the blocklist
# max_severity tells the parser to override any severities that are higher than this value
# import_fields tells the parser to only import that set of fields from a specific source
blocklist_url_sources = [
# { url = 'file:///path/to/fediblockhole/samples/demo-blocklist-01.csv', format = 'csv' },
{ url = 'https://raw.githubusercontent.com/eigenmagic/fediblockhole/main/samples/demo-blocklist-01.csv', format = 'csv' },
]
## These global allowlists override blocks from blocklists
# These are the same format and structure as blocklists, but they take precedence
allowlist_url_sources = [
{ url = 'https://raw.githubusercontent.com/eigenmagic/fediblockhole/main/samples/demo-allowlist-01.csv', format = 'csv' },
{ url = 'https://raw.githubusercontent.com/eigenmagic/fediblockhole/main/samples/demo-allowlist-02.csv', format = 'csv' },
]
# List of instances to write blocklist to
blocklist_instance_destinations = [
# { domain = 'eigenmagic.net', token = '<read_write_token>', max_followed_severity = 'silence'},
]
## Store a local copy of the remote blocklists after we fetch them
#save_intermediate = true
## Directory to store the local blocklist copies
# savedir = '/tmp'
## File to save the fully merged blocklist into
# blocklist_savefile = '/tmp/merged_blocklist.csv'
## Don't push blocklist to instances, even if they're defined above
# no_push_instance = false
## Don't fetch blocklists from URLs, even if they're defined above
# no_fetch_url = false
## Don't fetch blocklists from instances, even if they're defined above
# no_fetch_instance = false
## Set the mergeplan to use when dealing with overlaps between blocklists
# The default 'max' mergeplan will use the harshest severity block found for a domain.
# The 'min' mergeplan will use the lightest severity block found for a domain.
# mergeplan = 'max'
## Set which fields we import
## 'domain' and 'severity' are always imported, these are additional
##
import_fields = ['public_comment', 'reject_media', 'reject_reports', 'obfuscate']
## Set which fields we export
## 'domain' and 'severity' are always exported, these are additional
##
export_fields = ['public_comment']

View File

@ -0,0 +1,70 @@
{{/* vim: set filetype=mustache: */}}
{{/*
Expand the name of the chart.
*/}}
{{- define "fediblockhole.name" -}}
{{- default .Chart.Name .Values.nameOverride | trunc 63 | trimSuffix "-" }}
{{- end }}
{{/*
Create a default fully qualified app name.
We truncate at 63 chars because some Kubernetes name fields are limited to this (by the DNS naming spec).
If release name contains chart name it will be used as a full name.
*/}}
{{- define "fediblockhole.fullname" -}}
{{- if .Values.fullnameOverride }}
{{- .Values.fullnameOverride | trunc 63 | trimSuffix "-" }}
{{- else }}
{{- $name := default .Chart.Name .Values.nameOverride }}
{{- if contains $name .Release.Name }}
{{- .Release.Name | trunc 63 | trimSuffix "-" }}
{{- else }}
{{- printf "%s-%s" .Release.Name $name | trunc 63 | trimSuffix "-" }}
{{- end }}
{{- end }}
{{- end }}
{{/*
Create chart name and version as used by the chart label.
*/}}
{{- define "fediblockhole.chart" -}}
{{- printf "%s-%s" .Chart.Name .Chart.Version | replace "+" "_" | trunc 63 | trimSuffix "-" }}
{{- end }}
{{/*
Common labels
*/}}
{{- define "fediblockhole.labels" -}}
helm.sh/chart: {{ include "fediblockhole.chart" . }}
{{ include "fediblockhole.selectorLabels" . }}
{{- if .Chart.AppVersion }}
app.kubernetes.io/version: {{ .Chart.AppVersion | quote }}
{{- end }}
app.kubernetes.io/managed-by: {{ .Release.Service }}
{{- end }}
{{/*
Selector labels
*/}}
{{- define "fediblockhole.selectorLabels" -}}
app.kubernetes.io/name: {{ include "fediblockhole.name" . }}
app.kubernetes.io/instance: {{ .Release.Name }}
{{- end }}
{{/*
Rolling pod annotations
*/}}
{{- define "fediblockhole.rollingPodAnnotations" -}}
rollme: {{ .Release.Revision | quote }}
checksum/config-configmap: {{ include ( print $.Template.BasePath "/configmap-conf-toml.yaml" ) . | sha256sum | quote }}
{{- end }}
{{/*
Create the default conf file path and filename
*/}}
{{- define "fediblockhole.conf_file_path" -}}
{{- default "/etc/default/" .Values.fediblockhole.conf_file.path }}
{{- end }}
{{- define "fediblockhole.conf_file_filename" -}}
{{- default "fediblockhole.conf.toml" .Values.fediblockhole.conf_file.filename }}
{{- end }}

View File

@ -0,0 +1,8 @@
apiVersion: v1
kind: ConfigMap
metadata:
name: {{ include "fediblockhole.fullname" . }}-conf-toml
labels:
{{- include "fediblockhole.labels" . | nindent 4 }}
data:
{{ (.Files.Glob "fediblockhole.conf.toml").AsConfig | nindent 4 }}

View File

@ -0,0 +1,68 @@
{{ if .Values.fediblockhole.cron.sync.enabled -}}
apiVersion: batch/v1
kind: CronJob
metadata:
name: {{ include "fediblockhole.fullname" . }}-sync
labels:
{{- include "fediblockhole.labels" . | nindent 4 }}
spec:
schedule: {{ .Values.fediblockhole.cron.sync.schedule }}
failedJobsHistoryLimit: {{ .Values.fediblockhole.cron.sync.failedJobsHistoryLimit }}
successfulJobsHistoryLimit: {{ .Values.fediblockhole.cron.sync.successfulJobsHistoryLimit }}
jobTemplate:
spec:
template:
metadata:
name: {{ include "fediblockhole.fullname" . }}-sync
{{- with .Values.jobAnnotations }}
annotations:
{{- toYaml . | nindent 12 }}
{{- end }}
spec:
restartPolicy: OnFailure
containers:
- name: {{ include "fediblockhole.fullname" . }}-sync
image: "{{ .Values.image.repository }}:{{ .Values.image.tag | default .Chart.AppVersion }}"
imagePullPolicy: {{ .Values.image.pullPolicy }}
command:
- fediblock-sync
- -c
- "{{- include "fediblockhole.conf_file_path" . -}}{{- include "fediblockhole.conf_file_filename" . -}}"
volumeMounts:
- name: config
mountPath: "{{- include "fediblockhole.conf_file_path" . -}}{{- include "fediblockhole.conf_file_filename" . -}}"
subPath: "{{- include "fediblockhole.conf_file_filename" . -}}"
{{ if .Values.fediblockhole.allow_file.filename }}
- name: allowfile
mountPath: "{{- include "fediblockhole.conf_file_path" . -}}{{- .Values.fediblockhole.allow_file.filename -}}"
subPath: "{{- .Values.fediblockhole.allow_file.filename -}}"
{{ end }}
{{ if .Values.fediblockhole.block_file.filename }}
- name: blockfile
mountPath: "{{- include "fediblockhole.conf_file_path" . -}}{{- .Values.fediblockhole.block_file.filename -}}"
subPath: "{{- .Values.fediblockhole.block_file.filename -}}"
{{ end }}
volumes:
- name: config
configMap:
name: {{ include "fediblockhole.fullname" . }}-conf-toml
items:
- key: {{ include "fediblockhole.conf_file_filename" . | quote }}
path: {{ include "fediblockhole.conf_file_filename" . | quote }}
{{ if .Values.fediblockhole.allow_file.filename }}
- name: allowfile
configMap:
name: {{ include "fediblockhole.fullname" . }}-allow-csv
items:
- key: {{ .Values.fediblockhole.allow_file.filename | quote }}
path: {{ .Values.fediblockhole.allow_file.filename | quote }}
{{ end }}
{{ if .Values.fediblockhole.block_file.filename }}
- name: blockfile
configMap:
name: {{ include "fediblockhole.fullname" . }}-block-csv
items:
- key: {{ .Values.fediblockhole.block_file.filename | quote }}
path: {{ .Values.fediblockhole.block_file.filename | quote }}
{{ end }}
{{- end }}

77
chart/values.yaml Normal file
View File

@ -0,0 +1,77 @@
image:
repository: ghcr.io/cunningpike/fediblockhole
# https://github.com/cunningpike/fediblockhole/pkgs/container/fediblockhole/versions
#
# alternatively, use `latest` for the latest release or `edge` for the image
# built from the most recent commit
#
# tag: latest
tag: ""
# use `Always` when using `latest` tag
pullPolicy: IfNotPresent
fediblockhole:
# location of the configuration file. Default is /etc/default/fediblockhole.conf.toml
conf_file:
path: ""
filename: ""
# Location of a local allowlist file. It is recommended that this file should at a
# minimum contain the web_domain of your own instance.
allow_file:
# Optionally, set the name of the file. This should match the data key in the
# associated ConfigMap
filename: ""
# Location of a local blocklist file.
block_file:
# Optionally, set the name of the file. This should match the data key in the
# associated ConfigMap
filename: ""
cron:
# -- run `fediblock-sync` every hour
sync:
# @ignored
enabled: false
# @ignored
schedule: "0 * * * *"
failedJobsHistoryLimit: 1
successfulJobsHistoryLimit: 3
# if you manually change the UID/GID environment variables, ensure these values
# match:
podSecurityContext:
runAsUser: 991
runAsGroup: 991
fsGroup: 991
# @ignored
securityContext: {}
# -- Kubernetes manages pods for jobs and pods for deployments differently, so you might
# need to apply different annotations to the two different sets of pods. The annotations
# set with podAnnotations will be added to all deployment-managed pods.
podAnnotations: {}
# -- The annotations set with jobAnnotations will be added to all job pods.
jobAnnotations: {}
# -- Default resources for all Deployments and jobs unless overwritten
resources: {}
# We usually recommend not to specify default resources and to leave this as a conscious
# choice for the user. This also increases chances charts run on environments with little
# resources, such as Minikube. If you do want to specify resources, uncomment the following
# lines, adjust them as necessary, and remove the curly braces after 'resources:'.
# limits:
# cpu: 100m
# memory: 128Mi
# requests:
# cpu: 100m
# memory: 128Mi
# @ignored
nodeSelector: {}
# @ignored
tolerations: []
# -- Affinity for all pods unless overwritten
affinity: {}

6
container/.dockerignore Normal file
View File

@ -0,0 +1,6 @@
Dockerfile
#README.md
*.pyc
*.pyo
*.pyd
__pycache__

14
container/Dockerfile Normal file
View File

@ -0,0 +1,14 @@
# Use the official lightweight Python image.
# https://hub.docker.com/_/python
FROM python:slim
# Copy local code to the container image.
ENV APP_HOME /app
WORKDIR $APP_HOME
# Install production dependencies.
RUN pip install fediblockhole
USER 1001
# Set the command on start to fediblock-sync.
ENTRYPOINT ["fediblock-sync"]

View File

@ -56,6 +56,24 @@ blocklist_instance_destinations = [
# The 'min' mergeplan will use the lightest severity block found for a domain. # The 'min' mergeplan will use the lightest severity block found for a domain.
# mergeplan = 'max' # mergeplan = 'max'
## Optional threshold-based merging.
# Only merge in domain blocks if the domain is mentioned in
# at least `threshold` blocklists.
# `merge_thresold` is an integer, with a default value of 0.
# The `merge_threshold_type` can be `count` or `pct`.
# If `count` type is selected, the threshold is reached when the domain
# is mentioned in at least `merge_threshold` blocklists. The default value
# of 0 means that every block in every list will be merged in.
# If `pct` type is selected, `merge_threshold` is interpreted as a percentage,
# i.e. if `merge_threshold` = 20, blocks will only be merged in if the domain
# is present in at least 20% of blocklists.
# Percentage calculated as number_of_mentions / total_number_of_blocklists.
# The percentage method is more flexibile, but also more complicated, so take care
# when using it.
#
# merge_threshold_type = 'count'
# merge_threshold = 0
## Set which fields we import ## Set which fields we import
## 'domain' and 'severity' are always imported, these are additional ## 'domain' and 'severity' are always imported, these are additional
## ##

View File

@ -1,10 +1,10 @@
[project] [project]
name = "fediblockhole" name = "fediblockhole"
version = "0.4.2" version = "0.4.4"
description = "Federated blocklist management for Mastodon" description = "Federated blocklist management for Mastodon"
readme = "README.md" readme = "README.md"
license = {file = "LICENSE"} license = {file = "LICENSE"}
requires-python = ">=3.10" requires-python = ">=3.6"
keywords = ["mastodon", "fediblock"] keywords = ["mastodon", "fediblock"]
authors = [ authors = [
{name = "Justin Warren"}, {email = "justin@eigenmagic.com"} {name = "Justin Warren"}, {email = "justin@eigenmagic.com"}
@ -17,6 +17,10 @@ classifiers = [
"Natural Language :: English", "Natural Language :: English",
"Programming Language :: Python :: 3", "Programming Language :: Python :: 3",
"Programming Language :: Python :: 3.10", "Programming Language :: Python :: 3.10",
"Programming Language :: Python :: 3.9",
"Programming Language :: Python :: 3.8",
"Programming Language :: Python :: 3.7",
"Programming Language :: Python :: 3.6",
] ]
dependencies = [ dependencies = [
"requests", "requests",

View File

@ -1,3 +1,4 @@
"domain","severity","private_comment","public_comment","reject_media","reject_reports","obfuscate" "domain","severity","private_comment","public_comment","reject_media","reject_reports","obfuscate"
"eigenmagic.net","noop","Never block me","Only the domain field matters",False,False,False "eigenmagic.net","noop","Never block me","Only the domain field matters for allowlists",False,False,False
"example.org","noop","Never block me either","The severity is ignored as are all other fields",False,False,False "example.org","noop","Never block me either","The severity is ignored in allowlists as are all other fields",False,False,False
"demo01.example.org","noop","Never block me either","But you can use them to leave yourself or others notes on why the item is here",False,False,False

1 domain severity private_comment public_comment reject_media reject_reports obfuscate
2 eigenmagic.net noop Never block me Only the domain field matters Only the domain field matters for allowlists False False False
3 example.org noop Never block me either The severity is ignored as are all other fields The severity is ignored in allowlists as are all other fields False False False
4 demo01.example.org noop Never block me either But you can use them to leave yourself or others notes on why the item is here False False False

View File

@ -1,6 +1,6 @@
"""A tool for managing federated Mastodon blocklists """A tool for managing federated Mastodon blocklists
""" """
from __future__ import annotations
import argparse import argparse
import toml import toml
import csv import csv
@ -11,7 +11,7 @@ import os.path
import sys import sys
import urllib.request as urlr import urllib.request as urlr
from .blocklist_parser import parse_blocklist from .blocklists import Blocklist, parse_blocklist
from .const import DomainBlock, BlockSeverity from .const import DomainBlock, BlockSeverity
from importlib.metadata import version from importlib.metadata import version
@ -59,19 +59,19 @@ def sync_blocklists(conf: argparse.Namespace):
# Add extra export fields if defined in config # Add extra export fields if defined in config
export_fields.extend(conf.export_fields) export_fields.extend(conf.export_fields)
blocklists = {} blocklists = []
# Fetch blocklists from URLs # Fetch blocklists from URLs
if not conf.no_fetch_url: if not conf.no_fetch_url:
blocklists = fetch_from_urls(blocklists, conf.blocklist_url_sources, blocklists.extend(fetch_from_urls(conf.blocklist_url_sources,
import_fields, conf.save_intermediate, conf.savedir, export_fields) import_fields, conf.save_intermediate, conf.savedir, export_fields))
# Fetch blocklists from remote instances # Fetch blocklists from remote instances
if not conf.no_fetch_instance: if not conf.no_fetch_instance:
blocklists = fetch_from_instances(blocklists, conf.blocklist_instance_sources, blocklists.extend(fetch_from_instances(conf.blocklist_instance_sources,
import_fields, conf.save_intermediate, conf.savedir, export_fields) import_fields, conf.save_intermediate, conf.savedir, export_fields))
# Merge blocklists into an update dict # Merge blocklists into an update dict
merged = merge_blocklists(blocklists, conf.mergeplan) merged = merge_blocklists(blocklists, conf.mergeplan, conf.merge_threshold, conf.merge_threshold_type)
# Remove items listed in allowlists, if any # Remove items listed in allowlists, if any
allowlists = fetch_allowlists(conf) allowlists = fetch_allowlists(conf)
@ -80,48 +80,48 @@ def sync_blocklists(conf: argparse.Namespace):
# Save the final mergelist, if requested # Save the final mergelist, if requested
if conf.blocklist_savefile: if conf.blocklist_savefile:
log.info(f"Saving merged blocklist to {conf.blocklist_savefile}") log.info(f"Saving merged blocklist to {conf.blocklist_savefile}")
save_blocklist_to_file(merged.values(), conf.blocklist_savefile, export_fields) save_blocklist_to_file(merged, conf.blocklist_savefile, export_fields)
# Push the blocklist to destination instances # Push the blocklist to destination instances
if not conf.no_push_instance: if not conf.no_push_instance:
log.info("Pushing domain blocks to instances...") log.info("Pushing domain blocks to instances...")
for dest in conf.blocklist_instance_destinations: for dest in conf.blocklist_instance_destinations:
domain = dest['domain'] target = dest['domain']
token = dest['token'] token = dest['token']
scheme = dest.get('scheme', 'https') scheme = dest.get('scheme', 'https')
max_followed_severity = BlockSeverity(dest.get('max_followed_severity', 'silence')) max_followed_severity = BlockSeverity(dest.get('max_followed_severity', 'silence'))
push_blocklist(token, domain, merged.values(), conf.dryrun, import_fields, max_followed_severity, scheme) push_blocklist(token, target, merged, conf.dryrun, import_fields, max_followed_severity, scheme)
def apply_allowlists(merged: dict, conf: argparse.Namespace, allowlists: dict): def apply_allowlists(merged: Blocklist, conf: argparse.Namespace, allowlists: dict):
"""Apply allowlists """Apply allowlists
""" """
# Apply allows specified on the commandline # Apply allows specified on the commandline
for domain in conf.allow_domains: for domain in conf.allow_domains:
log.info(f"'{domain}' allowed by commandline, removing any blocks...") log.info(f"'{domain}' allowed by commandline, removing any blocks...")
if domain in merged: if domain in merged.blocks:
del merged[domain] del merged.blocks[domain]
# Apply allows from URLs lists # Apply allows from URLs lists
log.info("Removing domains from URL allowlists...") log.info("Removing domains from URL allowlists...")
for key, alist in allowlists.items(): for alist in allowlists:
log.debug(f"Processing allows from '{key}'...") log.debug(f"Processing allows from '{alist.origin}'...")
for allowed in alist: for allowed in alist.blocks.values():
domain = allowed.domain domain = allowed.domain
log.debug(f"Removing allowlisted domain '{domain}' from merged list.") log.debug(f"Removing allowlisted domain '{domain}' from merged list.")
if domain in merged: if domain in merged.blocks:
del merged[domain] del merged.blocks[domain]
return merged return merged
def fetch_allowlists(conf: argparse.Namespace) -> dict: def fetch_allowlists(conf: argparse.Namespace) -> Blocklist:
""" """
""" """
if conf.allowlist_url_sources: if conf.allowlist_url_sources:
allowlists = fetch_from_urls({}, conf.allowlist_url_sources, ALLOWLIST_IMPORT_FIELDS) allowlists = fetch_from_urls(conf.allowlist_url_sources, ALLOWLIST_IMPORT_FIELDS, conf.save_intermediate, conf.savedir)
return allowlists return allowlists
return {} return Blocklist()
def fetch_from_urls(blocklists: dict, url_sources: dict, def fetch_from_urls(url_sources: dict,
import_fields: list=IMPORT_FIELDS, import_fields: list=IMPORT_FIELDS,
save_intermediate: bool=False, save_intermediate: bool=False,
savedir: str=None, export_fields: list=EXPORT_FIELDS) -> dict: savedir: str=None, export_fields: list=EXPORT_FIELDS) -> dict:
@ -131,7 +131,7 @@ def fetch_from_urls(blocklists: dict, url_sources: dict,
@returns: A dict of blocklists, same as input, but (possibly) modified @returns: A dict of blocklists, same as input, but (possibly) modified
""" """
log.info("Fetching domain blocks from URLs...") log.info("Fetching domain blocks from URLs...")
blocklists = []
for item in url_sources: for item in url_sources:
url = item['url'] url = item['url']
# If import fields are provided, they override the global ones passed in # If import fields are provided, they override the global ones passed in
@ -144,14 +144,14 @@ def fetch_from_urls(blocklists: dict, url_sources: dict,
listformat = item.get('format', 'csv') listformat = item.get('format', 'csv')
with urlr.urlopen(url) as fp: with urlr.urlopen(url) as fp:
rawdata = fp.read(URL_BLOCKLIST_MAXSIZE).decode('utf-8') rawdata = fp.read(URL_BLOCKLIST_MAXSIZE).decode('utf-8')
blocklists[url] = parse_blocklist(rawdata, listformat, import_fields, max_severity) bl = parse_blocklist(rawdata, url, listformat, import_fields, max_severity)
blocklists.append(bl)
if save_intermediate: if save_intermediate:
save_intermediate_blocklist(blocklists[url], url, savedir, export_fields) save_intermediate_blocklist(bl, savedir, export_fields)
return blocklists return blocklists
def fetch_from_instances(blocklists: dict, sources: dict, def fetch_from_instances(sources: dict,
import_fields: list=IMPORT_FIELDS, import_fields: list=IMPORT_FIELDS,
save_intermediate: bool=False, save_intermediate: bool=False,
savedir: str=None, export_fields: list=EXPORT_FIELDS) -> dict: savedir: str=None, export_fields: list=EXPORT_FIELDS) -> dict:
@ -161,12 +161,13 @@ def fetch_from_instances(blocklists: dict, sources: dict,
@returns: A dict of blocklists, same as input, but (possibly) modified @returns: A dict of blocklists, same as input, but (possibly) modified
""" """
log.info("Fetching domain blocks from instances...") log.info("Fetching domain blocks from instances...")
blocklists = []
for item in sources: for item in sources:
domain = item['domain'] domain = item['domain']
admin = item.get('admin', False) admin = item.get('admin', False)
token = item.get('token', None) token = item.get('token', None)
scheme = item.get('scheme', 'https') scheme = item.get('scheme', 'https')
itemsrc = f"{scheme}://{domain}/api" # itemsrc = f"{scheme}://{domain}/api"
# If import fields are provided, they override the global ones passed in # If import fields are provided, they override the global ones passed in
source_import_fields = item.get('import_fields', None) source_import_fields = item.get('import_fields', None)
@ -174,45 +175,69 @@ def fetch_from_instances(blocklists: dict, sources: dict,
# Ensure we always use the default fields # Ensure we always use the default fields
import_fields = IMPORT_FIELDS.extend(source_import_fields) import_fields = IMPORT_FIELDS.extend(source_import_fields)
# Add the blocklist with the domain as the source key bl = fetch_instance_blocklist(domain, token, admin, import_fields, scheme)
blocklists[itemsrc] = fetch_instance_blocklist(domain, token, admin, import_fields, scheme) blocklists.append(bl)
if save_intermediate: if save_intermediate:
save_intermediate_blocklist(blocklists[itemsrc], domain, savedir, export_fields) save_intermediate_blocklist(bl, savedir, export_fields)
return blocklists return blocklists
def merge_blocklists(blocklists: dict, mergeplan: str='max') -> dict: def merge_blocklists(blocklists: list[Blocklist], mergeplan: str='max',
threshold: int=0,
threshold_type: str='count') -> Blocklist:
"""Merge fetched remote blocklists into a bulk update """Merge fetched remote blocklists into a bulk update
@param blocklists: A dict of lists of DomainBlocks, keyed by source. @param blocklists: A dict of lists of DomainBlocks, keyed by source.
Each value is a list of DomainBlocks Each value is a list of DomainBlocks
@param mergeplan: An optional method of merging overlapping block definitions @param mergeplan: An optional method of merging overlapping block definitions
'max' (the default) uses the highest severity block found 'max' (the default) uses the highest severity block found
'min' uses the lowest severity block found 'min' uses the lowest severity block found
@param threshold: An integer used in the threshold mechanism.
If a domain is not present in this number/pct or more of the blocklists,
it will not get merged into the final list.
@param threshold_type: choice of ['count', 'pct']
If `count`, threshold is met if block is present in `threshold`
or more blocklists.
If `pct`, theshold is met if block is present in
count_of_mentions / number_of_blocklists.
@param returns: A dict of DomainBlocks keyed by domain @param returns: A dict of DomainBlocks keyed by domain
""" """
merged = {} merged = Blocklist('fediblockhole.merge_blocklists')
for key, blist in blocklists.items(): num_blocklists = len(blocklists)
log.debug(f"processing blocklist from: {key} ...")
for newblock in blist: # Create a domain keyed list of blocks for each domain
domain = newblock.domain domain_blocks = {}
# If the domain has two asterisks in it, it's obfuscated
# and we can't really use it, so skip it and do the next one for bl in blocklists:
if '*' in domain: for block in bl.values():
log.debug(f"Domain '{domain}' is obfuscated. Skipping it.") if '*' in block.domain:
log.debug(f"Domain '{block.domain}' is obfuscated. Skipping it.")
continue continue
elif block.domain in domain_blocks:
elif domain in merged: domain_blocks[block.domain].append(block)
log.debug(f"Overlapping block for domain {domain}. Merging...")
blockdata = apply_mergeplan(merged[domain], newblock, mergeplan)
else: else:
# New block domain_blocks[block.domain] = [block,]
blockdata = newblock
# Only merge items if `threshold` is met or exceeded
for domain in domain_blocks:
if threshold_type == 'count':
domain_threshold_level = len(domain_blocks[domain])
elif threshold_type == 'pct':
domain_threshold_level = len(domain_blocks[domain]) / num_blocklists * 100
# log.debug(f"domain threshold level: {domain_threshold_level}")
else:
raise ValueError(f"Unsupported threshold type '{threshold_type}'. Supported values are: 'count', 'pct'")
log.debug(f"Checking if {domain_threshold_level} >= {threshold} for {domain}")
if domain_threshold_level >= threshold:
# Add first block in the list to merged
block = domain_blocks[domain][0]
log.debug(f"Yes. Merging block: {block}")
# Merge the others with this record
for newblock in domain_blocks[domain][1:]:
block = apply_mergeplan(block, newblock, mergeplan)
merged.blocks[block.domain] = block
# end if
log.debug(f"blockdata is: {blockdata}")
merged[domain] = blockdata
# end for
return merged return merged
def apply_mergeplan(oldblock: DomainBlock, newblock: DomainBlock, mergeplan: str='max') -> dict: def apply_mergeplan(oldblock: DomainBlock, newblock: DomainBlock, mergeplan: str='max') -> dict:
@ -239,10 +264,10 @@ def apply_mergeplan(oldblock: DomainBlock, newblock: DomainBlock, mergeplan: str
# How do we override an earlier block definition? # How do we override an earlier block definition?
if mergeplan in ['max', None]: if mergeplan in ['max', None]:
# Use the highest block level found (the default) # Use the highest block level found (the default)
log.debug(f"Using 'max' mergeplan.") # log.debug(f"Using 'max' mergeplan.")
if newblock.severity > oldblock.severity: if newblock.severity > oldblock.severity:
log.debug(f"New block severity is higher. Using that.") # log.debug(f"New block severity is higher. Using that.")
blockdata['severity'] = newblock.severity blockdata['severity'] = newblock.severity
# For 'reject_media', 'reject_reports', and 'obfuscate' if # For 'reject_media', 'reject_reports', and 'obfuscate' if
@ -271,7 +296,7 @@ def apply_mergeplan(oldblock: DomainBlock, newblock: DomainBlock, mergeplan: str
else: else:
raise NotImplementedError(f"Mergeplan '{mergeplan}' not implemented.") raise NotImplementedError(f"Mergeplan '{mergeplan}' not implemented.")
log.debug(f"Block severity set to {blockdata['severity']}") # log.debug(f"Block severity set to {blockdata['severity']}")
return DomainBlock(**blockdata) return DomainBlock(**blockdata)
@ -357,17 +382,19 @@ def fetch_instance_blocklist(host: str, token: str=None, admin: bool=False,
url = f"{scheme}://{host}{api_path}" url = f"{scheme}://{host}{api_path}"
blocklist = [] blockdata = []
link = True link = True
while link: while link:
response = requests.get(url, headers=headers, timeout=REQUEST_TIMEOUT) response = requests.get(url, headers=headers, timeout=REQUEST_TIMEOUT)
if response.status_code != 200: if response.status_code != 200:
log.error(f"Cannot fetch remote blocklist: {response.content}") log.error(f"Cannot fetch remote blocklist: {response.content}")
raise ValueError("Unable to fetch domain block list: %s", response) raise ValueError("Unable to fetch domain block list: %s", response)
blocklist.extend( parse_blocklist(response.content, parse_format, import_fields) ) # Each block of returned data is a JSON list of dicts
# so we parse them and append them to the fetched list
# of JSON data we need to parse.
blockdata.extend(json.loads(response.content.decode('utf-8')))
# Parse the link header to find the next url to fetch # Parse the link header to find the next url to fetch
# This is a weird and janky way of doing pagination but # This is a weird and janky way of doing pagination but
# hey nothing we can do about it we just have to deal # hey nothing we can do about it we just have to deal
@ -385,6 +412,8 @@ def fetch_instance_blocklist(host: str, token: str=None, admin: bool=False,
urlstring, rel = next.split('; ') urlstring, rel = next.split('; ')
url = urlstring.strip('<').rstrip('>') url = urlstring.strip('<').rstrip('>')
blocklist = parse_blocklist(blockdata, url, parse_format, import_fields)
return blocklist return blocklist
def delete_block(token: str, host: str, id: int, scheme: str='https'): def delete_block(token: str, host: str, id: int, scheme: str='https'):
@ -474,13 +503,9 @@ def update_known_block(token: str, host: str, block: DomainBlock, scheme: str='h
"""Update an existing domain block with information in blockdict""" """Update an existing domain block with information in blockdict"""
api_path = "/api/v1/admin/domain_blocks/" api_path = "/api/v1/admin/domain_blocks/"
try: id = block.id
id = block.id blockdata = block._asdict()
blockdata = block._asdict() del blockdata['id']
del blockdata['id']
except KeyError:
import pdb
pdb.set_trace()
url = f"{scheme}://{host}{api_path}{id}" url = f"{scheme}://{host}{api_path}{id}"
@ -514,7 +539,7 @@ def add_block(token: str, host: str, blockdata: DomainBlock, scheme: str='https'
raise ValueError(f"Something went wrong: {response.status_code}: {response.content}") raise ValueError(f"Something went wrong: {response.status_code}: {response.content}")
def push_blocklist(token: str, host: str, blocklist: list[dict], def push_blocklist(token: str, host: str, blocklist: list[DomainBlock],
dryrun: bool=False, dryrun: bool=False,
import_fields: list=['domain', 'severity'], import_fields: list=['domain', 'severity'],
max_followed_severity:BlockSeverity=BlockSeverity('silence'), max_followed_severity:BlockSeverity=BlockSeverity('silence'),
@ -522,8 +547,7 @@ def push_blocklist(token: str, host: str, blocklist: list[dict],
): ):
"""Push a blocklist to a remote instance. """Push a blocklist to a remote instance.
Merging the blocklist with the existing list the instance has, Updates existing entries if they exist, creates new blocks if they don't.
updating existing entries if they exist.
@param token: The Bearer token for OAUTH API authentication @param token: The Bearer token for OAUTH API authentication
@param host: The instance host, FQDN or IP @param host: The instance host, FQDN or IP
@ -538,15 +562,16 @@ def push_blocklist(token: str, host: str, blocklist: list[dict],
serverblocks = fetch_instance_blocklist(host, token, True, import_fields, scheme) serverblocks = fetch_instance_blocklist(host, token, True, import_fields, scheme)
# # Convert serverblocks to a dictionary keyed by domain name # # Convert serverblocks to a dictionary keyed by domain name
knownblocks = {row.domain: row for row in serverblocks} # knownblocks = {row.domain: row for row in serverblocks}
for newblock in blocklist: for newblock in blocklist.values():
log.debug(f"Processing block: {newblock}") log.debug(f"Processing block: {newblock}")
oldblock = knownblocks.get(newblock.domain, None) if newblock.domain in serverblocks:
if oldblock:
log.debug(f"Block already exists for {newblock.domain}, checking for differences...") log.debug(f"Block already exists for {newblock.domain}, checking for differences...")
oldblock = serverblocks[newblock.domain]
change_needed = is_change_needed(oldblock, newblock, import_fields) change_needed = is_change_needed(oldblock, newblock, import_fields)
# Is the severity changing? # Is the severity changing?
@ -605,15 +630,14 @@ def load_config(configfile: str):
conf = toml.load(configfile) conf = toml.load(configfile)
return conf return conf
def save_intermediate_blocklist( def save_intermediate_blocklist(blocklist: Blocklist, filedir: str,
blocklist: list[dict], source: str,
filedir: str,
export_fields: list=['domain','severity']): export_fields: list=['domain','severity']):
"""Save a local copy of a blocklist we've downloaded """Save a local copy of a blocklist we've downloaded
""" """
# Invent a filename based on the remote source # Invent a filename based on the remote source
# If the source was a URL, convert it to something less messy # If the source was a URL, convert it to something less messy
# If the source was a remote domain, just use the name of the domain # If the source was a remote domain, just use the name of the domain
source = blocklist.origin
log.debug(f"Saving intermediate blocklist from {source}") log.debug(f"Saving intermediate blocklist from {source}")
source = source.replace('/','-') source = source.replace('/','-')
filename = f"{source}.csv" filename = f"{source}.csv"
@ -621,7 +645,7 @@ def save_intermediate_blocklist(
save_blocklist_to_file(blocklist, filepath, export_fields) save_blocklist_to_file(blocklist, filepath, export_fields)
def save_blocklist_to_file( def save_blocklist_to_file(
blocklist: list[DomainBlock], blocklist: Blocklist,
filepath: str, filepath: str,
export_fields: list=['domain','severity']): export_fields: list=['domain','severity']):
"""Save a blocklist we've downloaded from a remote source """Save a blocklist we've downloaded from a remote source
@ -631,18 +655,22 @@ def save_blocklist_to_file(
@param export_fields: Which fields to include in the export. @param export_fields: Which fields to include in the export.
""" """
try: try:
blocklist = sorted(blocklist, key=lambda x: x.domain) sorted_list = sorted(blocklist.blocks.items())
except KeyError: except KeyError:
log.error("Field 'domain' not found in blocklist.") log.error("Field 'domain' not found in blocklist.")
log.debug(f"blocklist is: {blocklist}") log.debug(f"blocklist is: {sorted_list}")
except AttributeError:
log.error("Attribute error!")
import pdb
pdb.set_trace()
log.debug(f"export fields: {export_fields}") log.debug(f"export fields: {export_fields}")
with open(filepath, "w") as fp: with open(filepath, "w") as fp:
writer = csv.DictWriter(fp, export_fields, extrasaction='ignore') writer = csv.DictWriter(fp, export_fields, extrasaction='ignore')
writer.writeheader() writer.writeheader()
for item in blocklist: for key, value in sorted_list:
writer.writerow(item._asdict()) writer.writerow(value)
def augment_args(args, tomldata: str=None): def augment_args(args, tomldata: str=None):
"""Augment commandline arguments with config file parameters """Augment commandline arguments with config file parameters
@ -682,6 +710,12 @@ def augment_args(args, tomldata: str=None):
if not args.mergeplan: if not args.mergeplan:
args.mergeplan = conf.get('mergeplan', 'max') args.mergeplan = conf.get('mergeplan', 'max')
if not args.merge_threshold:
args.merge_threshold = conf.get('merge_threshold', 0)
if not args.merge_threshold_type:
args.merge_threshold_type = conf.get('merge_threshold_type', 'count')
args.blocklist_url_sources = conf.get('blocklist_url_sources', []) args.blocklist_url_sources = conf.get('blocklist_url_sources', [])
args.blocklist_instance_sources = conf.get('blocklist_instance_sources', []) args.blocklist_instance_sources = conf.get('blocklist_instance_sources', [])
args.allowlist_url_sources = conf.get('allowlist_url_sources', []) args.allowlist_url_sources = conf.get('allowlist_url_sources', [])
@ -703,6 +737,8 @@ def setup_argparse():
ap.add_argument('-S', '--save-intermediate', dest="save_intermediate", action='store_true', help="Save intermediate blocklists we fetch to local files.") ap.add_argument('-S', '--save-intermediate', dest="save_intermediate", action='store_true', help="Save intermediate blocklists we fetch to local files.")
ap.add_argument('-D', '--savedir', dest="savedir", help="Directory path to save intermediate lists.") ap.add_argument('-D', '--savedir', dest="savedir", help="Directory path to save intermediate lists.")
ap.add_argument('-m', '--mergeplan', choices=['min', 'max'], help="Set mergeplan.") ap.add_argument('-m', '--mergeplan', choices=['min', 'max'], help="Set mergeplan.")
ap.add_argument('--merge-threshold', type=int, help="Merge threshold value")
ap.add_argument('--merge-threshold-type', choices=['count', 'pct'], help="Type of merge threshold to use.")
ap.add_argument('-I', '--import-field', dest='import_fields', action='append', help="Extra blocklist fields to import.") ap.add_argument('-I', '--import-field', dest='import_fields', action='append', help="Extra blocklist fields to import.")
ap.add_argument('-E', '--export-field', dest='export_fields', action='append', help="Extra blocklist fields to export.") ap.add_argument('-E', '--export-field', dest='export_fields', action='append', help="Extra blocklist fields to export.")

View File

@ -1,19 +1,48 @@
"""Parse various blocklist data formats """Parse various blocklist data formats
""" """
from typing import Iterable from __future__ import annotations
from .const import DomainBlock, BlockSeverity
import csv import csv
import json import json
from typing import Iterable
from dataclasses import dataclass, field
from .const import DomainBlock, BlockSeverity
import logging import logging
log = logging.getLogger('fediblockhole') log = logging.getLogger('fediblockhole')
@dataclass
class Blocklist:
""" A Blocklist object
A Blocklist is a list of DomainBlocks from an origin
"""
origin: str = None
blocks: dict[str, DomainBlock] = field(default_factory=dict)
def __len__(self):
return len(self.blocks)
def __class_getitem__(cls, item):
return dict[str, DomainBlock]
def __getitem__(self, item):
return self.blocks[item]
def __iter__(self):
return self.blocks.__iter__()
def items(self):
return self.blocks.items()
def values(self):
return self.blocks.values()
class BlocklistParser(object): class BlocklistParser(object):
""" """
Base class for parsing blocklists Base class for parsing blocklists
""" """
preparse = False do_preparse = False
def __init__(self, import_fields: list=['domain', 'severity'], def __init__(self, import_fields: list=['domain', 'severity'],
max_severity: str='suspend'): max_severity: str='suspend'):
@ -30,17 +59,18 @@ class BlocklistParser(object):
""" """
raise NotImplementedError raise NotImplementedError
def parse_blocklist(self, blockdata) -> dict[DomainBlock]: def parse_blocklist(self, blockdata, origin:str=None) -> Blocklist:
"""Parse an iterable of blocklist items """Parse an iterable of blocklist items
@param blocklist: An Iterable of blocklist items @param blocklist: An Iterable of blocklist items
@returns: A dict of DomainBlocks, keyed by domain @returns: A dict of DomainBlocks, keyed by domain
""" """
if self.preparse: if self.do_preparse:
blockdata = self.preparse(blockdata) blockdata = self.preparse(blockdata)
parsed_list = [] parsed_list = Blocklist(origin)
for blockitem in blockdata: for blockitem in blockdata:
parsed_list.append(self.parse_item(blockitem)) block = self.parse_item(blockitem)
parsed_list.blocks[block.domain] = block
return parsed_list return parsed_list
def parse_item(self, blockitem) -> DomainBlock: def parse_item(self, blockitem) -> DomainBlock:
@ -53,12 +83,13 @@ class BlocklistParser(object):
class BlocklistParserJSON(BlocklistParser): class BlocklistParserJSON(BlocklistParser):
"""Parse a JSON formatted blocklist""" """Parse a JSON formatted blocklist"""
preparse = True do_preparse = True
def preparse(self, blockdata) -> Iterable: def preparse(self, blockdata) -> Iterable:
"""Parse the blockdata as JSON """Parse the blockdata as JSON if needed"""
""" if type(blockdata) == type(''):
return json.loads(blockdata) return json.loads(blockdata)
return blockdata
def parse_item(self, blockitem: dict) -> DomainBlock: def parse_item(self, blockitem: dict) -> DomainBlock:
# Remove fields we don't want to import # Remove fields we don't want to import
@ -102,7 +133,7 @@ class BlocklistParserCSV(BlocklistParser):
The parser expects the CSV data to include a header with the field names. The parser expects the CSV data to include a header with the field names.
""" """
preparse = True do_preparse = True
def preparse(self, blockdata) -> Iterable: def preparse(self, blockdata) -> Iterable:
"""Use a csv.DictReader to create an iterable from the blockdata """Use a csv.DictReader to create an iterable from the blockdata
@ -130,6 +161,24 @@ class BlocklistParserCSV(BlocklistParser):
block.severity = self.max_severity block.severity = self.max_severity
return block return block
class BlocklistParserMastodonCSV(BlocklistParserCSV):
""" Parse Mastodon CSV formatted blocklists
The Mastodon v4.1.x domain block CSV export prefixes its
field names with a '#' character because reasons?
"""
do_preparse = True
def parse_item(self, blockitem: dict) -> DomainBlock:
"""Build a new blockitem dict with new un-#ed keys
"""
newdict = {}
for key in blockitem:
newkey = key.lstrip('#')
newdict[newkey] = blockitem[key]
return super().parse_item(newdict)
class RapidBlockParserCSV(BlocklistParserCSV): class RapidBlockParserCSV(BlocklistParserCSV):
""" Parse RapidBlock CSV blocklists """ Parse RapidBlock CSV blocklists
@ -193,6 +242,7 @@ def str2bool(boolstring: str) -> bool:
FORMAT_PARSERS = { FORMAT_PARSERS = {
'csv': BlocklistParserCSV, 'csv': BlocklistParserCSV,
'mastodon_csv': BlocklistParserMastodonCSV,
'json': BlocklistParserJSON, 'json': BlocklistParserJSON,
'mastodon_api_public': BlocklistParserMastodonAPIPublic, 'mastodon_api_public': BlocklistParserMastodonAPIPublic,
'rapidblock.csv': RapidBlockParserCSV, 'rapidblock.csv': RapidBlockParserCSV,
@ -202,11 +252,13 @@ FORMAT_PARSERS = {
# helper function to select the appropriate Parser # helper function to select the appropriate Parser
def parse_blocklist( def parse_blocklist(
blockdata, blockdata,
origin,
format="csv", format="csv",
import_fields: list=['domain', 'severity'], import_fields: list=['domain', 'severity'],
max_severity: str='suspend'): max_severity: str='suspend'):
"""Parse a blocklist in the given format """Parse a blocklist in the given format
""" """
parser = FORMAT_PARSERS[format](import_fields, max_severity)
log.debug(f"parsing {format} blocklist with import_fields: {import_fields}...") log.debug(f"parsing {format} blocklist with import_fields: {import_fields}...")
return parser.parse_blocklist(blockdata)
parser = FORMAT_PARSERS[format](import_fields, max_severity)
return parser.parse_blocklist(blockdata, origin)

View File

@ -1,5 +1,6 @@
""" Constant objects used by FediBlockHole """ Constant objects used by FediBlockHole
""" """
from __future__ import annotations
import enum import enum
from typing import NamedTuple, Optional, TypedDict from typing import NamedTuple, Optional, TypedDict
from dataclasses import dataclass from dataclasses import dataclass

View File

@ -7,5 +7,6 @@ def shim_argparse(testargv: list=[], tomldata: str=None):
""" """
ap = setup_argparse() ap = setup_argparse()
args = ap.parse_args(testargv) args = ap.parse_args(testargv)
args = augment_args(args, tomldata) if tomldata is not None:
args = augment_args(args, tomldata)
return args return args

View File

@ -4,6 +4,7 @@ import pytest
from util import shim_argparse from util import shim_argparse
from fediblockhole.const import DomainBlock from fediblockhole.const import DomainBlock
from fediblockhole.blocklists import Blocklist
from fediblockhole import fetch_allowlists, apply_allowlists from fediblockhole import fetch_allowlists, apply_allowlists
def test_cmdline_allow_removes_domain(): def test_cmdline_allow_removes_domain():
@ -11,17 +12,13 @@ def test_cmdline_allow_removes_domain():
""" """
conf = shim_argparse(['-A', 'removeme.org']) conf = shim_argparse(['-A', 'removeme.org'])
merged = { merged = Blocklist('test_allowlist.merged', {
'example.org': DomainBlock('example.org'), 'example.org': DomainBlock('example.org'),
'example2.org': DomainBlock('example2.org'), 'example2.org': DomainBlock('example2.org'),
'removeme.org': DomainBlock('removeme.org'), 'removeme.org': DomainBlock('removeme.org'),
'keepblockingme.org': DomainBlock('keepblockingme.org'), 'keepblockingme.org': DomainBlock('keepblockingme.org'),
} })
# allowlists = {
# 'testlist': [ DomainBlock('removeme.org', 'noop'), ]
# }
merged = apply_allowlists(merged, conf, {}) merged = apply_allowlists(merged, conf, {})
with pytest.raises(KeyError): with pytest.raises(KeyError):
@ -32,16 +29,18 @@ def test_allowlist_removes_domain():
""" """
conf = shim_argparse() conf = shim_argparse()
merged = { merged = Blocklist('test_allowlist.merged', {
'example.org': DomainBlock('example.org'), 'example.org': DomainBlock('example.org'),
'example2.org': DomainBlock('example2.org'), 'example2.org': DomainBlock('example2.org'),
'removeme.org': DomainBlock('removeme.org'), 'removeme.org': DomainBlock('removeme.org'),
'keepblockingme.org': DomainBlock('keepblockingme.org'), 'keepblockingme.org': DomainBlock('keepblockingme.org'),
} })
allowlists = { allowlists = [
'testlist': [ DomainBlock('removeme.org', 'noop'), ] Blocklist('test_allowlist', {
} 'removeme.org': DomainBlock('removeme.org', 'noop'),
})
]
merged = apply_allowlists(merged, conf, allowlists) merged = apply_allowlists(merged, conf, allowlists)
@ -53,19 +52,19 @@ def test_allowlist_removes_tld():
""" """
conf = shim_argparse() conf = shim_argparse()
merged = { merged = Blocklist('test_allowlist.merged', {
'.cf': DomainBlock('.cf'), '.cf': DomainBlock('.cf'),
'example.org': DomainBlock('example.org'), 'example.org': DomainBlock('example.org'),
'.tk': DomainBlock('.tk'), '.tk': DomainBlock('.tk'),
'keepblockingme.org': DomainBlock('keepblockingme.org'), 'keepblockingme.org': DomainBlock('keepblockingme.org'),
} })
allowlists = { allowlists = [
'list1': [ Blocklist('test_allowlist.list1', {
DomainBlock('.cf', 'noop'), '.cf': DomainBlock('.cf', 'noop'),
DomainBlock('.tk', 'noop'), '.tk': DomainBlock('.tk', 'noop'),
] })
} ]
merged = apply_allowlists(merged, conf, allowlists) merged = apply_allowlists(merged, conf, allowlists)

View File

@ -49,3 +49,33 @@ allowlist_url_sources = [ { url='file:///path/to/allowlist', format='csv'} ]
'url': 'file:///path/to/allowlist', 'url': 'file:///path/to/allowlist',
'format': 'csv', 'format': 'csv',
}] }]
def test_set_merge_thresold_default():
tomldata = """
"""
args = shim_argparse([], tomldata)
assert args.mergeplan == 'max'
assert args.merge_threshold_type == 'count'
def test_set_merge_thresold_count():
tomldata = """# Add a merge threshold
merge_threshold_type = 'count'
merge_threshold = 2
"""
args = shim_argparse([], tomldata)
assert args.mergeplan == 'max'
assert args.merge_threshold_type == 'count'
assert args.merge_threshold == 2
def test_set_merge_thresold_pct():
tomldata = """# Add a merge threshold
merge_threshold_type = 'pct'
merge_threshold = 35
"""
args = shim_argparse([], tomldata)
assert args.mergeplan == 'max'
assert args.merge_threshold_type == 'pct'
assert args.merge_threshold == 35

View File

@ -0,0 +1,153 @@
"""Test merge with thresholds
"""
from fediblockhole.blocklists import Blocklist, parse_blocklist
from fediblockhole import merge_blocklists, apply_mergeplan
from fediblockhole.const import SeverityLevel, DomainBlock
datafile01 = "data-suspends-01.csv"
datafile02 = "data-silences-01.csv"
datafile03 = "data-noop-01.csv"
import_fields = [
'domain',
'severity',
'public_comment',
'private_comment',
'reject_media',
'reject_reports',
'obfuscate'
]
def load_test_blocklist_data(datafiles):
blocklists = []
for df in datafiles:
with open(df) as fp:
data = fp.read()
bl = parse_blocklist(data, df, 'csv', import_fields)
blocklists.append(bl)
return blocklists
def test_mergeplan_count_2():
"""Only merge a block if present in 2 or more lists
"""
bl_1 = Blocklist('test01', {
'onemention.example.org': DomainBlock('onemention.example.org', 'suspend', '', '', True, True, True),
'twomention.example.org': DomainBlock('twomention.example.org', 'suspend', '', '', True, True, True),
'threemention.example.org': DomainBlock('threemention.example.org', 'suspend', '', '', True, True, True),
})
bl_2 = Blocklist('test2', {
'twomention.example.org': DomainBlock('twomention.example.org', 'suspend', '', '', True, True, True),
'threemention.example.org': DomainBlock('threemention.example.org', 'suspend', '', '', True, True, True),
})
bl_3 = Blocklist('test3', {
'threemention.example.org': DomainBlock('threemention.example.org', 'suspend', '', '', True, True, True),
'threemention.example.org': DomainBlock('threemention.example.org', 'suspend', '', '', True, True, True),
})
ml = merge_blocklists([bl_1, bl_2, bl_3], 'max', threshold=2)
assert 'onemention.example.org' not in ml
assert 'twomention.example.org' in ml
assert 'threemention.example.org' in ml
def test_mergeplan_count_3():
"""Only merge a block if present in 3 or more lists
"""
bl_1 = Blocklist('test01', {
'onemention.example.org': DomainBlock('onemention.example.org', 'suspend', '', '', True, True, True),
'twomention.example.org': DomainBlock('twomention.example.org', 'suspend', '', '', True, True, True),
'threemention.example.org': DomainBlock('threemention.example.org', 'suspend', '', '', True, True, True),
})
bl_2 = Blocklist('test2', {
'twomention.example.org': DomainBlock('twomention.example.org', 'suspend', '', '', True, True, True),
'threemention.example.org': DomainBlock('threemention.example.org', 'suspend', '', '', True, True, True),
})
bl_3 = Blocklist('test3', {
'threemention.example.org': DomainBlock('threemention.example.org', 'suspend', '', '', True, True, True),
'threemention.example.org': DomainBlock('threemention.example.org', 'suspend', '', '', True, True, True),
})
ml = merge_blocklists([bl_1, bl_2, bl_3], 'max', threshold=3)
assert 'onemention.example.org' not in ml
assert 'twomention.example.org' not in ml
assert 'threemention.example.org' in ml
def test_mergeplan_pct_30():
"""Only merge a block if present in 2 or more lists
"""
bl_1 = Blocklist('test01', {
'onemention.example.org': DomainBlock('onemention.example.org', 'suspend', '', '', True, True, True),
'twomention.example.org': DomainBlock('twomention.example.org', 'suspend', '', '', True, True, True),
'fourmention.example.org': DomainBlock('fourmention.example.org', 'suspend', '', '', True, True, True),
})
bl_2 = Blocklist('test2', {
'twomention.example.org': DomainBlock('twomention.example.org', 'suspend', '', '', True, True, True),
'threemention.example.org': DomainBlock('threemention.example.org', 'suspend', '', '', True, True, True),
'fourmention.example.org': DomainBlock('fourmention.example.org', 'suspend', '', '', True, True, True),
})
bl_3 = Blocklist('test3', {
'threemention.example.org': DomainBlock('threemention.example.org', 'suspend', '', '', True, True, True),
'fourmention.example.org': DomainBlock('fourmention.example.org', 'suspend', '', '', True, True, True),
})
bl_4 = Blocklist('test4', {
'threemention.example.org': DomainBlock('threemention.example.org', 'suspend', '', '', True, True, True),
'fourmention.example.org': DomainBlock('fourmention.example.org', 'suspend', '', '', True, True, True),
})
ml = merge_blocklists([bl_1, bl_2, bl_3, bl_4], 'max', threshold=30, threshold_type='pct')
assert 'onemention.example.org' not in ml
assert 'twomention.example.org' in ml
assert 'threemention.example.org' in ml
assert 'fourmention.example.org' in ml
def test_mergeplan_pct_55():
"""Only merge a block if present in 2 or more lists
"""
bl_1 = Blocklist('test01', {
'onemention.example.org': DomainBlock('onemention.example.org', 'suspend', '', '', True, True, True),
'twomention.example.org': DomainBlock('twomention.example.org', 'suspend', '', '', True, True, True),
'fourmention.example.org': DomainBlock('fourmention.example.org', 'suspend', '', '', True, True, True),
})
bl_2 = Blocklist('test2', {
'twomention.example.org': DomainBlock('twomention.example.org', 'suspend', '', '', True, True, True),
'threemention.example.org': DomainBlock('threemention.example.org', 'suspend', '', '', True, True, True),
'fourmention.example.org': DomainBlock('fourmention.example.org', 'suspend', '', '', True, True, True),
})
bl_3 = Blocklist('test3', {
'threemention.example.org': DomainBlock('threemention.example.org', 'suspend', '', '', True, True, True),
'fourmention.example.org': DomainBlock('fourmention.example.org', 'suspend', '', '', True, True, True),
})
bl_4 = Blocklist('test4', {
'threemention.example.org': DomainBlock('threemention.example.org', 'suspend', '', '', True, True, True),
'fourmention.example.org': DomainBlock('fourmention.example.org', 'suspend', '', '', True, True, True),
})
ml = merge_blocklists([bl_1, bl_2, bl_3, bl_4], 'max', threshold=55, threshold_type='pct')
assert 'onemention.example.org' not in ml
assert 'twomention.example.org' not in ml
assert 'threemention.example.org' in ml
assert 'fourmention.example.org' in ml

View File

@ -1,7 +1,7 @@
"""Various mergeplan tests """Various mergeplan tests
""" """
from fediblockhole.blocklist_parser import parse_blocklist from fediblockhole.blocklists import parse_blocklist
from fediblockhole import merge_blocklists, merge_comments, apply_mergeplan from fediblockhole import merge_blocklists, merge_comments, apply_mergeplan
from fediblockhole.const import SeverityLevel, DomainBlock from fediblockhole.const import SeverityLevel, DomainBlock
@ -22,20 +22,19 @@ import_fields = [
def load_test_blocklist_data(datafiles): def load_test_blocklist_data(datafiles):
blocklists = {} blocklists = []
for df in datafiles: for df in datafiles:
with open(df) as fp: with open(df) as fp:
data = fp.read() data = fp.read()
bl = parse_blocklist(data, 'csv', import_fields) bl = parse_blocklist(data, df, 'csv', import_fields)
blocklists[df] = bl blocklists.append(bl)
return blocklists return blocklists
def test_mergeplan_max(): def test_mergeplan_max():
"""Test 'max' mergeplan""" """Test 'max' mergeplan"""
blocklists = load_test_blocklist_data([datafile01, datafile02]) blocklists = load_test_blocklist_data([datafile01, datafile02])
bl = merge_blocklists(blocklists, 'max') bl = merge_blocklists(blocklists, 'max')
assert len(bl) == 13 assert len(bl) == 13

View File

@ -1,22 +1,24 @@
"""Tests of the CSV parsing """Tests of the CSV parsing
""" """
from fediblockhole.blocklist_parser import BlocklistParserCSV, parse_blocklist from fediblockhole.blocklists import BlocklistParserCSV, parse_blocklist
from fediblockhole.const import DomainBlock, BlockSeverity, SeverityLevel from fediblockhole.const import SeverityLevel
def test_single_line(): def test_single_line():
csvdata = "example.org" csvdata = "example.org"
origin = "csvfile"
parser = BlocklistParserCSV() parser = BlocklistParserCSV()
bl = parser.parse_blocklist(csvdata) bl = parser.parse_blocklist(csvdata, origin)
assert len(bl) == 0 assert len(bl) == 0
def test_header_only(): def test_header_only():
csvdata = "domain,severity,public_comment" csvdata = "domain,severity,public_comment"
origin = "csvfile"
parser = BlocklistParserCSV() parser = BlocklistParserCSV()
bl = parser.parse_blocklist(csvdata) bl = parser.parse_blocklist(csvdata, origin)
assert len(bl) == 0 assert len(bl) == 0
def test_2_blocks(): def test_2_blocks():
@ -24,12 +26,13 @@ def test_2_blocks():
example.org,silence example.org,silence
example2.org,suspend example2.org,suspend
""" """
origin = "csvfile"
parser = BlocklistParserCSV() parser = BlocklistParserCSV()
bl = parser.parse_blocklist(csvdata) bl = parser.parse_blocklist(csvdata, origin)
assert len(bl) == 2 assert len(bl) == 2
assert bl[0].domain == 'example.org' assert 'example.org' in bl
def test_4_blocks(): def test_4_blocks():
csvdata = """domain,severity,public_comment csvdata = """domain,severity,public_comment
@ -38,20 +41,21 @@ example2.org,suspend,"test 2"
example3.org,noop,"test 3" example3.org,noop,"test 3"
example4.org,suspend,"test 4" example4.org,suspend,"test 4"
""" """
origin = "csvfile"
parser = BlocklistParserCSV() parser = BlocklistParserCSV()
bl = parser.parse_blocklist(csvdata) bl = parser.parse_blocklist(csvdata, origin)
assert len(bl) == 4 assert len(bl) == 4
assert bl[0].domain == 'example.org' assert 'example.org' in bl
assert bl[1].domain == 'example2.org' assert 'example2.org' in bl
assert bl[2].domain == 'example3.org' assert 'example3.org' in bl
assert bl[3].domain == 'example4.org' assert 'example4.org' in bl
assert bl[0].severity.level == SeverityLevel.SILENCE assert bl['example.org'].severity.level == SeverityLevel.SILENCE
assert bl[1].severity.level == SeverityLevel.SUSPEND assert bl['example2.org'].severity.level == SeverityLevel.SUSPEND
assert bl[2].severity.level == SeverityLevel.NONE assert bl['example3.org'].severity.level == SeverityLevel.NONE
assert bl[3].severity.level == SeverityLevel.SUSPEND assert bl['example4.org'].severity.level == SeverityLevel.SUSPEND
def test_ignore_comments(): def test_ignore_comments():
csvdata = """domain,severity,public_comment,private_comment csvdata = """domain,severity,public_comment,private_comment
@ -60,18 +64,18 @@ example2.org,suspend,"test 2","ignote me also"
example3.org,noop,"test 3","and me" example3.org,noop,"test 3","and me"
example4.org,suspend,"test 4","also me" example4.org,suspend,"test 4","also me"
""" """
origin = "csvfile"
parser = BlocklistParserCSV() parser = BlocklistParserCSV()
bl = parser.parse_blocklist(csvdata) bl = parser.parse_blocklist(csvdata, origin)
assert len(bl) == 4 assert len(bl) == 4
assert bl[0].domain == 'example.org' assert 'example.org' in bl
assert bl[1].domain == 'example2.org' assert 'example2.org' in bl
assert bl[2].domain == 'example3.org' assert 'example3.org' in bl
assert bl[3].domain == 'example4.org' assert 'example4.org' in bl
assert bl[0].public_comment == '' assert bl['example.org'].public_comment == ''
assert bl[0].private_comment == '' assert bl['example.org'].private_comment == ''
assert bl['example3.org'].public_comment == ''
assert bl[2].public_comment == '' assert bl['example4.org'].private_comment == ''
assert bl[2].private_comment == ''

View File

@ -0,0 +1,81 @@
"""Tests of the CSV parsing
"""
from fediblockhole.blocklists import BlocklistParserMastodonCSV
from fediblockhole.const import SeverityLevel
def test_single_line():
csvdata = "example.org"
origin = "csvfile"
parser = BlocklistParserMastodonCSV()
bl = parser.parse_blocklist(csvdata, origin)
assert len(bl) == 0
def test_header_only():
csvdata = "#domain,#severity,#public_comment"
origin = "csvfile"
parser = BlocklistParserMastodonCSV()
bl = parser.parse_blocklist(csvdata, origin)
assert len(bl) == 0
def test_2_blocks():
csvdata = """domain,severity
example.org,silence
example2.org,suspend
"""
origin = "csvfile"
parser = BlocklistParserMastodonCSV()
bl = parser.parse_blocklist(csvdata, origin)
assert len(bl) == 2
assert 'example.org' in bl
def test_4_blocks():
csvdata = """domain,severity,public_comment
example.org,silence,"test 1"
example2.org,suspend,"test 2"
example3.org,noop,"test 3"
example4.org,suspend,"test 4"
"""
origin = "csvfile"
parser = BlocklistParserMastodonCSV()
bl = parser.parse_blocklist(csvdata, origin)
assert len(bl) == 4
assert 'example.org' in bl
assert 'example2.org' in bl
assert 'example3.org' in bl
assert 'example4.org' in bl
assert bl['example.org'].severity.level == SeverityLevel.SILENCE
assert bl['example2.org'].severity.level == SeverityLevel.SUSPEND
assert bl['example3.org'].severity.level == SeverityLevel.NONE
assert bl['example4.org'].severity.level == SeverityLevel.SUSPEND
def test_ignore_comments():
csvdata = """domain,severity,public_comment,private_comment
example.org,silence,"test 1","ignore me"
example2.org,suspend,"test 2","ignote me also"
example3.org,noop,"test 3","and me"
example4.org,suspend,"test 4","also me"
"""
origin = "csvfile"
parser = BlocklistParserMastodonCSV()
bl = parser.parse_blocklist(csvdata, origin)
assert len(bl) == 4
assert 'example.org' in bl
assert 'example2.org' in bl
assert 'example3.org' in bl
assert 'example4.org' in bl
assert bl['example.org'].public_comment == ''
assert bl['example.org'].private_comment == ''
assert bl['example3.org'].public_comment == ''
assert bl['example4.org'].private_comment == ''

View File

@ -1,8 +1,8 @@
"""Tests of the CSV parsing """Tests of the CSV parsing
""" """
from fediblockhole.blocklist_parser import BlocklistParserJSON, parse_blocklist from fediblockhole.blocklists import BlocklistParserJSON, parse_blocklist
from fediblockhole.const import DomainBlock, BlockSeverity, SeverityLevel from fediblockhole.const import SeverityLevel
datafile = 'data-mastodon.json' datafile = 'data-mastodon.json'
@ -14,33 +14,32 @@ def test_json_parser():
data = load_data() data = load_data()
parser = BlocklistParserJSON() parser = BlocklistParserJSON()
bl = parser.parse_blocklist(data) bl = parser.parse_blocklist(data, 'test_json')
assert len(bl) == 10 assert len(bl) == 10
assert bl[0].domain == 'example.org' assert 'example.org' in bl
assert bl[1].domain == 'example2.org' assert 'example2.org' in bl
assert bl[2].domain == 'example3.org' assert 'example3.org' in bl
assert bl[3].domain == 'example4.org' assert 'example4.org' in bl
assert bl[0].severity.level == SeverityLevel.SUSPEND assert bl['example.org'].severity.level == SeverityLevel.SUSPEND
assert bl[1].severity.level == SeverityLevel.SILENCE assert bl['example2.org'].severity.level == SeverityLevel.SILENCE
assert bl[2].severity.level == SeverityLevel.SUSPEND assert bl['example3.org'].severity.level == SeverityLevel.SUSPEND
assert bl[3].severity.level == SeverityLevel.NONE assert bl['example4.org'].severity.level == SeverityLevel.NONE
def test_ignore_comments(): def test_ignore_comments():
data = load_data() data = load_data()
parser = BlocklistParserJSON() parser = BlocklistParserJSON()
bl = parser.parse_blocklist(data) bl = parser.parse_blocklist(data, 'test_json')
assert len(bl) == 10 assert len(bl) == 10
assert bl[0].domain == 'example.org' assert 'example.org' in bl
assert bl[1].domain == 'example2.org' assert 'example2.org' in bl
assert bl[2].domain == 'example3.org' assert 'example3.org' in bl
assert bl[3].domain == 'example4.org' assert 'example4.org' in bl
assert bl[0].public_comment == '' assert bl['example.org'].public_comment == ''
assert bl[0].private_comment == '' assert bl['example.org'].private_comment == ''
assert bl['example3.org'].public_comment == ''
assert bl[2].public_comment == '' assert bl['example4.org'].private_comment == ''
assert bl[2].private_comment == ''

View File

@ -1,7 +1,7 @@
"""Tests of the Rapidblock CSV parsing """Tests of the Rapidblock CSV parsing
""" """
from fediblockhole.blocklist_parser import RapidBlockParserCSV, parse_blocklist from fediblockhole.blocklists import RapidBlockParserCSV, parse_blocklist
from fediblockhole.const import DomainBlock, BlockSeverity, SeverityLevel from fediblockhole.const import DomainBlock, BlockSeverity, SeverityLevel
csvdata = """example.org\r\nsubdomain.example.org\r\nanotherdomain.org\r\ndomain4.org\r\n""" csvdata = """example.org\r\nsubdomain.example.org\r\nanotherdomain.org\r\ndomain4.org\r\n"""
@ -11,13 +11,13 @@ def test_basic_rapidblock():
bl = parser.parse_blocklist(csvdata) bl = parser.parse_blocklist(csvdata)
assert len(bl) == 4 assert len(bl) == 4
assert bl[0].domain == 'example.org' assert 'example.org' in bl
assert bl[1].domain == 'subdomain.example.org' assert 'subdomain.example.org' in bl
assert bl[2].domain == 'anotherdomain.org' assert 'anotherdomain.org' in bl
assert bl[3].domain == 'domain4.org' assert 'domain4.org' in bl
def test_severity_is_suspend(): def test_severity_is_suspend():
bl = parser.parse_blocklist(csvdata) bl = parser.parse_blocklist(csvdata)
for block in bl: for block in bl.values():
assert block.severity.level == SeverityLevel.SUSPEND assert block.severity.level == SeverityLevel.SUSPEND

View File

@ -1,6 +1,6 @@
"""Test parsing the RapidBlock JSON format """Test parsing the RapidBlock JSON format
""" """
from fediblockhole.blocklist_parser import parse_blocklist from fediblockhole.blocklists import parse_blocklist
from fediblockhole.const import SeverityLevel from fediblockhole.const import SeverityLevel
@ -9,26 +9,26 @@ rapidblockjson = "data-rapidblock.json"
def test_parse_rapidblock_json(): def test_parse_rapidblock_json():
with open(rapidblockjson) as fp: with open(rapidblockjson) as fp:
data = fp.read() data = fp.read()
bl = parse_blocklist(data, 'rapidblock.json') bl = parse_blocklist(data, 'pytest', 'rapidblock.json')
assert bl[0].domain == '101010.pl' assert '101010.pl' in bl
assert bl[0].severity.level == SeverityLevel.SUSPEND assert bl['101010.pl'].severity.level == SeverityLevel.SUSPEND
assert bl[0].public_comment == '' assert bl['101010.pl'].public_comment == ''
assert bl[10].domain == 'berserker.town' assert 'berserker.town' in bl
assert bl[10].severity.level == SeverityLevel.SUSPEND assert bl['berserker.town'].severity.level == SeverityLevel.SUSPEND
assert bl[10].public_comment == '' assert bl['berserker.town'].public_comment == ''
assert bl[10].private_comment == '' assert bl['berserker.town'].private_comment == ''
def test_parse_with_comments(): def test_parse_with_comments():
with open(rapidblockjson) as fp: with open(rapidblockjson) as fp:
data = fp.read() data = fp.read()
bl = parse_blocklist(data, 'rapidblock.json', ['domain', 'severity', 'public_comment', 'private_comment']) bl = parse_blocklist(data, 'pytest', 'rapidblock.json', ['domain', 'severity', 'public_comment', 'private_comment'])
assert bl[0].domain == '101010.pl' assert '101010.pl' in bl
assert bl[0].severity.level == SeverityLevel.SUSPEND assert bl['101010.pl'].severity.level == SeverityLevel.SUSPEND
assert bl[0].public_comment == 'cryptomining javascript, white supremacy' assert bl['101010.pl'].public_comment == 'cryptomining javascript, white supremacy'
assert bl[10].domain == 'berserker.town' assert 'berserker.town' in bl
assert bl[10].severity.level == SeverityLevel.SUSPEND assert bl['berserker.town'].severity.level == SeverityLevel.SUSPEND
assert bl[10].public_comment == 'freeze peach' assert bl['berserker.town'].public_comment == 'freeze peach'