Hot-keys on this page
r m x p toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
"""Initialize Flask application. """ logging.debug("Application is frozen.") root = Path(sys._MEIPASS) else: template_folder=str(Path(root, "templates")), static_folder=str(Path(root, "static")))
"""Initialize logging. """ logging.basicConfig(level=level, format="%(message)s", filename=str(LOGFILE), filemode="w") # Disable logging for Flask and Werkzeug # (this would be a lot of spam, even level INFO): if level > logging.DEBUG: logging.getLogger("flask").setLevel(logging.ERROR) logging.getLogger("werkzeug").setLevel(logging.ERROR)
"""Initialize SQLite database. """ logging.debug("Initializing database...") db = database.get_db() if getattr(sys, "frozen", False): logging.debug("Application is frozen.") root = Path(sys._MEIPASS) else: root = "." with Path(root, "schema.sql").open("r", encoding="utf-8") as schemafile: schema = schemafile.read() db.executescript(schema) db.commit() database.close_db()
"""Format log messages. """ else:
"""Load text file, return title and content. """ filename = Path(secure_filename(textfile.filename)) title = filename.stem suffix = filename.suffix if suffix in {".txt", ".xml", ".html"}: content = textfile.read().decode("utf-8") if suffix in {".xml", ".html"}: content = remove_markup(content) return title, content else: return None, None
"""Parse XML and drop tags. """ encoding="utf8", method="text")
"""Get Document objects. """
"""Get stopwords from file or corpus. """ logging.info("Fetching stopwords...") if "stopwords" in data: _, stopwords = load_textfile(data["stopwords"]) stopwords = cophi.model.Document(stopwords).tokens else: stopwords = corpus.mfw(data["mfw"]) return stopwords
"""Get data from HTML forms. """ logging.info("Processing user data...") data = {"corpus": flask.request.files.getlist("corpus"), "topics": int(flask.request.form["topics"]), "iterations": int(flask.request.form["iterations"])} if flask.request.files.get("stopwords", None): data["stopwords"] = flask.request.files["stopwords"] else: data["mfw"] = int(flask.request.form["mfw"]) return data
"""Get topics from topic model. """ logging.info("Fetching topics from topic model...") for distribution in model.topic_word_: words = list(np.array(vocabulary)[np.argsort(distribution)][:-maximum-1:-1]) yield "{}, ...".format(", ".join(words[:3])), words
"""Get document-topic distribution from topic model. """ logging.info("Fetching document-topic distributions from topic model...") document_topic = pd.DataFrame(model.doc_topic_) document_topic.index = titles document_topic.columns = descriptors return document_topic
"""Calculate cosine similarity between columns. """
"""Min-max scaler for a vector. """ logging.debug("Scaling data from {} to {}...".format(minimum, maximum)) return np.interp(vector, (vector.min(), vector.max()), (minimum, maximum))
"""Export model output to ZIP archive. """ logging.info("Creating data archive...") if DATA_EXPORT.exists(): unlink_content(DATA_EXPORT) else: DATA_EXPORT.mkdir() model, stopwords = database.select("data_export") document_topic, topics, document_similarities, topic_similarities = model
logging.info("Preparing document-topic distributions...") document_topic = pd.read_json(document_topic, orient="index") document_topic.columns = [col.replace(",", "").replace(" ...", "") for col in document_topic.columns]
logging.info("Preparing topics...") topics = pd.read_json(topics, orient="index") topics.index = ["Topic {}".format(n) for n in range(topics.shape[0])] topics.columns = ["Word {}".format(n) for n in range(topics.shape[1])]
logging.info("Preparing topic similarity matrix...") topic_similarities = pd.read_json(topic_similarities) topic_similarities.columns = [col.replace(",", "").replace(" ...", "") for col in topic_similarities.columns] topic_similarities.index = [ix.replace(",", "").replace(" ...", "") for ix in topic_similarities.index]
logging.info("Preparing document similarity matrix...") document_similarities = pd.read_json(document_similarities) data_export = {"document-topic-distribution": document_topic, "topics": topics, "topic-similarities": topic_similarities, "document-similarities": document_similarities, "stopwords": json.loads(stopwords)}
for name, data in data_export.items(): if name in {"stopwords"}: with Path(DATA_EXPORT, "{}.txt".format(name)).open("w", encoding="utf-8") as file: for word in data: file.write("{}\n".format(word)) else: path = Path(DATA_EXPORT, "{}.csv".format(name)) data.to_csv(path, sep=";", encoding="utf-8") shutil.make_archive(DATA_EXPORT, "zip", DATA_EXPORT)
"""Deletes the content of a directory. """ logging.info("Cleaning up in data directory...") for p in directory.rglob(pattern): if p.is_file(): p.unlink()
"""Convert pandas Series to a 2-D array. """ for i, v in zip(s.index, s): yield [i, v]
"""Threads a process. """ q = queue.Queue() def wrapper(): q.put(target(*args)) t = threading.Thread(target=wrapper) t.start() return q |