-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathsqlquerygraph.py
More file actions
107 lines (93 loc) · 3.58 KB
/
sqlquerygraph.py
File metadata and controls
107 lines (93 loc) · 3.58 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
import logging
import os
import argparse
from extractor import Extractor
import exporter
import writer
import numpy as np
import pandas as pd
logging.basicConfig(
level=logging.INFO,
filename="log/sqlquerygraph.log",
filemode="w",
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
)
if __name__ == """__main__""":
argp = argparse.ArgumentParser()
argp.add_argument(
"-sd",
"--script_dir",
type=str,
help="Directory where we store subdirectories of our SQL queries",
)
argp.add_argument(
"-d",
"--sub_dir",
default=None,
type=str,
help="Subdirectories within script_dir that you want to read SQL queries from. "
"If no value is inputted, then use all subdirectories in script_dir.",
)
argp.add_argument(
"-rd",
"--reference_datasets",
nargs="*",
type=str,
default=[],
help="Datasets that contain tables in database to look-up against. "
"If no values is inputted, then take datasets specified in constants.py.",
)
argp.add_argument("-ed", "--export_dir", type=str, help="Directory to store files.")
argp.add_argument(
"-v",
"--verbose",
default=False,
type=bool,
help="Boolean to output steps taken and query after cleaning. "
"Useful if want to check where function is failing.",
)
args = argp.parse_args()
# initialise empty array for storing dfs
arr = np.empty(shape=(0, 2))
if args.sub_dir is None:
subdir = os.listdir(path=args.script_dir)
else:
subdir = args.sub_dir
print(subdir)
for i, dataset in enumerate(subdir):
logging.info(
f"Extracting {dataset} tables and their dependencies from scripts\n"
)
# create text to remove
dir_report = f"{args.script_dir}/{dataset}"
remove_txt = []
for table in os.listdir(dir_report):
table_name, _ = os.path.splitext(p=table)
remove_txt.append(f"MERGE {dataset}.{table_name} USING (")
remove_txt.append(
") ON FALSE WHEN NOT MATCHED THEN "
"INSERT ROW WHEN NOT MATCHED BY SOURCE THEN "
"DELETE"
)
extractor = Extractor(script_dir=f"{args.script_dir}/{dataset}", schema=dataset)
table_dependencies = extractor.extract_table_dependencies_from_queries(
reference_datasets=args.reference_datasets,
str_to_remove=remove_txt,
verbose=args.verbose,
)
logging.info(f"Converting {dataset} dictionaries to dataframes\n")
df_tables = exporter.convert_dict_to_df(data=table_dependencies)
df_tables = df_tables.to_numpy()
arr = np.concatenate((arr, df_tables), axis=0)
logging.info("Splitting tables from their dependencies\n")
df = pd.DataFrame(data=arr, columns=["table", "dependency"])
df = exporter.separate_dataset_table(data=df)
logging.info("Exporting unique table names for nodes\n")
exporter.export_unique_names(data=df, path_or_buf=args.export_dir)
logging.info("Exporting table dependencies for relationships\n")
exporter.export_table_dependency(data=df, path_or_buf=args.export_dir)
logging.info("Creating Cypher queries for neo4j database\n")
datasets = [txt.title() for txt in args.reference_datasets]
writer.create_query_constraint(datasets=datasets, dir_file=args.export_dir)
writer.create_query_node_import(datasets=datasets, dir_file=args.export_dir)
writer.create_query_relationship(datasets=datasets, dir_file=args.export_dir)