337 lines
16 KiB
Python
Executable File
337 lines
16 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
""" Script to parse a sitemap.xml file,
|
|
then look through a NGINX log file for the number of hits for each of the URLs
|
|
defined in the sitemap, by unique IP.
|
|
"""
|
|
import os
|
|
import re
|
|
import sys
|
|
import json
|
|
import socket
|
|
import getpass
|
|
import argparse
|
|
import subprocess
|
|
import configparser
|
|
from datetime import datetime
|
|
from collections import defaultdict, Counter
|
|
from itertools import repeat
|
|
from urllib.parse import urlparse
|
|
from typing import Dict, List, Tuple, Set
|
|
import xml.etree.ElementTree as ET
|
|
|
|
import requests
|
|
|
|
VisitDict = Dict[str, Set[str]]
|
|
MAX_UA_NB = 1000
|
|
|
|
def parse_args()-> argparse.Namespace:
|
|
""" Parse arguments of the script
|
|
"""
|
|
parser = argparse.ArgumentParser(description='Collect number of daily loading of each page '
|
|
'in the nginx log file.')
|
|
parser.add_argument("-s", "--sitemap", default="/var/www/html/sitemap.xml",
|
|
help="Path to the sitemap xml file for the website.")
|
|
parser.add_argument("-l", "--logfile", default="/var/log/nginx/saxodwarf.fr-access.log.1",
|
|
help="Path to the log file to analyze")
|
|
parser.add_argument("-e", "--exclude-crawler", action="store_true",
|
|
help="If set, uses a crawler-user-agent.json file to exclude requests "
|
|
"made by bots.")
|
|
parser.add_argument("-t", "--telegraf-url",
|
|
help="URL for a telegraf http listener v2")
|
|
parser.add_argument("-u", "--user",
|
|
help="Username for the telegraf export")
|
|
parser.add_argument("-c", "--config-file",
|
|
help="Configuration file for the URL, the username and password of "
|
|
"the exporter")
|
|
return parser.parse_args()
|
|
|
|
def print_visit_dict(title:str, visit_dict: VisitDict)-> None:
|
|
""" Pretty-print a visit dictionnary
|
|
Keys are locations, values are list of IPs.
|
|
"""
|
|
total_visits=0
|
|
print(f'======== {title} ========')
|
|
for loc, ips in visit_dict.items():
|
|
print(f"{loc}: {len(ips)}")
|
|
total_visits += len(ips)
|
|
print(f'Total visits for {title}: {total_visits}')
|
|
|
|
class TelegrafExporter():
|
|
""" A class to export viti count to a telegraf instance using the http listener v2
|
|
input plugin
|
|
"""
|
|
def __init__(self, telegraf_url: str, username: str, password: str, source: str):
|
|
self.telegraf_url = telegraf_url
|
|
self.username = username
|
|
self._password = password
|
|
self.source = source
|
|
|
|
def telegraf_post(self, timestamp:int, create_time: int, title:str,
|
|
metric:str, count:int)-> requests.Response:
|
|
""" Post a value to telegraf
|
|
:param timestamp: timestamp used by influxdb as time field.
|
|
:param create_time: second of the day at which the data point is exported
|
|
(to de-duplicate entries generated on the same day).
|
|
:param title: name of the destination table in influxdb
|
|
:param location: path for which we register the hit count, used as a tag in influxdb.
|
|
:param count: hit count for the aforementioned path
|
|
"""
|
|
payload = {"name": title,
|
|
"timestamp": timestamp,
|
|
"create_time": create_time,
|
|
"source": self.source,
|
|
"location": metric,
|
|
"hits": count}
|
|
return requests.post(self.telegraf_url,
|
|
json=payload,
|
|
auth=(self.username, self._password))
|
|
|
|
|
|
def export_result_to_telegraf(self, page_hits: VisitDict,
|
|
bot_hits: VisitDict,
|
|
user_agents: Dict[str, int],
|
|
methods: Counter[str],
|
|
timestamp: int) -> None:
|
|
""" Export the bot_hits and page_hits dictionnaries to telegraf
|
|
"""
|
|
# export standard hits
|
|
now = datetime.now().time()
|
|
create_time = now.second + 60*now.minute + 3600*now.hour
|
|
|
|
name="blog_client_hit"
|
|
for location, ips in page_hits.items():
|
|
try:
|
|
response = self.telegraf_post(timestamp,
|
|
create_time,
|
|
name,
|
|
location,
|
|
len(ips))
|
|
response.raise_for_status()
|
|
except requests.exceptions.RequestException as excpt:
|
|
print(excpt)
|
|
sys.exit(1)
|
|
# export bots hits
|
|
name="blog_bot_hit"
|
|
for location, ips in bot_hits.items():
|
|
try:
|
|
response = self.telegraf_post(timestamp,
|
|
create_time,
|
|
name,
|
|
location,
|
|
len(ips))
|
|
response.raise_for_status()
|
|
except requests.exceptions.RequestException as excpt:
|
|
print(excpt)
|
|
sys.exit(1)
|
|
# export user agent variety
|
|
name="user_agent_variety"
|
|
for metric_name, count in user_agents.items():
|
|
try:
|
|
response = self.telegraf_post(timestamp,
|
|
create_time,
|
|
name,
|
|
metric_name,
|
|
count)
|
|
response.raise_for_status()
|
|
except requests.exceptions.RequestException as excpt:
|
|
print(excpt)
|
|
sys.exit(1)
|
|
# export method variety
|
|
name="method_variety"
|
|
for metric_name, count in methods.items():
|
|
try:
|
|
response = self.telegraf_post(timestamp,
|
|
create_time,
|
|
name,
|
|
metric_name,
|
|
count)
|
|
response.raise_for_status()
|
|
except requests.exceptions.RequestException as excpt:
|
|
print(excpt)
|
|
sys.exit(1)
|
|
|
|
def get_crawler_patterns(exclude_crawler: bool) -> List[re.Pattern[str]]:
|
|
""" Parse the crawler-user-agent file, and returns a list
|
|
of compiled regex crawler patterns
|
|
"""
|
|
if exclude_crawler:
|
|
base_path = os.path.dirname(os.path.abspath(__file__))
|
|
crawler_path = os.path.join(base_path, "crawler-user-agents.json")
|
|
if not os.path.exists(crawler_path) or os.path.getsize(crawler_path) == 0:
|
|
# retrieve the crawler file from github
|
|
cmd = ["wget", "-O", crawler_path,
|
|
"https://raw.githubusercontent.com/monperrus/crawler-user-agents/master/"
|
|
"crawler-user-agents.json"]
|
|
subprocess.run(cmd, check=False)
|
|
try:
|
|
with open(crawler_path, 'r', encoding='utf-8') as crawler_file:
|
|
crawlers = json.load(crawler_file)
|
|
except (FileNotFoundError, json.JSONDecodeError):
|
|
print("Could not open and use the crawler user agent file")
|
|
crawlers = []
|
|
else:
|
|
crawlers = []
|
|
# Crawlers patterns are built once and for all for speed
|
|
crawler_patterns = [re.compile(entry["pattern"]) for entry in crawlers]
|
|
return crawler_patterns
|
|
|
|
def get_locations(sitemap_path:str) -> List[str]:
|
|
""" Parse a sitemap file, and return the list of all its locations
|
|
"""
|
|
locations = []
|
|
tree = ET.parse(sitemap_path)
|
|
root = tree.getroot()
|
|
# Get the default XML namespace, needed for tag lookup later
|
|
match_nsp = re.match(r'{.*}', root.tag)
|
|
nsp = match_nsp.group(0) if match_nsp else ""
|
|
for url in root:
|
|
loc_elmt = url.find(f"{nsp}loc")
|
|
if loc_elmt is not None:
|
|
locations.append(str(urlparse(loc_elmt.text).path))
|
|
return locations
|
|
|
|
|
|
def parse_logfile(logfile_path: str, locations: List[str],
|
|
crawler_patterns: List[re.Pattern[str]]) -> Tuple[VisitDict, VisitDict,
|
|
VisitDict, Dict[str, int],
|
|
Counter[str]]:
|
|
""" Parse a logfile, and return 4 dicts:
|
|
page_hits, bot_hits, other_hits and additional_infos
|
|
"""
|
|
time_local_fmt = "%d/%b/%Y:%H:%M:%S %z"
|
|
# Regexes for all the pattern matching
|
|
# Default format for NGINX log is:
|
|
# pylint: disable=line-too-long
|
|
# $remote_addr - $remote_user [$time_local] "$request" $status $body_bytes_sent "$http_referer" "$http_user_agent"
|
|
log_line_template = (r'^(?P<ip_address>[0-9a-f.:]+) \- .+? \[(?P<time_local>.*)\] '
|
|
'"(?P<method>[A-Z]+) (?P<location>{locations}) .*?" [0-9]{{3}} [0-9]+ ".+?" '
|
|
'"(?P<user_agent>.+)"$')
|
|
#known_page_regex = re.compile(log_line_template.format(locations='|'.join(map(re.escape,
|
|
# locations))))
|
|
log_regex = re.compile(log_line_template.format(locations='.+?'))
|
|
|
|
# Output data structure initialization
|
|
visit_dict: VisitDict = dict(map(lambda x: (x, set()), locations))
|
|
bot_visit_dict: VisitDict = dict(map(lambda x: (x, set()), locations))
|
|
other_visit_dict: VisitDict = defaultdict(set)
|
|
bot_user_agents: Set[str] = set()
|
|
client_user_agents: Set[str] = set()
|
|
method_counter: Counter[str] = Counter()
|
|
# The way to get the timezone data here is not great (not taking into account DST and such)
|
|
# but it is a fallback default date that should hardly ever be used.
|
|
last_log_date = datetime.now(datetime.now().astimezone().tzinfo).strftime(time_local_fmt)
|
|
|
|
# Do not parse a log file that has not been edited since more than 24 hours
|
|
if abs(os.path.getmtime(logfile_path) - datetime.now().timestamp()) >= 24 * 3600:
|
|
print("Log file is too old, there was no access today.")
|
|
logfile_path="/dev/null"
|
|
with open(logfile_path, 'r', encoding='utf-8') as logfile:
|
|
for line in logfile:
|
|
match_obj = re.match(log_regex, line)
|
|
if match_obj:
|
|
client_ip = match_obj.group("ip_address")
|
|
location = match_obj.group("location")
|
|
last_log_date = match_obj.group("time_local")
|
|
user_agent = match_obj.group("user_agent")
|
|
method = match_obj.group("method")
|
|
if method == "GET" and location in locations:
|
|
# For each line, if it is a GET on a known page, count it
|
|
if ((not user_agent in bot_user_agents and
|
|
(len(client_user_agents) + len(bot_user_agents)) < MAX_UA_NB) and
|
|
(user_agent in client_user_agents or
|
|
not any(map(re.search, crawler_patterns,
|
|
repeat(user_agent))))):
|
|
visit_dict[location].add(client_ip)
|
|
client_user_agents.add(user_agent)
|
|
else:
|
|
bot_visit_dict[location].add(client_ip)
|
|
bot_user_agents.add(user_agent)
|
|
else:
|
|
# Also count lines that are NOT "GET on a known page" in a different dict.
|
|
# Those other hits can be static site ressources loaded,
|
|
# in which case we group the hits
|
|
method_counter[method] += 1
|
|
if location.startswith("/isso/"):
|
|
other_visit_dict["/isso/*"].add(client_ip)
|
|
elif location.startswith("/assets/css/"):
|
|
other_visit_dict["/assets/css/*"].add(client_ip)
|
|
elif location.startswith("/assets/js/"):
|
|
other_visit_dict["/assets/js/*"].add(client_ip)
|
|
elif location.startswith("/images/"):
|
|
other_visit_dict["/images/*"].add(client_ip)
|
|
else:
|
|
# for everything else, we store the exact path, but not the query string
|
|
other_visit_dict[location.split('?')[0]].add(client_ip)
|
|
today_date = datetime.strptime(last_log_date, time_local_fmt).replace(hour=0,
|
|
minute=0,
|
|
second=0,
|
|
microsecond=0)
|
|
additional_infos = {"last_log_timestamp": int(today_date.timestamp()),
|
|
"bot_user_agents_nb": len(bot_user_agents),
|
|
"client_user_agents_nb": len(client_user_agents)}
|
|
|
|
|
|
return visit_dict, bot_visit_dict, other_visit_dict, additional_infos, method_counter
|
|
|
|
|
|
def main() -> None:
|
|
""" Parses the arguments, the crawler file and the sitemap,
|
|
Then reads the log file line by line, regexes through it to isolate locations and client IP
|
|
It records the number of unique IP accessing each known pages (from the sitemap), and
|
|
the number of unique IP accessing each unknown locations.
|
|
(either ressources being loaded or bot looking for vulnerable website).
|
|
"""
|
|
args = parse_args()
|
|
telegraf_url = ""
|
|
|
|
# Read config file
|
|
if args.config_file:
|
|
config = configparser.ConfigParser()
|
|
config.read(args.config_file)
|
|
try:
|
|
username = config["telegraf"]["username"]
|
|
telegraf_url = config["telegraf"]["url"]
|
|
_password = config["telegraf"]["password"]
|
|
except KeyError as excpt:
|
|
print(f"Error: missing key in configuration file '{args.config_file}': {excpt.args[0]}")
|
|
sys.exit(1)
|
|
elif args.telegraf_url:
|
|
telegraf_url = args.telegraf_url
|
|
username = args.user if args.user else input("Telegraf username: ")
|
|
_password = getpass.getpass("Telegraf password: ")
|
|
|
|
# Get parser, get locations and parse the log file
|
|
crawler_patterns = get_crawler_patterns(args.exclude_crawler)
|
|
locations = get_locations(args.sitemap)
|
|
(visit_dict, bot_visit_dict, other_visit_dict,
|
|
additional_infos, method_counter) = parse_logfile(args.logfile,
|
|
locations,
|
|
crawler_patterns)
|
|
|
|
# Generate the report
|
|
print_visit_dict("Standard visits", visit_dict)
|
|
print(f"There were {additional_infos['client_user_agents_nb']} unique client user agent(s)")
|
|
if args.exclude_crawler:
|
|
print_visit_dict("Bot visits", bot_visit_dict)
|
|
print(f"There were {additional_infos['bot_user_agents_nb']} unique bot user agent(s)")
|
|
print_visit_dict("Other visits", other_visit_dict)
|
|
for method, count in method_counter.items():
|
|
print(f"{method}: {count}")
|
|
|
|
if telegraf_url:
|
|
exporter = TelegrafExporter(telegraf_url=telegraf_url,
|
|
username=username,
|
|
password=_password,
|
|
source=socket.gethostname())
|
|
exporter.export_result_to_telegraf(visit_dict,
|
|
bot_visit_dict,
|
|
{"bot_user_agents":
|
|
additional_infos['bot_user_agents_nb'],
|
|
"client_user_agents":
|
|
additional_infos['client_user_agents_nb']},
|
|
method_counter,
|
|
additional_infos["last_log_timestamp"])
|
|
|
|
if __name__ == "__main__":
|
|
main()
|