Source code for dance.datasets.singlemodality

import collections
import os
import os.path as osp
import pprint
import shutil
import sys
from glob import glob

import anndata as ad
import h5py
import numpy as np
import pandas as pd
import scanpy as sc
from scipy.sparse import csr_matrix
from sklearn.model_selection import train_test_split

from dance import logger
from dance.data import Data
from dance.datasets.base import BaseDataset
from dance.metadata.imputation import IMPUTATION_DATASET_TO_FILE
from dance.registry import register_dataset
from dance.settings import METADIR
from dance.typing import Dict, List, Optional, Set, Tuple
from dance.utils.download import download_file, download_unzip
from dance.utils.io import load_data_url_dict_from_csv
from dance.utils.preprocess import cell_label_to_df


def _load_scdeepsort_metadata():
    path = METADIR / "scdeepsort.csv"
    logger.debug(f"Loading scdeepsort metadata from {path}")
    scdeepsort_meta_df = pd.read_csv(path).astype(str)

    bench_url_dict, available_data = {}, []
    for _, i in scdeepsort_meta_df.iterrows():
        bench_url_dict[i["celltype_fname"]] = i["celltype_url"]
        bench_url_dict[i["data_fname"]] = i["data_url"]
        available_data.append({key: i[key] for key in ("split", "species", "tissue", "dataset")})

    return bench_url_dict, available_data


