##############################################################################
# Python From Scratch
# Autor: Nilo Ney Coutinho Menezes
# Editora Novatec (c) 2010-2024
# Site: https://pythonfromscratch.com
#
# File: listing\chapter 11\11.1511 - No Title.py
# Description: No Title
##############################################################################
BANK = """
create table types(id integer primary key autoincrement,
description text);
create table names(id integer primary key autoincrement,
name text);
create table phones(id integer primary key autoincrement,
id_name integer,
number text,
type_id integer);
insert into types(description) values("Cellular");
insert into types(description) values("Landline");
insert into types(description) values("Fax");
insert into types(description) values("Home");
insert into types(description) values("Job");
"""
class DBPhonebook:
def __init__(self, bank):
self.phoneType = DBTelephoneType()
self.bank = bank
new = not os.path.isfile(bank)
self.connection = sqlite3.connect(bank)
self.connection.row_factory = sqlite3.Row
if new:
self.create_bank()
self.load_types()
def load_types(self):
for type in self.connection.execute("select * from types"):
id_ = type["id"]
description = type["description"]
self.phone_type.add(DBTelephoneType(id_, description))
def create_bank(self):
self.connection.executescript(BANK)
def search_name(self, name):
if not isinstance(name, DBName):
raise TypeError("name must be of type DBName")
found = self.connection.execute("""select count(*)
from names where name = ?""",
(name.name,)).fetchone()
if found[0] > 0:
return self.load_by_name(name)
else:
return None
def load_by_id(self, id):
query = self.connection.execute(
"select * from names where id = ?" , (id,))
return self.load(query.fetchone())
def load_by_name(self, name):
query = self.connection.execute(
"select * from names where name = ?" , (name.name,))
return self.load(query.fetchone())
def loads(self, query):
if query is None:
return None
new = DBPhonebookData(DBName(query["name"], query["id"]))
for phone in self.connection.execute(
"select * from telephones where id_name =?", (new.name.id,)):
telnum = DBTelephone(telephone["number"], None,
phone["id"], phone["id_name"])
for type in self.phoneTypes:
if type.id == phone["type_id"]:
telnum.type = type
break
new.telephones.add(telnum)
return new
def list(self):
query = self.connection.execute(
"select * from names order by name")
for registration in query:
yield self.load(record)
def new(self, record):
try:
cur = self.connection.cursor()
cur.execute("insert into names(name) values(?)",
(str(record.name),))
record.name.id = cur.lastrowid
for telephone in record.telephones:
cur.execute("""insert into phones(number,
id_type, id_name) values (?,?,?)""",
(telephone.number, telephone.type.id,
record.name.id))
telephone.id = cur.lastrowid
self.connection.commit()
except Exception:
self.connection.rollback()
raise
finally:
cur.close()
def updates(self, record):
try:
cur = self.connection.cursor()
cur.execute("update names set name=? where id =?",
(str(record.name), record.name.id))
for telephone in record.telephones:
if telephone.id is None:
cur.execute("""insert into phones(number,
id_type, id_name)
values (?,?,?)""",
(telephone.number, telephone.type.id,
record.name.id))
telephone.id = cur.lastrowid
else:
cur.execute("""update telephones set number=?,
id_type=?, id_name=?
where id = ?""",
(telephone.number, telephone.type.id,
record.name.id, telephone.id))
for deleted in record.telefones.deleted:
cur.execute("delete from telephones where id = ?" ,(deleted,))
self.connection.commit()
record.telephones.clean()
except Exception:
self.connection.rollback()
raise
finally:
cur.close()
def delete(self, record):
try:
cur = self.connection.cursor()
cur.execute("delete from telephones where id_name = ?" ,
(record.name.id,))
cur.execute("delete from names where id = ?",
(record.name.id,))
self.connection.commit()
except Exception:
self.connection.rollback()
raise
finally:
cur.close()