The database operator class
- class dwopt.dbo._Db(eng, **kwargs)[source]
The base database operator class.
See examples for quick-start.
Instantiate the child classes for different databases via one of below ways:
The factory function:
dwopt.db()
.The pre-instantiated objects on package import.
The relevant child classes.
The child classes and the pre-instantiated objects:
Database
Child class
Pre-instantiated object
Postgre
dwopt.Pg(eng)
dwopt.pg
Sqlite
dwopt.Lt(eng)
dwopt.lt
Oracle
dwopt.Oc(eng)
dwopt.oc
Pre-instantiation uses the default credentials set-up prior by the user via the
dwopt.save_url()
function.- Parameters:
eng (str, or sqlalchemy.engine.Engine) –
A sqlalchemy engine url, which combines the user name, password, database names, etc.
Alternatively a Database connection engine to be used. Use the
dwopt.make_eng()
function to make engine.kwargs (Additional engine creation arguments.) –
- eng
Underlying engine. Details see sqlalchemy.engine.Engine
- Type:
sqlalchemy.engine.Engine
- meta
Underlying metadata. Details see sqlalchemy.schema.MetaData
- Type:
sqlalchemy.schema.MetaData
Examples
Instantiate and use a Sqlite database operator object via factory:
>>> from dwopt import db >>> d = db("sqlite://") >>> d.mtcars() >>> d.run('select count(1) from mtcars') count(1) 0 32
Use the pre-instantiated Sqlite database operator object:
>>> from dwopt import lt >>> lt.iris() >>> lt.qry('iris').len() 150
Instantiate and use a Postgre database operator object via the class:
>>> from dwopt import Pg >>> p = Pg("postgresql://dwopt_tester:1234@localhost/dwopt_test") >>> p.mtcars(q=1).len() 32
Additonal engine creation arguments:
from dwopt import Pg p = Pg("postgresql://dwopt_tester:1234@localhost/dwopt_test", echo=1) p.mtcars()
- add_pkey(sch_tbl_nme, pkey)[source]
Make and run an add primary key statement.
Work on postgre and oracle.
- Parameters:
sch_tbl_nme (str) – Table name in form
my_schema1.my_table1
ormy_table1
.pkey (str) – columns names in form “col1, col2, …”.
Examples
>>> from dwopt import pg >>> pg.mtcars() >>> pg.add_pkey('mtcars', 'name') >>> pg.qry('information_schema.constraint_table_usage').select( ... 'table_name, constraint_name').where( ... "table_schema = 'public'", "table_name = 'mtcars'").run() table_name constraint_name 0 mtcars mtcars_pkey
- create(sch_tbl_nme, dtypes=None, **kwargs)[source]
Make and run a create table statment.
- Parameters:
sch_tbl_nme (str) – Table name in form
my_schema1.my_table1
ormy_table1
.dtypes ({str:str}, optional) – Dictionary of column names to data types mappings.
**kwargs – Convenient way to add mappings. Keyword to argument mappings will be added to the dtypes dictionary.
Notes
Datatypes
Datatypes vary across databases (postgre types, sqlite types, oracle types), common example below:
Type
Postgre
Sqlite
Oracle
integer
bigint
integer
number
float
float8
real
float
string
varchar(20)
text
varchar2(20)
datetime
timestamp
text
date
date
date
text
date
Other statements
The
dtypes
mappings also allow other sql statements which are part of a create statement to be added (sqlite other, postgre other, oracle other). For example a primary key constraint.Examples
>>> from dwopt import lt >>> lt.drop('test') >>> lt.create( ... 'test', ... { ... 'id': 'integer' ... ,'score': 'real' ... ,'amt': 'integer' ... ,'cat': 'text' ... ,'time': 'text' ... ,'constraint df_pk': 'primary key (id)' ... }) >>> lt.run("select * from test") Empty DataFrame Columns: [id, score, amt, cat, time] Index: []
>>> lt.drop('test2') >>> lt.create('test2', id='integer', score='real', cat='text') >>> lt.run("select * from test2") Empty DataFrame Columns: [id, score, cat] Index: []
- create_schema(sch_nme)[source]
Make and run a create schema statement.
Works on postgre.
- Parameters:
sch_nme (str) – Schema name.
Examples
>>> from dwopt import pg >>> pg.create_schema('test') >>> pg.iris('test.iris', q=1).len() 150
- cwrite(df, sch_tbl_nme)[source]
Create table and insert based on dataframe.
Replace
.
by_
in dataframe column names.Data types infered based on the
dwopt.dbo._Db.create()
method notes. Also, date type columns are treated same as str type columns.Reversibility issue see
dwopt.dbo._Db.write()
method notes.
- Parameters:
df (pandas.DataFrame) – Payload Dataframe with data to insert.
sch_tbl_nme (str) – Table name in form
my_schema1.my_table1
ormy_table1
.
Examples
>>> import pandas as pd >>> from dwopt import lt >>> tbl = pd.DataFrame({'col1': [1, 2], 'col2': ['a', 'b']}) >>> lt.drop('test') >>> lt.cwrite(tbl, 'test') >>> lt.qry('test').run() col1 col2 0 1 a 1 2 b
Attempt to write a dataframe into database and query back the same dataframe.
>>> from dwopt import pg >>> from pandas.testing import assert_frame_equal >>> df = pg.mtcars(q=1).run().sort_values('name').reset_index(drop=True) >>> pg.drop('mtcars2') >>> pg.cwrite(df, 'mtcars2') >>> df_back = pg.qry('mtcars2').run().sort_values('name').reset_index(drop=True) >>> assert_frame_equal(df_back, df)
- drop(sch_tbl_nme)[source]
Drop table if exist.
- Parameters:
sch_tbl_nme (str) – Table name in form
my_schema1.my_table1
ormy_table1
.
See also
Examples
>>> from dwopt import lt >>> lt.drop('iris') >>> lt.iris() >>> lt.drop('iris') >>> lt.exist('iris') False
>>> from dwopt import pg >>> pg.create_schema('test') >>> tbl = 'test.iris' >>> pg.iris(tbl) >>> pg.exist(tbl) True >>> pg.drop(tbl) >>> pg.exist(tbl) False
- exist(sch_tbl_nme)[source]
Check if table exist.
- Parameters:
sch_tbl_nme (str) – Table name in form
my_schema1.my_table1
ormy_table1
.- Return type:
bool
Examples
>>> from dwopt import lt >>> lt.iris() >>> lt.drop('mtcars') >>> lt.exist('iris') True >>> lt.exist('mtcars') False
>>> from dwopt import pg as d >>> d.create_schema('test') >>> d.iris('test.iris') >>> d.drop('test.mtcars') >>> d.exist('test.iris') True >>> d.exist('test.mtcars') False
- iris(sch_tbl_nme='iris', q=False)[source]
Create the iris test table on the database.
Drop and recreate if already exist. Sourced from UCI iris.
- Parameters:
sch_tbl_nme (str) – Table name in form
my_schema1.my_table1
ormy_table1
. Defaultiris
.q (bool) – Return query object or not. Default False.
- Returns:
Query object with sch_tbl_nme loaded for convenience.
- Return type:
None or dwopt._qry._Qry
Examples
>>> from dwopt import lt >>> lt.iris() >>> lt.run('select count(*) from iris') count(*) 0 150
>>> from dwopt import lt >>> lt.iris(q=True).valc('species', 'avg(petal_length)') species n avg(petal_length) 0 sicolor 50 4.260 1 setosa 50 1.462 2 rginica 50 5.552
>>> from dwopt import pg >>> pg.create_schema('test') >>> pg.iris('test.iris', q=1).len() 150
- list_cons()[source]
List all constraints.
Only works for postgre. Uses the postgre information_schema.constraint_table_usage table.
- Return type:
pandas.DataFrame
Examples
>>> from dwopt import pg >>> pg.mtcars() >>> pg.add_pkey('mtcars', 'name') >>> pg.list_cons().loc[ ... lambda x:(x.table_schema == 'public') & (x.table_name == 'mtcars'), ... ['table_name', 'constraint_name'] ... ].reset_index(drop=True) table_name constraint_name 0 mtcars mtcars_pkey
- list_tables(owner)[source]
List all tables on database or specified schema.
- Parameters:
owner (str) – Only applicable for oracle. Name of the schema(owner).
- Return type:
pandas.DataFrame
Notes
Postgre sql used, information_schema.tables:
select table_catalog,table_schema,table_name ,is_insertable_into,commit_action from information_schema.tables where table_schema not in ('information_schema','pg_catalog')
Sqlite sql used, sqlite_schema:
select * from sqlite_master where type ='table' and name NOT LIKE 'sqlite_%'
Oracle sql used, all_tab_columns:
select/*+PARALLEL (4)*/ owner,table_name ,max(column_name),min(column_name) from all_tab_columns where owner = ':owner' group by owner,table_name
Examples
>>> from dwopt import lt >>> lt.iris() >>> lt.mtcars() >>> lt.drop('test') >>> lt.drop('test2') >>> lt.list_tables().iloc[:,:-2] type name tbl_name 0 table iris iris 1 table mtcars mtcars
- mtcars(sch_tbl_nme='mtcars', q=False)[source]
Create the mtcars test table on the database.
Drop and recreate if already exist. Sourced from R mtcars.
- Parameters:
sch_tbl_nme (str) – Table name in form
my_schema1.my_table1
ormy_table1
. Defaultmtcars
.q (bool) – Return query object or not. Default False.
- Returns:
Query object with sch_tbl_nme loaded for convenience.
- Return type:
None or dwopt._qry._Qry
Examples
>>> from dwopt import lt >>> lt.mtcars() >>> lt.run('select count(*) from mtcars') count(*) 0 32
>>> from dwopt import lt >>> lt.mtcars(q=True).valc('cyl', 'avg(mpg)') cyl n avg(mpg) 0 8 14 15.100000 1 4 11 26.663636 2 6 7 19.742857
>>> from dwopt import pg >>> pg.create_schema('test') >>> pg.mtcars('test.mtcars', q=1).len() 32
- qry(*args, **kwargs)[source]
Make a
query object
.- Parameters:
*args – Positional arguments of the
dwopt._qry._Qry
>.**kwargs – keyword arguments of the
dwopt._qry._Qry
.
- Return type:
Examples
>>> from dwopt import lt >>> lt.mtcars() >>> lt.qry('mtcars').valc('cyl', 'avg(mpg)') cyl n avg(mpg) 0 8 14 15.100000 1 4 11 26.663636 2 6 7 19.742857
- run(sql=None, args=None, pth=None, mods=None, **kwargs)[source]
Run sql statement.
Features:
Argument binding.
Text replacement.
Reading from sql script file.
- Parameters:
sql (str, optional) – The sql statement to run.
args (dict, or [dict], optional) – Dictionary or list of dictionary of argument name str to argument data object mappings. These argument data objects are passed via sqlalchemy to the database, to function as data for the argument names. See the notes and the examples section for details.
pth (str, optional) – Path to sql script, ignored if the sql parameter is not None. The script can hold a sql statement, for example a significant piece of table creation statement.
mods (dict, optional) – Dictionary of modification name str to modification str mappings. Replaces modification name in the sql by the respective modification str. See the notes and the examples section for details.
**kwargs – Convenient way to add modification mappings. Keyword to argument mappings will be added to the mods dictionary. The keyword cannot be one of the positional parameter names.
- Returns:
Returns dataframe if the database returns any result. Returns dataframe with column names and zero rows if running query that returns zero rows. Returns None otherwise, typically when running DDL/DML statement.
- Return type:
pandas.DataFrame or None
Notes
The args and the mods parameter
An argument name or a modification name is denoted in the sql by prepending a colon symbol
:
before a series of alphanumeric or underscore symbols.In addition, the end of the series for the modification name is to be followed by a non-alphanumeric or a end of line symbol. This is to distinguish names such as
:var
and:var1
.The args parameter binding is recommanded where possible, while the mods paramter method of text replacement gives more flexibility when it comes to programatically generate sql statment.
Examples
Run sql:
>>> from dwopt import lt >>> lt.iris() >>> lt.run("select * from iris limit 1") sepal_length sepal_width petal_length petal_width species 0 5.1 3.5 1.4 0.2 setosa
Run sql with argument passing:
>>> from dwopt import lt >>> lt.iris() >>> lt.run("select count(1) from iris where species = :x", ... args = {'x':'setosa'}) count(1) 0 50
Run sql with text modification:
>>> from dwopt import lt >>> lt.iris() >>> old = 'iris' >>> new = 'iris2' >>> lt.run("drop table if exists :var", var=new) >>> lt.run("create table :x as select * from :y", mods={'x':new, 'y': old}) >>> lt.run("select count(1) from :tbl", tbl=new) count(1) 0 150
Run from sql script:
>>> from dwopt import pg, make_test_tbl >>> _ = make_test_tbl(pg) >>> pg.run(pth = "E:/projects/my_sql_script.sql", ... my_run_dte = '2022-03-03', ... my_label = '20220303', ... threshold = 5) count 0 137
Above runs the sql stored on
E:/projects/my_sql_script.sql
as below:drop table if exists monthly_extract_:my_label; create table monthly_extract_:my_label as select * from test where dte = to_date(':my_run_dte','YYYY-MM-DD') and score > :threshold; select count(1) from monthly_extract_:my_label;
- table_cols(sch_tbl_nme)[source]
Show information of specified table’s columns.
Notes
Postgre sql used, information_schema.columns:
select column_name, data_type from information_schema.columns where table_schema = ':schema_nme' and table_name = ':tbl_nme'
Oracle sql used, all_tab_columns:
select/*+PARALLEL (4)*/ * from all_tab_columns where owner = ':schema_nme' and table_name = ':tbl_nme'
- Parameters:
sch_tbl_nme (str) – Table name in format: schema.table.
- Return type:
pandas.DataFrame
Examples
>>> from dwopt import pg >>> pg.iris() >>> pg.table_cols('public.iris') column_name data_type 0 sepal_length real 1 sepal_width real 2 petal_length real 3 petal_width real 4 species character varying
- table_sizes()[source]
List sizes of all tables in current schema.
- Return type:
pandas.DataFrame
Notes
Oracle sql used, user_extents:
select/*+PARALLEL (4)*/ tablespace_name,segment_type,segment_name ,sum(bytes)/1024/1024 table_size_mb from user_extents group by tablespace_name,segment_type,segment_name
- write(df, sch_tbl_nme)[source]
Make and run a insert many statement.
Pre-processing
Pandas Datetime64 columns are converted into object columns, and the
pandas.NaT
objects are converted intoNone
.Pandas Float64 columns are converted into object columns, and the
pandas.NaN
objects are converted intoNone
.
This should follow from a
dwopt.dbo._Db.create()
call which sets up the database table with table name, column names, intended data types, and constraints.- Parameters:
df (pandas.DataFrame) – Payload Dataframe with data to insert.
sch_tbl_nme (str) – Table name in form
my_schema1.my_table1
ormy_table1
.
Notes
Reversibility
Ideally python dataframe written to database should allow a exact same dataframe to be read back into python. Whether this is true depends on:
The database.
The data and object types on the dataframe.
The data types on the database table.
With the set up used in the
dwopt.make_test_tbl()
function, following results is obtained:The postgre table is reversible except for row order on select from database. Example fix/strategy for comparison:
df.sort_values('id').reset_index(drop=True)
Sqlite stores date/datetime as text, this causes a str type column to be read back. One strategy is to convert from datatime and NaT to str and None before insertion, and convert to date and datetime when reading back. Example fix/strategy for comparison:
lt.write( df.assign( time=lambda x: x.time.astype(str).where(~x.time.isna(), None)), "test2", ) tbl = ( db.qry("test2").run() .assign( dte=lambda x: x["dte"].apply( lambda x: datetime.date.fromisoformat(x) if x else None ), time=lambda x: pd.to_datetime(x.time), ) )
Oracle has same issue as postgre. In addition:
Both date and datetime are stored as date format, and are read back as datetime.
Datetime milliseconds are lost on the database.
Date are stored in dd-MMM-yy format on database.
Date passed into varchar2 type column are stored in dd-MMM-yy format.
Example fix/strategy for comparison:
tbl = db.run("select * from test2 order by id").assign( dte=lambda x: x["dte"].apply(lambda x: x.date() if x else None) ) df2 = df.assign( time=lambda x: x["time"].apply(lambda x: x.replace(microsecond=0)) )
Examples
Write dataframe into a table.
>>> import pandas as pd >>> from dwopt import lt >>> tbl = pd.DataFrame({'col1': [1, 2], 'col2': ['a', 'b']}) >>> lt.drop('test') >>> lt.create('test', col1='int', col2='text') >>> lt.write(tbl,'test') >>> lt.run('select * from test') col1 col2 0 1 a 1 2 b
Attempt to write a dataframe into database and query back the same dataframe.
>>> from dwopt import make_test_tbl >>> from pandas.testing import assert_frame_equal >>> pg, df = make_test_tbl('pg') >>> pg.drop('test') >>> pg.create( ... "test", ... dtypes={ ... "id": "bigint primary key", ... "score": "float8", ... "amt": "bigint", ... "cat": "varchar(20)", ... "dte":"date", ... "time":"timestamp" ... } ... ) >>> pg.write(df, 'test') >>> df_back = pg.qry('test').run().sort_values('id').reset_index(drop=True) >>> assert_frame_equal(df_back, df)
- write_nodup(tbl, sch_tbl_nme, pkey, where=None)[source]
Insert without creating duplicates.
Does below:
Make and run a select statement with optionally provided where clause.
If step 1 returns any results and the payload table in non-empty , remove duplicates on the payload table, using the provided primary key columns as judge of duplication.
Make insert statement on the non-duplicating payload data via the
dwopt.dbo._Db.write()
method.
- Parameters:
tbl (pandas.DataFrame) – Payload Dataframe with data to insert.
sch_tbl_nme (str) – Table name in form
my_schema1.my_table1
ormy_table1
.pkey ([str]) – Iterable of column name str.
where (str) – where clause in str form. The
where
keyword is not needed.
See also
Examples
>>> import pandas as pd >>> from dwopt import lt >>> tbl = pd.DataFrame({'col1': [1, 2], 'col2': ['a', 'b']}) >>> tbl2 = pd.DataFrame({'col1': [1, 3], 'col2': ['a', 'c']}) >>> lt.drop('test') >>> lt.create('test', col1='int', col2='text') >>> lt.write(tbl, 'test') >>> lt.write_nodup(tbl2, 'test', ['col1'], "col1 < 4") >>> lt.run("select * from test") col1 col2 0 1 a 1 2 b 2 3 c