Python

This is a walkthrough showing you how to connect Live Link 365's SMS-sending API to a flight booking app. This sample creates the model of a flight's booking information and sends an SMS notification to a phone number.

This is not a production-ready application. Please take your time to enhance it for production so that it meets your specific business requirements.

Steps

Code

keyboard_arrow_down
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
# _Step_1
class CustomerDetail:
    def __init__(self, customerName, customerId, mobileNumber, products=None, queries=None):
        self.customerName = customerName
        self.customerId = customerId
        self.mobileNumber = mobileNumber #Phone numbers have to be in E.164 format.
        if products is None:
            self.products = []
        if queries is None:
            self.queries = []

    def __str__(self):
        return "CustomerDetail [name=" + self.customerName + ", id=" + self.customerId + ", mobileNumber=" + self.mobileNumber + "]"
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
# _Step_2
class BankSalesPerson:
    def __init__(self, bankPersonId, bankPersonName, department, mobileNumber):
        self.bankPersonId = bankPersonId
        self.bankPersonName = bankPersonName
        self.department = department
        self.mobileNumber = mobileNumber #Phone numbers have to be in E.164 format.

    def __str__(self):
        return "BankSalesPerson [id=" + self.bankPersonId + ", name=" + self.bankPersonName + ", department=" + self.department + ", mobileNumber="\
               + self.mobileNumber + "]"
1
2
3
4
5
6
7
8
# _Step_3
class CustomerProduct:
    def __init__(self, customerId, product):
        self.customerId = customerId
        self.product = product

    def toString(self):
        return "CustomerProduct [id=" + self.customerId + ", product=" + str(self.product) + "]"
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
# _Step_4
class CustomerQuery:
    def __init__(self, customerId, queryType, queryContent):
        self.customerId = customerId
        self.queryType = queryType
        self.queryContent = queryContent

    def __str__(self):
        return "CustomerQuery [customerId=" + self.customerId + ", queryType=" + self.queryType + ", queryContent=" + self.queryContent \
                + "]"
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
import json
import csv
import sys
from datetime import datetime, timedelta
import time
sys.path.append("../customerService")
from sendSMS import MessageHandler
from CustomerDetail import CustomerDetail
from BankSalesPerson import BankSalesPerson
from CustomerQuery import CustomerQuery
from CustomerProduct import CustomerProduct
import configparser

class Bank:
    def __init__(self):
        self.customerMap = {}
        self.bankSalesPersonMap = {}
        config = configparser.ConfigParser()
        config.read('../resources/config.ini')

        base_url = config['API']['base_url']
        app_key = config['API']['app_key']
        app_secret = config['API']['app_secret']
        self.messageHandler = MessageHandler(base_url, app_key, app_secret)

    # _Step_5
    def loadDetails(self):
        # loading customer list
        with open('../resources/CustomerList.csv', 'rt') as csvfile:
            reader = csv.reader(csvfile, delimiter='|')
            for row in reader:
                products = row[3:]
                id = row[0]
                self.customerMap[id] = CustomerDetail(row[1], id, row[2])
                for product in products:
                    self.customerMap[id].products.append(CustomerProduct(id, product))

        # loading customer queries
        with open('../resources/CustomerQuery.csv', 'rt') as csvQueryFile:
            reader = csv.DictReader(csvQueryFile, delimiter='|', fieldnames=("customerID","type","content"))
            for row in reader:
                customerID = row['customerID']
                if customerID in self.customerMap:
                    self.customerMap[customerID].queries.append(CustomerQuery(customerID, row['type'], row['content']))

        # loading bank sales person list
        with open('../resources/BankSalesPersonList.csv', 'rt') as csvBankFile:
            reader = csv.DictReader(csvBankFile, delimiter='|', fieldnames=("id","name","department","phone"))
            for row in reader:
                id = row['id']
                self.bankSalesPersonMap[id] = BankSalesPerson(id, row['name'], row['department'], row['phone'])

    def sendAdvertisements(self):
        for key, customer in self.customerMap.items():
            self.creditCardAdvertisment(customer)

    def updateComplaints(self):
        for key, customer in self.customerMap.items():
            self.updateComplaint(customer)

    # _Step_6
    def creditCardAdvertisment(self, customer):
        if any(item.product == "CREDIT CARD" for item in customer.products) and not any(item.product == "PREMIUM CARD" for item in customer.products):
            SMSContent = "Hi " + customer.customerName + ", We are offering a new Premium Credit card for you. Upgrade your card to Premium Card with lots of Benefits"
            self.messageHandler.sendSMS(SMSContent, customer.mobileNumber)

    # _Step_7
    def updateComplaint(self, customer):
        SMSContent = ''
        for query in customer.queries:
            if query.queryType == 'COMPLAINT':
                person = self.getBankPerson(query.queryType)
                SMSContent = "Hi " + customer.customerName + " on your complaint " + query.queryContent + " Mr/Ms " + person.bankPersonName + " will contact you Mobile number: " + person.mobileNumber
            elif query.queryType == 'QUERY':
                person = self.getBankPerson(query.queryType)
                SMSContent = "Hi " + customer.customerName + " on your query " + query.queryContent + " Please connect to our sales person Mr/Ms " + person.bankPersonName + " on " + person.mobileNumber + " Or our sales person will connect with you in next 3 working days."
            elif query.queryType == 'CRITICAL':
                person = self.getBankPerson(query.queryType)
                SMSContent = "Hi " + customer.customerName + " on your issue " + query.queryContent + " an appointment with manager is being fixed on " + self.getDateAndTime() + ""
            self.messageHandler.sendSMS(SMSContent, customer.mobileNumber)

    def getDateAndTime(self):
        sysDate = datetime.utcnow()
        days = 1
        if sysDate.weekday() > 5:
            days = 3
        new_time = sysDate + timedelta(days=days)
        return new_time.strftime('%a %b %d %Y')

    def getBankPerson(self, type):
        if type == 'COMPLAINT':
            return self.bankSalesPersonMap['3']
        elif type == 'QUERY':
            return self.bankSalesPersonMap['2']
        elif type == 'CRITICAL':
            return self.bankSalesPersonMap['1']
