Data pipelines are a series of interconnected steps or processes that are used to extract, transform, and load (ETL) data from various sources into a target destination, such as a database or a data warehouse. Data pipelines are commonly used in data integration scenarios where data needs to be collected from multiple sources, transformed into a desired format, and loaded into a centralized storage for further analysis or processing.
Designing data pipelines with PostgreSQL and Python involves several key steps, including data extraction, transformation, and loading (ETL). PostgreSQL is a powerful open-source relational database management system (RDBMS) that supports complex data handling and retrieval, while Python is a versatile programming language with extensive libraries for data manipulation and integration.
For the purpose of this data digest, some of the codes will be written in Jupyter notebook and visual studio code on how to leverage python to run some queries with key highlights on the following sub-topics:
Instructor: Chimdiadi Catherine Orji
Let’s get started…
This is the link to the PostgreSQL Database that will be used in this data gist.
when working with PostgreSQL in python, some library are required which includes - psycopg2, pandas and sqlachemy.
import psycopg2
import pandas as pd
from sqlalchemy import create_engine
Note: To create this connection, leverage the create engine imported from the sqlalchemy. The user, password and name of database will be required to successfully create the connection.
connection = create_engine("postgresql + psycopg2://{user}:{pw}@localhost/{db}".format(user = "postgres",pw = "*******", db = "postgres"))
df = pd.read_sql("SELECT * FROM actor", connection)
df.head()
The result of this query will be a dataframe that looks like this :
dff = pd.read_ sql_read_table("address", connection)
dff.head()
The result of this query will be a dataframe that looks like this :
Note: For this, it will require creating a query first and inserting the query in tandem with the connection in the pandas function. This will be used on the same address table
query = "SELECT * FROM address"
data = pd.read_sql_query(query, connection)
data.head()
The result of this query will be a dataframe that looks like this :
query = """ SELECT film.film_id,
title,
inventory_id,
store_id
FROM film
LEFT JOIN inventory ON inventory.film_id = film.film_id
WHERE inventory.film_id is NULL"""
data = pd.read_sql_query(query, connection)
data.head()
The result of this query will be a dataframe that looks like this :
data.to_sql(“film_invent”, connection, if exists ="fail")
In Python, "try-except" is a way to handle exceptions or errors that may occur during the execution of a program. Exceptions are events or errors that occur during the normal flow of program execution, and they can disrupt the normal execution of the program.
The "try" block is used to enclose a section of code that may raise an exception. If an exception occurs within the "try" block, the code execution is immediately transferred to the corresponding "except" block that handles that specific exception. The "except" block contains the code that will be executed to handle the exception.
The “finally” block is also optional and can be used with try and except. The code within the finally block is always executed, regardless of whether an exception occurred or not. It is typically used to specify cleanup operations that must be performed, such as closing a file or releasing resources, regardless of whether an exception occurred or not.
Note: You can copy and post the code in your VS code IDE
import psycopg2
from psycopg2 import Error
try:
# create connection
connection= psycopg2.connect(
host = "localhost", user = "postgres", password = "********", database = "postgres"
)
# cursor object
cursor = connection.cursor()
# drop table if exist using the execute method()
cursor.execute("DROP TABLE IF EXISTS student")
# query - create table
query = """CREATE TABLE student(
student_id INTEGER PRIMARY KEY,
first_name VARCHAR(20),
last_name VARCHAR(20),
age INTEGER
)
"""
# Execute query
cursor.execute(query)
# Commit changes
connection.commit()
# print a message
print("Table created successfully")
except Error as e:
# Rollback changes
connection.rollback()
print("Error: creating table")
finally:
connection.close()
print("connection closed")
try:
# create connection
connection= psycopg2.connect(
host = "localhost", user = "postgres", password = "********", database = "postgres")
# cursor object
cursor = connection.cursor()
# query insert
# create a placeholder to insert values
query ="""INSERT INTO student(student_id, first_name, last_name, age)
VALUES(%s, %s, %s, %s)
"""
# create an insert list
records = [((1, "Emeke", "Jude", 23)
(2, "Ayomide", "Emmanuel", 25))]
# Execute query
cursor.executemany(query, records)
# commit changes
connection.commit()
# print a message
print("rows inserted successfully")
except Error as e:
connection.rollback()
print(e)
print("Error inserting table")
finally:
connection.close()
print("connection closed")
Note: Here, I will be creating a function that gets the name of actors and bring out specific information with respect to the name
def getActor(name):
try:
connection= psycopg2.connect(
host = "localhost", user = "postgres", password = "********", database = "postgres")
# cursor object
cursor = connection.cursor()
# write query
# create a placeholder to insert values
query = "SELECT * FROM actor WHERE first_name = %s"
# Execute query
cursor.executemany(query, (name, ))
# create record
record = cursor.fetchall()
for row in record:
print("actor_id", row[0])
print("first_name", row[1])
print("last_name", row[2])
print("last_update", row[3], "\n")
except Error as e:
print("Error reading table")
finally:
connection.close()
print("connection closed")
getActor("Nick")
The result of this query will look like this :
try:
# create connection
connection= psycopg2.connect(
host = "localhost", user = "postgres", password = "********", database = "postgres")
# cursor object
cursor = connection.cursor()
# Execute query
query = "UPDATE student SET age = 27"
cursor.execute()
# commit changes
connection.commit()
# check for update
cursor.execute("SELECT * FROM students")
for row in record:
print("student_id", row[0])
print("first_name", row[1])
print("last_name", row[2])
print("age", row[3], "\n")
# print message
ptint("successful update")
except Error as e:
connection.rollback()
print("Error : updating")
finally:
connection.close()
print("connection closed")
Note: This code updates the age from 24 to 27 and it is effected in the PostgreSQL database. The result looks like this in the VS IDE terminal.
try:
# create connection
connection= psycopg2.connect(
host = "localhost", user = "postgres", password = "********", database = "postgres")
# cursor object
cursor = connection.cursor()
# write your query
query = "DELETE FROM student WHERE student_id = 1"
# Execute your query
cursor.execute(query)
# Check for delete
cursor.execute("SELECT * FROM student")
record = cursor.fetchall()
for row in record:
print("student_id", row[0])
print("first_name", row[1])
print("last_name", row[2])
print("age", row[3], "\n")
# print message
print("deleted successfully")
except Error as e:
connection.rollback()
print("Error : deleting")
finally:
connection.close()
print("connection closed")
Python provides powerful libraries that allow you to interact with PostgreSQL databases and perform various operations like running SQL queries, creating connections, reading tables, saving query results, and performing CRUD (Create, Read, Update, Delete) operations on tables.
With the help of these libraries and tools like Jupyter notebook and Visual Studio Code IDE, you can leverage the flexibility and versatility of Python to efficiently work with PostgreSQL databases in your data analysis or application development tasks. Whether you are a data scientist, data engineer, or software developer, mastering the use of Python for PostgreSQL interactions can greatly enhance your ability to work with databases effectively and efficiently.
1. Dive into our online data bootcamp! Learn at your own pace with our expert-led virtual programs designed to fit into your schedule. Become a qualified data expert in just 4 months and unlock your potential and land your dream career.
2. Learn more about our Data BootCamp programs by reading the testimonials of our graduates. Click HERE to access the testimonials.
3. You can also sign up for 1:1 personal tutoring with an expert instructor or request other solutions that we provide, which include data research, tech skill training, data products, and application development. Click HERE to begin.
4. Get in touch with us for further assistance from our team OR via email at servus@resagratia.com or our Whatsapp Number via +2349042231545.
Empowering individuals and businesses with the tools to harness data, drive innovation, and achieve excellence in a digital world.
Copyright 2025Resagratia. All Rights Reserved.