Functions

Config dataclass

Configuration for NLP input paths and processing flags.

Source code in src/pydistintox/common/config.py
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
@dataclass
class Config:
    """Configuration for NLP input paths and processing flags."""
    skip_nlp: bool
    target: Path
    reference: Path
    spacy_model_name: str  # = 'en_core_web_sm'
    source: str = 'Not specified'  # only for logging / documentation
    save_nlp: bool | Path = False
    verbose: bool = False
    measures_to_calculate: list[str] | None = None
    scaling: bool = True  # scale scores to -1,1
    segmentlength: int = 5000
    # the following parameters are relevant to non-tf-idf measures
    logaddition: np.float64 = np.float64(1 + 1e-11)
    # logaddition is a normalization factor; 
    # we want log(x + logaddition) > 0, 
    # hence logaddition > 1 for x near 0.
    divaddition: np.float64 = np.float64(1e-11)
    # Define division-by-zero avoidance
    # and convert input into np.float
    def __post_init__(self):
        from pydistintox.td_matrices.config import measures_available as tf_idf_measures
        from pydistintox.distinct_measures.config import measures_available as non_tf_idf_measures
        measures_available = set(tf_idf_measures + non_tf_idf_measures)
        if self.measures_to_calculate:
            assert set(self.measures_to_calculate).issubset(measures_available), (
                f'The requested measures are not available. Please check for typos.\n'
                f'Your input:\n\n'
                f'{self.measures_to_calculate}\n\n'
                f'Possible measures:\n\n'
                f'{measures_available}'
            )

calculate_rank_correlation(results)

Calculates the Kendall rank correlation coefficients between all pairs of measures in the input dictionary. It returns them as a 2-d matrix (np.ndarray)

As the input arrays are uniformly scaled, only rank correlation is meaningful. The resulting matrix is symmetric, with each entry representing the correlation between two measures.

Source code in src/pydistintox/distinct_measures/core.py
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
def calculate_rank_correlation(
        results: dict[str,np.ndarray],
    )-> tuple[np.ndarray, list[str]]:
    """
    Calculates the Kendall rank correlation coefficients 
    between all pairs of measures in the input dictionary.
    It returns them as a 2-d matrix (np.ndarray)

    As the input arrays are uniformly scaled, only rank correlation is meaningful.
    The resulting matrix is symmetric, with each entry representing the correlation between two measures.
    """

    index = list(results.keys())
    number_of_measures = len(results.keys())
    rows,cols = number_of_measures, number_of_measures
    matrix = np.empty((rows,cols))

    # fill matrix
    for i in range(rows):
        for j in range(cols):
            key_i = index[i]
            array_i = results[key_i]
            key_j = index[j]
            array_j = results[key_j]
            tau, p_value =kendalltau(array_i,array_j)  # p_value not used
            matrix[i,j] = tau
    return matrix, index

calculate_scores(matrices, config)

This is the main funktion, doing all the calculation. it takes and gives out a dictionary containing these keys: ['zeta_sd0', 'zeta_sd2', 'rrf_dr0', 'eta_sg0', 'welch_t_value', 'ranksumtest_value', 'chi_square_value', 'LLR_value', 'tf_idf'] This function implements several distinctive measures.

Source code in src/pydistintox/distinct_measures/core.py
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
def calculate_scores(
        matrices: common_config.Matrices,  # containing also docprops and relative
        config: Config,
    )-> dict[str, np.ndarray]:
    """
    This is the main funktion, doing all the calculation.
    it takes and gives out a dictionary containing these keys:
    ['zeta_sd0', 'zeta_sd2', 'rrf_dr0', 
    'eta_sg0', 'welch_t_value', 'ranksumtest_value', 
    'chi_square_value', 'LLR_value', 'tf_idf']
    This function implements several distinctive measures.
    """

    logging.info('Starting to calculate scores...')
    logging.debug(f'Measures requested: {config.measures_to_calculate}')

    """ Preprocessing """

    non_tf_idf_measures_to_calculate = common_utils.intersect_lists(
        config.measures_to_calculate, 
        measures_available
    )
    measures_in_sequence = common_utils.get_measure_names(
        measures_input=non_tf_idf_measures_to_calculate,
        measures_available=measures_available,
    )
    logging.debug(
        f'measures in sequence: \n'
        f'{measures_in_sequence}'
    )

    log_shapes(matrices)

    """ Gather data for analysis"""
    # data is stored in a dictionary
    # which will be given to the calulation

    # calculate indicators (docprops / relative frequencies)
    data = common_config.CalculationData(
        matrices,
        get_indicators(matrices)
    )

    # save parameters
    data.set_params(
        segmentlength=config.segmentlength,
        logaddition=config.logaddition,
        divaddition=config.divaddition,
    )

    """ Start calculation"""
    result = {}

    for measure_name in measures_in_sequence:
        # load function for measure
        fct = mapping_of_names_and_measure_functions[measure_name]
        # execute function
        result_measure = handle_errors(
            fct,
            data,
        )

        # save results only in resutlt dictionary if:
        if result_measure is not None:
            logging.debug(f'{measure_name} is None')
            result[measure_name] = result_measure

    logging.debug(f'Scores calculated: \n {result.keys()}')
    # For the time beeing, dense matrices are necessary for welch's t-Test only
    # However, if they are generated, Chi-squared test and Wilcoxon rank-sum test
    # use these dense matrices, since they make the computation faster.

    dense_absolute_matrices_loaded = (
        data.matrices.abs_dense_tar is not None and data.matrices.abs_dense_ref is not None
    )
    logging.debug(
        f'dense_absolute_matrices_loaded = {dense_absolute_matrices_loaded}'
    )

    # results of tfidf measures are given by the other package td_matrices
    not_computed = set(measures_in_sequence).difference(set(result.keys()))
    logging.debug(f"Results calculated for: {', '.join(result.keys())}")
    if not_computed:
        logging.info(f"Could not compute {', '.join(not_computed)}")

    # scaling
    if config.scaling:
        scale_results(
            result,
        )

    logging.debug(
        f'statistical measures calculated \n'
        f' {calculate_scores.__name__} finished')

    return result

