Changed default to exclude private_comment field

from exports and imports.
Added commandline and config option to
--include-public-comments
Fixes: #1
This commit is contained in:
Justin Warren 2022-12-20 15:35:51 +11:00
parent 68d79dd253
commit 268f1ef2b0
No known key found for this signature in database
2 changed files with 63 additions and 22 deletions

View File

@ -65,6 +65,7 @@ def sync_blocklists(conf: dict):
if boolkey in row: if boolkey in row:
row[boolkey] = str2bool(row[boolkey]) row[boolkey] = str2bool(row[boolkey])
blocklists[listurl].append(row) blocklists[listurl].append(row)
if conf.save_intermediate: if conf.save_intermediate:
save_intermediate_blocklist(blocklists[listurl], listurl, conf.savedir) save_intermediate_blocklist(blocklists[listurl], listurl, conf.savedir)
@ -76,13 +77,13 @@ def sync_blocklists(conf: dict):
token = blocklist_src['token'] token = blocklist_src['token']
blocklists[domain] = fetch_instance_blocklist(token, domain) blocklists[domain] = fetch_instance_blocklist(token, domain)
if conf.save_intermediate: if conf.save_intermediate:
save_intermediate_blocklist(blocklists[domain], domain, conf.savedir) save_intermediate_blocklist(blocklists[domain], domain, conf.savedir, conf.include_private_comments)
# Merge blocklists into an update dict # Merge blocklists into an update dict
merged = merge_blocklists(blocklists, conf.mergeplan) merged = merge_blocklists(blocklists, conf.mergeplan, conf.include_private_comments)
if conf.blocklist_savefile: if conf.blocklist_savefile:
log.info(f"Saving merged blocklist to {conf.blocklist_savefile}") log.info(f"Saving merged blocklist to {conf.blocklist_savefile}")
save_blocklist_to_file(merged.values(), conf.blocklist_savefile) save_blocklist_to_file(merged.values(), conf.blocklist_savefile, conf.include_private_comments)
# Push the blocklist to destination instances # Push the blocklist to destination instances
if not conf.no_push_instance: if not conf.no_push_instance:
@ -90,14 +91,16 @@ def sync_blocklists(conf: dict):
for dest in conf.blocklist_instance_destinations: for dest in conf.blocklist_instance_destinations:
domain = dest['domain'] domain = dest['domain']
token = dest['token'] token = dest['token']
push_blocklist(token, domain, merged.values(), conf.dryrun) push_blocklist(token, domain, merged.values(), conf.dryrun, conf.include_private_comments)
def merge_blocklists(blocklists: dict, mergeplan: str='max') -> dict: def merge_blocklists(blocklists: dict, mergeplan: str='max',
include_private_comments: bool=False) -> dict:
"""Merge fetched remote blocklists into a bulk update """Merge fetched remote blocklists into a bulk update
@param mergeplan: An optional method of merging overlapping block definitions @param mergeplan: An optional method of merging overlapping block definitions
'max' (the default) uses the highest severity block found 'max' (the default) uses the highest severity block found
'min' uses the lowest severity block found 'min' uses the lowest severity block found
@param include_private_comments: Include private comments in merged blocklist. Defaults to False.
""" """
merged = {} merged = {}
@ -107,7 +110,7 @@ def merge_blocklists(blocklists: dict, mergeplan: str='max') -> dict:
domain = newblock['domain'] domain = newblock['domain']
if domain in merged: if domain in merged:
log.debug(f"Overlapping block for domain {domain}. Merging...") log.debug(f"Overlapping block for domain {domain}. Merging...")
blockdata = apply_mergeplan(merged[domain], newblock, mergeplan) blockdata = apply_mergeplan(merged[domain], newblock, mergeplan, include_private_comments)
else: else:
# New block # New block
blockdata = { blockdata = {
@ -115,24 +118,29 @@ def merge_blocklists(blocklists: dict, mergeplan: str='max') -> dict:
# Default to Silence if nothing is specified # Default to Silence if nothing is specified
'severity': newblock.get('severity', 'silence'), 'severity': newblock.get('severity', 'silence'),
'public_comment': newblock.get('public_comment', ''), 'public_comment': newblock.get('public_comment', ''),
'private_comment': newblock.get('private_comment', ''),
'obfuscate': newblock.get('obfuscate', True), # default obfuscate to True 'obfuscate': newblock.get('obfuscate', True), # default obfuscate to True
} }
sev = blockdata['severity'] # convenience variable sev = blockdata['severity'] # convenience variable
blockdata['reject_media'] = newblock.get('reject_media', REJECT_MEDIA_DEFAULT[sev]) blockdata['reject_media'] = newblock.get('reject_media', REJECT_MEDIA_DEFAULT[sev])
blockdata['reject_reports'] = newblock.get('reject_reports', REJECT_REPORTS_DEFAULT[sev]) blockdata['reject_reports'] = newblock.get('reject_reports', REJECT_REPORTS_DEFAULT[sev])
if include_private_comments:
blockdata['private_comment']: newblock.get('private_comment', '')
# end if # end if
log.debug(f"blockdata is: {blockdata}") log.debug(f"blockdata is: {blockdata}")
merged[domain] = blockdata merged[domain] = blockdata
# end for # end for
return merged return merged
def apply_mergeplan(oldblock: dict, newblock: dict, mergeplan: str='max') -> dict: def apply_mergeplan(oldblock: dict, newblock: dict,
mergeplan: str='max',
include_private_comments: bool=False) -> dict:
"""Use a mergeplan to decide how to merge two overlapping block definitions """Use a mergeplan to decide how to merge two overlapping block definitions
@param oldblock: The exist block definition. @param oldblock: The exist block definition.
@param newblock: The new block definition we want to merge in. @param newblock: The new block definition we want to merge in.
@param mergeplan: How to merge. Choices are 'max', the default, and 'min'. @param mergeplan: How to merge. Choices are 'max', the default, and 'min'.
@param include_private_comments: Include private comments in merged blocklist. Defaults to False.
""" """
# Default to the existing block definition # Default to the existing block definition
blockdata = oldblock.copy() blockdata = oldblock.copy()
@ -140,10 +148,17 @@ def apply_mergeplan(oldblock: dict, newblock: dict, mergeplan: str='max') -> dic
# If the public or private comment is different, # If the public or private comment is different,
# append it to the existing comment, joined with a newline # append it to the existing comment, joined with a newline
# unless the comment is None or an empty string # unless the comment is None or an empty string
for key in ['public_comment', 'private_comment']: keylist = ['public_comment']
if oldblock[key] != newblock[key] and newblock[key] not in ['', None]: if include_private_comments:
blockdata[key] = '\n'.join([oldblock[key], newblock[key]]) keylist.append('private_comment')
for key in keylist:
try:
if oldblock[key] != newblock[key] and newblock[key] not in ['', None]:
blockdata[key] = '\n'.join([oldblock[key], newblock[key]])
except KeyError:
log.debug(f"Key '{key}' missing from block definition so cannot compare. Continuing...")
continue
# How do we override an earlier block definition? # How do we override an earlier block definition?
if mergeplan in ['max', None]: if mergeplan in ['max', None]:
# Use the highest block level found (the default) # Use the highest block level found (the default)
@ -274,7 +289,9 @@ def add_block(token: str, host: str, blockdata: dict):
if response.status_code != 200: if response.status_code != 200:
raise ValueError(f"Something went wrong: {response.status_code}: {response.content}") raise ValueError(f"Something went wrong: {response.status_code}: {response.content}")
def push_blocklist(token: str, host: str, blocklist: list[dict], dryrun: bool=False): def push_blocklist(token: str, host: str, blocklist: list[dict],
dryrun: bool=False,
include_private_comments: bool=False):
"""Push a blocklist to a remote instance. """Push a blocklist to a remote instance.
Merging the blocklist with the existing list the instance has, Merging the blocklist with the existing list the instance has,
@ -283,6 +300,7 @@ def push_blocklist(token: str, host: str, blocklist: list[dict], dryrun: bool=Fa
@param token: The Bearer token for OAUTH API authentication @param token: The Bearer token for OAUTH API authentication
@param host: The instance host, FQDN or IP @param host: The instance host, FQDN or IP
@param blocklist: A list of block definitions. They must include the domain. @param blocklist: A list of block definitions. They must include the domain.
@param include_private_comments: Include private comments in merged blocklist. Defaults to False.
""" """
log.info(f"Pushing blocklist to host {host} ...") log.info(f"Pushing blocklist to host {host} ...")
# Fetch the existing blocklist from the instance # Fetch the existing blocklist from the instance
@ -300,14 +318,17 @@ def push_blocklist(token: str, host: str, blocklist: list[dict], dryrun: bool=Fa
# Check if anything is actually different and needs updating # Check if anything is actually different and needs updating
change_needed = False change_needed = False
for key in [ keylist = [
'severity', 'severity',
'public_comment', 'public_comment',
'private_comment',
'reject_media', 'reject_media',
'reject_reports', 'reject_reports',
'obfuscate', 'obfuscate',
]: ]
if include_private_comments:
keylist.append('private_comment')
for key in keylist:
try: try:
log.debug(f"Compare {key} '{oldblock[key]}' <> '{newblock[key]}'") log.debug(f"Compare {key} '{oldblock[key]}' <> '{newblock[key]}'")
oldval = oldblock[key] oldval = oldblock[key]
@ -318,7 +339,7 @@ def push_blocklist(token: str, host: str, blocklist: list[dict], dryrun: bool=Fa
break break
except KeyError: except KeyError:
log.debug(f"KeyError comparing {key}") log.debug(f"Key '{key}' missing from block definition so cannot compare. Continuing...")
continue continue
if change_needed: if change_needed:
@ -365,7 +386,10 @@ def load_config(configfile: str):
conf = toml.load(configfile) conf = toml.load(configfile)
return conf return conf
def save_intermediate_blocklist(blocklist: list[dict], source: str, filedir: str): def save_intermediate_blocklist(
blocklist: list[dict], source: str,
filedir: str,
include_private_comments: bool=False):
"""Save a local copy of a blocklist we've downloaded """Save a local copy of a blocklist we've downloaded
""" """
# Invent a filename based on the remote source # Invent a filename based on the remote source
@ -375,13 +399,17 @@ def save_intermediate_blocklist(blocklist: list[dict], source: str, filedir: str
source = source.replace('/','-') source = source.replace('/','-')
filename = f"{source}.csv" filename = f"{source}.csv"
filepath = os.path.join(filedir, filename) filepath = os.path.join(filedir, filename)
save_blocklist_to_file(blocklist, filepath) save_blocklist_to_file(blocklist, filepath, include_private_comments)
def save_blocklist_to_file(blocklist: list[dict], filepath: str): def save_blocklist_to_file(
blocklist: list[dict],
filepath: str,
include_private_comments: bool=False):
"""Save a blocklist we've downloaded from a remote source """Save a blocklist we've downloaded from a remote source
@param blocklist: A dictionary of block definitions, keyed by domain @param blocklist: A dictionary of block definitions, keyed by domain
@param filepath: The path to the file the list should be saved in. @param filepath: The path to the file the list should be saved in.
@param include_private_comments: Include private comments in merged blocklist. Defaults to False.
""" """
try: try:
blocklist = sorted(blocklist, key=lambda x: x['domain']) blocklist = sorted(blocklist, key=lambda x: x['domain'])
@ -389,7 +417,10 @@ def save_blocklist_to_file(blocklist: list[dict], filepath: str):
log.error("Field 'domain' not found in blocklist. Are you sure the URLs are correct?") log.error("Field 'domain' not found in blocklist. Are you sure the URLs are correct?")
log.debug(f"blocklist is: {blocklist}") log.debug(f"blocklist is: {blocklist}")
fieldnames = ['domain', 'severity', 'private_comment', 'public_comment', 'reject_media', 'reject_reports', 'obfuscate'] if include_private_comments:
fieldnames = ['domain', 'severity', 'private_comment', 'public_comment', 'reject_media', 'reject_reports', 'obfuscate']
else:
fieldnames = ['domain', 'severity', 'public_comment', 'reject_media', 'reject_reports', 'obfuscate']
with open(filepath, "w") as fp: with open(filepath, "w") as fp:
writer = csv.DictWriter(fp, fieldnames, extrasaction='ignore') writer = csv.DictWriter(fp, fieldnames, extrasaction='ignore')
writer.writeheader() writer.writeheader()
@ -417,6 +448,9 @@ def augment_args(args):
if not args.savedir: if not args.savedir:
args.savedir = conf.get('savedir', '/tmp') args.savedir = conf.get('savedir', '/tmp')
if not args.include_private_comments:
args.include_private_comments = conf.get('include_private_comments', False)
args.blocklist_url_sources = conf.get('blocklist_url_sources') args.blocklist_url_sources = conf.get('blocklist_url_sources')
args.blocklist_instance_sources = conf.get('blocklist_instance_sources') args.blocklist_instance_sources = conf.get('blocklist_instance_sources')
args.blocklist_instance_destinations = conf.get('blocklist_instance_destinations') args.blocklist_instance_destinations = conf.get('blocklist_instance_destinations')
@ -444,10 +478,11 @@ if __name__ == '__main__':
ap.add_argument('-S', '--save-intermediate', dest="save_intermediate", action='store_true', help="Save intermediate blocklists we fetch to local files.") ap.add_argument('-S', '--save-intermediate', dest="save_intermediate", action='store_true', help="Save intermediate blocklists we fetch to local files.")
ap.add_argument('-D', '--savedir', dest="savedir", help="Directory path to save intermediate lists.") ap.add_argument('-D', '--savedir', dest="savedir", help="Directory path to save intermediate lists.")
ap.add_argument('-m', '--mergeplan', choices=['min', 'max'], default='max', help="Set mergeplan.") ap.add_argument('-m', '--mergeplan', choices=['min', 'max'], default='max', help="Set mergeplan.")
ap.add_argument('--no-fetch-url', dest='no_fetch_url', action='store_true', help="Don't fetch from URLs, even if configured.") ap.add_argument('--no-fetch-url', dest='no_fetch_url', action='store_true', help="Don't fetch from URLs, even if configured.")
ap.add_argument('--no-fetch-instance', dest='no_fetch_instance', action='store_true', help="Don't fetch from instances, even if configured.") ap.add_argument('--no-fetch-instance', dest='no_fetch_instance', action='store_true', help="Don't fetch from instances, even if configured.")
ap.add_argument('--no-push-instance', dest='no_push_instance', action='store_true', help="Don't push to instances, even if configured.") ap.add_argument('--no-push-instance', dest='no_push_instance', action='store_true', help="Don't push to instances, even if configured.")
ap.add_argument('--include-private-comments', dest='include_private_comments', action='store_true', help="Include private_comment field in exports and imports.")
ap.add_argument('--loglevel', choices=['debug', 'info', 'warning', 'error', 'critical'], help="Set log output level.") ap.add_argument('--loglevel', choices=['debug', 'info', 'warning', 'error', 'critical'], help="Set log output level.")
ap.add_argument('--dryrun', action='store_true', help="Don't actually push updates, just show what would happen.") ap.add_argument('--dryrun', action='store_true', help="Don't actually push updates, just show what would happen.")

View File

@ -39,3 +39,9 @@ blocklist_instance_destinations = [
# The 'min' mergeplan will use the lightest severity block found for a domain. # The 'min' mergeplan will use the lightest severity block found for a domain.
# mergeplan = 'max' # mergeplan = 'max'
## Include private_comment field when exporting or importing a blocklist.
## By default this isn't exported or imported to keep private comments private, but
## if you're using this tool to back up or maintain your own bulk blocklists you might
## want to include the private comments as well. Use with care.
##
#include_private_comments = false