Skip to content

Commit

Permalink
Feat: Full pipeline now working
Browse files Browse the repository at this point in the history
  • Loading branch information
kcwongaz committed Aug 30, 2022
1 parent 8a952b5 commit 591f3ce
Show file tree
Hide file tree
Showing 4 changed files with 87 additions and 98 deletions.
119 changes: 28 additions & 91 deletions pipeline/1_extract.py
Original file line number Diff line number Diff line change
@@ -1,47 +1,29 @@
import os
import json
import pickle
import shutil
import zipfile

from FR24Writer import FR24Writer
from air_traffic.FR24Writer import FR24Writer


# --------------------------------------------------------------------------- #
# User inputs

# Locations to read data from
# All the .bz2 files and top level .zip files are expected to be decompressed.
# (e.g. 01.tar.bz2, 02.zip, etc.)
# So that the directories have all the yyyymmdd.zip files.
# Make sure all yyyymmdd.zip are there, or you will miss some time points.
dataroot = "/mnt/Passport/Lishuai_data/USA"
# All files are expected to be fully decompressed,
# so that in the directory you should see the bare .json / .txt files
dataroot = "../raw"

# Locations to store data to
saveroot = "/home/kc/Research/air_traffic/data/fr24_usa"
saveroot = "../data/extracted"

# Location for a pickled FR24Writer object to be restored.
# This is for continuing a partial flight extraction.
# Set this to an empty string if there are nothing to be restored.
# Each sucessful run will generate a FR24Writer.pickle for use
restore_loc = ""


# (!!!) Some suggestions
# Do the extraction for one geographic area per run.
# i.e. set dataroot ".../Lishuai_data/china/" instead of ".../Lishuai_data/"
# When start for a new area, always set restore_loc = ""
# These should prevent some potential problems that I haven't fully tested.
#
# You can also do it year-by-year or month-by-month like
# dataroot = "/mnt/Passport/Lishuai_data/china/2017/"
# But when you continue to the next year, remember to set restore_loc
# so that you pick up the flights that have not landed in the previous year.

# --------------------------------------------------------------------------- #
# *** Main program starts here

# Initialize flight writer
if restore_loc == "":
flights = FR24Writer(saveroot)
else:
Expand All @@ -54,10 +36,6 @@
if not os.path.exists(saveroot):
os.makedirs(saveroot, exist_ok=True)

tempdir = f"{saveroot}/temp"
if not os.path.exists(tempdir):
os.makedirs(tempdir, exist_ok=True)

# Start the log
log_writer = open(f"{saveroot}/fr24_extract.log", "a")
log_writer.write("# --------------------------------------------------- #\r\n")
Expand All @@ -72,79 +50,38 @@
dirs.sort()
files.sort()

# 1. Copy the yyyymmdd.zip files to local working directory
for file in files:
if file.endswith(".zip") and len(file) >= 12:
fname = os.path.join(root, file)
shutil.copy(fname, tempdir)

log_writer.write(f"Found: {fname} \r\n")
print(f"Found: {fname}")

# 2. Unzip all the copied .zip files
for file in os.listdir(tempdir):
if not file.endswith(".zip"):
# Skip files that are not json, e.g. any leftover zip files
if not file.endswith(".json") and not file.endswith(".txt"):
continue
fname = os.path.join(root, file)
print(fname)

# Have to unzip file one-by-one, because some files may contain error
zf = zipfile.ZipFile(os.path.join(tempdir, file), "r")
for zf_child in zf.namelist():
with open(fname, "r") as f:
try:
zf.extract(zf_child, tempdir)
except zipfile.BadZipFile:
log_writer.write(f"Error unzipping {zf}/{zf_child} \r\n")
zf.close()

# 3. Now loop through the unzipped .json file to extract flights
for troot, tdirs, tfiles in os.walk(tempdir):
tdirs.sort()
tfiles.sort()

for tfile in tfiles:
if not tfile.endswith(".json") and not tfile.endswith(".txt"):
json_data = json.load(f)
timestamp_fetched = False
except json.JSONDecodeError:
log_writer.write(f"Error encountered on: {fname} \r\n")
continue
fname = os.path.join(troot, tfile)

