Compare commits

..

5 Commits

Author SHA1 Message Date
saxodwarf 4a1c96574b Merge pull request 'Make a lots of improvements' (#2) from first_version into master
Reviewed-on: #2
2021-09-18 13:47:33 +02:00
Hugo c5be2476a5 Merge branch 'master' into first_version 2021-09-18 13:47:02 +02:00
Hugo 6644749fab huge improvements 2021-09-18 13:40:00 +02:00
Hugo 3e364d5f51 ditch grep and make all filtering in pure python 2021-09-11 17:33:06 +02:00
Hugo 575c7e89db First version of the script 2021-09-04 16:04:53 +02:00
4 changed files with 218 additions and 27 deletions

5
.gitignore vendored
View File

@ -109,7 +109,7 @@ celerybeat.pid
.env
.venv
env/
venv/
*venv/
ENV/
env.bak/
venv.bak/
@ -138,3 +138,6 @@ dmypy.json
# Cython debug symbols
cython_debug/
# config
*.conf
crawler-user-agents.json

View File

@ -1,3 +1,5 @@
# pages_stats
Simple script to gather daily global statistics for hugo post served
Simple script to gather daily global statistics for hugo post served.
The crawler-user-agents.json file comes from [this project](https://github.com/monperrus/crawler-user-agents/).

235
get_page_stats.py Normal file → Executable file
View File

@ -1,62 +1,247 @@
#!/bin/env python3
#!/usr/bin/env python3
""" Script to parse a sitemap.xml file,
then look through a NGINX log file for the number of hits for each of the URLs
defined in the sitemap, by unique IP.
"""
import os
import re
import sys
import json
import socket
import getpass
import argparse
import subprocess
import configparser
from datetime import datetime, time
from collections import defaultdict
from itertools import repeat
from subprocess import run
from urllib.parse import urlparse
from typing import Dict, List, Tuple, Set
import xml.etree.ElementTree as ET
import requests
def parse_args():
parser = argparse.ArgumentParser(description='Collect number of daily loading of each page in the nginx log file.')
parser.add_argument("-s", "--sitemap", default="/var/www/my_webapp/www/sitemap.xml",
VisitDict = Dict[str, Set[str]]
def parse_args()-> argparse.Namespace:
""" Parse arguments of the script
"""
parser = argparse.ArgumentParser(description='Collect number of daily loading of each page '
'in the nginx log file.')
parser.add_argument("-s", "--sitemap", default="/var/www/html/sitemap.xml",
help="Path to the sitemap xml file for the website.")
parser.add_argument("-l", "--logfile", default="/var/log/nginx/saxodwarf.fr-access.log.1",
help="Path to the log file to analyze")
parser.add_argument("-e", "--exclude-crawler", action="store_true",
help="If set, uses a crawler-user-agent.json file to exclude requests made by bots.")
help="If set, uses a crawler-user-agent.json file to exclude requests "
"made by bots.")
parser.add_argument("-t", "--telegraf-url",
help="URL for a telegraf http listener v2")
parser.add_argument("-u", "--user",
help="Username for the telegraf export")
parser.add_argument("-c", "--config-file",
help="Configuration file for the URL, the username and password of "
"the exporter")
return parser.parse_args()
def main():
""" Parses the arguments, the crawler file and the sitemap,
then for each locations, uses grep to select the lines containing GET calls for
the location, and prints the number of unique IP accessing it.
def print_visit_dict(title:str, visit_dict: VisitDict)-> None:
""" Pretty-print a visit dictionnary
Keys are locations, values are list of IPs.
"""
args = parse_args()
total_visits=0
print(f'======== {title} ========')
for loc, ips in visit_dict.items():
print(f"{loc}: {len(ips)}")
total_visits += len(ips)
print(f'Total visits for {title}: {total_visits}')
if args.exclude_crawler:
class TelegrafExporter():
""" A class to export viti count to a telegraf instance using the http listener v2
input plugin
"""
def __init__(self, telegraf_url: str, username: str, password: str, source: str):
self.telegraf_url = telegraf_url
self.username = username
self._password = password
self.source = source
def telegraf_post(self, timestamp:int, title:str, location:str, count:int)-> requests.Response:
""" Post a value to telegraf
"""
payload = {"name": title,
"timestamp": timestamp,
"source": self.source,
"location": location,
"hits": count}
return requests.post(self.telegraf_url,
json=payload,
auth=(self.username, self._password))
def export_result_to_telegraf(self, page_hits: VisitDict, bot_hits: VisitDict) -> None:
""" Export the bot_hits and page_hits dictionnaries to telegraf
"""
# export standard hits
timestamp = int(datetime.combine(datetime.now().date(), time()).timestamp())
name="blog_client_hit"
for location, ips in page_hits.items():
try:
response = self.telegraf_post(timestamp,
name,
location,
len(ips))
response.raise_for_status()
except requests.exceptions.RequestException as excpt:
print(excpt)
sys.exit(1)
# export bots hits
name="blog_bot_hit"
for location, ips in bot_hits.items():
try:
response = self.telegraf_post(timestamp,
name,
location,
len(ips))
response.raise_for_status()
except requests.exceptions.RequestException as excpt:
print(excpt)
sys.exit(1)
def get_crawler_patterns(exclude_crawler: bool) -> List[str]:
""" Parse the crawler-user-agent file, and returns a list
of compiled regex crawler patterns
"""
if exclude_crawler:
base_path = os.path.dirname(os.path.abspath(__file__))
crawler_path = os.path.join(base_path, "crawler-user-agents.json")
if not os.path.exists(crawler_path) or os.path.getsize(crawler_path) == 0:
# retrieve the crawler file from github
cmd = ["wget", "-O", crawler_path,
"https://raw.githubusercontent.com/monperrus/crawler-user-agents/master/"
"crawler-user-agents.json"]
subprocess.run(cmd, check=False)
try:
with open("./crawler-user-agents.json", 'r') as crawler_file:
with open(crawler_path, 'r', encoding='utf-8') as crawler_file:
crawlers = json.load(crawler_file)
except (FileNotFoundError, json.JSONDecodeError):
print("Could not open the crawler user agent file")
print("Could not open and use the crawler user agent file")
crawlers = []
else:
crawlers = []
# Crawlers patterns are built once and for all for speed
crawler_patterns = [re.compile(entry["pattern"]) for entry in crawlers]
return crawler_patterns
def get_locations(sitemap_path:str) -> List[str]:
""" Parse a sitemap file, and return the list of all its locations
"""
locations = []
tree = ET.parse(args.sitemap)
tree = ET.parse(sitemap_path)
root = tree.getroot()
# Get the default XML namespace, needed for tag lookup later
ns = re.match(r'{.*}', root.tag).group(0)
match_nsp = re.match(r'{.*}', root.tag)
nsp = match_nsp.group(0) if match_nsp else ""
for url in root:
locations.append(urlparse(url.find(f"{ns}loc").text).path)
loc_elmt = url.find(f"{nsp}loc")
if loc_elmt is not None:
locations.append(str(urlparse(loc_elmt.text).path))
return locations
for path in locations:
# Pre-process log file using grep, to keep only interesting lines
cmd = ["grep", "-e", f'GET {path} ', args.logfile]
process = run(cmd, capture_output=True, text=True)
# Silmutaneously keep only unique source IP and exclude crawlers if resquested
lines = {line.split(' ')[0] for line in process.stdout.splitlines() if not any(map(re.search, crawler_patterns, repeat(line)))}
print(f"{path}: {len(lines)}")
def parse_logfile(logfile_path: str, locations: List[str],
crawler_patterns: List[str]) -> Tuple[VisitDict, VisitDict, VisitDict]:
""" Parse a logfile, and return 3 dicts:
page_hits, bot_hits and other_hits
"""
# Regexes for all the pattern matching
log_line_template = r'^(?P<ip_address>[0-9a-f.:]+) .*"GET (?P<location>{locations}) .*'
known_page_regex = re.compile(log_line_template.format(locations='|'.join(map(re.escape,
locations))))
other_pages_regex = re.compile(log_line_template.format(locations='.+?'))
# Output data structure initialization
visit_dict: VisitDict = dict(map(lambda x: (x, set()), locations))
bot_visit_dict: VisitDict = dict(map(lambda x: (x, set()), locations))
other_visit_dict: VisitDict = defaultdict(set)
with open(logfile_path, 'r', encoding='utf-8') as logfile:
for line in logfile:
match_obj = re.match(known_page_regex, line)
if match_obj:
# For each line, check if it is a GET on a lnown page, and count those
client_ip = match_obj.group("ip_address")
location = match_obj.group("location")
if not any(map(re.search, crawler_patterns, repeat(line))):
visit_dict[location].add(client_ip)
else:
bot_visit_dict[location].add(client_ip)
else:
# Also count lines that are NOT GET on a known page in a different dict.
match_obj = re.match(other_pages_regex, line)
if match_obj:
client_ip = match_obj.group("ip_address")
location = match_obj.group("location")
# Those other hits are either ressource loaded, in this case we group the hits
if location.startswith("/isso/"):
other_visit_dict["/isso/*"].add(client_ip)
elif location.startswith("/assets/css/"):
other_visit_dict["/assets/css/*"].add(client_ip)
elif location.startswith("/assets/js/"):
other_visit_dict["/assets/js/*"].add(client_ip)
elif location.startswith("/images/"):
other_visit_dict["/images/*"].add(client_ip)
else:
# for everything else, we store the exact path
other_visit_dict[location.split('?')[0]].add(client_ip)
return visit_dict, bot_visit_dict, other_visit_dict
def main() -> None:
""" Parses the arguments, the crawler file and the sitemap,
Then reads the log file line by line, regexes through it to isolate locations and client IP
It records the number of unique IP accessing each known pages (from the sitemap), and
the number of unique IP accessing each unknown locations.
(either ressources being loaded or bot looking for vulnerable website).
"""
args = parse_args()
telegraf_url = ""
# Read config file
if args.config_file:
config = configparser.ConfigParser()
config.read(args.config_file)
try:
username = config["telegraf"]["username"]
telegraf_url = config["telegraf"]["url"]
_password = config["telegraf"]["password"]
except KeyError as excpt:
print(f"Error: missing key in configuration file '{args.config_file}': {excpt.args[0]}")
sys.exit(1)
elif args.telegraf_url:
telegraf_url = args.telegraf_url
username = args.user if args.user else input("Telegraf username: ")
_password = getpass.getpass("Telegraf password: ")
# Get parser, get locations and parse the log file
crawler_patterns = get_crawler_patterns(args.exclude_crawler)
locations = get_locations(args.sitemap)
visit_dict, bot_visit_dict, other_visit_dict = parse_logfile(args.logfile,
locations,
crawler_patterns)
# Generate the report
print_visit_dict("Standard visits", visit_dict)
if args.exclude_crawler:
print_visit_dict("Bot visits", bot_visit_dict)
print_visit_dict("Other visits", other_visit_dict)
if telegraf_url:
exporter = TelegrafExporter(telegraf_url=telegraf_url,
username=username,
password=_password,
source=socket.gethostname())
exporter.export_result_to_telegraf(visit_dict, bot_visit_dict)
if __name__ == "__main__":
main()

1
requirements.txt Normal file
View File

@ -0,0 +1 @@
requests