compute_td_matrices_and_measures(config)

parses texts, then creates a whole corpus of lemmata calculates six TF-IDF measures of given the texts.

Source code in src/pydistintox/td_matrices/core.py
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
def compute_td_matrices_and_measures(
        config: common_config.Config
    )->tuple[
        common_config.Matrices,
        dict[str,np.ndarray],
        list[str]
    ]:
    """ 
    parses texts, then creates a whole corpus of lemmata
    calculates six TF-IDF measures of given the texts.
    """

    logging.debug(f'Started {compute_td_matrices_and_measures.__name__}')

    """Preprocessing"""
    # Spacy model used for nlp
    spacy_model = load_spacy_model(config.spacy_model_name)


    if not(config.skip_nlp):
        # gather texts
        corp_tar = create_corp(config.target)
        corp_ref = create_corp(config.reference)
        if config.save_nlp:
            assert isinstance(config.save_nlp, Path)
            output_tar = config.save_nlp / 'tar'
            output_ref = config.save_nlp / 'ref'
            do_save_nlp = True
        else:
            do_save_nlp = False
            output_tar = None
            output_ref = None

        # parse corps
        parsed_corp_tar = parse_corp(
            corp_tar,
            spacy_model,
            chunk_length=DEFAULT_CHUNK_LENGTH,
            do_save_nlp=do_save_nlp,
            save_path_dir=output_tar,
        )
        parsed_corp_ref = parse_corp(
            corp_ref,
            spacy_model,
            chunk_length=DEFAULT_CHUNK_LENGTH,
            do_save_nlp=do_save_nlp,
            save_path_dir=output_ref,
        )
    else:  # skip nlp, so load corp from json
        # load lemmas from json
        logging.info('Loading lemmas from json of target corpus')
        parsed_corp_tar = load_corp_from_dir(
            path_dir=config.target,
            spacy_model=spacy_model,
        )
        logging.info('Loading lemmas from json of reference corpus')
        parsed_corp_ref = load_corp_from_dir(
            path_dir=config.reference,
            spacy_model=spacy_model,
        )

    # logging
    logging.info('\n\n\n Target corpus\n')
    log_corp(parsed_corp_tar)
    logging.info('\n\n\n Reference corpus')
    log_corp(parsed_corp_ref)

    all_lemmas = parsed_corp_tar + parsed_corp_ref
    gensim_dictionary = build_gensim_dictionary_from_lemmas(all_lemmas) 
    bow = docs_to_bow(all_lemmas,gensim_dictionary)
    terms = get_values(gensim_dictionary)

    """create absolute, binary and relative matrices"""
    matrices_sp = create_matrices(
        corp_lemma_tar=parsed_corp_tar, 
        corp_lemma_ref=parsed_corp_ref,
        bow_corpus=bow
    )

    """calculate tf-idf measures """   
    # Scaler is fitted per-measure, so *absolute* scaled values are not comparable across measures.
    # However, *relative rankings within a measure* (e.g., term A > term B in 'nfn') remain valid.
    # We accept this limitation because only intra-measure ranks are used for further analysis. 
    # logging.debug(f'got terms {terms[0]+terms[1]+terms[2]}...')

    scores_tf_idf = {}
    scaler = MinMaxScaler(feature_range=(-1,1)) if config.scaling else DummyScaler()

    tf_idf_measures_to_calculate = common_utils.intersect_lists(
        config.measures_to_calculate, 
        measures_available
    )
    measures_in_sequence = common_utils.get_measure_names(
        measures_available=measures_available,
        measures_input=tf_idf_measures_to_calculate,
    )

    for measure in measures_in_sequence:
        result = calculate_measure(
            parsed_corp_tar,
            parsed_corp_ref,
            gensim_dictionary,
            bow,
            measure,
            scaler,
        )
        scores_tf_idf[measure] = result

    return matrices_sp, scores_tf_idf, terms