if os.stat(fname).st_size == 0:
log_writer.write(f"Empty JSON file: {fname} \r\n")
continue
for key, item in json_data.items():
# Each json file contains some useless meta-data,
# actual flight data take shape of a list
if not isinstance(item, list):
continue

with open(fname, "r") as f:
try:
json_data = json.load(f)
timestamp_fetched = False
print(fname)
except json.JSONDecodeError:
log_writer.write(f"Error encountered on: {fname} \r\n")
# Pick only flights that are coming to HK
if item[12] != "HKG":
continue

for key, item in json_data.items():
# Each json file contains some useless meta-data,
# actual flight data take shape of a list
if not isinstance(item, list):
continue

# The data row also need to have the correct length
# Expecting length 18, but some can have 19 for some reason
if len(item) < 18:
log_writer.write(f"Error encountered on: {fname} \r\n")
continue

# Pick only flights that are coming to HK
if item[12] != "HKG":
continue

# Read the timestamp for the current file only once
# Do one push every 24 hours = 86400 seconds
if not timestamp_fetched and (item[10] - cutoff_time >= 86400):
cutoff_time = item[10]
flights.push(cutoff_time)

flights.write(key, item)

# Done with this round, empty the temp directory
shutil.rmtree(tempdir)
os.makedirs(tempdir, exist_ok=True)
# Read the timestamp for the current file and read only once
# Do one push every 24 hours = 86400 seconds
if not timestamp_fetched and (item[10] - cutoff_time >= 86400):
cutoff_time = item[10]
flights.push(cutoff_time)

flights.write(key, item)


# One last push before finishing.
Expand Down
14 changes: 7 additions & 7 deletions pipeline/2_filter.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,22 @@
import pandas as pd
import os

from checkers import *
from air_traffic.filters import *


# --------------------------------------------------------------------------- #

# Will apply filter to all trajectories within this date range
start_date = "2016-12-31"
end_date = "2021-04-28"
start_date = "2017-01-01"
end_date = "2017-01-31"


# (!) Change the following to appropriate directory names
# Location of raw data
dataroot = "/home/kc/Research/air_traffic/data/fr24_china/"
dataroot = "../data/extracted"

# Location for output
copyroot = "/home/kc/Research/air_traffic/data/fr24_clean/"
copyroot = "../data/cleaned"

# mode should be in of ["copy", "list", ""]
# "copy" will create a filtered copy of the data in copyroot
Expand Down Expand Up @@ -62,8 +62,8 @@
while date <= end:
dstr = date.strftime(date_fmt)
mstr = date.strftime("%Y-%m")
datadir = dataroot + mstr + "/" + dstr
copydir = copyroot + mstr + "/" + dstr
datadir = dataroot + "/" + mstr + "/" + dstr
copydir = copyroot + "/" + mstr + "/" + dstr

# Create destination directory if not exist
if not os.path.exists(copydir) and mode == "copy":
Expand Down
30 changes: 30 additions & 0 deletions pipeline/4_distance_time_curves.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
import pandas as pd

from air_traffic.io import read_trajectories_range
from air_traffic.trajectory import distance_hkia


# --------------------------------------------------------------------------- #
datadir = "../data/cleaned"

start = "2017-01-01"
end = "2017-01-31"

# --------------------------------------------------------------------------- #
dataset = read_trajectories_range(datadir, start, end, fname_only=True)

for files in dataset:
for f in files:

print(f)
df = pd.read_csv(f, header=0)

lat = df["latitude"].to_numpy()
lon = df["longitude"].to_numpy()
time = df["time"].to_numpy()

d = df.apply(distance_hkia, axis=1)

# Save the result
df = df.assign(distance=d)
df.to_csv(f, index=False)
22 changes: 22 additions & 0 deletions pipeline/start.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
#!/bin/bash

BASEDIR=$(dirname "BASH_SOURCE")
cd $BASEDIR
echo "Start ..."

# The raw data originally is stored as .zip files,
#
echo "Step 1: Extract flight trajectories from raw data..."
python 1_extract.py

echo "Step 2: Clean up extracted trajectory data..."
python 2_filter.py

echo "Step 3: Compute statistics from fixed entry ring..."
python 3_stat_fixed_distance.py

echo "Step 4: Compute distances..."
python 4_distance_time_curves.py


echo "All done!!"

0 comments on commit 591f3ce

Please sign in to comment.