Source code for varspark.utils

import numpy as np
import pandas as pd
from pyspark.mllib.linalg.distributed import IndexedRowMatrix, RowMatrix


[docs]def dist_mat_to_array(dist_mat): """ Converts a (small) distributed matrix to dense numpy narray :param dist_mat: a pyspark.mllib.linalg distributed matrix :return: a local numpy array with the matrix data """ if RowMatrix == type(dist_mat): return np.array([v.toArray() for v in dist_mat.rows.collect()]) elif IndexedRowMatrix == type(dist_mat): return dist_mat_to_array(dist_mat.toRowMatrix()) else: raise Exception("Cannot convert distributed matrix of type %s" % type(dist_mat))
[docs]def array_to_dataframe(ndarray, labels=None): """ Converts a square numpy array to a pandas dataframe with index and column names from labels (if provided) :param ndarray: a square numpy array to convert :param labels: labels to use for the index and for the column names :return: a pandas dataframe """ return pd.DataFrame(ndarray, columns=labels, index=labels)
[docs]def array_to_dataframe_coord(ndarray, labels=None, triangular=True, include_diagonal=True, row_name='row', col_name='col', value_name='value'): """ Converts a square numpy array to a pandas dataframe in coordinate format that is `[row, column, value]`. Optionally only includes the lower triangular matrix with or without diagonal (to get only unique coordinates) :param labels: labels to use for row and columns coordinates :param triangular: only include the lower triangular matrix :param include_diagonal: if the main diagonal should be included :param row_name: the name to use for row column (first coordinate) :param col_name: the name to use for col column (second coordinate) :param value_name: the name to use for the value column :return: dataframe with the values from the kinship matrix in the coordinate form """ pdist_mat = np.array(ndarray) if triangular: pdist_mat[np.triu_indices(pdist_mat.shape[0], 1 if include_diagonal else 0)] = np.nan pdist_df = array_to_dataframe(pdist_mat, labels=labels) pdist_df[row_name] = pdist_df.index return pd.melt(pdist_df, id_vars=[row_name], var_name=col_name, value_name=value_name).dropna()
[docs]def kinship_mat_to_dataframe(km): """Converts a hail KinshipMatrix to a pandas dataframe. Index and column names are obtained from `sample_list` of the matrix. :param km: kinship matrix to convert :return: dataframe with the values from the kinship matrix """ return array_to_dataframe(dist_mat_to_array(km.matrix()), labels=km.sample_list())
[docs]def kinship_mat_to_dataframe_coord(km, **kwargs): """Converts a hail KinshipMatrix to a pandas dataframe. Coordinate values are obtained from `sample_list` of the matrix. :param km: kinship matrix to convert :param kwargs: other conversion parameters as in [[array_to_dataframe_coord]] :return: dataframe with the values from the kinship matrix in the coordinate form """ return array_to_dataframe_coord(dist_mat_to_array(km.matrix()), labels=km.sample_list(), **kwargs)