"""Interface for interacting with a SQLite database containing geochemical data.
This module provides a class, GeochemDB, for interacting with a SQLite database, as well as helper functions for structuring and processing data from the database.
"""
import sqlite3
import numpy as np
import pandas as pd
from thefuzz import process
[docs]
class GeochemDB:
"""
assumes a SQLite database with the schema describe in the package
documentation.
"""
[docs]
def __init__(self, database_path):
"""
Initializes a GeochemDB instance.
Parameters
----------
database_path (str): Path to the SQLite database.
Attributes
----------
_database_path (str): Internal storage for the database path.
con (sqlite3.Connection): SQLite connection object.
cursor (sqlite3.Cursor): SQLite cursor object.
"""
self._database_path = database_path
self.con = sqlite3.connect(self._database_path)
self.cursor = self.con.cursor()
def __del__(self):
"""
destructor, just want to close sqlite connection
"""
self.con.close()
[docs]
def matchrows_strings(self, table, names, column, score_threshold=98):
"""
match to rows in a table based on a column in the row using strings
Parameters
----------
table : str
name of the table to match rows into.
names : arraylike
list of names to match in the table
column : str
name of column in table to do matching on
score_threshold : float
thefuzz score that matching must exceed to be a match
Returns
-------
idx : array (bool)
logical indices of length len(names); true for each entry matched
in the table
sample_matches_dict : dict
closest matching sample names in database with scores exceeding the
threshold as values for keys being the provide matching sample
names
"""
# read table
table_df = pd.read_sql_query(f'SELECT * from {table}', self.con)
# more flexible input
names = np.atleast_1d(names)
n_names = len(names)
# if table is empty, return empty idx, sample_matches
idx = np.zeros(n_names, dtype=bool)
if len(table_df) == 0:
return idx, {}
# table rows to match to
rows = table_df[column].values
# matched names
name_matches = []
row_matches = []
for ii, name in enumerate(names):
# use fuzzy matching to get nearest match and score
row_match, score = process.extractOne(name, rows)
# must meet threshold
if score >= score_threshold:
name_matches.append(name)
row_matches.append(row_match)
idx[ii] = True
row_match_dict = dict(zip(name_matches, row_matches))
return idx, row_match_dict
[docs]
def matchrows(self, table, values, columns):
"""
exactly match rows in a table based on provided values
Parameters
----------
table : str
name of the table to match rows into.
values : arraylike
array of values to match.
columns : arraylike
names of columns in table that contain values; must have same
length as second dimension of values
Returns
-------
idx : array (bool)
logical indices of length len(names); true for each row in values
matched in the table.
"""
# make columns array
columns = np.atleast_1d(columns)
# make values column vector if necessary
if values.ndim == 1:
values = values.reshape(-1, 1)
# make sure columns and values have same shapes
assert len(columns) == values.shape[1], \
'values.shape[1] should be same as number of comparison columns.'
# read table
table_df = pd.read_sql_query(f'SELECT * from {table}', self.con)
table_arr = table_df[columns].values
# indices of rows in values that are matched in table_arr
idx = (values[:, None] == table_arr).all(2).any(1)
return idx
[docs]
def matchcolumns(self, table, df_cols, score_threshold=96.0):
"""
Match columns of df to columns in the sqlite database
Parameters
----------
table : str
Table whose columns to match
df_cols : arraylike
Columns to match to columns in sqlite database
score_threshold : float
thefuzz score that matching must exceed to be a match
Returns
-------
col_match_dict : dictionary
dictionary of matches where keys are df_cols and values are the sql
columns for the matched table
"""
# get table header
res = self.cursor.execute(f'PRAGMA table_info("{table}")')
columns_info = res.fetchall()
# sqlite columns, not sure what the last 3 are
cols_sq_df = pd.DataFrame(columns=['id',
'name',
'type',
'c1',
'c2',
'c3'],
data=columns_info)
sq_cols = cols_sq_df['name'].values
# matching columns dict lists (to make dict later)
df_cols_matched = []
sq_cols_matched = []
# do matching
for col in df_cols:
sq_col_match, score = \
process.extractOne(col, sq_cols)
# must meet threshold
if score >= score_threshold:
df_cols_matched.append(col)
sq_cols_matched.append(sq_col_match)
col_match_dict = dict(zip(df_cols_matched, sq_cols_matched))
return col_match_dict
[docs]
def matchsamples_df(self, df, score_threshold=96.0):
"""
Match samples in a DataFrame with a 'sample' column to existing
samples in the database
Parameters
----------
df : pandas.DataFrame
DataFrame with a 'sample' column
Returns
-------
df_matched : pandas.DataFrame
df with rows corresponding to matched samples
"""
samples_unique = df['sample'].unique()
# now match parsed sample names against database
idx, sample_match_dict = self.matchrows_strings(
'Samples',
samples_unique,
'name',
score_threshold=score_threshold)
# check which sample_names were not matched
sample_names_matched = list(sample_match_dict.keys())
# samples with no matches
samples_not_matched = set(samples_unique) ^ \
set(sample_names_matched)
if len(samples_not_matched) > 0:
print(f'Sample names not matched:\n{samples_not_matched}')
# keep only rows with matched samples (indexing into df)
idx_samples = np.array(
[x in sample_names_matched for x in df['sample']])
df = df.iloc[idx_samples].copy()
# add sample column, renamed
df['sample'] = df['sample'].replace(sample_match_dict)
return df.copy()
[docs]
def insert_rows(self, table, columns, values):
"""
Insert rows into table.
Parameters
----------
table : str
name of table in which to insert row.
columns : arraylike
columns in table to insert new values for.
values : list
must be a list of tuples
Returns
-------
None.
"""
assert len(columns) == len(values[0]),\
'Must have value for each column.'
# columns and values as string
cols_str = ''
vals_str = ''
for ii in range(len(columns)-1):
cols_str = cols_str + columns[ii] + ', '
vals_str = vals_str + '?, '
cols_str = cols_str + columns[-1]
vals_str = vals_str + '?'
# sql string
sql = f'INSERT INTO {table} ({cols_str}) VALUES ({vals_str})'
# execute sql
self.cursor.executemany(sql, values)
# commit
self.con.commit()
[docs]
def update_rows(self, table,
match_columns, match_values,
update_columns, update_values):
"""
Update columns in rows in a table based on values in matching columns.
Parameters
----------
table : str
Name of table to update rows in.
match_columns : arraylike
Columns to do matching on.
match_values : list
List of tuples of values to match rows on in match_columns. Length
of each tuple must be same as len(match_columns)
update_columns : arraylike
Columns for which to update values.
update_values : list
List of tuples with values to update in update_columns. Length of
each tuple must be same as len(update_columns)
Returns
-------
None.
"""
assert len(match_columns) == len(match_values[0]),\
'Must have match_value for each match_column.'
assert len(update_columns) == len(update_values[0]),\
'Must have update_value for each update_column.'
assert len(update_values) == len(match_values), \
'update_values and match_values must be same length.'
# string for columns to update (SET)
set_str = ''
for ii in range(len(update_columns)-1):
set_str = set_str + update_columns[ii] + ' = ?, '
set_str = set_str + update_columns[-1] + ' = ?'
# string for columns to match on (WHERE)
where_str = ''
for ii in range(len(match_columns)-1):
where_str = where_str + match_columns[ii] + ' = ? AND '
where_str = where_str + match_columns[-1] + ' = ?'
# sql string
sql = f'UPDATE {table} SET {set_str} WHERE {where_str}'
# assemble values
values = [update_value + match_value for
update_value, match_value in
zip(update_values, match_values)]
# execute sql
self.cursor.executemany(sql, values)
# commit
self.con.commit()
return
[docs]
def measurements_update(self, df_measurements):
"""
Update matching spot measurements in the Measurements table for
matching analyses. Does not attempt to add aliquots, analyses, samples,
or measurements
Parameters
----------
df : pandas.DataFrame
Ideally generated by iolite_tools.measurements2sql()
must have minimally the following columns:
analysis, quantity, mean, measurement_unit, uncertainty,
uncertainty_unit
optionally:
reference_material
Returns
-------
None.
"""
# check for basic column structure
cols_meas = ['analysis', 'quantity', 'mean', 'measurement_unit',
'uncertainty', 'uncertainty_unit', 'reference_material']
assert set(cols_meas) <= set(list(df_measurements)), \
'Missing columns in df_measurements.'
# match measurements
idx = self.matchrows('Measurements',
df_measurements[['analysis', 'quantity']].values,
['analysis', 'quantity'])
# if no matching measurements, stop
if np.sum(idx) == 0:
print('No existing measurements found.')
return
# keep matched measurements
df_measurements = df_measurements.loc[idx]
cols_match = ['analysis', 'quantity']
match_values = df_measurements[cols_match].values.tolist()
cols_update = ['mean', 'measurement_unit', 'uncertainty',
'uncertainty_unit', 'reference_material']
update_values = df_measurements[cols_update].values.tolist()
# update in database
self.update_rows('Measurements',
cols_match,
match_values,
cols_update,
update_values)
print('Updated:\n' +
f'{len(df_measurements)} measurements')
[docs]
def measurements_add(self, df_measurements, df_analyses, df_aliquots,
score_threshold=98):
"""
Add measurements for new analyses, but don't add samples.
Parameters
----------
df_measurements : pandas.DataFrame
DataFrame suitable for reference against the Measurements table
must have have the following columns:
analysis, quantity, mean, measurement_unit, uncertainty,
uncertainty_unit
df_analyses : pandas.DataFrame
DataFrame suitable for reference against the Analyses table.
must have the following columns:
analysis, aliquot, date, insturment, technique
df_aliquots : pandas.DataFrame
DataFrame suitable for reference against the Aliquots table.
must have the following columns:
aliquot, sample, material
score_threshold : int
0-100, scoring threshold for matching sample names. defaults to 98
Returns
-------
None.
"""
# check for basic column structure
cols_meas = ['analysis', 'quantity', 'mean', 'measurement_unit',
'uncertainty', 'uncertainty_unit', 'reference_material']
assert set(cols_meas) <= set(list(df_measurements)), \
'Missing columns in df_measurements.'
cols_analyses = ['analysis', 'aliquot',
'date', 'instrument', 'technique']
assert set(cols_analyses) <= set(list(df_analyses)), \
'Missing columns in df_analyses.'
cols_aliquots = ['aliquot', 'sample', 'material']
assert set(cols_aliquots) <= set(list(df_aliquots)), \
'Missing columns in df_aliquots.'
# make sure that all analyses in df_measurements are also in
# df_analyses
assert set(df_analyses['analysis'].unique().tolist()) == \
set(df_measurements['analysis'].unique().tolist()), \
"""All analyses in df_analyses must be present in
df_measurements, and vice versa."""
# make sure that all aliquots in df_analyses are also in df_aliquots
assert set(df_analyses['aliquot'].unique().tolist()) == \
set(df_aliquots['aliquot'].unique().tolist()), \
"""All aliquots in df_analyses must be present in
df_aliquots, and vice versa."""
# match samples
df_aliquots = self.matchsamples_df(df_aliquots, score_threshold=score_threshold)
# if no matching samples, stop
if len(df_aliquots) == 0:
print('No samples matched.')
return
# remove analyses for aliquots with missing samples
idx = df_analyses['aliquot'].isin(df_aliquots['aliquot'])
df_analyses = df_analyses.loc[idx]
# remove measurements with missing samples
idx = df_measurements['analysis'].isin(df_analyses['analysis'])
df_measurements = df_measurements.loc[idx]
# distinguish between existing and new measurements
idx = self.matchrows('Measurements',
df_measurements[['analysis', 'quantity']].values,
['analysis', 'quantity'])
# if all measurements are already in the database, stop
if np.sum(idx) == len(df_measurements):
print('All measurements already in database, use ' +
'measurements_update() instead.')
return
# ignore existing measurements
if np.sum(idx) > 0:
print('Existing measurements found, ignoring.')
# remove from df_measurements
df_measurements = df_measurements.iloc[~idx]
# keep only corresponding analyses
analyses_unique = df_measurements['analysis'].unique()
idx = df_analyses['analysis'].isin(analyses_unique).values
df_analyses = df_analyses.iloc[idx]
# create necessary aliquots
idx_aliquots = ~self.matchrows('Aliquots',
df_aliquots['aliquot'].values,
'aliquot')
if np.any(idx_aliquots):
cur_values = df_aliquots.loc[idx_aliquots][cols_aliquots].values
self.insert_rows('Aliquots',
cols_aliquots,
cur_values.tolist())
# create necessary analyses
idx_analyses = ~self.matchrows('Analyses',
df_analyses['analysis'].values,
'analysis')
if np.any(idx_analyses):
cur_values = df_analyses.loc[idx_analyses][cols_analyses].values
self.insert_rows('Analyses',
cols_analyses,
cur_values.tolist())
# then add measurements
self.insert_rows('Measurements',
cols_meas,
df_measurements[cols_meas].values.tolist())
print('Added:\n' +
f'{np.sum(idx_aliquots)} aliquots,\n' +
f'{np.sum(idx_analyses)} analyses,\n' +
f'{len(df_measurements)} measurements')
[docs]
def measurements_by_sample(self, samples):
"""
return a DataFrame with all measurements corresponding to the requested
samples
Parameters
----------
samples : str or arraylike
sample or samples for which to retrieve measurements
Returns
-------
df : pandas.DataFrame
all measurements associated with the sample.
"""
samples = np.atleast_1d(samples)
# get aliquots matching samples
if len(samples) == 1:
sql = f'SELECT aliquot, sample FROM Aliquots WHERE sample = "{samples[0]}"'
else:
sql = f'SELECT aliquot, sample FROM Aliquots WHERE sample in {tuple(samples)}'
df_aliquots = pd.read_sql_query(sql, self.con)
aliquots = tuple(df_aliquots['aliquot'].values)
# then get matching analyses and measurements
sql = f'SELECT analysis, aliquot FROM Analyses WHERE aliquot in {aliquots}'
df_analyses = pd.read_sql_query(sql, self.con)
analyses = tuple(df_analyses['analysis'].values)
sql = f'SELECT * FROM Measurements WHERE analysis in {analyses}'
df_measurements = pd.read_sql_query(sql, self.con)
# add aliquot and sample information
df_analyses = df_analyses.merge(df_aliquots,
how='left',
left_on='aliquot',
right_on='aliquot')
df_measurements = df_measurements.merge(df_analyses,
how='left',
left_on='analysis',
right_on='analysis')
return df_measurements
[docs]
def measurements_by_aliquot(self, aliquots):
"""
Return a DataFrame with all measurements corresponding to the requested aliquots.
Parameters
----------
aliquots : str or arraylike
aliquot(s) for which to retrieve measurements
Returns
-------
df : pandas.DataFrame
All measurements associated with the aliquot(s).
"""
aliquots = np.atleast_1d(aliquots)
# get samples matching aliquots
if len(aliquots) == 1:
sql = f'SELECT aliquot, sample FROM Aliquots WHERE aliquot = "{aliquots[0]}"'
else:
sql = f'SELECT aliquot, sample FROM Aliquots WHERE aliquot in {tuple(aliquots)}'
df_aliquots = pd.read_sql_query(sql, self.con)
aliquots = tuple(df_aliquots['aliquot'].values)
# then get matching analyses
if len(aliquots) == 1:
sql = f'SELECT analysis, aliquot FROM Analyses WHERE aliquot = "{aliquots[0]}"'
else:
sql = f'SELECT analysis, aliquot FROM Analyses WHERE aliquot in {tuple(aliquots)}'
df_analyses = pd.read_sql_query(sql, self.con)
analyses = tuple(df_analyses['analysis'].values)
sql = f'SELECT * FROM Measurements WHERE analysis in {analyses}'
df_measurements = pd.read_sql_query(sql, self.con)
# add aliquot and sample information
df_analyses = df_analyses.merge(df_aliquots,
how='left',
left_on='aliquot',
right_on='aliquot')
df_measurements = df_measurements.merge(df_analyses,
how='left',
left_on='analysis',
right_on='analysis')
return df_measurements
[docs]
def get_samples(self):
"""List samples in the database.
Parameters
----------
None.
Returns
-------
samples : array
Array of sample names in the database.
"""
sql = 'SELECT name FROM Samples'
samples = pd.read_sql_query(sql, self.con)
return samples['name'].values
[docs]
def get_aliquots(self):
"""List aliquots in the database.
Parameters
----------
None.
Returns
-------
aliquots : array
Array of aliquot names in the database.
"""
sql = 'SELECT aliquot FROM Aliquots'
aliquots = pd.read_sql_query(sql, self.con)
return aliquots['aliquot'].values
[docs]
def get_aliquots_samples(self):
"""List samples and aliquots in the database.
Parameters
----------
None.
Returns
-------
df : pandas.DataFrame
DataFrame with columns 'sample' and 'aliquot'.
"""
sql = 'SELECT sample, aliquot FROM Aliquots'
df = pd.read_sql_query(sql, self.con)
return df
[docs]
def aliquot_average(df_measurements):
"""
given a dataframe of measurements as generated by :py:meth:`GeochemDB.measurements_by_sample()` or :py:meth:`GeochemDB.measurements_by_aliquots()`, gather measurements by aliquot, averaging duplicate measurements. Assumes that duplicates have the same units.
to do:
implement more robust duplicate checking
responsible uncertainty propagation
Args:
df_measurements (pd.DataFrame): Dataframe of measurements output by :py:meth:`GeochemDB.measurements_by_sample()`.
Returns:
pd.DataFrame: DataFrame with geochemical measurements averaged by aliquot.
"""
df_aliquots = \
df_measurements.pivot_table(columns=['quantity'],
index=['aliquot', 'sample'],
values=['mean', 'uncertainty'],
aggfunc={'mean': 'mean',
'uncertainty': 'max'})
df_aliquots = df_aliquots.reorder_levels([1, 0], axis=1)
return df_aliquots