#!/usr/bin/env python3 """ Script to parse a sitemap.xml file, then look through a NGINX log file for the number of hits for each of the URLs defined in the sitemap, by unique IP. """ import os import re import sys import json import socket import getpass import argparse import subprocess import configparser from datetime import datetime from collections import defaultdict, Counter from itertools import repeat from urllib.parse import urlparse from typing import Dict, List, Tuple, Set import xml.etree.ElementTree as ET import requests VisitDict = Dict[str, Set[str]] MAX_UA_NB = 1000 def parse_args()-> argparse.Namespace: """ Parse arguments of the script """ parser = argparse.ArgumentParser(description='Collect number of daily loading of each page ' 'in the nginx log file.') parser.add_argument("-s", "--sitemap", default="/var/www/html/sitemap.xml", help="Path to the sitemap xml file for the website.") parser.add_argument("-l", "--logfile", default="/var/log/nginx/saxodwarf.fr-access.log.1", help="Path to the log file to analyze") parser.add_argument("-e", "--exclude-crawler", action="store_true", help="If set, uses a crawler-user-agent.json file to exclude requests " "made by bots.") parser.add_argument("-t", "--telegraf-url", help="URL for a telegraf http listener v2") parser.add_argument("-u", "--user", help="Username for the telegraf export") parser.add_argument("-c", "--config-file", help="Configuration file for the URL, the username and password of " "the exporter") return parser.parse_args() def print_visit_dict(title:str, visit_dict: VisitDict)-> None: """ Pretty-print a visit dictionnary Keys are locations, values are list of IPs. """ total_visits=0 print(f'======== {title} ========') for loc, ips in visit_dict.items(): print(f"{loc}: {len(ips)}") total_visits += len(ips) print(f'Total visits for {title}: {total_visits}') class TelegrafExporter(): """ A class to export viti count to a telegraf instance using the http listener v2 input plugin """ def __init__(self, telegraf_url: str, username: str, password: str, source: str): self.telegraf_url = telegraf_url self.username = username self._password = password self.source = source def telegraf_post(self, timestamp:int, create_time: int, title:str, metric:str, count:int)-> requests.Response: """ Post a value to telegraf :param timestamp: timestamp used by influxdb as time field. :param create_time: second of the day at which the data point is exported (to de-duplicate entries generated on the same day). :param title: name of the destination table in influxdb :param location: path for which we register the hit count, used as a tag in influxdb. :param count: hit count for the aforementioned path """ payload = {"name": title, "timestamp": timestamp, "create_time": create_time, "source": self.source, "location": metric, "hits": count} return requests.post(self.telegraf_url, json=payload, auth=(self.username, self._password)) def export_result_to_telegraf(self, page_hits: VisitDict, bot_hits: VisitDict, user_agents: Dict[str, int], methods: Counter[str], timestamp: int) -> None: """ Export the bot_hits and page_hits dictionnaries to telegraf """ # export standard hits now = datetime.now().time() create_time = now.second + 60*now.minute + 3600*now.hour name="blog_client_hit" for location, ips in page_hits.items(): try: response = self.telegraf_post(timestamp, create_time, name, location, len(ips)) response.raise_for_status() except requests.exceptions.RequestException as excpt: print(excpt) sys.exit(1) # export bots hits name="blog_bot_hit" for location, ips in bot_hits.items(): try: response = self.telegraf_post(timestamp, create_time, name, location, len(ips)) response.raise_for_status() except requests.exceptions.RequestException as excpt: print(excpt) sys.exit(1) # export user agent variety name="user_agent_variety" for metric_name, count in user_agents.items(): try: response = self.telegraf_post(timestamp, create_time, name, metric_name, count) response.raise_for_status() except requests.exceptions.RequestException as excpt: print(excpt) sys.exit(1) # export method variety name="method_variety" for metric_name, count in methods.items(): try: response = self.telegraf_post(timestamp, create_time, name, metric_name, count) response.raise_for_status() except requests.exceptions.RequestException as excpt: print(excpt) sys.exit(1) def get_crawler_patterns(exclude_crawler: bool) -> List[re.Pattern[str]]: """ Parse the crawler-user-agent file, and returns a list of compiled regex crawler patterns """ if exclude_crawler: base_path = os.path.dirname(os.path.abspath(__file__)) crawler_path = os.path.join(base_path, "crawler-user-agents.json") if not os.path.exists(crawler_path) or os.path.getsize(crawler_path) == 0: # retrieve the crawler file from github cmd = ["wget", "-O", crawler_path, "https://raw.githubusercontent.com/monperrus/crawler-user-agents/master/" "crawler-user-agents.json"] subprocess.run(cmd, check=False) try: with open(crawler_path, 'r', encoding='utf-8') as crawler_file: crawlers = json.load(crawler_file) except (FileNotFoundError, json.JSONDecodeError): print("Could not open and use the crawler user agent file") crawlers = [] else: crawlers = [] # Crawlers patterns are built once and for all for speed crawler_patterns = [re.compile(entry["pattern"]) for entry in crawlers] return crawler_patterns def get_locations(sitemap_path:str) -> List[str]: """ Parse a sitemap file, and return the list of all its locations """ locations = [] tree = ET.parse(sitemap_path) root = tree.getroot() # Get the default XML namespace, needed for tag lookup later match_nsp = re.match(r'{.*}', root.tag) nsp = match_nsp.group(0) if match_nsp else "" for url in root: loc_elmt = url.find(f"{nsp}loc") if loc_elmt is not None: locations.append(str(urlparse(loc_elmt.text).path)) return locations def parse_logfile(logfile_path: str, locations: List[str], crawler_patterns: List[re.Pattern[str]]) -> Tuple[VisitDict, VisitDict, VisitDict, Dict[str, int], Counter[str]]: """ Parse a logfile, and return 4 dicts: page_hits, bot_hits, other_hits and additional_infos """ time_local_fmt = "%d/%b/%Y:%H:%M:%S %z" # Regexes for all the pattern matching # Default format for NGINX log is: # pylint: disable=line-too-long # $remote_addr - $remote_user [$time_local] "$request" $status $body_bytes_sent "$http_referer" "$http_user_agent" log_line_template = (r'^(?P[0-9a-f.:]+) \- .+? \[(?P.*)\] ' '"(?P[A-Z]+) (?P{locations}) .*?" [0-9]{{3}} [0-9]+ ".+?" ' '"(?P.+)"$') #known_page_regex = re.compile(log_line_template.format(locations='|'.join(map(re.escape, # locations)))) log_regex = re.compile(log_line_template.format(locations='.+?')) # Output data structure initialization visit_dict: VisitDict = dict(map(lambda x: (x, set()), locations)) bot_visit_dict: VisitDict = dict(map(lambda x: (x, set()), locations)) other_visit_dict: VisitDict = defaultdict(set) bot_user_agents: Set[str] = set() client_user_agents: Set[str] = set() method_counter: Counter[str] = Counter() # The way to get the timezone data here is not great (not taking into account DST and such) # but it is a fallback default date that should hardly ever be used. last_log_date = datetime.now(datetime.now().astimezone().tzinfo).strftime(time_local_fmt) # Do not parse a log file that has not been edited since more than 24 hours if abs(os.path.getmtime(logfile_path) - datetime.now().timestamp()) >= 24 * 3600: print("Log file is too old, there was no access today.") logfile_path="/dev/null" with open(logfile_path, 'r', encoding='utf-8') as logfile: for line in logfile: match_obj = re.match(log_regex, line) if match_obj: client_ip = match_obj.group("ip_address") location = match_obj.group("location") last_log_date = match_obj.group("time_local") user_agent = match_obj.group("user_agent") method = match_obj.group("method") if method == "GET" and location in locations: # For each line, if it is a GET on a known page, count it if ((not user_agent in bot_user_agents and (len(client_user_agents) + len(bot_user_agents)) < MAX_UA_NB) and (user_agent in client_user_agents or not any(map(re.search, crawler_patterns, repeat(user_agent))))): visit_dict[location].add(client_ip) client_user_agents.add(user_agent) else: bot_visit_dict[location].add(client_ip) bot_user_agents.add(user_agent) else: # Also count lines that are NOT "GET on a known page" in a different dict. # Those other hits can be static site ressources loaded, # in which case we group the hits method_counter[method] += 1 if location.startswith("/isso/"): other_visit_dict["/isso/*"].add(client_ip) elif location.startswith("/assets/css/"): other_visit_dict["/assets/css/*"].add(client_ip) elif location.startswith("/assets/js/"): other_visit_dict["/assets/js/*"].add(client_ip) elif location.startswith("/images/"): other_visit_dict["/images/*"].add(client_ip) else: # for everything else, we store the exact path, but not the query string other_visit_dict[location.split('?')[0]].add(client_ip) today_date = datetime.strptime(last_log_date, time_local_fmt).replace(hour=0, minute=0, second=0, microsecond=0) additional_infos = {"last_log_timestamp": int(today_date.timestamp()), "bot_user_agents_nb": len(bot_user_agents), "client_user_agents_nb": len(client_user_agents)} return visit_dict, bot_visit_dict, other_visit_dict, additional_infos, method_counter def main() -> None: """ Parses the arguments, the crawler file and the sitemap, Then reads the log file line by line, regexes through it to isolate locations and client IP It records the number of unique IP accessing each known pages (from the sitemap), and the number of unique IP accessing each unknown locations. (either ressources being loaded or bot looking for vulnerable website). """ args = parse_args() telegraf_url = "" # Read config file if args.config_file: config = configparser.ConfigParser() config.read(args.config_file) try: username = config["telegraf"]["username"] telegraf_url = config["telegraf"]["url"] _password = config["telegraf"]["password"] except KeyError as excpt: print(f"Error: missing key in configuration file '{args.config_file}': {excpt.args[0]}") sys.exit(1) elif args.telegraf_url: telegraf_url = args.telegraf_url username = args.user if args.user else input("Telegraf username: ") _password = getpass.getpass("Telegraf password: ") # Get parser, get locations and parse the log file crawler_patterns = get_crawler_patterns(args.exclude_crawler) locations = get_locations(args.sitemap) (visit_dict, bot_visit_dict, other_visit_dict, additional_infos, method_counter) = parse_logfile(args.logfile, locations, crawler_patterns) # Generate the report print_visit_dict("Standard visits", visit_dict) print(f"There were {additional_infos['client_user_agents_nb']} unique client user agent(s)") if args.exclude_crawler: print_visit_dict("Bot visits", bot_visit_dict) print(f"There were {additional_infos['bot_user_agents_nb']} unique bot user agent(s)") print_visit_dict("Other visits", other_visit_dict) for method, count in method_counter.items(): print(f"{method}: {count}") if telegraf_url: exporter = TelegrafExporter(telegraf_url=telegraf_url, username=username, password=_password, source=socket.gethostname()) exporter.export_result_to_telegraf(visit_dict, bot_visit_dict, {"bot_user_agents": additional_infos['bot_user_agents_nb'], "client_user_agents": additional_infos['client_user_agents_nb']}, method_counter, additional_infos["last_log_timestamp"]) if __name__ == "__main__": main()