Add pagination for long messages and type anotation

This commit is contained in:
Hugo 2021-11-14 08:12:54 +01:00
parent 81176a7ece
commit 73ebfa7414
1 changed files with 110 additions and 18 deletions

View File

@ -2,11 +2,13 @@
""" Script to send SMSes using the Free mobile API """ Script to send SMSes using the Free mobile API
""" """
import sys import sys
import time
import logging import logging
import argparse import argparse
import readline import readline
import configparser import configparser
from itertools import count from itertools import count
from typing import List
import requests import requests
@ -14,32 +16,120 @@ import requests
class SMSApiManager: class SMSApiManager:
""" Class to manage ending SMS notification using Free Mobile API. """ Class to manage ending SMS notification using Free Mobile API.
""" """
FREE_API_URL = "https://smsapi.free-mobile.fr/sendmsg" FREE_SMSAPI_URL = "https://smsapi.free-mobile.fr/sendmsg"
MAX_LENGTH = 999 MAX_LENGTH = 999
RETURN_CODE = {200: "Message sent", RETURN_CODE = {200: "Message sent",
400: "Mandatory parameter is missing", 400: "Mandatory parameter is missing",
402: "Too many SMS sent in to few time", 402: "Too many SMS sent in to few time",
403: "Service disabled", 403: "Service disabled",
500: "Server error, please try again later"} 500: "Server error, please try again later"}
def __init__(self, conf_file_path): def __init__(self, conf_file_path: str):
self._pass = "" self._pass = ""
self.user_id = "" self.user_id = ""
self.read_config(conf_file_path) self._read_config(conf_file_path)
def send_sms(self, msg: str): def send_sms(self, msg: str) -> int:
""" Send a SMS to a user using its passphrase and user ID """ Send a SMS to a user using its passphrase and user ID
Return the response object from requests. Return the response object from requests.
If the message is longer than the maximum amount of characters allowed,
it is truncated silently (better get a truncated notification than no notification).
If you want to keep all of the message, use send_paginated_sms().
""" """
# Make sure we do not send more than MAX_LENGTH characters # Make sure we do not send more than MAX_LENGTH characters
# or it will fail.
msg = msg[:self.MAX_LENGTH] msg = msg[:self.MAX_LENGTH]
data = {"user": self.user_id, data = {"user": self.user_id,
"pass": self._pass, "pass": self._pass,
"msg": msg} "msg": msg}
res = requests.get(self.FREE_API_URL, params=data) res = requests.get(self.FREE_SMSAPI_URL, params=data)
logging.info(self.RETURN_CODE[res.status_code]) logging.info(self.RETURN_CODE[res.status_code])
return res return res.status_code
def read_config(self, conf_file_path): def send_paginated_sms(self, msg: str, max_message:int=-1) -> List[int]:
""" Paginate a message that may be too long for the SMS API.
Try to cut the message either at line break, if not possible at
sentence boundary, and if not possible at word boundary, to avoid
cutting a message mid-word.
If max_message is set and > 0, send at most max_message messages.
Otherwise, there is not limit.
"""
paginated_message = self.paginate_message(msg)
return_values = []
for i, chunk in enumerate(paginated_message, start=1):
if i > 0 and i > max_message:
# pylint: disable=logging-fstring-interpolation
logging.info(f"Hitting chunk number limit, {len(paginated_message)-i+1} out "
f"of {len(paginated_message)} not sent.")
break
message = chunk.strip() + f"\n{i}/{len(paginated_message)}"
# pylint: disable=logging-fstring-interpolation
logging.info(f"Sending chunk {i} out of {len(paginated_message)}: len {len(message)}")
return_values.append(self.send_sms(message))
# Little pause to avoid trigerring rate limiting
time.sleep(0.1)
return return_values
def paginate_message(self, msg: str) -> List[str]:
""" Paginate a message that may be too long for the SMS API.
Try to cut the message either at line break, if not possible at
sentence boundary, and if not possible at word boundary, to avoid
cutting a message mid-word.
"""
## Calculation of the place to leave for the page counter ##
# Worst case scenario, that should not happen a lot:
# Smallest line is self.MAX_LENGTH/2 + 1 character
# Pagination algorithm generates twice the amount of chunk that would be needed
# if we were to cut naively.
# So we need to leave enough place to write "\n<max_nb>/<max_nb>" at the end of the
# message
pagination_mark_max_length = len(str((len(msg)/self.MAX_LENGTH)*2)) * 2 + 2
# Check if message is short enough. There is no way to know beforehand the
# number of chunks, so we leave enough space for the worst case scenario:
if len(msg) <= self.MAX_LENGTH - pagination_mark_max_length:
return [msg]
messages = []
# Cut at each line
msg_lines = msg.splitlines(keepends=True)
if len(msg_lines) == 1:
# If there is only one line and it is still too long, cut at sentence end.
msg_sentences = [sentence + '. ' for sentence in msg.split('. ')
if sentence.strip()]
msg_sentences[-1] = msg_sentences[-1][:-len('. ')]
if len(msg_sentences) == 1:
# If there is only one sentence and it is still too long,
# cut at word boundaries.
msg_words = [word + ' ' for word in msg.split() if word.strip()]
msg_words[-1] = msg_words[-1][:-len(' ')]
messages.extend(msg_words)
else:
for sentence in msg_sentences:
messages.extend(self.paginate_message(sentence))
return messages
for line in msg_lines:
messages.extend(self.paginate_message(line))
# Rationalize the "messages" list.
# Message now contains chunks that all fit in a SMS,
# but multiple chunk could fit in a SMS as well.
# We concatenate those chunks.
final_message = []
chunk = ""
i = 0
while i < len(messages):
while (i < len(messages) and
len(chunk+messages[i]) <= self.MAX_LENGTH - pagination_mark_max_length):
chunk += messages[i]
i += 1
final_message.append(chunk)
chunk = ""
return final_message
def _read_config(self, conf_file_path: str) -> None:
""" Read and apply the configuration from a config file. """ Read and apply the configuration from a config file.
That config file must have a 'creds' section, and the following options: That config file must have a 'creds' section, and the following options:
- 'user_id', that contains the user identifier for the API - 'user_id', that contains the user identifier for the API
@ -51,12 +141,13 @@ class SMSApiManager:
self._pass = conf_parser["creds"]["passphrase"] self._pass = conf_parser["creds"]["passphrase"]
def parse_args(): def parse_args() -> argparse.Namespace:
""" Parse the script's command line argument, and return the args object. """ Parse the script's command line argument, and return the args object.
""" """
my_parser = argparse.ArgumentParser(description="Script to send a SMS to a person. " my_parser = argparse.ArgumentParser(description="Script to send a SMS to a person. "
"The message must be at most " "If the message is longer than "
f"{SMSApiManager.MAX_LENGTH} characters long.") f"{SMSApiManager.MAX_LENGTH} characters, "
"it will be split in multiple SMS.")
my_parser.add_argument("-c", "--config", help="Path to the config file.", my_parser.add_argument("-c", "--config", help="Path to the config file.",
default="secrets.ini") default="secrets.ini")
my_parser.add_argument("message", nargs="?", my_parser.add_argument("message", nargs="?",
@ -69,7 +160,7 @@ def parse_args():
return my_parser.parse_args() return my_parser.parse_args()
def rlinput(prompt, prefill=''): def rlinput(prompt: str, prefill: str='') -> str:
""" Prefill the input content with the value of `prefill` """ Prefill the input content with the value of `prefill`
All credits go to https://stackoverflow.com/a/2533142 All credits go to https://stackoverflow.com/a/2533142
""" """
@ -79,13 +170,14 @@ def rlinput(prompt, prefill=''):
finally: finally:
readline.set_startup_hook() readline.set_startup_hook()
def main(): def main() -> None:
""" Instantiate an sms_manager, and send SMS """ Instantiate an sms_manager, and send SMS
""" """
args = parse_args() args = parse_args()
sms_manager = SMSApiManager(args.config) sms_manager = SMSApiManager(args.config)
while args.interactive: while args.interactive:
# Interactive mode is enabled, loop indefinetely until user hits Ctrl+C or Ctrl+D # Interactive mode is enabled, loop indefinetely until user hits Ctrl+C or Ctrl+D
print("Type your sms, and submit an empty line to send it.")
try: try:
message=[] message=[]
for i in count(): for i in count():
@ -95,22 +187,22 @@ def main():
if not message[i]: if not message[i]:
break break
res = sms_manager.send_sms(msg='\n'.join(message)) res = sms_manager.send_sms(msg='\n'.join(message))
if res.status_code != 200: if res != 200:
print("Error while sending the message:") print("Error while sending the message:")
print(f"{sms_manager.RETURN_CODE[res.status_code]}") print(f"{sms_manager.RETURN_CODE[res]}")
except (EOFError, KeyboardInterrupt): except (EOFError, KeyboardInterrupt):
print("\nBye !") print("\nBye !")
break break
else: else:
# Else either get message from command line or stdin. # Else either get message from command line or stdin, and send it paginated.
logging.basicConfig(level=logging.INFO) logging.basicConfig(level=logging.INFO)
if args.message: if args.message:
# Send the requested message # Send the requested message
sms_manager.send_sms(msg=args.message) sms_manager.send_paginated_sms(msg=args.message)
else: else:
# Read the message from stdin # Read the message from stdin
msg = sys.stdin.read(sms_manager.MAX_LENGTH) msg = sys.stdin.read()
res = sms_manager.send_sms(msg=msg) sms_manager.send_paginated_sms(msg=msg, max_message=5)
if __name__ == "__main__": if __name__ == "__main__":
main() main()