import sqlalchemy as alc
import pandas as pd
import numpy as np
import logging
import re
from dwopt._qry import _Qry
from dwopt.set_up import _make_iris_df, _make_mtcars_df
_logger = logging.getLogger(__name__)
[docs]def db(eng):
"""The :class:`database operator object <dwopt.dbo._Db>` factory.
Args
-----------
eng: str, or sqlalchemy.engine.Engine
A `sqlalchemy engine url <https://docs.sqlalchemy.org/en/14/
core/engines.html#database-urls>`_, which
combines the user name, password, database names, etc.
Alternatively a Database connection engine to be used.
Use the :func:`dwopt.make_eng` function to make engine.
Returns
-------
dwopt.dbo._Db
The relevant database operator object.
Examples
-------------
Produce a sqlite database operator object:
>>> from dwopt import db
>>> d = db("sqlite://")
>>> d.mtcars()
>>> d.run('select count(1) from mtcars')
count(1)
0 32
Produce a postgre database operator object:
>>> from dwopt import db
>>> url = "postgresql://dwopt_tester:1234@localhost/dwopt_test"
>>> db(url).iris(q=True).len()
150
Produce using engine object:
>>> from dwopt import db, make_eng
>>> eng = make_eng("sqlite://")
>>> db(eng).mtcars(q=1).len()
32
Produce an oracle database operator object:
>>> from dwopt import db, Oc
>>> url = "oracle://scott2:tiger@tnsname"
>>> isinstance(db(url), Oc)
True
"""
if isinstance(eng, str):
eng = alc.create_engine(eng)
else:
if not isinstance(eng, alc.engine.Engine):
raise ValueError("Invalid eng, either engine url or engine")
nme = eng.name
if nme == "postgresql":
return Pg(eng)
elif nme == "sqlite":
return Lt(eng)
elif nme == "oracle":
return Oc(eng)
else:
raise ValueError("Invalid engine, either postgre, sqlite, or oracle")
[docs]def Db(eng):
"""Alias for :func:`dwopt.db`"""
return db(eng)
[docs]class _Db:
"""
The base database operator class.
See examples for quick-start.
Instantiate the child classes for different databases via one of below ways:
* The factory function: :func:`dwopt.db`.
* The pre-instantiated objects on package import.
* The relevant child classes.
The child classes and the pre-instantiated objects:
========== =================== ========================
Database Child class Pre-instantiated object
========== =================== ========================
Postgre ``dwopt.Pg(eng)`` ``dwopt.pg``
Sqlite ``dwopt.Lt(eng)`` ``dwopt.lt``
Oracle ``dwopt.Oc(eng)`` ``dwopt.oc``
========== =================== ========================
Pre-instantiation uses the default credentials set-up prior by the user
via the :func:`dwopt.save_url` function.
Args
----------
eng: str, or sqlalchemy.engine.Engine
A `sqlalchemy engine url <https://docs.sqlalchemy.org/en/14/
core/engines.html#database-urls>`_, which
combines the user name, password, database names, etc.
Alternatively a Database connection engine to be used.
Use the :func:`dwopt.make_eng` function to make engine.
Attributes
----------
eng: sqlalchemy.engine.Engine
Underlying engine. Details see
`sqlalchemy.engine.Engine <https://docs.sqlalchemy.org/en/14/core/
connections.html#sqlalchemy.engine.Engine>`_
meta: sqlalchemy.schema.MetaData
Underlying metadata. Details see
`sqlalchemy.schema.MetaData <https://docs.sqlalchemy.org/en/14/core/
metadata.html#sqlalchemy.schema.MetaData>`_
Examples
--------
Instantiate and use a Sqlite database operator object via factory:
>>> from dwopt import db
>>> d = db("sqlite://")
>>> d.mtcars()
>>> d.run('select count(1) from mtcars')
count(1)
0 32
Use the pre-instantiated Sqlite database operator object:
>>> from dwopt import lt
>>> lt.iris()
>>> lt.qry('iris').len()
150
Instantiate and use a Postgre database operator object via the class:
>>> from dwopt import Pg
>>> p = Pg("postgresql://dwopt_tester:1234@localhost/dwopt_test")
>>> p.mtcars(q=1).len()
32
"""
def __init__(self, eng):
if isinstance(eng, str):
self.eng = alc.create_engine(eng)
else:
self.eng = eng
self.meta = alc.MetaData()
_nme = self.eng.name
if _nme == "postgresql":
self._dialect = "pg"
elif _nme == "sqlite":
self._dialect = "lt"
elif _nme == "oracle":
self._dialect = "oc"
def _bind_mods(self, sql, mods=None, **kwargs):
"""Apply modification to sql statement
Examples
-----------
import re
def f(sql, i, j):
return re.sub(f":{i}(?=[^a-zA-Z0-9]|$)", str(j), sql)
f("from tbl_:yr_0304", 'yr', 2017)
f(f("from tbl_:yr_:yr1_0304", 'yr', 2017), 'yr1', 2018)
f("from tbl_:yr_mth_tbl", 'yr_mth', 2017)
"""
if mods is None:
mods = kwargs
else:
mods.update(kwargs)
for i, j in mods.items():
sql = re.sub(f":{i}(?=[^a-zA-Z0-9]|$)", str(j), sql)
_logger.debug(f"replaced :{i} by {j}")
return sql
def _guess_dtype(self, dtype):
"""See :meth:`dwopt.dbo._Db.create`"""
if np.issubdtype(dtype, np.int64):
return alc.Integer
elif np.issubdtype(dtype, np.float64):
return alc.Float
elif np.issubdtype(dtype, np.datetime64):
return alc.DateTime
else:
return alc.String
def _parse_sch_tbl_nme(self, sch_tbl_nme, split=True):
"""Resolve schema dot table name name into lower case components.
Args
------
sch_tbl_nme: str
Table name in form ``my_schema1.my_table1`` or ``my_table1``.
split: bool
Split form or not.
Returns
----------
str or (str, str, str)
parsed names, all elements can be None.
Examples
---------
>>> import dwopt
>>> d = dwopt.dbo._Db
>>> f = lambda x:d._parse_sch_tbl_nme(d, x, split=True)
>>> g = lambda x:d._parse_sch_tbl_nme(d, x)
>>> for i in ['ab', 'Ab', 'ab.ab', 'Ab.Ab', 'Ab.Ab.Ab', '', None, 3]:
... print(f"{i = }, {f(i) = }, {g(i) = }")
i = 'ab', f(i) = ('ab', None, 'ab'), g(i) = 'ab'
i = 'Ab', f(i) = ('ab', None, 'ab'), g(i) = 'ab'
i = 'ab.ab', f(i) = ('ab.ab', 'ab', 'ab'), g(i) = 'ab.ab'
i = 'Ab.Ab', f(i) = ('ab.ab', 'ab', 'ab'), g(i) = 'ab.ab'
i = 'Ab.Ab.Ab', f(i) = ('ab.ab.ab', 'ab', 'ab.ab'), g(i) = 'ab.ab.ab'
i = '', f(i) = ('', None, ''), g(i) = ''
i = None, f(i) = (None, None, None), g(i) = None
i = 3, f(i) = (None, None, None), g(i) = None
"""
try:
clean = sch_tbl_nme.lower()
items = clean.split(".")
except AttributeError:
sch = None
tbl_nme = None
full_nme = None
else:
n = len(items)
if n == 1:
sch = None
tbl_nme = items[0]
full_nme = tbl_nme
elif n == 2:
sch = items[0]
tbl_nme = items[1]
full_nme = clean
else:
sch = items[0]
tbl_nme = ".".join(items[1:n])
full_nme = clean
if split:
return full_nme, sch, tbl_nme
else:
return full_nme
def _remove_sch_tbl(self, sch_tbl_nme):
"""Remove sch_tbl from meta.
Args
------
sch_tbl_nme: str
Table name in form ``my_schema1.my_table1`` or ``my_table1``.
Examples
-----------
Set-up::
from dwopt import pg
import sqlalchemy as alc
meta = pg.meta
First table entry into meta overwrites second one::
meta.clear()
alc.Table('test', meta, schema='test')
alc.Table('test.test', meta)
meta.tables
meta.clear()
alc.Table('test.test', meta)
alc.Table('test', meta, schema='test')
meta.tables
No schema is entered unless explicitly::
meta.clear()
alc.Table('test.test', meta, schema=None)
meta.clear()
alc.Table('test.test.test', meta)
meta.clear()
alc.Table('test.test', meta, schema='test')
Items removed by key not certain on schema::
meta.clear()
alc.Table('test', meta)
alc.Table('test.test', meta)
alc.Table('test.test.test', meta)
meta.tables['test']
meta.tables['test.test']
meta.tables['test.test.test']
meta.tables
"""
if sch_tbl_nme in self.meta.tables:
self.meta.remove(self.meta.tables[sch_tbl_nme])
def _run(self, sql, args=None):
"""Run sql statement with argument passing"""
with self.eng.begin() as c:
_logger.info(f"running:\n{sql}")
if args is not None:
_logger.info(f"{len(args) = }")
r = c.execute(alc.text(sql), args)
else:
r = c.execute(sql)
_logger.info("done")
if r.returns_rows:
return pd.DataFrame(r.all(), columns=r.keys())
[docs] def add_pkey(self, sch_tbl_nme, pkey):
"""Make and run an add primary key statement.
Work on postgre and oracle.
Args
----------
sch_tbl_nme: str
Table name in form ``my_schema1.my_table1`` or ``my_table1``.
pkey : str
columns names in form "col1, col2, ...".
Examples
--------
>>> from dwopt import pg
>>> pg.mtcars()
>>> pg.add_pkey('mtcars', 'name')
>>> pg.qry('information_schema.constraint_table_usage').select(
... 'table_name, constraint_name').where(
... "table_schema = 'public'", "table_name = 'mtcars'").run()
table_name constraint_name
0 mtcars mtcars_pkey
"""
sql = f"alter table {sch_tbl_nme} add primary key ({pkey})"
return self.run(sql)
[docs] def create(self, sch_tbl_nme, dtypes=None, **kwargs):
"""
Make and run a create table statment.
Args
----------
sch_tbl_nme: str
Table name in form ``my_schema1.my_table1`` or ``my_table1``.
dtypes : {str:str}, optional
Dictionary of column names to data types mappings.
**kwargs :
Convenient way to add mappings.
Keyword to argument mappings will be added to the dtypes
dictionary.
Notes
-----
**Datatypes**
Datatypes vary across databses
(`postgre type <https://www.postgresql.org/docs/current/
datatype.html>`_,
`sqlite type <https://www.sqlite.org/datatype3.html>`_,
`oracle type <https://docs.oracle.com/en/database/oracle/
oracle-database/21/sqlqr/Data-Types.html>`_),
common example below:
========== =========== ======= ============
Type Postgre Sqlite Oracle
========== =========== ======= ============
integer bigint integer number
float float8 real float
string varchar(20) text varchar2(20)
datetime timestamp text timestamp
date date text date
========== =========== ======= ============
Note `sqlite datetime functions <https://www.sqlite.org/lang_datefunc.html>`_
are supposed to be used to work with datetime data types stored as text.
**Other statements**
The ``dtypes`` mappings also allow other sql statements which are
part of a create statement to be added
(`sqlite other <https://sqlite.org/lang_createtable.html>`_,
`postgre other <https://www.postgresql.org/docs/current/
sql-createtable.html>`_,
`oracle other <https://docs.oracle.com/en/database/oracle/
oracle-database/21/sqlrf/CREATE-TABLE.html>`_).
For example a primary key constraint.
Examples
--------
>>> from dwopt import lt
>>> lt.drop('test')
>>> lt.create(
... 'test',
... {
... 'id': 'integer'
... ,'score': 'real'
... ,'amt': 'integer'
... ,'cat': 'text'
... ,'time': 'text'
... ,'constraint df_pk': 'primary key (id)'
... })
>>> lt.run("select * from test")
Empty DataFrame
Columns: [id, score, amt, cat, time]
Index: []
>>> lt.drop('test2')
>>> lt.create('test2', id='integer', score='real', cat='text')
>>> lt.run("select * from test2")
Empty DataFrame
Columns: [id, score, cat]
Index: []
"""
if dtypes is None:
dtypes = kwargs
else:
dtypes.update(kwargs)
cls = ""
for col, dtype in dtypes.items():
cls += f"\n ,{col} {dtype}"
self.run(f"create table {sch_tbl_nme}(" f"\n {cls[6:]}" "\n)")
[docs] def create_schema(self, sch_nme):
"""Make and run a create schema statement.
Works on postgre.
Args
----------
sch_nme: str
Schema name.
Examples
--------
>>> from dwopt import pg
>>> pg.create_schema('test')
>>> pg.iris('test.iris').len()
150
"""
try:
self.run(f"create schema {sch_nme}")
except Exception as ex:
if "already exists" in str(ex):
pass
else:
raise (ex)
[docs] def cwrite(self, df, sch_tbl_nme):
"""
Create table and insert based on dataframe.
* Replace ``.`` by ``_`` in dataframe column names.
* Data types infered based on the :meth:`dwopt.dbo._Db.create` method notes.
* Datetime and reversibility issue see :meth:`dwopt.dbo._Db.write` method notes.
Args
----------
df : pandas.DataFrame
Payload Dataframe with data to insert.
sch_tbl_nme: str
Table name in form ``my_schema1.my_table1`` or ``my_table1``.
Examples
--------
>>> import pandas as pd
>>> from dwopt import lt
>>> tbl = pd.DataFrame({'col1': [1, 2], 'col2': ['a', 'b']})
>>> lt.drop('test')
>>> lt.cwrite(tbl, 'test')
>>> lt.qry('test').run()
col1 col2
0 1 a
1 2 b
Attempt to write a dataframe into database and query back the same dataframe.
>>> from dwopt import pg
>>> from pandas.testing import assert_frame_equal
>>> df = pg.mtcars(q=1).run().sort_values('name').reset_index(drop=True)
>>> pg.drop('mtcars2')
>>> pg.cwrite(df, 'mtcars2')
>>> df_back = pg.qry('mtcars2').run().sort_values('name').reset_index(drop=True)
>>> assert_frame_equal(df_back, df)
"""
df = df.copy()
df.columns = [_.lower().replace(".", "_") for _ in df.columns]
sch_tbl_nme, sch, tbl_nme = self._parse_sch_tbl_nme(sch_tbl_nme)
self._remove_sch_tbl(sch_tbl_nme)
tbl = alc.Table(
tbl_nme,
self.meta,
*[alc.Column(col, self._guess_dtype(df[col].dtype)) for col in df.columns],
schema=sch,
)
_logger.info("creating table via sqlalchemy:")
for col in tbl.columns.items():
_logger.info(f"{col}")
tbl.create(self.eng)
_logger.info("done")
self.write(df, sch_tbl_nme)
[docs] def delete(self):
"""WIP"""
raise NotImplementedError
[docs] def drop(self, sch_tbl_nme):
"""Drop table if exist.
Args
----------
sch_tbl_nme: str
Table name in form ``my_schema1.my_table1`` or ``my_table1``.
See also
----------
:meth:`dwopt.dbo._Db.exist`
Examples
--------
>>> from dwopt import lt
>>> lt.drop('iris')
>>> lt.iris()
>>> lt.drop('iris')
>>> lt.list_tables()
Empty DataFrame
Columns: [type, name, tbl_name, rootpage, sql]
Index: []
>>> from dwopt import pg
>>> pg.create_schema('test')
>>> tbl = 'test.iris'
>>> pg.iris(tbl)
>>> pg.exist(tbl)
True
>>> pg.drop(tbl)
>>> pg.exist(tbl)
False
"""
sch_tbl_nme, sch, tbl_nme = self._parse_sch_tbl_nme(sch_tbl_nme)
self._remove_sch_tbl(sch_tbl_nme)
with self.eng.connect() as conn:
_logger.info(f"dropping table via sqlalchemy: {sch_tbl_nme}")
alc.Table(tbl_nme, self.meta, schema=sch).drop(conn, checkfirst=True)
_logger.info("done")
[docs] def exist(self, sch_tbl_nme):
"""Check if table exist.
Args
------
sch_tbl_nme: str
Table name in form ``my_schema1.my_table1`` or ``my_table1``.
Returns
----------
bool
Examples
---------
>>> from dwopt import lt
>>> lt.iris()
>>> lt.drop('mtcars')
>>> lt.exist('iris')
True
>>> lt.exist('mtcars')
False
>>> from dwopt import pg as d
>>> d.create_schema('test')
>>> d.iris('test.iris')
>>> d.drop('test.mtcars')
>>> d.exist('test.iris')
True
>>> d.exist('test.mtcars')
False
"""
sch_tbl_nme, sch, tbl_nme = self._parse_sch_tbl_nme(sch_tbl_nme)
self._remove_sch_tbl(sch_tbl_nme)
try:
_logger.info(f"reflecting table via sqlalchemy: {sch_tbl_nme}")
self.meta.reflect(self.eng, schema=sch, only=[tbl_nme])
_logger.info("done")
return True
except Exception as ex:
if "Could not reflect: requested table(s) not available in Engine" in str(
ex
):
_logger.debug(ex)
return False
else:
raise ex
[docs] def iris(self, sch_tbl_nme="iris", q=False):
"""Create the iris test table on the database.
Drop and recreate if already exist.
Sourced from `UCI iris <https://archive.ics.uci.edu/ml/datasets/Iris/>`_.
args
-------
sch_tbl_nme: str
Table name in form ``my_schema1.my_table1`` or ``my_table1``.
Default ``iris``.
q: bool
Return query object or not. Default False.
Returns
-------
None or dwopt._qry._Qry
Query object with sch_tbl_nme loaded for convenience.
Examples
--------
>>> from dwopt import lt
>>> lt.iris()
>>> lt.run('select count(*) from iris')
count(*)
0 150
>>> from dwopt import lt
>>> lt.iris(q=True).valc('species', 'avg(petal_length)')
species n avg(petal_length)
0 sicolor 50 4.260
1 setosa 50 1.462
2 rginica 50 5.552
>>> from dwopt import pg
>>> pg.create_schema('test')
>>> pg.iris('test.iris', q=1).len()
150
"""
sch_tbl_nme = self._parse_sch_tbl_nme(sch_tbl_nme, split=False)
self.drop(sch_tbl_nme)
self.cwrite(_make_iris_df(), sch_tbl_nme)
if q:
return self.qry(sch_tbl_nme)
[docs] def list_cons(self):
"""
List all constraints.
Only works for postgre.
Uses the postgre `information_schema.constraint_table_usage
<https://www.postgresql.org/docs/current/infoschema-
constraint-table-usage.html>`_ table.
Returns
-------
pandas.DataFrame
Examples
----------
>>> from dwopt import pg
>>> pg.mtcars()
>>> pg.add_pkey('mtcars', 'name')
>>> pg.list_cons().loc[
... lambda x:(x.table_schema == 'public') & (x.table_name == 'mtcars'),
... ['table_name', 'constraint_name']
... ]
table_name constraint_name
0 mtcars mtcars_pkey
"""
if self._dialect == "pg":
sql = "SELECT * FROM information_schema.constraint_table_usage"
return self.run(sql)
else:
raise NotImplementedError
[docs] def list_tables(self, owner):
"""
List all tables on database or specified schema.
Args
----------
owner : str
Only applicable for oracle. Name of the schema(owner).
Returns
-------
pandas.DataFrame
Notes
-----
Postgre sql used, `information_schema.tables
<https://www.postgresql.org/docs/current/infoschema-tables.html>`_:
.. code-block:: sql
select
table_catalog,table_schema,table_name
,is_insertable_into,commit_action
from information_schema.tables
where table_schema
not in ('information_schema','pg_catalog')
Sqlite sql used, `sqlite_schema <https://www.sqlite.org/schematab.html>`_:
.. code-block:: sql
select * from sqlite_master
where type ='table'
and name NOT LIKE 'sqlite_%'
Oracle sql used, `all_tab_columns
<https://docs.oracle.com/en/database/oracle/oracle-database/21/
refrn/ALL_TAB_COLUMNS.html>`_:
.. code-block:: sql
select/*+PARALLEL (4)*/ owner,table_name
,max(column_name),min(column_name)
from all_tab_columns
where owner = ':owner'
group by owner,table_name
Examples
-----------
>>> from dwopt import lt
>>> lt.iris()
>>> lt.mtcars()
>>> lt.list_tables().iloc[:,:-1]
type name tbl_name rootpage
0 table iris iris 2
1 table mtcars mtcars 5
"""
raise NotImplementedError
[docs] def mtcars(self, sch_tbl_nme="mtcars", q=False):
"""Create the mtcars test table on the database.
Drop and recreate if already exist.
Sourced from `R mtcars <https://www.rdocumentation.org/packages/datasets
/versions/3.6.2/topics/mtcars>`_.
args
-------
sch_tbl_nme: str
Table name in form ``my_schema1.my_table1`` or ``my_table1``.
Default ``mtcars``.
q: bool
Return query object or not. Default False.
Returns
-------
None or dwopt._qry._Qry
Query object with sch_tbl_nme loaded for convenience.
Examples
--------
>>> from dwopt import lt
>>> lt.mtcars()
>>> lt.run('select count(*) from mtcars')
count(*)
0 32
>>> from dwopt import lt
>>> lt.mtcars(q=True).valc('cyl', 'avg(mpg)')
cyl n avg(mpg)
0 8 14 15.100000
1 4 11 26.663636
2 6 7 19.742857
>>> from dwopt import pg
>>> pg.create_schema('test')
>>> pg.mtcars('test.mtcars', q=1).len()
32
"""
sch_tbl_nme = self._parse_sch_tbl_nme(sch_tbl_nme, split=False)
self.drop(sch_tbl_nme)
self.cwrite(_make_mtcars_df(), sch_tbl_nme)
if q:
return self.qry(sch_tbl_nme)
[docs] def qry(self, *args, **kwargs):
"""Make a :class:`query object <dwopt._qry._Qry>`.
Args
----------
*args :
Positional arguments of the :class:`dwopt._qry._Qry`>.
**kwargs :
keyword arguments of the :class:`dwopt._qry._Qry`.
Returns
-------
dwopt._qry._Qry
Examples
--------
>>> from dwopt import lt
>>> lt.mtcars()
>>> lt.qry('mtcars').valc('cyl', 'avg(mpg)')
cyl n avg(mpg)
0 8 14 15.100000
1 4 11 26.663636
2 6 7 19.742857
"""
return _Qry(self, *args, **kwargs)
[docs] def run(self, sql=None, args=None, pth=None, mods=None, **kwargs):
"""
Run sql statement.
Features:
* Argument binding.
* Text replacement.
* Reading from sql script file.
Args
----------
sql : str, optional
The sql statement to run.
args : dict, or [dict], optional
Dictionary or list of dictionary of argument name str to argument
data object mappings.
These argument data objects are passed via sqlalchemy to the database,
to function as data for the argument names.
See the notes and the examples section for details.
pth : str, optional
Path to sql script, ignored if the sql parameter is not None.
The script can hold a sql statement, for example a significant piece
of table creation statement.
mods : dict, optional
Dictionary of modification name str to modification str mappings.
Replaces modification name in the sql by the respective
modification str.
See the notes and the examples section for details.
**kwargs :
Convenient way to add modification mappings.
Keyword to argument mappings will be added to the mods dictionary.
The keyword cannot be one of the positional parameter names.
Returns
-------
pandas.DataFrame or None
Returns dataframe if the database returns any result.
Returns dataframe with column names and zero rows if running query
that returns zero rows.
Returns None otherwise, typically when running DDL/DML statement.
Notes
-----
**The args and the mods parameter**
An argument name or a modification name is denoted in the sql by prepending
a colon symbol ``:`` before a series of alphanumeric or underscore symbols.
In addition, the end of the series for the modification name is to be
followed by a non-alphanumeric or a end of line symbol. This is to distinguish
names such as ``:var`` and ``:var1``.
The args parameter binding is recommanded where possible,
while the mods paramter method of text replacement gives
more flexibility when it comes to programatically generate sql statment.
Examples
--------
Run sql:
>>> from dwopt import lt
>>> lt.iris()
>>> lt.run("select * from iris limit 1")
sepal_length sepal_width petal_length petal_width species
0 5.1 3.5 1.4 0.2 setosa
Run sql with argument passing:
>>> from dwopt import lt
>>> lt.iris()
>>> lt.run("select count(1) from iris where species = :x",
... args = {'x':'setosa'})
count(1)
0 50
Run sql with text modification:
>>> from dwopt import lt
>>> lt.iris()
>>> old = 'iris'
>>> new = 'iris2'
>>> lt.run("drop table if exists :var", var=new)
>>> lt.run("create table :x as select * from :y", mods={'x':new, 'y': old})
>>> lt.run("select count(1) from :tbl", tbl=new)
count(1)
0 150
Run from sql script:
>>> from dwopt import pg, make_test_tbl
>>> _ = make_test_tbl(pg)
>>> pg.run(pth = "E:/projects/my_sql_script.sql",
... my_run_date = '2022-03-03',
... my_label = '20220303',
... threshold = 5)
count
0 137
Above runs the sql stored on ``E:/projects/my_sql_script.sql`` as below:
.. code-block:: sql
drop table if exists monthly_extract_:my_label;
create table monthly_extract_:my_label as
select * from test
where
date = to_date(':my_run_date','YYYY-MM-DD')
and score > :threshold;
select count(1) from monthly_extract_:my_label;
"""
if sql is None and pth is not None:
with open(pth) as f:
sql = f.read()
_logger.info(f"sql from:\n{pth}")
if mods is not None or len(kwargs) > 0:
sql = self._bind_mods(sql, mods, **kwargs)
return self._run(sql, args)
[docs] def table_cols(self, sch_tbl_nme):
"""
Show information of specified table's columns.
Notes
-----
Postgre sql used, `information_schema.columns
<https://www.postgresql.org/docs/current/infoschema-columns.html>`_:
.. code-block:: sql
select column_name, data_type
from information_schema.columns
where table_schema = ':schema_nme'
and table_name = ':tbl_nme'
Oracle sql used, `all_tab_columns
<https://docs.oracle.com/en/database/oracle/oracle-database/21/
refrn/ALL_TAB_COLUMNS.html>`_:
.. code-block:: sql
select/*+PARALLEL (4)*/ *
from all_tab_columns
where owner = ':schema_nme'
and table_name = ':tbl_nme'
Parameters
----------
sch_tbl_nme : str
Table name in format: `schema.table`.
Returns
-------
pandas.DataFrame
Examples
-----------
>>> from dwopt import pg
>>> pg.iris()
>>> pg.table_cols('public.iris')
column_name data_type
0 sepal_length real
1 sepal_width real
2 petal_length real
3 petal_width real
4 species character varying
"""
raise NotImplementedError
[docs] def table_sizes(self):
"""
List sizes of all tables in current schema.
Returns
-------
pandas.DataFrame
Notes
-----
Oracle sql used, `user_extents
<https://docs.oracle.com/en/database/oracle/oracle-database/21/refrn/
USER_EXTENTS.html>`_:
.. code-block:: sql
select/*+PARALLEL (4)*/
tablespace_name,segment_type,segment_name
,sum(bytes)/1024/1024 table_size_mb
from user_extents
group by tablespace_name,segment_type,segment_name
"""
raise NotImplementedError
[docs] def update(self):
"""WIP"""
raise NotImplementedError
[docs] def write(self, df, sch_tbl_nme):
"""
Make and run a insert many statement.
This should follow from a :meth:`dwopt.dbo._Db.create` call which sets up
the database table with table name, column names, intended data types,
and constraints.
Args
----------
df: pandas.DataFrame
Payload Dataframe with data to insert.
sch_tbl_nme: str
Table name in form ``my_schema1.my_table1`` or ``my_table1``.
Notes
-----
**Datetime**
Pandas Datetime64 columns are converted into object columns, and the
``pandas.NaT`` objects are converted into ``None`` before insertion.
For sqlite tables, datetime columns should be manually converted to str
and None before insertion.
**Reversibility**
Ideally python dataframe written to database should allow a exact same
dataframe to be read back into python. Whether this is true depends on the
database, data and object types on the dataframe,
and data types on the database table.
With the set up used in the :func:`dwopt.make_test_tbl` function, we have
following results (See the examples and the test function relevant for
the :meth:`dwopt.dbo._Db.write_nodup` method):
* Postgre example has reversibility except for row ordering and auto generated
pandas dataframe index. These can be strightened as below.
.. code-block:: python
df.sort_values('id').reset_index(drop=True)
* Sqlite stores datetime datatypes as text, this causes a str type column to
be read back. One strategy is to convert from datatime and NaT to
str and None before insertion, and convert back to date and datetime
when read back. Use ``datetime`` and ``pandas`` package for this.
.. code-block:: python
lt.write(
df.assign(
time=lambda x: x.time.astype(str).where(~x.time.isna(), None)),
"test2",
)
tbl = (
db.qry("test2").run()
.assign(
date=lambda x: x["date"].apply(
lambda x: datetime.date.fromisoformat(x) if x else None
),
time=lambda x: pd.to_datetime(x.time),
)
)
* Oracle is similiar to postgre.
Examples
--------
Write dataframe into a table.
>>> import pandas as pd
>>> from dwopt import lt
>>> tbl = pd.DataFrame({'col1': [1, 2], 'col2': ['a', 'b']})
>>> lt.drop('test')
>>> lt.create('test', col1='int', col2='text')
>>> lt.write(tbl,'test')
>>> lt.run('select * from test')
col1 col2
0 1 a
1 2 b
Attempt to write a dataframe into database and query back the same dataframe.
>>> from dwopt import make_test_tbl
>>> from pandas.testing import assert_frame_equal
>>> pg, df = make_test_tbl('pg')
>>> pg.drop('test')
>>> pg.create(
>>> "test",
>>> dtypes={
>>> "id": "bigint primary key",
>>> "score": "float8",
>>> "amt": "bigint",
>>> "cat": "varchar(20)",
>>> "date":"date",
>>> "time":"timestamp"
>>> }
>>> )
>>> pg.write(df, 'test')
>>> df_back = pg.qry('test').run().sort_values('id').reset_index(drop=True)
>>> assert_frame_equal(df_back, df)
"""
L = len(df)
sch_tbl_nme, sch, tbl_nme = self._parse_sch_tbl_nme(sch_tbl_nme)
if L == 0:
return
df = df.copy()
cols = df.columns.tolist()
for col in cols:
if np.issubdtype(df[col].dtype, np.datetime64):
df[col] = df[col].astype(object).where(~df[col].isna(), None)
self._remove_sch_tbl(sch_tbl_nme)
tbl = alc.Table(
tbl_nme, self.meta, *[alc.Column(col) for col in cols], schema=sch
)
_logger.info(f"running:\n{tbl.insert()}")
_ = df.to_dict("records")
_logger.info(f"args len={L}, e.g.\n{_[0]}")
with self.eng.connect() as conn:
conn.execute(
tbl.insert(),
_,
)
_logger.info("done")
[docs] def write_nodup(self, tbl, sch_tbl_nme, pkey, where=None):
"""Insert without creating duplicates.
Does below:
1. Make and run a select statement with optionally provided
where clause.
2. If step 1 returns any results and the payload table in non-empty
, remove duplicates on the payload table, using the provided primary
key columns as judge of duplication.
3. Make insert statement on the non-duplicating payload data via the
:meth:`dwopt.dbo._Db.write` method.
Args
----------
tbl: pandas.DataFrame
Payload Dataframe with data to insert.
sch_tbl_nme: str
Table name in form ``my_schema1.my_table1`` or ``my_table1``.
pkey: [str]
Iterable of column name str.
where: str
where clause in str form. The ``where`` keyword is not needed.
See also
--------
:meth:`dwopt.dbo._Db.write`
Examples
--------
>>> import pandas as pd
>>> from dwopt import lt
>>> tbl = pd.DataFrame({'col1': [1, 2], 'col2': ['a', 'b']})
>>> tbl2 = pd.DataFrame({'col1': [1, 3], 'col2': ['a', 'c']})
>>> lt.drop('test')
>>> lt.create('test', col1='int', col2='text')
>>> lt.write(tbl, 'test')
>>> lt.write_nodup(tbl2, 'test', ['col1'], "col1 < 4")
>>> lt.run("select * from test")
col1 col2
0 1 a
1 2 b
2 3 c
"""
cols = ",".join(pkey)
where_cls = f"\nwhere {where}" if where else ""
sch_tbl_nme = self._parse_sch_tbl_nme(sch_tbl_nme, split=False)
db_tbl = self.run(f"select {cols} from {sch_tbl_nme} {where_cls}")
l_tbl = len(tbl)
l_db_tbl = len(db_tbl)
if l_tbl > 0 and l_db_tbl > 0:
dedup_tbl = (
tbl.merge(
db_tbl, how="left", on=pkey, validate="one_to_one", indicator=True
)
.loc[lambda x: x._merge == "left_only", :]
.drop(columns="_merge")
)
else:
dedup_tbl = tbl
_logger.debug(
f"write nodup: {l_tbl = }, {l_db_tbl = }" f", {len(dedup_tbl) = }"
)
self.write(dedup_tbl, sch_tbl_nme)
class Pg(_Db):
def _guess_dtype(self, dtype):
if np.issubdtype(dtype, np.int64):
return alc.dialects.postgresql.BIGINT
elif np.issubdtype(dtype, np.float64):
return alc.Float(8)
else:
return super(type(self), self)._guess_dtype(dtype)
def list_cons(self):
sql = "SELECT * FROM information_schema.constraint_table_usage"
return self.run(sql)
def list_tables(self):
sql = (
"select table_catalog,table_schema,table_name"
"\n ,is_insertable_into,commit_action"
"\nfrom information_schema.tables"
"\nwhere table_schema"
"\n not in ('information_schema','pg_catalog')"
)
return self.run(sql)
def table_cols(self, sch_tbl_nme):
sch_tbl_nme, sch, tbl_nme = self._parse_sch_tbl_nme(sch_tbl_nme)
sql = (
"select column_name, data_type from information_schema.columns"
f"\nwhere table_schema = '{sch}' "
f"\nand table_name = '{tbl_nme}'"
)
return self.run(sql)
class Lt(_Db):
def _guess_dtype(self, dtype):
if np.issubdtype(dtype, np.float64):
return alc.REAL
elif np.issubdtype(dtype, np.datetime64):
return alc.String
else:
return super(type(self), self)._guess_dtype(dtype)
def list_tables(self):
sql = (
"select * from sqlite_master "
"\nwhere type ='table' "
"\nand name NOT LIKE 'sqlite_%' "
)
return self.run(sql)
class Oc(_Db):
def _guess_dtype(self, dtype):
if np.issubdtype(dtype, np.int64):
return alc.dialects.oracle.NUMBER
else:
return super(type(self), self)._guess_dtype(dtype)
def list_tables(self, owner):
sql = (
"select/*+PARALLEL (4)*/ owner,table_name"
"\n ,max(column_name),min(column_name)"
"\nfrom all_tab_columns"
f"\nwhere owner = '{owner.upper()}'"
"\ngroup by owner,table_name"
)
return self.run(sql)
def table_cols(self, sch_tbl_nme):
sch, tbl_nme = self._parse_sch_tbl_nme(sch_tbl_nme)
sql = (
"select/*+PARALLEL (4)*/ *"
"\nfrom all_tab_columns"
f"\nwhere owner = '{sch.upper()}'"
f"\nand table_name = '{tbl_nme.upper()}'"
)
return self.run(sql)
def table_sizes(self):
sql = (
"select/*+PARALLEL (4)*/"
"\n tablespace_name,segment_type,segment_name"
"\n ,sum(bytes)/1024/1024 table_size_mb"
"\nfrom user_extents"
"\ngroup by tablespace_name,segment_type,segment_name"
)
return self.run(sql)