2.4 Hacks
"""
These imports define the key objects
"""
from flask import Flask
from flask_sqlalchemy import SQLAlchemy
"""
These object and definitions are used throughout the Jupyter Notebook.
"""
# Setup of key Flask object (app)
app = Flask(__name__)
# Setup SQLAlchemy object and properties for the database (db)
database = 'sqlite:///sqlite.db' # path and filename of database
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
app.config['SQLALCHEMY_DATABASE_URI'] = database
app.config['SECRET_KEY'] = 'SECRET_KEY'
db = SQLAlchemy()
# This belongs in place where it runs once per project
db.init_app(app)
""" database dependencies to support sqlite examples """
import json
from sqlalchemy.exc import IntegrityError
''' Tutorial: https://www.sqlalchemy.org/library.html#tutorials, try to get into a Python shell and follow along '''
# Define the User class to manage actions in the 'users' table
# -- Object Relational Mapping (ORM) is the key concept of SQLAlchemy
# -- a.) db.Model is like an inner layer of the onion in ORM
# -- b.) User represents data we want to store, something that is built on db.Model
# -- c.) SQLAlchemy ORM is layer on top of SQLAlchemy Core, then SQLAlchemy engine, SQL
class Item(db.Model): ## This is basically a template for Users
__tablename__ = 'Items' # table name is plural, class name is singular
# Define the User schema with "vars" from object
id = db.Column(db.Integer, primary_key=True)
_name = db.Column(db.String(255), unique=True, nullable=False)
_location = db.Column(db.String(255), unique=False, nullable=False)
_price = db.Column(db.String(255), unique=False, nullable=False)
# constructor of a User object, initializes the instance variables within object (self)
def __init__(self, name, location, price):
self._name = name # variables with self prefix become part of the object,
self._location = location
self._price = price
# a name getter method, extracts name from object
@property
def name(self):
return self._name
# a setter function, allows name to be updated after initial object creation
@name.setter
def name(self, name):
self._name = name
# check if uid parameter matches user id in object, return boolean
def is_name(self, name):
return self._name == name
# a getter method, extracts email from object
@property
def location(self):
return self._location
# a setter function, allows name to be updated after initial object creation
@location.setter
def uid(self, location):
self._location = location
@property
def price(self):
return self._price
# update password, this is conventional setter
def is_price(self, price):
self._price = price
# output content using str(object) in human readable form, uses getter
# output content using json dumps, this is ready for API response
def __str__(self):
return json.dumps(self.read())
# CRUD create/add a new record to the table
# returns self or None on error
def create(self):
try:
# creates a person object from User(db.Model) class, passes initializers
db.session.add(self) # add prepares to persist person object to Users table
db.session.commit() # SqlAlchemy "unit of work pattern" requires a manual commit
return self
except IntegrityError:
db.session.remove()
return None
# CRUD read converts self to dictionary
# returns dictionary
def read(self):
return {
"id": self.id,
"name": self.name,
"location": self.location,
"price": self.price
}
# CRUD update: updates user name, password, phone
# returns self
def update(self, name="", location="", price=""):
"""only updates values with length"""
if len(name) > 0:
self.name = name
if len(location) > 0:
self.location = location
if len(price) > 0:
self.price = price
db.session.commit()
return self
# CRUD delete: remove self
# None
def delete(self):
db.session.delete(self)
db.session.commit()
return None
"""Database Creation and Testing """
# Builds working data for testing
def initItems():
with app.app_context():
"""Create database and tables"""
db.create_all()
"""Tester data for table"""
i1 = Item(name='Ice Cream', location='Target', price='$6.99')
i2 = Item(name='Bagels', location='Target', price='$5.00')
i3 = Item(name='Rice', location='Costco', price='$8.50')
i4 = Item(name='Pizza', location='Costco', price='$14.00')
items = [i1, i2, i3, i4]
"""Builds sample user/note(s) data"""
for item in items:
try:
'''add user to table'''
object = item.create()
print(f"Add the item {object.name}")
except: # error raised if object nit created
'''fails with bad or duplicate data'''
print(f"You already have {item.name}")
initItems()
def find_by_name(name):
with app.app_context():
item = Item.query.filter_by(_name=name).first()
return item # returns user object
# # Check credentials by finding user and verify password
# def check_credentials(uid, password):
# # query email and return user record
# user = find_by_uid(uid)
# if user == None:
# return False
# if (user.is_password(password)):
# return True
# return False
# #check_credentials("indi", "123qwerty")
def create():
# optimize user time to see if uid exists
name = input("What are you looking for? ")
item = find_by_name(name)
try:
print("Found\n", item.read())
return
except:
pass # keep going
# request value that ensure creating valid object
name = input("What do you want to add to your list? ")
location = input("Enter its location: ")
price = input("Enter its price: ")
# Initialize User object before date
item = Item(name = name,
location = location,
price = price
)
# write object to database
with app.app_context():
try:
object = item.create()
print("Created\n", object.read())
except: # error raised if object not created
print("Unknown error name {name}")
create()
# SQLAlchemy extracts all users from database, turns each user into JSON
def read():
with app.app_context():
table = Item.query.all()
json_ready = [item.read() for item in table] # each user adds user.read() to list
return json_ready
read()
import sqlite3
database = 'instance/sqlite.db' # this is location of database
def schema():
# Connect to the database file
conn = sqlite3.connect(database)
# Create a cursor object to execute SQL queries
cursor = conn.cursor()
# Fetch results of Schema
results = cursor.execute("PRAGMA table_info('Items')").fetchall()
# Print the results
for row in results:
print(row)
# Close the database connection
conn.close()
schema()
import sqlite3
def read():
# Connect to the database file
conn = sqlite3.connect(database)
# Create a cursor object to execute SQL queries
cursor = conn.cursor()
# Execute a SELECT statement to retrieve data from a table
results = cursor.execute('SELECT * FROM Items').fetchall()
# Print the results
if len(results) == 0:
print("Table is empty")
else:
for row in results:
print(row)
# Close the cursor and connection objects
cursor.close()
conn.close()
read()
import sqlite3
def create():
name = input("What do you want to buy? ")
location = input("Enter its location: ")
price = input("Enter its price: ")
# Connect to the database file
conn = sqlite3.connect(database)
# Create a cursor object to execute SQL commands
cursor = conn.cursor()
try:
# Execute an SQL command to insert data into a table
cursor.execute("INSERT INTO Items (_name, _location, _price) VALUES (?, ?, ?)", (name, location, price))
# Commit the changes to the database
conn.commit()
print(f"You added the item {name}")
except sqlite3.Error as error:
print("Error while executing the INSERT:", error)
# Close the cursor and connection objects
cursor.close()
conn.close()
create()
import sqlite3
def update():
name = input("Enter the item you want to update: ")
location = input("Enter updated location: ")
price = input('Enter updated price: ')
if len(location) == 0:
message = "No location entered"
location = '---'
else:
message = "Location updated"
if len(price) == 0:
message = 'No price entered'
price = '---'
else:
message = 'Price updated'
# Connect to the database file
conn = sqlite3.connect(database)
# Create a cursor object to execute SQL commands
cursor = conn.cursor()
try:
# Execute an SQL command to update data in a table
cursor.execute("UPDATE Items SET _location = ? WHERE _name = ?", (location, name))
if cursor.rowcount == 0:
# The uid was not found in the table
print(f"{name} was not found in the table")
else:
print(f"The location for {name} has been changed {message}")
conn.commit()
except sqlite3.Error as error:
print("Error while executing the UPDATE:", error)
try:
# Execute an SQL command to update data in a table
cursor.execute("UPDATE Items SET _price = ? WHERE _name = ?", (price, name))
if cursor.rowcount == 0:
# The uid was not found in the table
print(f"{name} was not found in the table")
else:
print(f"The price for {name} has been changed {message}")
conn.commit()
except sqlite3.Error as error:
print("Error while executing the UPDATE:", error)
# Close the cursor and connection objects
cursor.close()
conn.close()
#update()
import sqlite3
def delete():
name = input("Enter the name of the item you want to delete")
# Connect to the database file
conn = sqlite3.connect(database)
# Create a cursor object to execute SQL commands
cursor = conn.cursor()
try:
cursor.execute("DELETE FROM Items WHERE _name = ?", (name,))
if cursor.rowcount == 0:
# The uid was not found in the table
print(f"The item {name} was not found in the table")
else:
# The uid was found in the table and the row was deleted
print(f"The item {name} was deleted")
conn.commit()
except sqlite3.Error as error:
print("Error while executing the DELETE:", error)
# Close the cursor and connection objects
cursor.close()
conn.close()
#delete()
def menu():
operation = input("Enter: (C)reate (R)ead (U)pdate or (D)elete or (S)chema")
if operation.lower() == 'c':
create()
elif operation.lower() == 'r':
read()
elif operation.lower() == 'u':
update()
elif operation.lower() == 'd':
delete()
elif operation.lower() == 's':
schema()
elif len(operation)==0: # Escape Key
return
else:
print("Please enter c, r, u, or d")
menu() # recursion, repeat menu
try:
menu() # start menu
except:
print("Perform Jupyter 'Run All' prior to starting menu")