Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

94

95

96

97

98

99

100

101

102

103

104

105

106

107

108

109

110

111

112

113

114

115

116

117

118

119

120

121

122

123

124

125

126

127

128

129

130

131

132

133

134

135

136

137

138

139

140

141

142

143

#!/usr/bin/env python 

# -*- coding: utf-8 -*- 

 

"""Topic Model Evaluation. 

This module contains functions to calculate topic coherence provided by `DARIAH-DE`_. 

.. _DARIAH-DE: 

https://de.dariah.eu 

https://github.com/DARIAH-DE 

""" 

 

__author__ = "DARIAH-DE" 

__authors__ = "Severin Simmler" 

__email__ = "severin.simmler@stud-mail.uni-wuerzburg.de" 

 

from itertools import permutations, combinations 

import numpy as np 

import pandas as pd 

 

 

def read_bow(path): 

sparse_bow = pd.read_csv(path, header=None) 

sparse_bow.columns = ['doc_id', 'token_id', 0] 

sparse_bow.set_index(['doc_id', 'token_id']) 

return sparse_bow 

 

def read_dictionary(path): 

dictionary = pd.read_csv(path, header=None) 

dictionary.index = dictionary[0] 

dictionary = dictionary[1] 

return dictionary.to_dict() 

 

def token2bow(token, type_dictionary): 

try: 

return type_dictionary[token] 

except KeyError: 

type_dictionary[token] = len(type_dictionary) + 1 

return type_dictionary[token] 

 

 

class Preparation: 

def __init__(self, topics, sparse_bow, type_dictionary): 

self.topics = topics 

self.sparse_bow = sparse_bow 

self.type_dictionary = type_dictionary 

 

 

def segment_topics(self, permutation=False): 

bigrams = [] 

for topic in self.topics.iterrows(): 

topic = [token2bow(token, self.type_dictionary) for token in topic[1]] 

if permutation: 

bigrams.append(list(permutations(topic, 2))) 

else: 

bigrams.append(list(combinations(topic, 2))) 

return pd.Series(bigrams) 

 

 

def calculate_occurences(self, bigrams): 

bow = self.sparse_bow.reset_index(level=1)['token_id'] 

occurences = pd.Series() 

if isinstance(bigrams, set): 

pass 

else: 

keys = set() 

for topic in bigrams: 

for bigram in topic: 

keys.add(bigram[0]) 

keys.add(bigram[1]) 

for key in keys: 

print(key) 

total = set() 

for doc in bow.groupby(level=0): 

if key in doc[1].values: 

total.add(doc[0]) 

occurences[str(key)] = total 

return occurences 

 

class Measures(Preparation): 

def __init__(self, sparse_bow, type_dictionary): 

self.type_dictionary = type_dictionary 

self.sparse_bow = sparse_bow 

 

 

def pmi_uci(self, pair, occurences, e=0.1, normalize=False): 

n = len(self.sparse_bow.index.levels[0]) 

k1 = occurences[str(pair[0])] 

k2 = occurences[str(pair[1])] 

k1k2 = k1.intersection(k2) 

numerator = len(k1k2) + e / n 

denominator = ((len(k1) + e) / n) * ((len(k2) + e) / n) 

if normalize: 

return np.log(numerator / denominator) / -np.log(numerator) 

else: 

return np.log(numerator / denominator) 

 

 

def pmi_umass(self, pair, occurences, e=0.1): 

n = len(self.sparse_bow.count(level=0)) 

k1 = occurences[str(pair[0])] 

k2 = occurences[str(pair[1])] 

k1k2 = k1.intersection(k2) 

numerator = len(k1k2) + e / n 

denominator = len(k2) + e / n 

return np.log(numerator / denominator) 

 

 

class Evaluation(Measures): 

def __init__(self, topics, sparse_bow, type_dictionary): 

self.topics = topics 

self.sparse_bow = sparse_bow 

self.type_dictionary = type_dictionary 

 

 

def calculate_umass(self, mean=True, e=0.1): 

scores = [] 

N = len(self.topics.T) 

segmented_topics = self.segment_topics() 

occurences = self.calculate_occurences(bigrams=segmented_topics) 

for topic in segmented_topics: 

pmi = [] 

for pair in topic: 

pmi.append(self.pmi_umass(pair=pair, occurences=occurences, e=e)) 

if mean: 

scores.append((2 / (N * (N - 1))) * np.mean(pmi)) 

else: 

scores.append((2 / (N * (N - 1))) * np.median(pmi)) 

return pd.Series(scores) 

 

 

def calculate_uci(self, mean=True, normalize=False, e=0.1): 

scores = [] 

N = len(self.topics.T) 

segmented_topics = self.segment_topics(permutation=True) 

occurences = self.calculate_occurences(bigrams=segmented_topics) 

for topic in segmented_topics: 

pmi = [] 

for pair in topic: 

pmi.append(self.pmi_uci(pair=pair, occurences=occurences, normalize=normalize, e=e)) 

if mean: 

scores.append((2 / (N * (N - 1))) * np.mean(pmi)) 

else: 

scores.append((2 / (N * (N - 1))) * np.median(pmi)) 

return pd.Series(scores)