1
2
3
4
5
6
7
8
# _Step_8
from Bank import Bank

if __name__ == '__main__':
    bank = Bank()
    bank.loadDetails()
    bank.sendAdvertisements()
    bank.updateComplaints()
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
import base64
import json
from urllib.request import Request, urlopen
from urllib.error import HTTPError, URLError
from urllib.parse import urlencode

class MessageHandler():
    def __init__(self, base_url, app_key, app_secret):
        self.base_url = base_url
        self.app_key = app_key
        self.app_secret = app_secret

    def getAccessToken(self, key, secret):
        """
        This Function is used to retrieve oAuth2 access_tokens.
        This conforms to oAuth2 Spec client_credentials grant_type.
        This Function is not doing any validation regarding expiration of tokens.
        """
        token_url = self.base_url + '/oauth/token'
        credentials = "%s:%s" % (key, secret)
        encode_credential = base64.b64encode(credentials.encode('utf-8')).decode('utf-8').replace("\n", "")
        header_params = {
            "Authorization": ("Basic %s" % encode_credential),
            "Content-Type": "application/x-www-form-urlencoded",
            "Accept": "application/json"
        }
        param = {
            'grant_type': 'client_credentials'
        }

        token_data = urlencode(param)
        token_request = Request(url=token_url, headers=header_params, data=token_data.encode('ascii'))

        try:
            token_response = urlopen(token_request).read()
        except HTTPError as error:
            print ("Error " + error.reason)
        else:
            return json.loads(token_response.decode('UTF-8'))['access_token']

    # _Step_9
    def sendSMS(self, sms, number):
        token_url = self.base_url + '/v2/sms'
        header_params = {
            "Content-Type": "application/json",
            "Authorization": ("Bearer %s" % self.getAccessToken(self.app_key, self.app_secret))
        } 

        param = { 
            "message": sms,
            "origin": "", 
            "destination": number #Phone numbers have to be in E.164 format.
        }
        token_data = json.dumps(param).encode('ascii')
        token_request = Request(url=token_url, headers=header_params, data=token_data)
        # Exception handle example
        try:
            token_response = urlopen(token_request)
            print (token_response)
        except HTTPError as e:
            return 'HTTP Request error with status %i %s' % (e.code, e.reason)
        except URLError as e:
            return 'URL Request error with status %i %s' % (e.code, e.reason)
        else:
            return 'SMS Sent Successfully' #with OrderId:' #% json.loads(sms_response.read().decode('UTF-8'))['orderId']