-
Notifications
You must be signed in to change notification settings - Fork 0
/
__main__.py
322 lines (303 loc) · 14.7 KB
/
__main__.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
""" This script creates a sqlserver database version of the hurdat2 files that are relational and utilize geometry
objects for fast spatial querying.
"""
#%% standard library imports
import pandas as pd
from dateutil.parser import isoparse
#%% third party imports
import sqlalchemy as sal
from sqlalchemy import create_engine
#%% input parameters: change these paths if you have downloaded the files to a different area
atlantic_path = "resources//hurdat2-1851-2019-052520.txt"
pacific_path = "resources//hurdat2-nepac-1949-2019-042320.txt"
sqlserver = "sqlserver"
database = "databasename"
#%% dictionaries to code the identifier and status columns
record_identifier = {
"C": 0, # closest approach to a coast, not followed by a landfall
"G": 1, # genesis
"I": 2, # an intensity peak in terms of both pressure and wind
"L": 3, # landfall
"P": 4, # minimum central pressure
"R": 5, # additional detail on intensity of cyclone when rapid changes are underway
"S": 6, # change in status of the system
"T": 7, # provides additional detail on the track (position) of the cyclone
"W": 8, # maximum sustained wind speed
" ": pd.NA # missing value
}
status = {
"TD": 0, # tropical cyclone of tropical depression intensity (<34 knots)
"TS": 1, # tropical cyclone of tropical storm intensity (34-63 knots)
"HU": 2, # tropical cyclone of hurricane intensity (>= 64 knots)
"EX": 3, # extratropical cyclone of any intensity
"SD": 4, # subtropical cyclone of subtropical depression intensity (<34 knots)
"SS": 5, # subtropical cyclone of subtropical storm intensity (>= 34 knots)
"LO": 6, # a low that is neither a tropical cyclone, a subtropical cyclone, nor an extrantropical cyclone
"WV": 7, # a tropical wave
"DB": 8, # disturbance of any intensity
" ": pd.NA, # missing value
"ET": pd.NA, # invalid value found in pacific dataset, maybe this means EX?
"TY": pd.NA, # invalid value found in pacific dataset
"ST": pd.NA, # invalid value found in pacific dataset. maybe this means SS?
"PT": pd.NA # invalid value found in pacific dataset
}
def process_file(file_path):
""" Code for processing the hurdat2 text files."""
file_headers = []
file_data = []
file = open(file_path, 'r')
for row in file:
row_split = row.split(",")
if len(row_split) == 4:
# header row
file_headers.append([
row_split[0], # event id
row_split[0][0:2], # basin
row_split[0][2:4], # storm number
row_split[0][4:8], # year
row_split[1].lstrip(" "), # name
int(row_split[2]) # number of data points
])
else:
# data row
file_data.append([
file_headers[-1][0], # event id. Need this to link back to header table.
row_split[0][0:4], # year
row_split[0][4:6], # month
row_split[0][6:8], # day
row_split[1].lstrip(" ")[0:2], # hours in UTC
row_split[1][-2:], # minutes in UTC
row_split[2][-1], # record identifier
row_split[3][-2:], # storm status code
float(row_split[4][:-1]) * (-1 if row_split[4][-1] == "S" else 1), # Latitude
float(row_split[5][:-1]) * (-1 if row_split[5][-1] == "W" else 1), # Longitude
int(row_split[6].lstrip(" ")), # maximum sustained wind (knots)
int(row_split[7].lstrip(" ")), # minimum central pressure
int(row_split[8].lstrip(" ")), # 34 kt wind radii max extent in N-E quadrant in nautical miles
int(row_split[9].lstrip(" ")), # 34 kt wind radii max extent in S-E quadrant in nautical miles
int(row_split[10].lstrip(" ")), # 34 kt wind radii max extent in S-W quadrant in nautical miles
int(row_split[11].lstrip(" ")), # 34 kt wind radii max extent in N-W quadrant in nautical miles
int(row_split[12].lstrip(" ")), # 50 kt wind radii max extent in N-E quadrant in nautical miles
int(row_split[13].lstrip(" ")), # 50 kt wind radii max extent in S-E quadrant in nautical miles
int(row_split[14].lstrip(" ")), # 50 kt wind radii max extent in S-W quadrant in nautical miles
int(row_split[15].lstrip(" ")), # 50 kt wind radii max extent in N-W quadrant in nautical miles
int(row_split[16].lstrip(" ")), # 64 kt wind radii max extent in N-E quadrant in nautical miles
int(row_split[17].lstrip(" ")), # 64 kt wind radii max extent in S-E quadrant in nautical miles
int(row_split[18].lstrip(" ")), # 64 kt wind radii max extent in S-W quadrant in nautical miles
int(row_split[19].lstrip(" ")), # 64 kt wind radii max extent in N-W quadrant in nautical miles
])
file.close()
return file_headers, file_data
def create_path(df):
if df.shape[0] == 1:
return "POINT(" + df["point"] + ")"
else:
return "(" + df["point"] + ", " + df["next_point"] + ")"
def clean_data(events, points):
"""function for cleansing the data"""
events = pd.DataFrame(
events,
columns=[
"event_id",
"basin",
"storm_num",
"year",
"name",
"num_points"
]
)
points = pd.DataFrame(
points,
columns=[
"event_id",
"year",
"month",
"day",
"hours_UTC",
"minutes_UTC",
"identifier",
"status",
"latitude",
"longitude",
"max_wind_knots",
"min_pressure_mb",
"ne_34kt_radii_max_nm",
"se_34kt_radii_max_nm",
"sw_34kt_radii_max_nm",
"nw_34kt_radii_max_nm",
"ne_50kt_radii_max_nm",
"se_50kt_radii_max_nm",
"sw_50kt_radii_max_nm",
"nw_50kt_radii_max_nm",
"ne_64kt_radii_max_nm",
"se_64kt_radii_max_nm",
"sw_64kt_radii_max_nm",
"nw_64kt_radii_max_nm"
]
)
# create time and geography text to process into geography objects in sql server.
points["location"] = "POINT(" + points["longitude"].astype(str) + " " + points["latitude"].astype(str) + " " \
+ points["max_wind_knots"].astype(str).replace("-99", "NULL") + " " \
+ points["min_pressure_mb"].astype(str).replace("-999", "NULL") + ")"
points["point_time"] = points["year"] + "-" + points["month"] + "-" + points["day"] + "T" \
+ points["hours_UTC"] + ":" + points["minutes_UTC"] + ":00.000Z"
points["point_time"] = points["point_time"].apply(isoparse)
points["point"] = points["longitude"].astype(str) + " " + points["latitude"].astype(str)
points.drop(
["year", "month", "day", "hours_UTC", "minutes_UTC", "latitude", "longitude"],
axis=1,
inplace=True
)
# clean up values
points.replace(
{
"identifier": record_identifier,
"status": status,
"max_wind_knots": {-99: pd.NA},
"min_pressure_mb": {-999: pd.NA},
"ne_34kt_radii_max_nm": {-999: pd.NA},
"se_34kt_radii_max_nm": {-999: pd.NA},
"sw_34kt_radii_max_nm": {-999: pd.NA},
"nw_34kt_radii_max_nm": {-999: pd.NA},
"ne_50kt_radii_max_nm": {-999: pd.NA},
"se_50kt_radii_max_nm": {-999: pd.NA},
"sw_50kt_radii_max_nm": {-999: pd.NA},
"nw_50kt_radii_max_nm": {-999: pd.NA},
"ne_64kt_radii_max_nm": {-999: pd.NA},
"se_64kt_radii_max_nm": {-999: pd.NA},
"sw_64kt_radii_max_nm": {-999: pd.NA},
"nw_64kt_radii_max_nm": {-999: pd.NA}
},
inplace=True
)
points["next_point"] = points.groupby("event_id", sort=False)["point"].shift(-1)
# construct path segments
# drop all segments that have the same start and end point
path = points[[
"event_id",
"point",
"next_point",
"point_time"
]].drop(points[points["point"] == points["next_point"]].index)
# remove any duplicate segments that have the start/end points reversed
path["point_set"] = path[["event_id", "point", "next_point"]].apply(frozenset, axis=1)
path = path.drop_duplicates(subset="point_set", keep="first")
# group the segments by event for processing
grouped = path.groupby("event_id", sort=False)
# remove any segments that don't have an end point, like the last point in each event,
# but keep events with only one point
path_text = grouped.apply(create_path).dropna()
events["start_time"] = grouped.first()["point_time"].values
events["path"] = path_text.groupby("event_id", sort=False).apply(",".join).values
events.loc[events["path"].str[0] == "(", "path"] = "MULTILINESTRING(" \
+ events.loc[events["path"].str[0] == "(", "path"] + ")"
events.drop(["storm_num", "num_points", "year"], axis=1, inplace=True)
# remove intermediary calculation columns
points.drop(["point", "next_point"], axis=1, inplace=True)
return events, points
if __name__ == "__main__":
#%% process each basin's HURDAT2 file
atlantic_headers, atlantic_data = process_file(atlantic_path)
pacific_headers, pacific_data = process_file(pacific_path)
#%% clean data
atlantic_headers, atlantic_data = clean_data(atlantic_headers, atlantic_data)
pacific_headers, pacific_data = clean_data(pacific_headers, pacific_data)
#%% combined the dataframes
headers = pd.concat([atlantic_headers, pacific_headers])
data = pd.concat([atlantic_data, pacific_data])
#%% load data into sql server
conn_string = (
"mssql+pyodbc://" + sqlserver + "/" + database + "?Driver=ODBC Driver 17 for SQL Server?Trusted_Connection=yes"
)
engine = create_engine(conn_string, fast_executemany=True)
conn = engine.connect()
table_types = {
"event_id": sal.types.NCHAR(length=8),
"identifier": sal.types.INTEGER(),
"status": sal.types.INTEGER(),
"max_wind_knots": sal.types.INTEGER(),
"min_pressure_mb": sal.types.INTEGER(),
"ne_34kt_radii_max_nm": sal.types.INTEGER(),
"se_34kt_radii_max_nm": sal.types.INTEGER(),
"sw_34kt_radii_max_nm": sal.types.INTEGER(),
"nw_34kt_radii_max_nm": sal.types.INTEGER(),
"ne_50kt_radii_max_nm": sal.types.INTEGER(),
"se_50kt_radii_max_nm": sal.types.INTEGER(),
"sw_50kt_radii_max_nm": sal.types.INTEGER(),
"nw_50kt_radii_max_nm": sal.types.INTEGER(),
"ne_64kt_radii_max_nm": sal.types.INTEGER(),
"se_64kt_radii_max_nm": sal.types.INTEGER(),
"sw_64kt_radii_max_nm": sal.types.INTEGER(),
"nw_64kt_radii_max_nm": sal.types.INTEGER(),
"location": sal.types.NVARCHAR(length=100),
"point_time": sal.DateTime()
}
data.to_sql("Historical_HU_points", con=engine, if_exists="replace", dtype=table_types, index=False)
table_types = {
"event_id": sal.types.NCHAR(length=8),
"basin": sal.types.NVARCHAR(length=2),
"name": sal.types.NVARCHAR(length=40),
"start_time": sal.DateTime(),
"path": sal.types.NVARCHAR()
}
headers.to_sql("Historical_HU", con=engine, if_exists="replace", dtype=table_types, index=False)
record = pd.DataFrame.from_dict(record_identifier, orient="index").reset_index()[:-1]
record.columns = ["description", "record_id"]
record["description"] = [
"closest approach to a coast, not followed by a landfall",
"genesis",
"an intensity peak in terms of both pressure and wind",
"landfall",
"minimum central pressure",
"additional detail on intensity of cyclone when rapid changes are underway",
"change in status of the system",
"provides additional detail on the track (position) of the cyclone",
"maximum sustained wind speed"
]
record.to_sql("HU_points_identifier", con=engine, if_exists="replace",
dtype={"record_id": sal.types.INTEGER(), "description": sal.types.NVARCHAR(length=100)}, index=False)
df_status = pd.DataFrame.from_dict(status, orient="index").reset_index()[:-5]
df_status.columns = ["description", "status_id"]
df_status["description"] = [
"tropical cyclone of tropical depression intensity (<34 knots)",
"tropical cyclone of tropical storm intensity (34-63 knots)",
"tropical cyclone of hurricane intensity (>= 64 knots)",
"extratropical cyclone of any intensity",
"subtropical cyclone of subtropical depression intensity (<34 knots)",
"subtropical cyclone of subtropical storm intensity (>= 34 knots)",
"low that is neither a tropical cyclone, a subtropical cyclone, nor an extratropical cyclone",
"a tropical wave",
"disturbance of any intensity",
]
df_status.to_sql("HU_points_status", con=engine, if_exists="replace",
dtype={"status_id": sal.types.INTEGER(), "description": sal.types.NVARCHAR(length=100)}, index=False)
#%% create keys, indexes, and geo datatypes
sql = (
"ALTER TABLE Historical_HU ALTER COLUMN event_id nchar(8) NOT NULL; "
"ALTER TABLE Historical_HU_points ADD point_id int NOT NULL IDENTITY;"
"ALTER TABLE HU_points_identifier ALTER COLUMN record_id int NOT NULL;"
"ALTER TABLE HU_points_status ALTER COLUMN status_id int NOT NULL;"
)
conn.execute(sql)
sql = (
"ALTER TABLE Historical_HU "
"ADD CONSTRAINT PK_Historical_HU_event_id PRIMARY KEY CLUSTERED (event_id), path_geo geography;"
"ALTER TABLE HU_points_identifier "
"ADD CONSTRAINT PK_points_identifier PRIMARY KEY (record_id);"
"ALTER TABLE HU_points_status "
"ADD CONSTRAINT PK_points_status PRIMARY KEY (status_id);"
"ALTER TABLE Historical_HU_points "
"ADD CONSTRAINT PK_Historical_HU_point_id PRIMARY KEY CLUSTERED (point_id), "
"FOREIGN KEY (event_id) REFERENCES Historical_HU(event_id), "
"FOREIGN KEY (identifier) REFERENCES HU_points_identifier(record_id),"
"FOREIGN KEY (status) REFERENCES HU_points_status(status_id), location_geo geography;"
)
conn.execute(sql)
sql = (
"UPDATE Historical_HU SET path_geo = geography::STGeomFromText(path, 4326);"
"CREATE SPATIAL INDEX Historical_HU_path ON Historical_HU (path_geo) USING GEOGRAPHY_AUTO_GRID;"
"UPDATE Historical_HU_points SET location_geo = geography::STGeomFromText(location, 4326);"
"CREATE SPATIAL INDEX Historical_HU_point on Historical_HU_points (location_geo) USING GEOGRAPHY_AUTO_GRID;"
)
conn.execute(sql)
conn.close()