Listing 11 - Page 0: No Title

##############################################################################
# Python From Scratch
# Autor: Nilo Ney Coutinho Menezes
# Editora Novatec (c) 2010-2024
# Site: https://pythonfromscratch.com
#
# File: listing\chapter 11\11.1511 - No Title.py
# Description: No Title
##############################################################################

BANK = """
create table types(id integer primary key autoincrement,
                   description text);
create table names(id integer primary key autoincrement,
                   name text);
create table phones(id integer primary key autoincrement,
                   id_name integer,
                   number text,
                   type_id integer);
insert into types(description) values("Cellular");
insert into types(description) values("Landline");
insert into types(description) values("Fax");
insert into types(description) values("Home");
insert into types(description) values("Job");
"""

class DBPhonebook:
    def __init__(self, bank):
        self.phoneType = DBTelephoneType()
        self.bank = bank
        new = not os.path.isfile(bank)
        self.connection = sqlite3.connect(bank)
        self.connection.row_factory = sqlite3.Row
        if new:
            self.create_bank()
        self.load_types()
    def load_types(self):
        for type in self.connection.execute("select * from types"):
            id_ = type["id"]
            description = type["description"]
            self.phone_type.add(DBTelephoneType(id_, description))
    def create_bank(self):
        self.connection.executescript(BANK)
    def search_name(self, name):
        if not isinstance(name, DBName):
            raise TypeError("name must be of type DBName")
        found = self.connection.execute("""select count(*)
                                         from names where name = ?""",
                                         (name.name,)).fetchone()
        if found[0] > 0:
            return self.load_by_name(name)
        else:
            return None
    def load_by_id(self, id):
        query = self.connection.execute(
                "select * from names where id = ?" , (id,))
        return self.load(query.fetchone())
    def load_by_name(self, name):
        query = self.connection.execute(
                "select * from names where name = ?" , (name.name,))
        return self.load(query.fetchone())
    def loads(self, query):
        if query is None:
            return None
        new = DBPhonebookData(DBName(query["name"], query["id"]))
        for phone in self.connection.execute(
                "select * from telephones where id_name =?", (new.name.id,)):
            telnum = DBTelephone(telephone["number"], None,
                              phone["id"], phone["id_name"])
            for type in self.phoneTypes:
                if type.id == phone["type_id"]:
                    telnum.type = type
                    break
            new.telephones.add(telnum)
        return new
    def list(self):
        query = self.connection.execute(
                "select * from names order by name")
        for registration in query:
            yield self.load(record)
    def new(self, record):
        try:
            cur = self.connection.cursor()
            cur.execute("insert into names(name) values(?)",
                        (str(record.name),))
            record.name.id = cur.lastrowid
            for telephone in record.telephones:
                cur.execute("""insert into phones(number,
                               id_type, id_name) values (?,?,?)""",
                            (telephone.number, telephone.type.id,
                             record.name.id))
                telephone.id = cur.lastrowid
            self.connection.commit()
        except Exception:
            self.connection.rollback()
            raise
        finally:
            cur.close()
    def updates(self, record):
        try:
            cur = self.connection.cursor()
            cur.execute("update names set name=? where id =?",
                (str(record.name), record.name.id))
            for telephone in record.telephones:
                if telephone.id is None:
                    cur.execute("""insert into phones(number,
                                   id_type, id_name)
                                   values (?,?,?)""",
                            (telephone.number, telephone.type.id,
                            record.name.id))
                    telephone.id = cur.lastrowid
                else:
                    cur.execute("""update telephones set number=?,
                                          id_type=?, id_name=?
                                          where id = ?""",
                            (telephone.number, telephone.type.id,
                            record.name.id, telephone.id))
            for deleted in record.telefones.deleted:
                cur.execute("delete from telephones where id = ?" ,(deleted,))
            self.connection.commit()
            record.telephones.clean()
        except Exception:
            self.connection.rollback()
            raise
        finally:
            cur.close()
    def delete(self, record):
        try:
            cur = self.connection.cursor()
            cur.execute("delete from telephones where id_name =  ?" ,
                            (record.name.id,))
            cur.execute("delete from names where id = ?",
                         (record.name.id,))
            self.connection.commit()
        except Exception:
            self.connection.rollback()
            raise
        finally:
            cur.close()
Click here to download the file