initial import and jupyter notebook to check for data issues
This commit is contained in:
12
.gitignore
vendored
12
.gitignore
vendored
@@ -1,4 +1,16 @@
|
||||
# ---> Python
|
||||
|
||||
# Data generated
|
||||
data/*.db
|
||||
*.xlsx
|
||||
|
||||
# Ignore local virtual environments
|
||||
venv/
|
||||
.venv/
|
||||
|
||||
# Ignore Jupyter Notebook checkpoints
|
||||
.ipynb_checkpoints/
|
||||
|
||||
# Byte-compiled / optimized / DLL files
|
||||
__pycache__/
|
||||
*.py[cod]
|
||||
|
||||
45
notebooks/001_data_cleaning_checks.ipynb
Normal file
45
notebooks/001_data_cleaning_checks.ipynb
Normal file
@@ -0,0 +1,45 @@
|
||||
{
|
||||
"cells": [
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": null,
|
||||
"id": "8d393e6d",
|
||||
"metadata": {},
|
||||
"outputs": [],
|
||||
"source": [
|
||||
"import sys\n",
|
||||
"import os\n",
|
||||
"import sqlite3\n",
|
||||
"import pandas as pd\n",
|
||||
"\n",
|
||||
"# 1. Resolve repository pathing and connect to SQLite\n",
|
||||
"BASE_DIR = os.path.dirname(os.getcwd())\n",
|
||||
"DB_PATH = os.path.join(BASE_DIR, \"data\", \"met_office_weather.db\")\n",
|
||||
"conn = sqlite3.connect(DB_PATH)\n",
|
||||
"\n",
|
||||
"# 2. Extract dataset profile\n",
|
||||
"df = pd.read_sql_query(\"SELECT * FROM historic_weather\", conn)\n",
|
||||
"print(\"--- Dataset Shape ---\")\n",
|
||||
"print(f\"Total Rows: ${df.shape[0]}, Total column: ${df.shape[1]}\\n\")\n",
|
||||
"\n",
|
||||
"print (\"--- Column Data Types & Counts ---\")\n",
|
||||
"print(df.info())\n",
|
||||
"\n",
|
||||
"print(\"\\n--- Missing Values (NaN) Per Feature Column ---\")\n",
|
||||
"print(df.isnull().sum())\n",
|
||||
"\n",
|
||||
"print(\"\\n --- Total Row Logs Collected Per Unique Station ---\")\n",
|
||||
"print(df[\"station_name\"].value_counts())\n",
|
||||
"\n",
|
||||
"conn.close()"
|
||||
]
|
||||
}
|
||||
],
|
||||
"metadata": {
|
||||
"language_info": {
|
||||
"name": "python"
|
||||
}
|
||||
},
|
||||
"nbformat": 4,
|
||||
"nbformat_minor": 5
|
||||
}
|
||||
9
requirements.txt
Normal file
9
requirements.txt
Normal file
@@ -0,0 +1,9 @@
|
||||
requests==2.31.0
|
||||
beautifulsoup4==4.12.3
|
||||
pandas===2.2.3
|
||||
openpyxl===3.1.5
|
||||
numpy===2.2.3
|
||||
scipy==1.17.1
|
||||
matplotlib==3.10.0
|
||||
seaborn==0.13.2
|
||||
statsmodels==0.14.4
|
||||
0
src/__init__.py
Normal file
0
src/__init__.py
Normal file
142
src/ingest_data.py
Normal file
142
src/ingest_data.py
Normal file
@@ -0,0 +1,142 @@
|
||||
import io
|
||||
import os
|
||||
import re
|
||||
import sqlite3
|
||||
import pandas as pd
|
||||
import requests
|
||||
from bs4 import BeautifulSoup
|
||||
|
||||
# Define file paths relative to this script's position
|
||||
# This ensures it resolves correctly regardless of where it is executed from
|
||||
BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
|
||||
DB_PATH = os.path.join(BASE_DIR, "data", "met_office_weather.db")
|
||||
|
||||
def setup_database():
|
||||
"""Initialises the database connection and ensures directories exist"""
|
||||
os.makedirs(os.path.dirname(DB_PATH), exist_ok=True)
|
||||
return sqlite3.connect(DB_PATH)
|
||||
|
||||
def fetch_station_urls():
|
||||
"""Scrapes the Met Office landing page to collect all valid historic station data text links."""
|
||||
main_url = "https://www.metoffice.gov.uk/research/climate/maps-and-data/historic-station-data"
|
||||
headers = {"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64)"}
|
||||
|
||||
print("🔍 Fetching station links from the Met Office...")
|
||||
response = requests.get(main_url, headers=headers)
|
||||
response.raise_for_status() # should account for fail
|
||||
|
||||
soup = BeautifulSoup(response.text, "html.parser")
|
||||
links = set()
|
||||
|
||||
for link in soup.find_all("a", href=True):
|
||||
href = str(link["href"])
|
||||
if href.endswith("data.txt"):
|
||||
if not href.startswith("http"):
|
||||
href = "https://www.metoffice.gov.uk" + href
|
||||
links.add(href)
|
||||
|
||||
print(f"✅ Found {len(links)} weather stations.")
|
||||
return list(links)
|
||||
|
||||
def clean_and_parse_data(url, raw_text):
|
||||
"""Cleans up the raw Met Office text, drops header metadata and handles uneven spacing"""
|
||||
|
||||
lines = raw_text.split("\n")
|
||||
|
||||
station_name = lines[0].strip()
|
||||
|
||||
header_idx = None
|
||||
for idx, line in enumerate(lines):
|
||||
if "yyyy" in line and "mm" in line:
|
||||
header_idx = idx
|
||||
break
|
||||
|
||||
if header_idx is None:
|
||||
raise ValueError("Could not find a valid header row containing 'yyyy mm'")
|
||||
|
||||
# Extract the header line and use it to determine the structural column count
|
||||
header_line = lines[header_idx]
|
||||
headers_list = [re.sub(r'[^a-zA-Z0-9]', '', col) for col in header_line.split() if col.strip()]
|
||||
expected_col_count = len(headers_list)
|
||||
|
||||
# Process data lines safety row-by-row to eliminate trailing anomalies
|
||||
cleaned_rows = [header_line]
|
||||
|
||||
for line in lines[header_idx + 2:]:
|
||||
line_stripped = line.strip()
|
||||
if not line_stripped:
|
||||
continue
|
||||
|
||||
# split line by its arbritary whitespace tokens
|
||||
tokens = line_stripped.split()
|
||||
|
||||
# If the row has more columns than headers, slice off the stray text
|
||||
if len(tokens) > expected_col_count:
|
||||
tokens = tokens[:expected_col_count]
|
||||
|
||||
# Reconstruct the line with clean uniform spacing
|
||||
cleaned_rows.append(" ".join(tokens))
|
||||
|
||||
# Reassemble our safe dataset payload
|
||||
clean_payload = "\n".join(cleaned_rows)
|
||||
|
||||
# Read text using regex split due to arbritary white spacing
|
||||
# skiprows=[1] safety drops the units line (e.g. "degC") directly under the headers
|
||||
df = pd.read_csv(io.StringIO(clean_payload), sep=r"\s+")
|
||||
|
||||
# Standardise column headers (remove special chars)
|
||||
df.columns = headers_list
|
||||
|
||||
# Add identity tracking feature column using grabbed name from Line 1
|
||||
df["station_name"] = station_name
|
||||
|
||||
# Strict statistical data cleaning
|
||||
for col in df.columns:
|
||||
if col!= "station_name":
|
||||
# Chain the string replacement safety
|
||||
clean_series = (
|
||||
df[col].astype(str)
|
||||
.str.replace("*", "", regex=False)
|
||||
.str.replace("#", "", regex=False)
|
||||
.str.replace("---", "", regex=False)
|
||||
)
|
||||
|
||||
# Turn text to numerical floats. Empty blocks naturally flip to NaN
|
||||
df.loc[:, col] = pd.to_numeric(clean_series, errors="coerce")
|
||||
|
||||
df = df.dropna(subset=["yyyy", "mm"])
|
||||
df.loc[:, "yyyy"] = df["yyyy"].astype(int)
|
||||
df.loc[:, "mm"] = df["mm"].astype(int)
|
||||
|
||||
return df
|
||||
|
||||
def main():
|
||||
conn = setup_database()
|
||||
urls = fetch_station_urls()
|
||||
|
||||
# Clear out any historical records to prevent compounding data duplications
|
||||
cursor = conn.cursor()
|
||||
cursor.execute("DROP TABLE IF EXISTS historic_weather")
|
||||
conn.commit()
|
||||
|
||||
success_count = 0
|
||||
|
||||
for url in urls:
|
||||
try:
|
||||
res = requests.get(url, timeout=15)
|
||||
res.raise_for_status()
|
||||
|
||||
df = clean_and_parse_data(url, res.text)
|
||||
|
||||
df.to_sql("historic_weather", conn, if_exists="append", index=False)
|
||||
success_count += 1
|
||||
print(f"Successfully processed {df['station_name'].iloc[0]}")
|
||||
|
||||
except Exception as e:
|
||||
print(f"❌ Failed to process link {url}. Error: {e}")
|
||||
|
||||
conn.close()
|
||||
print(f"\n Pipelines complete! Loaded {success_count}/{len(urls)} stations into '{DB_PATH}'.")
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Reference in New Issue
Block a user