import numpy as np
from sklearn.cluster import DBSCAN
[docs]
def getCommunities(
    adata,
    markers_of_interest,
    eps,
    min_samples=20,
    newcolumn="COI_community",
    core_only=False,
):
    """
    Get the communities of interest (COI) using DBSCAN
    :param adata: the anndata object
    :param markers_of_interest: the list of marker names to subset the data
    :param eps: the eps parameter for DBSCAN
    :param min_samples: Minimum number of samples in each community
    :param newcolumn: the column name of the community
    :returns: the communities of interest (COI)
    """
    # get cells of interest (COI)
    assert len(markers_of_interest) > 0, "markers_of_interest is empty?"
    condition = False
    for target_marker in markers_of_interest:
        try:
            condition = condition | (adata.obs[target_marker])
        except:
            print(target_marker, "is not found.")
            return None
    adata_tmp = adata[condition]
    # call DBSCAN
    X = adata_tmp.obs[["X_centroid", "Y_centroid"]].to_numpy()
    db = DBSCAN(eps=eps, min_samples=min_samples, algorithm="kd_tree").fit(X)
    # assign labels
    labels = db.labels_
    adata.obs[newcolumn] = -2
    if core_only:
        core_samples_mask = np.zeros_like(labels, dtype=bool)
        core_samples_mask[db.core_sample_indices_] = True
        labels[~core_samples_mask] = -3
    adata.obs.loc[condition, newcolumn] = labels
    labels_sorted = []  # a list of (number of cells, cluster index)
    new_n_clusters_ = 0
    # only count clusters of interest, i.e.,
    # excluding noise (-1), not assigned (-2), and non-core (-3)
    n_clusters_ = len(set(labels) - {-1, -2, -3})
    for i in range(n_clusters_):
        idx = labels == i
        npoints = sum(idx)
        if npoints > 0:
            labels_sorted.append((npoints, i))
            new_n_clusters_ = new_n_clusters_ + 1
    labels_sorted = sorted(labels_sorted, reverse=True)
    return labels_sorted, db