Commit 1a26d5d7 authored by Martino Bertoni's avatar Martino Bertoni 🌋
Browse files

initial fixes on cc_web pipeline tasks

parent 8f9980e5
......@@ -9,7 +9,6 @@ import uuid
import collections
from chemicalchecker.database import Dataset, Molecule
from chemicalchecker.util import Config
from chemicalchecker.core import ChemicalChecker
from chemicalchecker.core import DataSignature
......@@ -100,7 +99,6 @@ min_popu = np.sum(weights(["A%d" % i for i in [1, 2, 3, 4, 5]], coord_idxs))
# Read generic data
all_datasets = Dataset.get()
config_cc = Config()
cc = ChemicalChecker(CC_ROOT)
......
......@@ -46,14 +46,10 @@ COUNT = "SELECT COUNT(DISTINCT coord) FROM coordinates"
class Coordinates(BaseTask):
def __init__(self, name=None, **params):
args = []
task_id = params.get('task_id', None)
if task_id is None:
params['task_id'] = name
BaseTask.__init__(self, name, **params)
self.DB = params.get('DB', None)
......@@ -65,11 +61,8 @@ class Coordinates(BaseTask):
def run(self):
"""Run the coordinates step."""
all_datasets = Dataset.get()
cc = ChemicalChecker(self.CC_ROOT)
try:
self.__log.info("Creating table")
psql.query(DROP_TABLE, self.DB)
......@@ -77,9 +70,7 @@ class Coordinates(BaseTask):
psql.query(DROP_TABLE_STATS, self.DB)
psql.query(CREATE_TABLE_STATS, self.DB)
# psql.query(CREATE_INDEX, self.config.DB)
except Exception as e:
self.__log.error("Error while creating coordinates tables")
if not self.custom_ready():
raise Exception(e)
......@@ -99,9 +90,7 @@ class Coordinates(BaseTask):
desc = str(ds.description)
psql.query(INSERT % (str(ds.coordinate), name.replace(
"'", "''"), desc.replace("'", "''")), self.DB)
except Exception as e:
self.__log.error("Error while filling coordinates table")
if not self.custom_ready():
raise Exception(e)
......@@ -123,9 +112,7 @@ class Coordinates(BaseTask):
open(os.path.join(proj2.stats_path, "proj_stats.json")).read())
psql.query(INSERT_STATS % (coord, size, d["xlim"][1], d[
"xlim"][0], d["ylim"][1], d["ylim"][0]), self.DB)
except Exception as e:
self.__log.error("Error while filling coordinate_stats table")
if not self.custom_ready():
raise Exception(e)
......
......@@ -4,7 +4,7 @@ import tempfile
from chemicalchecker.util import psql
from chemicalchecker.util.pipeline import BaseTask
from chemicalchecker.util import logged, Config, HPC
from chemicalchecker.util import logged, HPC
# We got these strings by doing: pg_dump -t 'scores' --schema-only mosaic
# -h aloy-dbsrv
......@@ -52,12 +52,9 @@ CHECK = "select distinct(lib) from libraries"
class Libraries(BaseTask):
def __init__(self, name=None, **params):
task_id = params.get('task_id', None)
if task_id is None:
params['task_id'] = name
BaseTask.__init__(self, name, **params)
self.DB = params.get('DB', None)
......@@ -72,25 +69,17 @@ class Libraries(BaseTask):
def run(self):
"""Run the molecular info step."""
config_cc = Config()
cc_config_path = os.environ['CC_CONFIG']
cc_package = os.path.join(config_cc.PATH.CC_REPO, 'package')
script_path = os.path.join(os.path.dirname(
os.path.realpath(__file__)), "scripts/libraries.py")
universe_file = os.path.join(self.tmpdir, "universe.h5")
universe_file = os.path.join(self.cachedir, "universe.h5")
try:
self.__log.info("Creating table")
psql.query(DROP_TABLE, self.DB)
psql.query(CREATE_TABLE, self.DB)
psql.query(DROP_TABLE_DESC, self.DB)
psql.query(CREATE_TABLE_DESC, self.DB)
except Exception as e:
self.__log.error("Error while creating libraries table")
if not self.custom_ready():
raise Exception(e)
......@@ -106,12 +95,9 @@ class Libraries(BaseTask):
self.__log.info("Genretaing libraries for " +
str(len(self.libraries.keys())) + " libraries")
job_path = tempfile.mkdtemp(
prefix='jobs_libraries_', dir=self.tmpdir)
libraries_path = os.path.join(self.tmpdir, "libraries_files")
if not os.path.exists(libraries_path):
original_umask = os.umask(0)
os.makedirs(libraries_path, 0o775)
......@@ -126,12 +112,14 @@ class Libraries(BaseTask):
params["memory"] = 20
params["cpu"] = 10
# job command
singularity_image = config_cc.PATH.SINGULARITY_IMAGE
cc_config_path = self.config.config_path
cc_package = os.path.join(self.config.PATH.CC_REPO, 'package')
singularity_image = self.config.PATH.SINGULARITY_IMAGE
command = "SINGULARITYENV_PYTHONPATH={} SINGULARITYENV_CC_CONFIG={} singularity exec {} python {} <TASK_ID> <FILE> {} {} {} {}"
command = command.format(
cc_package, cc_config_path, singularity_image, script_path, universe_file, libraries_path, self.DB, self.CC_ROOT)
# submit jobs
cluster = HPC.from_config(config_cc)
cluster = HPC.from_config(self.config)
jobs = cluster.submitMultiJob(command, **params)
try:
......@@ -151,7 +139,6 @@ class Libraries(BaseTask):
shutil.rmtree(job_path)
self.mark_ready()
except Exception as e:
self.__log.error("Error while checking libraries table")
if not self.custom_ready():
raise Exception(e)
......@@ -161,5 +148,4 @@ class Libraries(BaseTask):
def execute(self, context):
"""Run the molprops step."""
self.tmpdir = context['params']['tmpdir']
self.run()
......@@ -8,7 +8,7 @@ from scipy.stats import rankdata
from chemicalchecker.util import psql
from chemicalchecker.util.pipeline import BaseTask
from chemicalchecker.util import logged, Config, HPC
from chemicalchecker.util import logged, HPC
# We got these strings by doing: pg_dump -t 'scores' --schema-only mosaic
......@@ -40,12 +40,9 @@ COUNT = "SELECT COUNT(*) FROM molecular_info"
class MolecularInfo(BaseTask):
def __init__(self, name=None, **params):
task_id = params.get('task_id', None)
if task_id is None:
params['task_id'] = name
BaseTask.__init__(self, name, **params)
self.DB = params.get('DB', None)
......@@ -57,11 +54,6 @@ class MolecularInfo(BaseTask):
def run(self):
"""Run the molecular info step."""
config_cc = Config()
cc_config_path = os.environ['CC_CONFIG']
cc_package = os.path.join(config_cc.PATH.CC_REPO, 'package')
script_path = os.path.join(os.path.dirname(
os.path.realpath(__file__)), "scripts/scores.py")
......@@ -69,32 +61,25 @@ class MolecularInfo(BaseTask):
self.__log.info("Creating table")
psql.query(DROP_TABLE, self.DB)
psql.query(CREATE_TABLE, self.DB)
except Exception as e:
if not self.custom_ready():
raise Exception(e)
else:
self.__log.error(e)
return
universe_file = os.path.join(self.tmpdir, "universe.h5")
universe_file = os.path.join(self.cachedir, "universe.h5")
consensus_file = os.path.join(os.path.dirname(
os.path.realpath(__file__)), "data/consensus.h5")
with h5py.File(universe_file) as h5:
keys = h5["keys"][:]
datasize = keys.shape[0]
self.__log.info("Genretaing molecular info for " +
str(keys.shape[0]) + " molecules")
keys.sort()
job_path = tempfile.mkdtemp(
prefix='jobs_molinfo_', dir=self.tmpdir)
data_files_path = tempfile.mkdtemp(
prefix='molinfo_data_', dir=self.tmpdir)
......@@ -107,12 +92,14 @@ class MolecularInfo(BaseTask):
params["memory"] = 4
params["cpu"] = 1
# job command
singularity_image = config_cc.PATH.SINGULARITY_IMAGE
cc_config_path = self.config.config_path
cc_package = os.path.join(self.config.PATH.CC_REPO, 'package')
singularity_image = self.config.PATH.SINGULARITY_IMAGE
command = "OMP_NUM_THREADS=1 SINGULARITYENV_PYTHONPATH={} SINGULARITYENV_CC_CONFIG={} singularity exec {} python {} <TASK_ID> <FILE> {} {} {}"
command = command.format(
cc_package, cc_config_path, singularity_image, script_path, consensus_file, data_files_path, self.CC_ROOT)
# submit jobs
cluster = HPC.from_config(config_cc)
cluster = HPC.from_config(self.config)
jobs = cluster.submitMultiJob(command, **params)
del keys
......@@ -134,23 +121,16 @@ class MolecularInfo(BaseTask):
"Generated molecular info does not include all universe molecules (%d/%d)" % (V.shape[0], datasize))
# Singularity
V[:, 1] = rankdata(-V[:, 1]) / V.shape[0]
# Mappability
V[:, 2] = rankdata(V[:, 2]) / V.shape[0]
index = range(0, datasize)
for i in range(0, datasize, 1000):
sl = slice(i, i + 1000)
S = ["('%s', '%s', %.3f, %.3f, %.3f, %.3f, %.3f, %.3f)" %
(iks[i], formula[i], V[i, 0], V[i, 1], V[i, 2], V[i, 3], V[i, 4], V[i, 5]) for i in index[sl]]
try:
psql.query(INSERT % ",".join(S), self.DB)
except Exception as e:
......@@ -173,7 +153,6 @@ class MolecularInfo(BaseTask):
shutil.rmtree(data_files_path)
self.mark_ready()
except Exception as e:
if not self.custom_ready():
raise Exception(e)
else:
......@@ -182,5 +161,4 @@ class MolecularInfo(BaseTask):
def execute(self, context):
"""Run the molprops step."""
self.tmpdir = context['params']['tmpdir']
self.run()
......@@ -7,19 +7,16 @@ from shutil import copyfile
from chemicalchecker.database import Dataset
from chemicalchecker.core import ChemicalChecker
from chemicalchecker.util.pipeline import BaseTask
from chemicalchecker.util import logged, Config, HPC
from chemicalchecker.util import logged, HPC
@logged
class Plots(BaseTask):
def __init__(self, name=None, **params):
task_id = params.get('task_id', None)
if task_id is None:
params['task_id'] = name
BaseTask.__init__(self, name, **params)
self.DB = params.get('DB', None)
......@@ -34,48 +31,32 @@ class Plots(BaseTask):
def run(self):
"""Run the coordinates step."""
all_datasets = Dataset.get()
config_cc = Config()
cc_config_path = os.environ['CC_CONFIG']
cc_package = os.path.join(config_cc.PATH.CC_REPO, 'package')
script_path = os.path.join(os.path.dirname(
os.path.realpath(__file__)), "scripts/make_plots.py")
cc = ChemicalChecker(self.CC_ROOT)
self.__log.info("Copying projections plots")
plots_dir = os.path.join(self.CC_ROOT, "plots_web")
if not os.path.exists(plots_dir):
os.mkdir(plots_dir)
for ds in all_datasets:
if not ds.exemplary:
continue
proj2 = cc.get_signature('proj2', 'reference', ds.dataset_code)
src_plot_file = os.path.join(proj2.stats_path, "largevis.png")
dest_plot_file = os.path.join(
plots_dir, ds.coordinate + "_largevis.png")
if not os.path.exists(src_plot_file):
raise Exception("Projection plot for dataset " +
ds.dataset_code + " is not available.")
copyfile(src_plot_file, dest_plot_file)
self.__log.info("Finding missing molecule plots")
universe_file = os.path.join(self.tmpdir, "universe.h5")
universe_file = os.path.join(self.cachedir, "universe.h5")
with h5py.File(universe_file, 'r') as h5:
keys = h5["keys"][:]
datasize = keys.shape[0]
keys.sort()
job_path = tempfile.mkdtemp(
......@@ -88,12 +69,14 @@ class Plots(BaseTask):
params["elements"] = keys
params["wait"] = True
# job command
singularity_image = config_cc.PATH.SINGULARITY_IMAGE
cc_config_path = self.config.config_path
cc_package = os.path.join(self.config.PATH.CC_REPO, 'package')
singularity_image = self.config.PATH.SINGULARITY_IMAGE
command = "SINGULARITYENV_PYTHONPATH={} SINGULARITYENV_CC_CONFIG={} singularity exec {} python {} <TASK_ID> <FILE> {}"
command = command.format(
cc_package, cc_config_path, singularity_image, script_path, self.MOLECULES_PATH)
# submit jobs
cluster = HPC.from_config(config_cc)
cluster = HPC.from_config(self.config)
cluster.submitMultiJob(command, **params)
if cluster.status() == HPC.READY:
......@@ -109,5 +92,4 @@ class Plots(BaseTask):
def execute(self, context):
"""Run the molprops step."""
self.tmpdir = context['params']['tmpdir']
self.run()
......@@ -21,12 +21,9 @@ COUNT = "SELECT COUNT(DISTINCT inchikey) FROM projections"
class Projections(BaseTask):
def __init__(self, name=None, **params):
task_id = params.get('task_id', None)
if task_id is None:
params['task_id'] = name
BaseTask.__init__(self, name, **params)
self.DB = params.get('DB', None)
......@@ -38,38 +35,27 @@ class Projections(BaseTask):
def run(self):
"""Run the projections step."""
all_datasets = Dataset.get()
cc = ChemicalChecker(self.CC_ROOT)
map_coord_code = {}
db_name = self.DB
for ds in all_datasets:
if not ds.exemplary:
continue
map_coord_code[ds.coordinate] = ds.dataset_code
spaces = sorted(map_coord_code.keys())
try:
self.__log.info("Creating table")
psql.query(DROP_TABLE, self.DB)
S = "CREATE TABLE projections (inchikey TEXT, "
for coord in spaces:
S += "%s_idx INTEGER, " % coord
S += "%s_x FLOAT, " % coord
S += "%s_y FLOAT, " % coord
S += "PRIMARY KEY (inchikey) );\n"
psql.query(S, db_name)
except Exception as e:
self.__log.error("Error while creating ptojections table")
if not self.custom_ready():
raise Exception(e)
......@@ -115,9 +101,7 @@ class Projections(BaseTask):
try:
self.__log.info("Creating indexes for table")
S = ''
for coord in spaces:
coord = coord.lower()
S += "CREATE UNIQUE INDEX %s_idx_projections_idx ON projections (%s_idx);\n" % (
......@@ -134,7 +118,6 @@ class Projections(BaseTask):
else:
self.mark_ready()
except Exception as e:
self.__log.error(
"Error while checking & creating indexes in projections table")
if not self.custom_ready():
......@@ -149,5 +132,4 @@ class Projections(BaseTask):
def execute(self, context):
"""Run the molprops step."""
self.tmpdir = context['params']['tmpdir']
self.run()
......@@ -4,8 +4,8 @@ import shutil
import tempfile
from chemicalchecker.util import psql
from chemicalchecker.util import logged, HPC
from chemicalchecker.util.pipeline import BaseTask
from chemicalchecker.util import logged, Config, HPC
# We got these strings by doing: pg_dump -t 'pubchem' --schema-only mosaic
# -h aloy-dbsrv
......@@ -46,12 +46,9 @@ COUNT = "SELECT COUNT(DISTINCT inchikey) FROM pubchem"
class Pubchem(BaseTask):
def __init__(self, name=None, **params):
task_id = params.get('task_id', None)
if task_id is None:
params['task_id'] = name
BaseTask.__init__(self, name, **params)
self.DB = params.get('DB', None)
......@@ -63,11 +60,6 @@ class Pubchem(BaseTask):
def run(self):
"""Run the pubchem step."""
config_cc = Config()
cc_config_path = os.environ['CC_CONFIG']
cc_package = os.path.join(config_cc.PATH.CC_REPO, 'package')
script_path = os.path.join(os.path.dirname(
os.path.realpath(__file__)), "scripts/load_pubchem.py")
......@@ -76,18 +68,15 @@ class Pubchem(BaseTask):
psql.query(DROP_TABLE, self.DB)
psql.query(CREATE_TABLE, self.DB)
# psql.query(CREATE_INDEX, self.DB)
except Exception as e:
self.__log.error(e)
universe_file = os.path.join(self.tmpdir, "universe.h5")
universe_file = os.path.join(self.cachedir, "universe.h5")
with h5py.File(universe_file, 'r') as h5:
data_size = h5["keys"].shape[0]
self.__log.info("Genretaing pubchem data for " +
str(data_size) + " molecules")
self.__log.info("Genretaing pubchem data for %s molecules",
len(data_size))
chunk_size = 1000
chunks = list()
for i in range(0, data_size, chunk_size):
......@@ -103,12 +92,16 @@ class Pubchem(BaseTask):
params["elements"] = chunks
params["wait"] = True
# job command
singularity_image = Config().PATH.SINGULARITY_IMAGE
command = "SINGULARITYENV_PYTHONPATH={} SINGULARITYENV_CC_CONFIG={} singularity exec {} python {} <TASK_ID> <FILE> {} {} {}"
command = command.format(cc_package, cc_config_path, singularity_image,
script_path, universe_file, self.OLD_DB, self.DB)
cc_config_path = self.config.config_path
cc_package = os.path.join(self.config.PATH.CC_REPO, 'package')
singularity_image = self.config.PATH.SINGULARITY_IMAGE
command = "SINGULARITYENV_PYTHONPATH={} SINGULARITYENV_CC_CONFIG={}" \
" singularity exec {} python {} <TASK_ID> <FILE> {} {} {}"
command = command.format(
cc_package, cc_config_path, singularity_image,
script_path, universe_file, self.OLD_DB, self.DB)
# submit jobs
cluster = HPC.from_config(config_cc)
cluster = HPC.from_config(self.config)
jobs = cluster.submitMultiJob(command, **params)
try:
......@@ -117,21 +110,21 @@ class Pubchem(BaseTask):
if int(count[0][0]) != data_size:
if not self.custom_ready():
raise Exception(
"Not all universe keys were added to Pubchem (%d/%d)" % (int(count[0][0]), data_size))
"Not all universe keys were added to Pubchem (%d/%d)" %
(int(count[0][0]), data_size))
else:
self.__log.error(
"Not all universe keys were added to Pubchem (%d/%d)" % (int(count[0][0]), data_size))
"Not all universe keys were added to Pubchem (%d/%d)" %
(int(count[0][0]), data_size))
else:
self.__log.info("Indexing table")
psql.query(CREATE_INDEX, self.DB)
shutil.rmtree(job_path)
self.mark_ready()
except Exception as e:
self.__log.error(e)
def execute(self, context):
"""Run the molprops step."""
self.tmpdir = context['params']['tmpdir']
self.run()
......@@ -49,12 +49,9 @@ ref_spaces = ['B1.001', 'B2.001', 'B4.001', 'B5.001']
class ShowTargets(BaseTask):
def __init__(self, name=None, **params):
task_id = params.get('task_id', None)
if task_id is None:
params['task_id'] = name
BaseTask.__init__(self, name, **params)
self.DB = params.get('DB', None)
......@@ -69,9 +66,7 @@ class ShowTargets(BaseTask):
def run(self):
"""Run the show targets step."""
database_name = self.DB
try:
self.__log.info("Creating table")
psql.query(DROP_TABLE, database_name)
......@@ -79,9 +74,7 @@ class ShowTargets(BaseTask):
psql.query(DROP_TABLE_DESC, database_name)
psql.query(CREATE_TABLE_DESC, database_name)
# psql.query(CREATE_INDEX, database_name)
except Exception as e:
self.__log.error("Error while creating tables")
if not self.custom_ready():
raise Exception(e)
......@@ -90,49 +83,44 @@ class ShowTargets(BaseTask):
return