File size: 6,365 Bytes
3a8e9de
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
import numpy as np
import pandas as pd
import argparse

from protein_conformal.util import *


def main(args):
    df = pd.read_csv(args.input)
    df_probs = pd.read_csv(args.precomputed_path)

    if args.precomputed:
        # NOTE: if probabilities are precomputed, we can just load them and save them. This will involve the following steps:
        # 1. Load the precomputed probabilities
        # 2. For each hit, see which similarity score bin it corresponds to
        # 3. Assign the probability to the hit based on the similarity score bin

        # df = pd.read_csv(args.precomputed_path)
        # if args.partial:
        #     df["prob_partial"] = df["prob_exact"]
        # else:
        #     df["prob_exact"] = df["prob_exact"]
        # df.to_csv(args.output, index=False)
        # return
        prob_exact_lst, prob_partial_lst = [], []

        for d in df["D_score"]:
            # Check if there are any rows where similarity <= d
            if len(df_probs[df_probs["similarity"] <= d]) > 0:
                lower_bin = df_probs[df_probs["similarity"] <= d].iloc[-1]
            else:
                # If d is smaller than all similarities, use the smallest bin
                lower_bin = df_probs.iloc[0]
                
            # Check if there are any rows where similarity >= d
            if len(df_probs[df_probs["similarity"] >= d]) > 0:
                upper_bin = df_probs[df_probs["similarity"] >= d].iloc[0]
            else:
                # If d is larger than all similarities, use the largest bin
                upper_bin = df_probs.iloc[-1]

            # Get probabilities for lower bin, upper bin (columns "prob_exact_p0", "prob_exact_p1")
            p_0_lower = lower_bin["prob_exact_p0"]
            p_1_lower = lower_bin["prob_exact_p1"]
            p_0_upper = upper_bin["prob_exact_p0"]
            p_1_upper = upper_bin["prob_exact_p1"]

            # Interpolate probabilities
            prob_exact = np.mean([
                min(p_0_lower, p_1_lower, p_0_upper, p_1_upper),
                max(p_0_lower, p_1_lower, p_0_upper, p_1_upper)
            ])
            prob_exact_lst.append(prob_exact)

            if args.partial:
                p_0_lower = lower_bin["prob_partial_p0"]
                p_1_lower = lower_bin["prob_partial_p1"]
                p_0_upper = upper_bin["prob_partial_p0"]
                p_1_upper = upper_bin["prob_partial_p1"]

                prob_partial = np.mean([
                    min(p_0_lower, p_1_lower, p_0_upper, p_1_upper),
                    max(p_0_lower, p_1_lower, p_0_upper, p_1_upper)
                ])
                prob_partial_lst.append(prob_partial)
        df["prob_exact"] = prob_exact_lst
        if args.partial:
            df["prob_partial"] = prob_partial_lst
    else:
        # Get a probability for each hit based on the distance using Venn-Abers / isotonic regression

        # Load calibration data
        data = np.load(
            args.cal_data,
            allow_pickle=True,
        )
        print("loading calibration data")

        n_calib = args.n_calib
        np.random.shuffle(data)
        cal_data = data[:n_calib]
        X_cal, y_cal = get_sims_labels(cal_data, partial=False)
        X_cal = X_cal.flatten()
        y_cal = y_cal.flatten()

        print("getting exact probabilities")
        p_s = []
        for d in df["D_score"]:
            p_0, p_1 = simplifed_venn_abers_prediction(X_cal, y_cal, d)
            p_s.append([p_0, p_1])

        p_s = np.array(p_s)
        abs_p = [np.abs(p[0] - p[1]) for p in p_s]
        df["prob_exact"] = np.mean(p_s, axis=1)

        if args.partial:
            # TODO: this stage may not be necessary, but we noticed sometimes that shuffling the data would mess up the original file
            data = np.load(
                args.cal_data,
                allow_pickle=True,
            )
            print("loading calibration data")
            np.random.shuffle(data)
            cal_data = data[:n_calib]
            X_cal, y_cal = get_sims_labels(cal_data, partial=True)
            X_cal = X_cal.flatten()
            y_cal = y_cal.flatten()

            print("getting partial probabilities")
            p_s = []
            for d in df["D_score"]:
                p_0, p_1 = simplifed_venn_abers_prediction(X_cal, y_cal, d)
                p_s.append([p_0, p_1])

            p_s = np.array(p_s)
            abs_p = [np.abs(p[0] - p[1]) for p in p_s]
            df["prob_partial"] = np.mean(p_s, axis=1)

    print("saving df new probabilities")
    df.to_csv(
        args.output,
        index=False,
    )


def parse_args():
    parser = argparse.ArgumentParser("Get probabilities for similarity scores")
    parser.add_argument(
        "--precomputed",
        action='store_true', 
        default=False,
        help="Use precomputed probabilities on similarity scores",
    )
    parser.add_argument(
        "--precomputed_path",
        type=str,
        default="",
        help="Path to precomputed probabilities. This will have probabilities for both partial and exact hits.",
    )
    parser.add_argument(
        "--input",
        type=str,
        default="/groups/doudna/projects/ronb/conformal_backup/results_no_probs.csv",
        help="Input tabular data with similarity scores and metadata.",
    )
    parser.add_argument(
        "--output",
        type=str,
        default="/groups/doudna/projects/ronb/conformal_backup/results_with_probs.csv",
        help="Output file for the results",
    )
    parser.add_argument(
        "--partial",
        action='store_true', 
        default=False,
        help="Return probability of partial hits given similarity scores",
    )
    # parser.add_argument(
    #     "--alpha", type=float, default=0.1, help="Alpha value for the algorithm"
    # )
    # parser.add_argument(
    #     "--num_trials", type=int, default=100, help="Number of trials to run"
    # )
    parser.add_argument(
        "--n_calib", type=int, default=100, help="Number of calibration data points"
    )
    parser.add_argument(
        "--cal_data", type=str, default="/groups/doudna/projects/ronb/conformal_backup/protein-conformal/data/pfam_new_proteins.npy", help="Path to calibration data"
    )
    return parser.parse_args()


if __name__ == "__main__":
    args = parse_args()
    main(args)