[docs]@register_dataset("singlemodality") class CellTypeAnnotationDataset(BaseDataset): _DISPLAY_ATTRS = ("species", "tissue", "train_dataset", "test_dataset") ALL_URL_DICT: Dict[str, str] = { "train_human_cell_atlas": "https://www.dropbox.com/s/1itq1pokplbqxhx?dl=1", "test_human_test_data": "https://www.dropbox.com/s/gpxjnnvwyblv3xb?dl=1", "train_mouse_cell_atlas": "https://www.dropbox.com/s/ng8d3eujfah9ppl?dl=1", "test_mouse_test_data": "https://www.dropbox.com/s/pkr28czk5g3al2p?dl=1", } # yapf: disable BENCH_URL_DICT, AVAILABLE_DATA = _load_scdeepsort_metadata() def __init__(self, full_download=False, train_dataset=None, test_dataset=None, species=None, tissue=None, valid_dataset=None, train_dir="train", test_dir="test", valid_dir="valid", map_path="map", data_dir="./", train_as_valid=False, val_size=0.2, test_size=0.2, filetype: str = "csv"): super().__init__(data_dir, full_download) self.data_dir = data_dir self.train_dataset = train_dataset self.test_dataset = test_dataset self.species = species self.tissue = tissue self.train_dir = train_dir self.test_dir = test_dir self.valid_dir = valid_dir self.map_path = map_path self.train_as_valid = train_as_valid self.bench_url_dict = self.BENCH_URL_DICT.copy() self.available_data = self.AVAILABLE_DATA.copy() self.valid_dataset = valid_dataset if valid_dataset is None and self.train_as_valid: self.valid_dataset = train_dataset self.train2valid() self.val_size = val_size self.test_size = test_size self.filetype = filetype def train2valid(self): logger.info("Copy train_dataset and use it as valid_dataset") temp_ava_data = self.available_data.copy() temp_ben_url_dict = self.bench_url_dict.copy() for data in self.available_data: if data["split"] == "train": end_data = data.copy() end_data['split'] = 'valid' temp_ava_data.append(end_data) for k, v in self.bench_url_dict.items(): if k.startswith("train"): temp_ben_url_dict[k.replace("train", "valid", 1)] = v self.available_data = temp_ava_data self.bench_url_dict = temp_ben_url_dict
[docs] def download_all(self): if self.is_complete(): return # Download and overwrite for name, url in self.ALL_URL_DICT.items(): download_unzip(url, self.data_dir) parts = name.split("_") # [train|test]_{species}_[cell|test]_atlas download_path = osp.join(self.data_dir, "_".join(parts[1:])) move_path = osp.join(self.data_dir, *parts[:2]) os.makedirs(osp.dirname(move_path), exist_ok=True) try: shutil.rmtree(move_path) except FileNotFoundError: pass os.rename(download_path, move_path)
def get_all_filenames(self, feat_suffix: str = "data", label_suffix: str = "celltype"): filenames = [] for id in self.train_dataset + (self.test_dataset if self.test_dataset is not None else []) + (self.valid_dataset if self.valid_dataset is not None else []): filenames.append(f"{self.species}_{self.tissue}{id}_{feat_suffix}.{self.filetype}") filenames.append(f"{self.species}_{self.tissue}{id}_{label_suffix}.{self.filetype}") return filenames
[docs] def download(self, download_map=True): if self.is_complete(): return filenames = self.get_all_filenames() # Download training and testing data for name, url in self.bench_url_dict.items(): parts = name.split("_") # [train|test]_{species}_{tissue}{id}_[celltype|data].csv filename = "_".join(parts[1:]) if filename in filenames: filepath = osp.join(self.data_dir, *parts[:2], filename) download_file(url, filepath) if download_map: # Download mapping data download_unzip("https://www.dropbox.com/sh/hw1189sgm0kfrts/AAAapYOblLApqygZ-lGo_70-a?dl=1", osp.join(self.data_dir, "map"))
[docs] def is_complete_all(self): """Check if data is complete.""" check = [ osp.join(self.data_dir, "train"), osp.join(self.data_dir, "test"), osp.join(self.data_dir, "pretrained") ] for i in check: if not osp.exists(i): logger.info(f"file {i} doesn't exist") return False return True
[docs] def is_complete(self): """Check if benchmarking data is complete.""" for name in self.bench_url_dict: if any(i not in name for i in (self.species, self.tissue)): continue filename = name[name.find(self.species):] file_i = osp.join(self.data_dir, *(name.split("_"))[:2], filename) if not osp.exists(file_i): logger.info(file_i) logger.info(f"file {filename} doesn't exist") return False # check maps map_check = [ osp.join(self.data_dir, "map", "mouse", "map.xlsx"), osp.join(self.data_dir, "map", "human", "map.xlsx"), osp.join(self.data_dir, "map", "celltype2subtype.xlsx") ] for file in map_check: if not osp.exists(file): logger.info(f"file {name} doesn't exist") return False return True
def _load_raw_data(self, ct_col: str = "Cell_type") -> Tuple[ad.AnnData, List[Set[str]], List[str], int]: species = self.species tissue = self.tissue valid_feat = None if self.test_dataset is None or self.test_dataset == []: return self._load_raw_data_single_h5ad() if self.valid_dataset is not None: train_dataset_ids = self.train_dataset test_dataset_ids = self.test_dataset valid_dataset_ids = self.valid_dataset data_dir = self.data_dir train_dir = osp.join(data_dir, self.train_dir) test_dir = osp.join(data_dir, self.test_dir) valid_dir = osp.join(data_dir, self.valid_dir) map_path = osp.join(data_dir, self.map_path, self.species) # Load raw data train_feat_paths, train_label_paths = self._get_data_paths(train_dir, species, tissue, train_dataset_ids) valid_feat_paths, valid_label_paths = self._get_data_paths(valid_dir, species, tissue, valid_dataset_ids) test_feat_paths, test_label_paths = self._get_data_paths(test_dir, species, tissue, test_dataset_ids) train_feat, valid_feat, test_feat = (self._load_dfs(paths, transpose=True) for paths in (train_feat_paths, valid_feat_paths, test_feat_paths)) train_label, valid_label, test_label = (self._load_dfs(paths) for paths in (train_label_paths, valid_label_paths, test_label_paths)) else: train_dataset_ids = self.train_dataset test_dataset_ids = self.test_dataset data_dir = self.data_dir train_dir = osp.join(data_dir, self.train_dir) test_dir = osp.join(data_dir, self.test_dir) map_path = osp.join(data_dir, self.map_path, self.species) train_feat_paths, train_label_paths = self._get_data_paths(train_dir, species, tissue, train_dataset_ids) test_feat_paths, test_label_paths = self._get_data_paths(test_dir, species, tissue, test_dataset_ids) train_feat, test_feat = (self._load_dfs(paths, transpose=True) for paths in (train_feat_paths, test_feat_paths)) train_label, test_label = (self._load_dfs(paths) for paths in (train_label_paths, test_label_paths)) if self.val_size > 0: train_feat, valid_feat, train_label, valid_label = train_test_split(train_feat, train_label, test_size=self.val_size) if valid_feat is not None: # Combine features (only use features that are present in the training data) train_size = train_feat.shape[0] valid_size = valid_feat.shape[0] feat_df = pd.concat( train_feat.align(valid_feat, axis=1, join="left", fill_value=0) + train_feat.align(test_feat, axis=1, join="left", fill_value=0)[1:]).fillna(0) adata = ad.AnnData(feat_df, dtype=np.float32) # Convert cell type labels and map test cell type names to train cell_types = set(train_label[ct_col].unique()) idx_to_label = sorted(cell_types) cell_type_mappings: Dict[str, Set[str]] = self.get_map_dict(map_path, tissue) train_labels, valid_labels, test_labels = train_label[ct_col].tolist(), [], [] for i in valid_label[ct_col]: valid_labels.append(i if i in cell_types else cell_type_mappings.get(i)) for i in test_label[ct_col]: test_labels.append(i if i in cell_types else cell_type_mappings.get(i)) labels: List[Set[str]] = train_labels + valid_labels + test_labels logger.debug("Mapped valid cell-types:") for i, j, k in zip(valid_label.index, valid_label[ct_col], valid_labels): logger.debug(f"{i}:{j}\t-> {k}") logger.debug("Mapped test cell-types:") for i, j, k in zip(test_label.index, test_label[ct_col], test_labels): logger.debug(f"{i}:{j}\t-> {k}") logger.info(f"Loaded expression data: {adata}") logger.info(f"Number of training samples: {train_feat.shape[0]:,}") logger.info(f"Number of valid samples: {valid_feat.shape[0]:,}") logger.info(f"Number of testing samples: {test_feat.shape[0]:,}") logger.info(f"Cell-types (n={len(idx_to_label)}):\n{pprint.pformat(idx_to_label)}") return adata, labels, idx_to_label, train_size, valid_size else: # Combine features (only use features that are present in the training data) train_size = train_feat.shape[0] feat_df = pd.concat(train_feat.align(test_feat, axis=1, join="left", fill_value=0)).fillna(0) adata = ad.AnnData(feat_df, dtype=np.float32) # Convert cell type labels and map test cell type names to train cell_types = set(train_label[ct_col].unique()) idx_to_label = sorted(cell_types) cell_type_mappings: Dict[str, Set[str]] = self.get_map_dict(map_path, tissue) train_labels, test_labels = train_label[ct_col].tolist(), [] for i in test_label[ct_col]: test_labels.append(i if i in cell_types else cell_type_mappings.get(i)) labels: List[Set[str]] = train_labels + test_labels logger.debug("Mapped test cell-types:") for i, j, k in zip(test_label.index, test_label[ct_col], test_labels): logger.debug(f"{i}:{j}\t-> {k}") logger.info(f"Loaded expression data: {adata}") logger.info(f"Number of training samples: {train_feat.shape[0]:,}") logger.info(f"Number of testing samples: {test_feat.shape[0]:,}") logger.info(f"Cell-types (n={len(idx_to_label)}):\n{pprint.pformat(idx_to_label)}") return adata, labels, idx_to_label, train_size, 0 def _load_raw_data_single_h5ad(self, ct_col: str = "cell_type") -> Tuple[ad.AnnData, List[Set[str]], List[str], int]: species = self.species tissue = self.tissue valid_feat = None data_dir = self.data_dir train_dir = osp.join(data_dir, self.train_dir) data_path = osp.join(train_dir, species, f"{species}_{tissue}{self.train_dataset[0]}_data.h5ad") adata = sc.read_h5ad(data_path) map_path = osp.join(data_dir, self.map_path, self.species) X_train_temp, X_test = train_test_split(adata, test_size=0.2) X_train, X_val = train_test_split(X_train_temp, test_size=0.25) train_feat, valid_feat, test_feat = X_train.X, X_val.X, X_test.X train_label, valid_label, test_label = X_train.obs, X_val.obs, X_test.obs if valid_feat is not None: # Combine features (only use features that are present in the training data) train_size = train_feat.shape[0] valid_size = valid_feat.shape[0] # Convert cell type labels and map test cell type names to train cell_types = set(train_label[ct_col].unique()) idx_to_label = sorted(cell_types) cell_type_mappings: Dict[str, Set[str]] = self.get_map_dict(map_path, tissue) train_labels, valid_labels, test_labels = train_label[ct_col].tolist(), [], [] for i in valid_label[ct_col]: valid_labels.append(i if i in cell_types else cell_type_mappings.get(i)) for i in test_label[ct_col]: test_labels.append(i if i in cell_types else cell_type_mappings.get(i)) labels: List[Set[str]] = train_labels + valid_labels + test_labels logger.debug("Mapped valid cell-types:") for i, j, k in zip(valid_label.index, valid_label[ct_col], valid_labels): logger.debug(f"{i}:{j}\t-> {k}") logger.debug("Mapped test cell-types:") for i, j, k in zip(test_label.index, test_label[ct_col], test_labels): logger.debug(f"{i}:{j}\t-> {k}") logger.info(f"Loaded expression data: {adata}") logger.info(f"Number of training samples: {train_feat.shape[0]:,}") logger.info(f"Number of valid samples: {valid_feat.shape[0]:,}") logger.info(f"Number of testing samples: {test_feat.shape[0]:,}") logger.info(f"Cell-types (n={len(idx_to_label)}):\n{pprint.pformat(idx_to_label)}") return adata, labels, idx_to_label, train_size, valid_size else: # Combine features (only use features that are present in the training data) train_size = train_feat.shape[0] cell_types = set(train_label[ct_col].unique()) idx_to_label = sorted(cell_types) cell_type_mappings: Dict[str, Set[str]] = self.get_map_dict(map_path, tissue) train_labels, test_labels = train_label[ct_col].tolist(), [] for i in test_label[ct_col]: test_labels.append(i if i in cell_types else cell_type_mappings.get(i)) labels: List[Set[str]] = train_labels + test_labels logger.debug("Mapped test cell-types:") for i, j, k in zip(test_label.index, test_label[ct_col], test_labels): logger.debug(f"{i}:{j}\t-> {k}") logger.info(f"Loaded expression data: {adata}") logger.info(f"Number of training samples: {train_feat.shape[0]:,}") logger.info(f"Number of testing samples: {test_feat.shape[0]:,}") logger.info(f"Cell-types (n={len(idx_to_label)}):\n{pprint.pformat(idx_to_label)}") return adata, labels, idx_to_label, train_size, 0 def _raw_to_dance(self, raw_data): adata, cell_labels, idx_to_label, train_size, valid_size = raw_data adata.obsm["cell_type"] = cell_label_to_df(cell_labels, idx_to_label, index=adata.obs.index) data = Data(adata, train_size=train_size, val_size=valid_size) return data @staticmethod def _get_data_paths(data_dir: str, species: str, tissue: str, dataset_ids: List[str], *, filetype: str = "csv", feat_suffix: str = "data", label_suffix: str = "celltype") -> Tuple[List[str], List[str]]: feat_paths, label_paths = [], [] for path_list, suffix in zip((feat_paths, label_paths), (feat_suffix, label_suffix)): for i in dataset_ids: path_list.append(osp.join(data_dir, species, f"{species}_{tissue}{i}_{suffix}.{filetype}")) return feat_paths, label_paths @staticmethod def _load_dfs(paths: List[str], *, index_col: Optional[int] = 0, transpose: bool = False, **kwargs): dfs = [] for path in paths: logger.info(f"Loading data from {path}") # TODO: load feat as csr df = pd.read_csv(path, index_col=index_col, **kwargs) # Labels: cell x cell-type; Data: feature x cell (need to transpose) df = df.T if transpose else df # Add dataset info to index dataset_name = "_".join(osp.basename(path).split("_")[:-1]) df.index = dataset_name + "_" + df.index.astype(str) dfs.append(df) combined_df = pd.concat(dfs) return combined_df
[docs] @staticmethod def get_map_dict(map_file_path: str, tissue: str) -> Dict[str, Set[str]]: """Load cell-type mappings. Parameters ---------- map_file_path Path to the mapping file. tissue Tissue of interest. Notes ----- Merge mapping across all test sets for the required tissue. """ map_df = pd.read_excel(osp.join(map_file_path, "map.xlsx")) map_dict = collections.defaultdict(set) for _, row in map_df.iterrows(): if row["Tissue"] == tissue: map_dict[row["Celltype"]].add(row["Training dataset cell type"]) return dict(map_dict)
[docs]@register_dataset("singlemodality") class ClusteringDataset(BaseDataset): """Data downloading and loading for clustering. Parameters ---------- data_dir Path to store datasets. dataset Choice of dataset. Available options are '10X_PBMC', 'mouse_bladder_cell', 'mouse_ES_cell', 'worm_neuron_cell'. """ URL_DICT = load_data_url_dict_from_csv(METADIR / "clustering.csv") AVAILABLE_DATA = sorted(URL_DICT) def __init__(self, data_dir: str = "./data", dataset: str = "mouse_bladder_cell"): super().__init__(data_dir, full_download=False) self.data_dir = data_dir self.dataset = dataset @property def data_path(self) -> str: return osp.join(self.data_dir, f"{self.dataset}.h5")
[docs] def download(self): download_file(self.URL_DICT[self.dataset], self.data_path)
[docs] def is_complete(self): return osp.exists(self.data_path)
def _load_raw_data(self) -> Tuple[ad.AnnData, np.ndarray]: with open(self.data_path, "rb") as f_o: with h5py.File(f_o, "r") as f: x = np.array(f["X"]) y = np.array(f["Y"]) adata = ad.AnnData(x, dtype=np.float32) return adata, y def _raw_to_dance(self, raw_data: Tuple[ad.AnnData, np.ndarray]): adata, y = raw_data adata.obsm["Group"] = y data = Data(adata, train_size="all") return data
[docs]@register_dataset("singlemodality") class ImputationDataset(BaseDataset): URL = load_data_url_dict_from_csv(METADIR / "imputation.csv") DATASET_TO_FILE =IMPUTATION_DATASET_TO_FILE # yapf: disable AVAILABLE_DATA = sorted(URL) def __init__(self, data_dir="data", dataset="human_stemcell", train_size=0.1): super().__init__(data_dir, full_download=False) self.data_dir = data_dir self.dataset = dataset self.train_size = train_size
[docs] def download(self): gene_class = [ "pbmc_data", "mouse_brain_data", "mouse_embryo_data", "human_stemcell_data", "human_breast_TGFb_data", "human_breast_Dox_data", "human_melanoma_data", "mouse_visual_data" ] assert self.dataset in gene_class class_name = self.dataset file_name = { "pbmc_data": "5k.zip?dl=0", "mouse_embryo_data": "GSE65525.zip?dl=0", "mouse_brain_data": "neuron_10k.zip?dl=0", "human_stemcell_data": "GSE75748.zip?dl=0", "human_breast_TGFb_data": "GSE114397.zip?dl=0", "human_breast_Dox_data": "GSM3141014.zip?dl=0", "human_melanoma_data": "human_melanoma_data.zip?dl=0", "mouse_visual_data": "mouse_visual_data.zip?dl=0" } dl_files = { "pbmc_data": "5k_*", "mouse_embryo_data": "GSE65525", "mouse_brain_data": "neuron*", "human_stemcell_data": "GSE75748", "human_breast_TGFb_data": "GSE11*", "human_breast_Dox_data": "GSM31*", "human_melanoma_data": "human*", "mouse_visual_data": "GSM27*" } if sys.platform != 'win32': if not osp.exists(self.data_dir): os.system("mkdir " + self.data_dir) if not osp.exists(self.data_dir + "/train"): os.system("mkdir " + self.data_dir + "/train") if not any(map(osp.exists, glob(osp.join(self.data_dir, "train", class_name, dl_files[class_name])))): os.system("mkdir " + self.data_dir + "/train/" + class_name) os.system("wget " + self.URL[class_name]) # assumes linux... mac needs to install logger.info(self.URL[class_name]) os.system("unzip " + file_name[class_name]) os.system("rm " + file_name[class_name]) os.system("mv " + dl_files[class_name] + " " + self.data_dir + "/train/" + class_name + "/") os.system("cp -r " + self.data_dir + "/train/ " + self.data_dir + "/test") else: if not osp.exists(self.data_dir): os.system("mkdir " + self.data_dir) if not osp.exists(self.data_dir + "/train"): os.mkdir(self.data_dir + "/train") if not any(map(osp.exists, glob(osp.join(self.data_dir, "train", class_name, dl_files[class_name])))): os.mkdir(self.data_dir + "/train/" + class_name) os.system("curl " + self.URL[class_name]) os.system("tar -xf " + file_name[class_name]) os.system("del -R " + file_name[class_name]) os.system("move " + dl_files[class_name] + " " + self.data_dir + "/train/" + class_name + "/") os.system("copy /r " + self.data_dir + "/train/ " + self.data_dir + "/test")
[docs] def is_complete(self): # check whether data is complete or not check = [osp.join(self.data_dir, i, self.dataset) for i in ("train", "test")] for i in check: if not osp.exists(i): logger.info("file {} doesn't exist".format(i)) return False return True
def _load_raw_data(self) -> ad.AnnData: if self.dataset[-5:] != '_data': dataset = self.dataset + '_data' else: dataset = self.dataset if self.dataset == 'mouse_embryo' or self.dataset == 'mouse_embryo_data' or self.dataset == "mouse_visual_data": for i in range(len(self.DATASET_TO_FILE[dataset])): fname = self.DATASET_TO_FILE[dataset][i] data_path = f'{self.data_dir}/train/{dataset}/{fname}' if i == 0: counts = pd.read_csv(data_path, header=None, index_col=0) time = pd.Series(np.zeros(counts.shape[1])) else: x = pd.read_csv(data_path, header=None, index_col=0) time = pd.concat([time, pd.Series(np.zeros(x.shape[1])) + i]) counts = pd.concat([counts, x], axis=1) time = pd.DataFrame(time) time.columns = ['time'] counts = counts.T counts.index = [i for i in range(counts.shape[0])] adata = ad.AnnData(csr_matrix(counts.values)) adata.var_names = counts.columns.tolist() adata.obs['time'] = time.to_numpy() else: data_path = osp.join(self.data_dir, "train", dataset, self.DATASET_TO_FILE[dataset]) if not os.path.exists(data_path): raise FileNotFoundError(f"{data_path} does not exist") if self.DATASET_TO_FILE[dataset][-3:] == 'csv': counts = pd.read_csv(data_path, header=None, index_col=0) nums = pd.Series(np.arange(counts.shape[1])) nums = pd.DataFrame(nums) nums.columns = ['nums'] counts = counts.T counts.index = [i for i in range(counts.shape[0])] adata = ad.AnnData(csr_matrix(counts.values)) adata.var_names = counts.columns.tolist() adata.obs['nums'] = nums.to_numpy() if self.DATASET_TO_FILE[dataset][-2:] == 'gz': counts = pd.read_csv(data_path, index_col=0, compression='gzip', header=0) counts = counts.T adata = ad.AnnData(csr_matrix(counts.values)) # adata.obs_names = ["%d" % i for i in range(adata.shape[0])] adata.obs_names = counts.index.tolist() adata.var_names = counts.columns.tolist() elif self.DATASET_TO_FILE[dataset][-2:] == 'h5': adata = sc.read_10x_h5(data_path) adata.var_names_make_unique() return adata def _raw_to_dance(self, raw_data: ad.AnnData): adata = raw_data data = Data(adata, train_size=int(adata.n_obs * self.train_size)) return data