Compare commits
2 Commits
39bad8e549
...
7215ff673c
| Author | SHA1 | Date |
|---|---|---|
|
|
7215ff673c | |
|
|
73ebfa7414 |
128
send_sms.py
128
send_sms.py
|
|
@ -2,11 +2,13 @@
|
|||
""" Script to send SMSes using the Free mobile API
|
||||
"""
|
||||
import sys
|
||||
import time
|
||||
import logging
|
||||
import argparse
|
||||
import readline
|
||||
import configparser
|
||||
from itertools import count
|
||||
from typing import List
|
||||
|
||||
import requests
|
||||
|
||||
|
|
@ -14,32 +16,120 @@ import requests
|
|||
class SMSApiManager:
|
||||
""" Class to manage ending SMS notification using Free Mobile API.
|
||||
"""
|
||||
FREE_API_URL = "https://smsapi.free-mobile.fr/sendmsg"
|
||||
FREE_SMSAPI_URL = "https://smsapi.free-mobile.fr/sendmsg"
|
||||
MAX_LENGTH = 999
|
||||
RETURN_CODE = {200: "Message sent",
|
||||
400: "Mandatory parameter is missing",
|
||||
402: "Too many SMS sent in to few time",
|
||||
403: "Service disabled",
|
||||
500: "Server error, please try again later"}
|
||||
def __init__(self, conf_file_path):
|
||||
def __init__(self, conf_file_path: str):
|
||||
self._pass = ""
|
||||
self.user_id = ""
|
||||
self.read_config(conf_file_path)
|
||||
self._read_config(conf_file_path)
|
||||
|
||||
def send_sms(self, msg: str):
|
||||
def send_sms(self, msg: str) -> int:
|
||||
""" Send a SMS to a user using its passphrase and user ID
|
||||
Return the response object from requests.
|
||||
|
||||
If the message is longer than the maximum amount of characters allowed,
|
||||
it is truncated silently (better get a truncated notification than no notification).
|
||||
If you want to keep all of the message, use send_paginated_sms().
|
||||
"""
|
||||
# Make sure we do not send more than MAX_LENGTH characters
|
||||
# or it will fail.
|
||||
msg = msg[:self.MAX_LENGTH]
|
||||
data = {"user": self.user_id,
|
||||
"pass": self._pass,
|
||||
"msg": msg}
|
||||
res = requests.get(self.FREE_API_URL, params=data)
|
||||
res = requests.get(self.FREE_SMSAPI_URL, params=data)
|
||||
logging.info(self.RETURN_CODE[res.status_code])
|
||||
return res
|
||||
return res.status_code
|
||||
|
||||
def read_config(self, conf_file_path):
|
||||
def send_paginated_sms(self, msg: str, max_message:int=-1) -> List[int]:
|
||||
""" Paginate a message that may be too long for the SMS API.
|
||||
Try to cut the message either at line break, if not possible at
|
||||
sentence boundary, and if not possible at word boundary, to avoid
|
||||
cutting a message mid-word.
|
||||
|
||||
If max_message is set and > 0, send at most max_message messages.
|
||||
Otherwise, there is not limit.
|
||||
"""
|
||||
paginated_message = self.paginate_message(msg)
|
||||
return_values = []
|
||||
for i, chunk in enumerate(paginated_message, start=1):
|
||||
if i > 0 and i > max_message:
|
||||
# pylint: disable=logging-fstring-interpolation
|
||||
logging.info(f"Hitting chunk number limit, {len(paginated_message)-i+1} out "
|
||||
f"of {len(paginated_message)} not sent.")
|
||||
break
|
||||
message = chunk.strip() + f"\n{i}/{len(paginated_message)}"
|
||||
# pylint: disable=logging-fstring-interpolation
|
||||
logging.info(f"Sending chunk {i} out of {len(paginated_message)}: len {len(message)}")
|
||||
return_values.append(self.send_sms(message))
|
||||
# Little pause to avoid trigerring rate limiting
|
||||
time.sleep(0.1)
|
||||
return return_values
|
||||
|
||||
def paginate_message(self, msg: str) -> List[str]:
|
||||
""" Paginate a message that may be too long for the SMS API.
|
||||
Try to cut the message either at line break, if not possible at
|
||||
sentence boundary, and if not possible at word boundary, to avoid
|
||||
cutting a message mid-word.
|
||||
"""
|
||||
## Calculation of the place to leave for the page counter ##
|
||||
# Worst case scenario, that should not happen a lot:
|
||||
# Smallest line is self.MAX_LENGTH/2 + 1 character
|
||||
# Pagination algorithm generates twice the amount of chunk that would be needed
|
||||
# if we were to cut naively.
|
||||
# So we need to leave enough place to write "\n<max_nb>/<max_nb>" at the end of the
|
||||
# message
|
||||
pagination_mark_max_length = len(str((len(msg)/self.MAX_LENGTH)*2)) * 2 + 2
|
||||
|
||||
# Check if message is short enough. There is no way to know beforehand the
|
||||
# number of chunks, so we leave enough space for the worst case scenario:
|
||||
if len(msg) <= self.MAX_LENGTH - pagination_mark_max_length:
|
||||
return [msg]
|
||||
|
||||
messages = []
|
||||
# Cut at each line
|
||||
msg_lines = msg.splitlines(keepends=True)
|
||||
if len(msg_lines) == 1:
|
||||
# If there is only one line and it is still too long, cut at sentence end.
|
||||
msg_sentences = [sentence + '. ' for sentence in msg.split('. ')
|
||||
if sentence.strip()]
|
||||
msg_sentences[-1] = msg_sentences[-1][:-len('. ')]
|
||||
if len(msg_sentences) == 1:
|
||||
# If there is only one sentence and it is still too long,
|
||||
# cut at word boundaries.
|
||||
msg_words = [word + ' ' for word in msg.split() if word.strip()]
|
||||
msg_words[-1] = msg_words[-1][:-len(' ')]
|
||||
messages.extend(msg_words)
|
||||
else:
|
||||
for sentence in msg_sentences:
|
||||
messages.extend(self.paginate_message(sentence))
|
||||
return messages
|
||||
|
||||
for line in msg_lines:
|
||||
messages.extend(self.paginate_message(line))
|
||||
# Rationalize the "messages" list.
|
||||
# Message now contains chunks that all fit in a SMS,
|
||||
# but multiple chunk could fit in a SMS as well.
|
||||
# We concatenate those chunks.
|
||||
final_message = []
|
||||
chunk = ""
|
||||
i = 0
|
||||
while i < len(messages):
|
||||
while (i < len(messages) and
|
||||
len(chunk+messages[i]) <= self.MAX_LENGTH - pagination_mark_max_length):
|
||||
chunk += messages[i]
|
||||
i += 1
|
||||
final_message.append(chunk)
|
||||
chunk = ""
|
||||
return final_message
|
||||
|
||||
|
||||
def _read_config(self, conf_file_path: str) -> None:
|
||||
""" Read and apply the configuration from a config file.
|
||||
That config file must have a 'creds' section, and the following options:
|
||||
- 'user_id', that contains the user identifier for the API
|
||||
|
|
@ -51,12 +141,13 @@ class SMSApiManager:
|
|||
self._pass = conf_parser["creds"]["passphrase"]
|
||||
|
||||
|
||||
def parse_args():
|
||||
def parse_args() -> argparse.Namespace:
|
||||
""" Parse the script's command line argument, and return the args object.
|
||||
"""
|
||||
my_parser = argparse.ArgumentParser(description="Script to send a SMS to a person. "
|
||||
"The message must be at most "
|
||||
f"{SMSApiManager.MAX_LENGTH} characters long.")
|
||||
"If the message is longer than "
|
||||
f"{SMSApiManager.MAX_LENGTH} characters, "
|
||||
"it will be split in multiple SMS.")
|
||||
my_parser.add_argument("-c", "--config", help="Path to the config file.",
|
||||
default="secrets.ini")
|
||||
my_parser.add_argument("message", nargs="?",
|
||||
|
|
@ -69,7 +160,7 @@ def parse_args():
|
|||
return my_parser.parse_args()
|
||||
|
||||
|
||||
def rlinput(prompt, prefill=''):
|
||||
def rlinput(prompt: str, prefill: str='') -> str:
|
||||
""" Prefill the input content with the value of `prefill`
|
||||
All credits go to https://stackoverflow.com/a/2533142
|
||||
"""
|
||||
|
|
@ -79,13 +170,14 @@ def rlinput(prompt, prefill=''):
|
|||
finally:
|
||||
readline.set_startup_hook()
|
||||
|
||||
def main():
|
||||
def main() -> None:
|
||||
""" Instantiate an sms_manager, and send SMS
|
||||
"""
|
||||
args = parse_args()
|
||||
sms_manager = SMSApiManager(args.config)
|
||||
while args.interactive:
|
||||
# Interactive mode is enabled, loop indefinetely until user hits Ctrl+C or Ctrl+D
|
||||
print("Type your sms, and submit an empty line to send it.")
|
||||
try:
|
||||
message=[]
|
||||
for i in count():
|
||||
|
|
@ -95,22 +187,22 @@ def main():
|
|||
if not message[i]:
|
||||
break
|
||||
res = sms_manager.send_sms(msg='\n'.join(message))
|
||||
if res.status_code != 200:
|
||||
if res != 200:
|
||||
print("Error while sending the message:")
|
||||
print(f"{sms_manager.RETURN_CODE[res.status_code]}")
|
||||
print(f"{sms_manager.RETURN_CODE[res]}")
|
||||
except (EOFError, KeyboardInterrupt):
|
||||
print("\nBye !")
|
||||
break
|
||||
else:
|
||||
# Else either get message from command line or stdin.
|
||||
# Else either get message from command line or stdin, and send it paginated.
|
||||
logging.basicConfig(level=logging.INFO)
|
||||
if args.message:
|
||||
# Send the requested message
|
||||
sms_manager.send_sms(msg=args.message)
|
||||
sms_manager.send_paginated_sms(msg=args.message)
|
||||
else:
|
||||
# Read the message from stdin
|
||||
msg = sys.stdin.read(sms_manager.MAX_LENGTH)
|
||||
res = sms_manager.send_sms(msg=msg)
|
||||
msg = sys.stdin.read()
|
||||
sms_manager.send_paginated_sms(msg=msg, max_message=5)
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
|
|
|
|||
Loading…
Reference in New Issue