• G. Mias Lab »
  • Source code for pyiomica.categorizationFunctions

    '''Categorization functions'''
    
    
    from .globalVariables import *
    
    from . import (utilityFunctions,
                  visualizationFunctions,
                  extendedDataFrame,
                  clusteringFunctions,
                  coreFunctions,
                  dataStorage)
    
    from .extendedDataFrame import DataFrame
    
    
    
    [docs]def calculateTimeSeriesCategorization(df_data, dataName, saveDir, hdf5fileName=None, p_cutoff=0.05, fraction=0.75, constantSignalsCutoff=0., lowValuesToTag=1., lowValuesToTagWith=1., NumberOfRandomSamples=10**5, NumberOfCPUs=4, referencePoint=0, autocorrelationBased=True, calculateAutocorrelations=False, calculatePeriodograms=False, preProcessData=True): """Time series classification. Parameters: df_data: pandas.DataFrame Data to process dataName: str Data name, e.g. "myData_1" saveDir: str Path of directories poining to data storage hdf5fileName: str, Default None Preferred hdf5 file name and location p_cutoff: float, Default 0.05 Significance cutoff signals selection fraction: float, Default 0.75 Fraction of non-zero point in a signal constantSignalsCutoff: float, Default 0. Parameter to consider a signal constant lowValuesToTag: float, Default 1. Values below this are considered low lowValuesToTagWith: float, Default 1. Low values to tag with NumberOfRandomSamples: int, Default 10**5 Size of the bootstrap distribution to generate NumberOfCPUs: int, Default 4 Number of processes allowed to use in calculations referencePoint: int, Default 0 Reference point autocorrelationBased: boolean, Default True Whether Autocorrelation of Frequency based calculateAutocorrelations: boolean, Default False Whether to recalculate Autocorrelations calculatePeriodograms: boolean, Default False Whether to recalculate Periodograms preProcessData: boolean, Default True Whether to preprocess data, i.e. filter, normalize etc. Returns: None Usage: calculateTimeSeriesCategorization(df_data, dataName, saveDir) """ print('\n', '-'*70, '\n\tProcessing %s (%s)'%(dataName, 'Periodograms' if not autocorrelationBased else 'Autocorrelations'), '\n', '-'*70) if not os.path.exists(saveDir): os.makedirs(saveDir) if hdf5fileName is None: hdf5fileName = saveDir + dataName + '.h5' df_data = extendedDataFrame.DataFrame(df_data) df_data.columns = df_data.columns.astype(float) if preProcessData: df_data.filterOutAllZeroSignals(inplace=True) df_data.filterOutReferencePointZeroSignals(referencePoint=referencePoint, inplace=True) df_data.filterOutFractionZeroSignals(fraction, inplace=True) df_data.tagValueAsMissing(inplace=True) df_data.tagLowValues(lowValuesToTag, lowValuesToTagWith, inplace=True) df_data.removeConstantSignals(constantSignalsCutoff, inplace=True) dataStorage.write(df_data, saveDir + dataName + '_df_data_transformed', hdf5fileName=hdf5fileName) if not autocorrelationBased: calculateAutocorrelations = False if not calculatePeriodograms: df_dataPeriodograms = dataStorage.read(saveDir + dataName + '_dataPeriodograms', hdf5fileName=hdf5fileName) df_randomPeriodograms = dataStorage.read(saveDir + dataName + '_randomPeriodograms', hdf5fileName=hdf5fileName) if (df_dataPeriodograms is None) or (df_randomPeriodograms is None): print('Periodograms of data and the corresponding null distribution not found. Calculating...') calculatePeriodograms = True else: calculatePeriodograms = False if not calculateAutocorrelations: df_dataAutocorrelations = dataStorage.read(saveDir + dataName + '_dataAutocorrelations', hdf5fileName=hdf5fileName) df_randomAutocorrelations = dataStorage.read(saveDir + dataName + '_randomAutocorrelations', hdf5fileName=hdf5fileName) if (df_dataAutocorrelations is None) or (df_randomAutocorrelations is None): print('Autocorrelation of data and the corresponding null distribution not found. Calculating...') calculateAutocorrelations = True if calculatePeriodograms: df_data = dataStorage.read(saveDir + dataName + '_df_data_transformed', hdf5fileName=hdf5fileName) print('Calculating null distribution (periodogram) of %s samples...' %(NumberOfRandomSamples)) df_randomPeriodograms = extendedDataFrame.getRandomPeriodograms(df_data, NumberOfRandomSamples=NumberOfRandomSamples, NumberOfCPUs=NumberOfCPUs, fraction=fraction, referencePoint=referencePoint) dataStorage.write(df_randomPeriodograms, saveDir + dataName + '_randomPeriodograms', hdf5fileName=hdf5fileName) df_data = dataStorage.read(saveDir + dataName + '_df_data_transformed', hdf5fileName=hdf5fileName) df_data = df_data.normalizeSignalsToUnity(referencePoint=referencePoint) print('Calculating each Time Series Periodogram...') df_dataPeriodograms = extendedDataFrame.getLombScarglePeriodogramOfDataframe(df_data) dataStorage.write(df_dataPeriodograms, saveDir + dataName + '_dataPeriodograms', hdf5fileName=hdf5fileName) if calculateAutocorrelations: df_data = dataStorage.read(saveDir + dataName + '_df_data_transformed', hdf5fileName=hdf5fileName) print('Calculating null distribution (autocorrelation) of %s samples...' %(NumberOfRandomSamples)) df_randomAutocorrelations = extendedDataFrame.getRandomAutocorrelations(df_data, NumberOfRandomSamples=NumberOfRandomSamples, NumberOfCPUs=NumberOfCPUs, fraction=fraction, referencePoint=referencePoint) dataStorage.write(df_randomAutocorrelations, saveDir + dataName + '_randomAutocorrelations', hdf5fileName=hdf5fileName) df_data = dataStorage.read(saveDir + dataName + '_df_data_transformed', hdf5fileName=hdf5fileName) df_data = df_data.normalizeSignalsToUnity(referencePoint=referencePoint) print('Calculating each Time Series Autocorrelations...') df_dataAutocorrelations = utilityFunctions.runCPUs(NumberOfCPUs, coreFunctions.getAutocorrelationsOfData, [(df_data.iloc[i], df_data.columns.values) for i in range(len(df_data.index))]) df_dataAutocorrelations = pd.DataFrame(data=df_dataAutocorrelations[1::2], index=df_data.index, columns=df_dataAutocorrelations[0]) df_dataAutocorrelations.columns = ['Lag ' + str(columnID) for columnID in range(len(df_dataAutocorrelations.columns))] dataStorage.write(df_dataAutocorrelations, saveDir + dataName + '_dataAutocorrelations', hdf5fileName=hdf5fileName) df_data = dataStorage.read(saveDir + dataName + '_df_data_transformed', hdf5fileName=hdf5fileName) if not autocorrelationBased: df_classifier = df_dataPeriodograms df_randomClassifier = df_randomPeriodograms info = 'Periodograms' else: df_classifier = df_dataAutocorrelations df_randomClassifier = df_randomAutocorrelations info = 'Autocorrelations' df_classifier.sort_index(inplace=True) df_data.sort_index(inplace=True) if not (df_data.index.values == df_classifier.index.values).all(): raise ValueError('Index mismatch') QP = [1.0] QP.extend([np.quantile(df_randomClassifier.values.T[i], 1. - p_cutoff,interpolation='lower') for i in range(1,df_classifier.shape[1])]) print('Quantiles:', list(np.round(QP, 16)), '\n') significant_index = np.vstack([df_classifier.values.T[lag] > QP[lag] for lag in range(df_classifier.shape[1])]).T print('Calculating spikes cutoffs...') spike_cutoffs = extendedDataFrame.getRandomSpikesCutoffs(df_data, p_cutoff, NumberOfRandomSamples=NumberOfRandomSamples) print(spike_cutoffs) df_data = df_data.normalizeSignalsToUnity(referencePoint=referencePoint) if not (df_data.index.values == df_classifier.index.values).all(): raise ValueError('Index mismatch') print('Recording SpikeMax data...') max_spikes = df_data.index.values[coreFunctions.getSpikes(df_data.values, np.max, spike_cutoffs)] print(len(max_spikes)) significant_index_spike_max = [(gene in list(max_spikes)) for gene in df_data.index.values] lagSignigicantIndexSpikeMax = (np.sum(significant_index.T[1:],axis=0) == 0) * significant_index_spike_max dataStorage.write(df_classifier[lagSignigicantIndexSpikeMax], saveDir + dataName +'_selected%s_SpikeMax'%(info), hdf5fileName=hdf5fileName) dataStorage.write(df_data[lagSignigicantIndexSpikeMax], saveDir + dataName +'_selectedTimeSeries%s_SpikeMax'%(info), hdf5fileName=hdf5fileName) print('Recording SpikeMin data...') min_spikes = df_data.index.values[coreFunctions.getSpikes(df_data.values, np.min, spike_cutoffs)] print(len(min_spikes)) significant_index_spike_min = [(gene in list(min_spikes)) for gene in df_data.index.values] lagSignigicantIndexSpikeMin = (np.sum(significant_index.T[1:],axis=0) == 0) * (np.array(significant_index_spike_max) == 0) * significant_index_spike_min dataStorage.write(df_classifier[lagSignigicantIndexSpikeMin], saveDir + dataName +'_selected%s_SpikeMin'%(info), hdf5fileName=hdf5fileName) dataStorage.write(df_data[lagSignigicantIndexSpikeMin], saveDir + dataName +'_selectedTimeSeries%s_SpikeMin'%(info), hdf5fileName=hdf5fileName) print('Recording Lag%s-Lag%s data...'%(1,df_classifier.shape[1])) for lag in range(1,df_classifier.shape[1]): lagSignigicantIndex = (np.sum(significant_index.T[1:lag],axis=0) == 0) * (significant_index.T[lag]) dataStorage.write(df_classifier[lagSignigicantIndex], saveDir + dataName +'_selected%s_LAG%s'%(info,lag), hdf5fileName=hdf5fileName) dataStorage.write(df_data[lagSignigicantIndex], saveDir + dataName +'_selectedTimeSeries%s_LAG%s'%(info,lag), hdf5fileName=hdf5fileName) return None
    [docs]def clusterTimeSeriesCategorization(dataName, saveDir, numberOfLagsToDraw=3, hdf5fileName=None, exportClusteringObjects=False, writeClusteringObjectToBinaries=True, autocorrelationBased=True, method='weighted', metric='correlation', significance='Elbow'): """Visualize time series classification. Parameters: dataName: str Data name, e.g. "myData_1" saveDir: str Path of directories pointing to data storage numberOfLagsToDraw: int, Default 3 First top-N lags (or frequencies) to draw hdf5fileName: str, Default None HDF5 storage path and name exportClusteringObjects: boolean, Default False Whether to export clustering objects to xlsx files writeClusteringObjectToBinaries: boolean, Default True Whether to export clustering objects to binary (pickle) files autocorrelationBased: boolean, Default True Whether to label to print on the plots method: str, Default 'weighted' Linkage calculation method metric: str, Default 'correlation' Distance measure significance: str, Default 'Elbow' Method for determining optimal number of groups and subgroups Returns: None Usage: clusterTimeSeriesClassification('myData_1', '/dir1/dir2/') """ info = 'Autocorrelations' if autocorrelationBased else 'Periodograms' if hdf5fileName is None: hdf5fileName = saveDir + dataName + '.h5' def internal(className): print('\n\n%s of Time Series:'%(className)) df_data_selected = dataStorage.read(saveDir + dataName + '_selectedTimeSeries%s_%s'%(info, className), hdf5fileName=hdf5fileName) df_classifier_selected = dataStorage.read(saveDir + dataName + '_selected%s_%s'%(info, className), hdf5fileName=hdf5fileName) if (df_data_selected is None) or (df_classifier_selected is None): print('Selected %s time series not found in %s.'%(className, saveDir + dataName + '.h5')) print('Do time series classification first.') return print('Creating clustering object.') clusteringObject = clusteringFunctions.makeClusteringObject(df_data_selected, df_classifier_selected, method=method, metric=metric, significance=significance) if clusteringObject is None: print('Error creating clustering object') return print('Exporting clustering object.') if writeClusteringObjectToBinaries: dataStorage.write(clusteringObject, saveDir + 'consolidatedGroupsSubgroups/' + dataName + '_%s_%s'%(className,info) + '_GroupsSubgroups') if exportClusteringObjects: clusteringFunctions.exportClusteringObject(clusteringObject, saveDir + 'consolidatedGroupsSubgroups/', dataName + '_%s_%s'%(className,info)) return for lag in range(1,numberOfLagsToDraw + 1): internal('LAG%s'%(lag)) internal('SpikeMax') internal('SpikeMin') return None
    [docs]def visualizeTimeSeriesCategorization(dataName, saveDir, numberOfLagsToDraw=3, autocorrelationBased=True,xLabel='Time', plotLabel='Transformed Expression',horizontal=False, minNumberOfCommunities=2, communitiesMethod='WDPVG', direction='left', weight='distance'): """Visualize time series classification. Parameters: dataName: str Data name, e.g. "myData_1" saveDir: str Path of directories pointing to data storage numberOfLagsToDraw: boolean, Default 3 First top-N lags (or frequencies) to draw autocorrelationBased: boolean, Default True Whether autocorrelation or frequency based xLabel: str, Default 'Time' X-axis label plotLabel: str, Default 'Transformed Expression' Label for the heatmap plot horizontal: boolean, Default False Whether to use horizontal or natural visibility graph. minNumberOfCommunities: int, Default 2 Number of communities to find depends on the number of splits. This parameter is ignored in methods that automatically estimate optimal number of communities. communitiesMethod: str, Default 'WDPVG' String defining the method to use for communitiy detection: 'Girvan_Newman': edge betweenness centrality based approach 'betweenness_centrality': reflected graph node betweenness centrality based approach 'WDPVG': weighted dual perspective visibility graph method (note to also set weight variable) direction:str, default 'left' The direction that nodes aggregate to communities: None: no specific direction, e.g. both sides. 'left': nodes can only aggregate to the left side hubs, e.g. early hubs 'right': nodes can only aggregate to the right side hubs, e.g. later hubs weight: str, Default 'distance' Type of weight for communitiesMethod='WDPVG': None: no weighted 'time': weight = abs(times[i] - times[j]) 'tan': weight = abs((data[i] - data[j])/(times[i] - times[j])) + 10**(-8) 'distance': weight = A[i, j] = A[j, i] = ((data[i] - data[j])**2 + (times[i] - times[j])**2)**0.5 Returns: None Usage: visualizeTimeSeriesClassification('myData_1', '/dir1/dir2/') """ info = 'Autocorrelations' if autocorrelationBased else 'Periodograms' def internal(className): print('\n\n%s of Time Series:'%(className)) clusteringObject = dataStorage.read(saveDir + 'consolidatedGroupsSubgroups/' + dataName + '_%s_%s'%(className,info) + '_GroupsSubgroups') if clusteringObject is None: print('Clustering object not found') return if len(clusteringObject['linkage']) < 2: print('Clustering linkage array has only 1 row') return print('Plotting Dendrogram with Heatmaps.') visualizationFunctions.makeDendrogramHeatmapOfClusteringObject(clusteringObject, saveDir, dataName + '_%s_%sBased'%(className,info), AutocorrNotPeriodogr=autocorrelationBased,xLabel=xLabel, plotLabel=plotLabel,horizontal=horizontal, minNumberOfCommunities=minNumberOfCommunities, communitiesMethod=communitiesMethod, direction=direction, weight=weight) return for lag in range(1,numberOfLagsToDraw + 1): internal('LAG%s'%(lag)) internal('SpikeMax') internal('SpikeMin') return None