-
Notifications
You must be signed in to change notification settings - Fork 0
/
load.py
213 lines (179 loc) · 7.2 KB
/
load.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
import sqlite3
import os
import csv
# Function to connect to the database and return the connection and cursor
def connect_to_database(db_path):
try:
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
return conn, cursor
except sqlite3.Error as e:
print(f"An error occurred: {e}")
return None, None
# Function to get the list of parsed files from the database
def get_parsed_files(cursor):
try:
cursor.execute("SELECT store, filename FROM parsed_files")
return set(cursor.fetchall())
except sqlite3.Error as e:
print(f"An error occurred: {e}")
return set()
# Function to get the list of all CSV files in the data folder
def get_all_csv_files(data_folder):
csv_files = []
for root, dirs, files in os.walk(data_folder):
for file in files:
if file.endswith('.csv'):
store = os.path.relpath(root, data_folder)
csv_files.append((store, file))
return set(csv_files)
def entry_exists(cursor, store_name, transaction_id):
try:
cursor.execute("""
SELECT EXISTS(
SELECT 1
FROM raw_sales
WHERE store = ? AND transaction_id = ?
)
""", (store_name, transaction_id))
# fetchone() retrieves the first record from the query result (which will be a single record here)
result = cursor.fetchone()
if result[0]:
return True
else:
return False
except sqlite3.Error as e:
print(f"An error occurred: {e}")
return None
def parse_csv_file(file_path):
try:
with open(file_path, 'r', encoding='utf-8') as file:
csv_reader = csv.reader(file)
rows = [row for row in csv_reader]
# format check
if len(rows[0]) != 42:
raise Exception(f"Invalid file format: {file_path}")
indexes = {
'product_id': 30,
'product_name': 36,
'qty_product_sold': 28,
'product_price': 38,
'product_prices': 29,
'transaction_id': 33,
'transaction_datetime': 37,
'discount_amount': 39, # this may be a number or a percentage like 29%
'transaction_total': 40
}
# dev: this statement helps decode the format of the csv file
# print('\n'.join([f"{i}: {val}" for i, val in enumerate(rows[1])]))
# commentary: I've missed good ol' list comprehension
entries = [
{
idx_name: row[idx_val]
for idx_name, idx_val in indexes.items()
}
for row in rows[1:]]
# Now we bucket them by transaction_id
transactions = {}
for entry in entries:
if entry['transaction_id'] not in transactions:
transactions[entry['transaction_id']] = []
transactions[entry['transaction_id']].append(entry)
return transactions
except FileNotFoundError:
print(f"The file '{file_path}' does not exist.")
except Exception as e:
print(f"An error occurred: {e}")
raise e
def add_transaction_to_db(conn, cursor, transaction_id, entries, store):
print(f"Adding {transaction_id} to database...")
def parse_transaction_datetime(entry):
entry['transaction_date'], entry['transaction_time'] = entry['transaction_datetime'].split(' ')
return entry
def parse_discount(entry):
discount = entry['discount_amount']
if '%' in discount:
entry['transaction_total_discount'] = 0
else:
# discount is usually an int, but sometimes a float representing a whole number
entry['transaction_total_discount'] = int(float(discount))
return entry
def parse_nums(entry):
entry['qty_product_sold'] = float(entry['qty_product_sold'].replace(',', ''))
entry['product_price'] = int(entry['product_price'].replace(',', ''))
entry['product_prices'] = int(entry['product_prices'].replace(',', ''))
entry['transaction_total'] = int(entry['transaction_total'].replace(',', ''))
return entry
def cleanup(entry):
del entry['transaction_datetime']
del entry['discount_amount']
return entry
entries = list(map(parse_transaction_datetime, entries))
entries = list(map(parse_discount, entries))
entries = list(map(parse_nums, entries))
entries = list(map(cleanup, entries))
query = """
INSERT INTO raw_sales (
product_id, product_name, qty_product_sold, product_price,
product_prices, transaction_id, transaction_date,
transaction_time, transaction_total_discount, transaction_total, store
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
"""
data = [
(
entry['product_id'],
entry['product_name'],
entry['qty_product_sold'],
entry['product_price'],
entry['product_prices'],
entry['transaction_id'],
entry['transaction_date'],
entry['transaction_time'],
entry['transaction_total_discount'],
entry['transaction_total'],
store
)
for entry in entries]
cursor.executemany(query, data)
conn.commit()
print(f"Added {transaction_id} with {len(entries)} entries to database.")
def insert_into_parsed_files(conn, cursor, store, filename):
try:
query = """
INSERT INTO parsed_files (store, filename)
VALUES (?, ?)
"""
cursor.execute(query, (store, filename))
conn.commit()
except sqlite3.IntegrityError:
print(f"Entry with store {store} and filename {filename} already exists in the database.")
except sqlite3.Error as e:
print(f"An error occurred: {e}")
raise e
if __name__ == '__main__':
db_path = 'database.db'
data_folder = 'data'
conn, cursor = connect_to_database(db_path)
if conn is None or cursor is None:
exit(1)
try:
parsed_files = get_parsed_files(cursor) # set of tuples (store, filename)
all_csv_files = get_all_csv_files(data_folder) # set of tuples (store, filename)
unread_files = all_csv_files - parsed_files
for store, filename in unread_files:
print(f"Unread file: {store}/{filename}")
for store, filename in unread_files:
transactions = parse_csv_file(f"{data_folder}/{store}/{filename}")
for transaction_id, entries in transactions.items():
if entry_exists(cursor, store, transaction_id):
print(f"Entry for transaction {transaction_id} already exists. Skipping...")
continue
else:
add_transaction_to_db(conn, cursor, transaction_id, entries, store)
# insert the file into the parsed_files table
insert_into_parsed_files(conn, cursor, store, filename)
conn.close()
except Exception as e:
conn.close()
print("Exception occurred. Connection to database closed.")
raise e