save_as_sparse(dense_matrix, output_dir, file_prefix, format)

Saves a dense matrix as sparse in the given directory using scipy.sparse.save_npz with the suffix 'npz'.

Source code in src/pydistintox/common/utils.py
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
def save_as_sparse(
        dense_matrix: np.ndarray,
        output_dir:Path,
        file_prefix: str,
        format: str  # 'csc' or 'csr'
        )-> None:
    """
    Saves a dense matrix as sparse in the given directory 
    using scipy.sparse.save_npz with the suffix 'npz'.
    """
    # make output_dir if not exists
    output_dir.mkdir(
        parents=True, 
        exist_ok=True,
    )
    out_name = f'{file_prefix}.npz'
    if (output_dir/out_name).is_file():
        logging.warning(f'File {out_name} exists already in {output_dir} and is replaced.')
    sparse_data = dense_to_sparse(
        dense_matrix,
        format,
    )
    try:
        sp.save_npz(output_dir/out_name, sparse_data)
        logging.debug(f'Saved {str(output_dir/out_name)} as {format}')
    except Exception as e:
        logging.warning(f'Error saving {out_name}: {e}')      

save_results(result, path, write_header=True)

store results as textfile in csv format in given directory.

Source code in src/pydistintox/visualize/core.py
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
def save_results(
        result: pd.DataFrame,
        path: Path,
        write_header: bool = True,
    )-> None:
    """
    store results as textfile in csv format in given directory.
    """

    result.to_csv(
        path,
        header = write_header,
        index = False,
        **CSV_EXPORT_CONFIG,
    )
    logging.debug(f'wrote result into path {path}.')
    return

visualize_rank_correlations(matrix, index, dir_path)

visualize rank correlations between scores

Source code in src/pydistintox/visualize/core.py
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
def visualize_rank_correlations(
        matrix: np.ndarray,
        index: list[str],
        dir_path: Path,
    )-> None:
    """visualize rank correlations between scores"""

    df = pd.DataFrame(
        matrix, 
        index=index,
        columns = index
    )

    df.to_csv(dir_path / 'measure_correlations.csv')

    # create and save visualization of rank correlations
    heatmap(
        df=df,
        save_path=dir_path / 'heatmap.html',
        title='Measures of Distinctiveness: Correlations',
        index_x_axis='Score 1',
        index_y_axis='Score 2',
    )

    return 

visualize_score(score_processed, title, path)

Visualizes the top and bottom terms from a score DataFrame as a horizontal bar chart.

Displays the top 25 terms (highest scores) in lightgreen and the bottom 25 terms (lowest scores) in lightblue. The chart is saved as an interactive HTML file in the specified directory.

Source code in src/pydistintox/visualize/core.py
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
def visualize_score(
        score_processed: pd.DataFrame, 
        title: str, 
        path: Path,  # to dir
    )-> None:
    """
    Visualizes the top and bottom terms 
    from a score DataFrame as a horizontal bar chart.

    Displays the top 25 terms (highest scores) in lightgreen 
    and the bottom 25 terms (lowest scores) in lightblue. 
    The chart is saved as an interactive HTML file in 
    the specified directory.
    """

    show_data_1 = score_processed.head(25).copy()  
    # a copy is used only to avoid warnings when executed (.loc works as well)
    show_data_2 = score_processed.tail(25).copy()  
    # a copy is used only to avoid warnings when executed (.loc works as well)

    show_data_1.columns = ['Term', 'Score']
    show_data_2.columns = ['Term', 'Score']

    show_data_1['color'] = 'lightgreen'
    show_data_2['color'] = 'lightblue'

    data_to_show = pd.concat([show_data_1, show_data_2])

    # altair variables     
    predicate = alt.datum.Score > 0
    # align = alt.when(predicate).then(alt.value('right')).otherwise(alt.value('left'))

    # altair chart
    # change column names if different in input df  
    bars = alt.Chart(
        data_to_show, width=500, 
        height=700, 
        title=title
    ).mark_bar().encode(
        x='Score',
        y=alt.Y(
            'Term', 
            sort={
                'field': '-x', 
                'order': 'descending'
            }, 
            axis=None
        ),
        color=alt.Color(
            'color', 
            scale=None
        )
    )

    text = bars.mark_text(
        baseline='middle',
        align='center',
        dx=alt.expr(alt.expr.if_(
            predicate, 
            -50, 
            50)
        )).encode(
            text='Term',
            color=alt.value('black')
        )

    figure = bars + text
    figure.save(path / f'{title}.html')

    return