From 3f994efa9d35c64a365eaf7421b9fbc8a40843e2 Mon Sep 17 00:00:00 2001 From: Cullen Date: Tue, 30 Apr 2024 15:20:54 -0500 Subject: [PATCH] enh: estimate/assessed --- homeharvest/homeharvest/__init__.py | 54 ++ homeharvest/homeharvest/cli.py | 85 ++ homeharvest/homeharvest/core/__init__.py | 0 .../homeharvest/core/scrapers/__init__.py | 88 ++ .../homeharvest/core/scrapers/models.py | 110 +++ .../core/scrapers/realtor/__init__.py | 791 ++++++++++++++++++ homeharvest/homeharvest/exceptions.py | 6 + homeharvest/homeharvest/utils.py | 111 +++ 8 files changed, 1245 insertions(+) create mode 100644 homeharvest/homeharvest/__init__.py create mode 100644 homeharvest/homeharvest/cli.py create mode 100644 homeharvest/homeharvest/core/__init__.py create mode 100644 homeharvest/homeharvest/core/scrapers/__init__.py create mode 100644 homeharvest/homeharvest/core/scrapers/models.py create mode 100644 homeharvest/homeharvest/core/scrapers/realtor/__init__.py create mode 100644 homeharvest/homeharvest/exceptions.py create mode 100644 homeharvest/homeharvest/utils.py diff --git a/homeharvest/homeharvest/__init__.py b/homeharvest/homeharvest/__init__.py new file mode 100644 index 0000000..a0973ba --- /dev/null +++ b/homeharvest/homeharvest/__init__.py @@ -0,0 +1,54 @@ +import warnings +import pandas as pd +from .core.scrapers import ScraperInput +from .utils import process_result, ordered_properties, validate_input, validate_dates +from .core.scrapers.realtor import RealtorScraper +from .core.scrapers.models import ListingType + + +def scrape_property( + location: str, + listing_type: str = "for_sale", + radius: float = None, + mls_only: bool = False, + past_days: int = None, + proxy: str = None, + date_from: str = None, + date_to: str = None, + foreclosure: bool = None, +) -> pd.DataFrame: + """ + Scrape properties from Realtor.com based on a given location and listing type. + :param location: Location to search (e.g. "Dallas, TX", "85281", "2530 Al Lipscomb Way") + :param listing_type: Listing Type (for_sale, for_rent, sold) + :param radius: Get properties within _ (e.g. 1.0) miles. Only applicable for individual addresses. + :param mls_only: If set, fetches only listings with MLS IDs. + :param past_days: Get properties sold or listed (dependent on your listing_type) in the last _ days. + :param date_from, date_to: Get properties sold or listed (dependent on your listing_type) between these dates. format: 2021-01-28 + :param proxy: Proxy to use for scraping + """ + validate_input(listing_type) + validate_dates(date_from, date_to) + + scraper_input = ScraperInput( + location=location, + listing_type=ListingType[listing_type.upper()], + proxy=proxy, + radius=radius, + mls_only=mls_only, + last_x_days=past_days, + date_from=date_from, + date_to=date_to, + foreclosure=foreclosure, + ) + + site = RealtorScraper(scraper_input) + results = site.search() + + properties_dfs = [process_result(result) for result in results] + if not properties_dfs: + return pd.DataFrame() + + with warnings.catch_warnings(): + warnings.simplefilter("ignore", category=FutureWarning) + return pd.concat(properties_dfs, ignore_index=True, axis=0)[ordered_properties] diff --git a/homeharvest/homeharvest/cli.py b/homeharvest/homeharvest/cli.py new file mode 100644 index 0000000..342c030 --- /dev/null +++ b/homeharvest/homeharvest/cli.py @@ -0,0 +1,85 @@ +import argparse +import datetime +from homeharvest import scrape_property + + +def main(): + parser = argparse.ArgumentParser(description="Home Harvest Property Scraper") + parser.add_argument("location", type=str, help="Location to scrape (e.g., San Francisco, CA)") + + parser.add_argument( + "-l", + "--listing_type", + type=str, + default="for_sale", + choices=["for_sale", "for_rent", "sold", "pending"], + help="Listing type to scrape", + ) + + parser.add_argument( + "-o", + "--output", + type=str, + default="excel", + choices=["excel", "csv"], + help="Output format", + ) + + parser.add_argument( + "-f", + "--filename", + type=str, + default=None, + help="Name of the output file (without extension)", + ) + + parser.add_argument("-p", "--proxy", type=str, default=None, help="Proxy to use for scraping") + parser.add_argument( + "-d", + "--days", + type=int, + default=None, + help="Sold/listed in last _ days filter.", + ) + + parser.add_argument( + "-r", + "--radius", + type=float, + default=None, + help="Get comparable properties within _ (eg. 0.0) miles. Only applicable for individual addresses.", + ) + parser.add_argument( + "-m", + "--mls_only", + action="store_true", + help="If set, fetches only MLS listings.", + ) + + args = parser.parse_args() + + result = scrape_property( + args.location, + args.listing_type, + radius=args.radius, + proxy=args.proxy, + mls_only=args.mls_only, + past_days=args.days, + ) + + if not args.filename: + timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") + args.filename = f"HomeHarvest_{timestamp}" + + if args.output == "excel": + output_filename = f"{args.filename}.xlsx" + result.to_excel(output_filename, index=False) + print(f"Excel file saved as {output_filename}") + elif args.output == "csv": + output_filename = f"{args.filename}.csv" + result.to_csv(output_filename, index=False) + print(f"CSV file saved as {output_filename}") + + +if __name__ == "__main__": + main() diff --git a/homeharvest/homeharvest/core/__init__.py b/homeharvest/homeharvest/core/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/homeharvest/homeharvest/core/scrapers/__init__.py b/homeharvest/homeharvest/core/scrapers/__init__.py new file mode 100644 index 0000000..180a0b6 --- /dev/null +++ b/homeharvest/homeharvest/core/scrapers/__init__.py @@ -0,0 +1,88 @@ +from dataclasses import dataclass +import requests +from requests.adapters import HTTPAdapter +from urllib3.util.retry import Retry +import uuid +from .models import Property, ListingType, SiteName + + +@dataclass +class ScraperInput: + location: str + listing_type: ListingType + radius: float | None = None + mls_only: bool | None = None + proxy: str | None = None + last_x_days: int | None = None + date_from: str | None = None + date_to: str | None = None + foreclosure: bool | None = None + + +class Scraper: + session = None + + def __init__( + self, + scraper_input: ScraperInput, + ): + self.location = scraper_input.location + self.listing_type = scraper_input.listing_type + + if not self.session: + Scraper.session = requests.Session() + print("Session created") + retries = Retry( + total=3, backoff_factor=3, status_forcelist=[429, 403], allowed_methods=frozenset(["GET", "POST"]) + ) + + adapter = HTTPAdapter(max_retries=retries) + Scraper.session.mount("http://", adapter) + Scraper.session.mount("https://", adapter) + Scraper.session.headers.update( + { + "auth": f"Bearer {self.get_access_token()}", + "apollographql-client-name": "com.move.Realtor-apollo-ios", + } + ) + + if scraper_input.proxy: + proxy_url = scraper_input.proxy + proxies = {"http": proxy_url, "https": proxy_url} + self.session.proxies.update(proxies) + + self.listing_type = scraper_input.listing_type + self.radius = scraper_input.radius + self.last_x_days = scraper_input.last_x_days + self.mls_only = scraper_input.mls_only + self.date_from = scraper_input.date_from + self.date_to = scraper_input.date_to + self.foreclosure = scraper_input.foreclosure + + def search(self) -> list[Property]: ... + + @staticmethod + def _parse_home(home) -> Property: ... + + def handle_location(self): ... + + def get_access_token(self): + url = "https://graph.realtor.com/auth/token" + + payload = f'{{"client_app_id":"rdc_mobile_native,24.20.4.149916,iphone","device_id":"{str(uuid.uuid4()).upper()}","grant_type":"device_mobile"}}' + headers = { + "Host": "graph.realtor.com", + "x-client-version": "24.20.4.149916", + "accept": "*/*", + "content-type": "Application/json", + "user-agent": "Realtor.com/24.20.4.149916 CFNetwork/1410.0.3 Darwin/22.6.0", + "accept-language": "en-US,en;q=0.9", + } + response = requests.post(url, headers=headers, data=payload) + + data = response.json() + try: + access_token = data["access_token"] + except Exception: + raise Exception("Could not get access token, use a proxy/vpn or wait") + return access_token diff --git a/homeharvest/homeharvest/core/scrapers/models.py b/homeharvest/homeharvest/core/scrapers/models.py new file mode 100644 index 0000000..1a29fb8 --- /dev/null +++ b/homeharvest/homeharvest/core/scrapers/models.py @@ -0,0 +1,110 @@ +from dataclasses import dataclass +from enum import Enum +from typing import Optional + + +class SiteName(Enum): + ZILLOW = "zillow" + REDFIN = "redfin" + REALTOR = "realtor.com" + + @classmethod + def get_by_value(cls, value): + for item in cls: + if item.value == value: + return item + raise ValueError(f"{value} not found in {cls}") + + +class ListingType(Enum): + FOR_SALE = "FOR_SALE" + FOR_RENT = "FOR_RENT" + PENDING = "PENDING" + SOLD = "SOLD" + + +@dataclass +class Agent: + name: str | None = None + phone: str | None = None + + +class PropertyType(Enum): + APARTMENT = "APARTMENT" + BUILDING = "BUILDING" + COMMERCIAL = "COMMERCIAL" + CONDO_TOWNHOME = "CONDO_TOWNHOME" + CONDO_TOWNHOME_ROWHOME_COOP = "CONDO_TOWNHOME_ROWHOME_COOP" + CONDO = "CONDO" + CONDOS = "CONDOS" + COOP = "COOP" + DUPLEX_TRIPLEX = "DUPLEX_TRIPLEX" + FARM = "FARM" + INVESTMENT = "INVESTMENT" + LAND = "LAND" + MOBILE = "MOBILE" + MULTI_FAMILY = "MULTI_FAMILY" + RENTAL = "RENTAL" + SINGLE_FAMILY = "SINGLE_FAMILY" + TOWNHOMES = "TOWNHOMES" + OTHER = "OTHER" + + +@dataclass +class Address: + street: str | None = None + unit: str | None = None + city: str | None = None + state: str | None = None + zip: str | None = None + + +@dataclass +class Description: + primary_photo: str | None = None + alt_photos: list[str] | None = None + style: PropertyType | None = None + beds: int | None = None + baths_full: int | None = None + baths_half: int | None = None + sqft: int | None = None + lot_sqft: int | None = None + sold_price: int | None = None + year_built: int | None = None + garage: float | None = None + stories: int | None = None + text: str | None = None + + +@dataclass +class Agent: + name: str | None = None + phone: str | None = None + + +@dataclass +class Property: + property_url: str + mls: str | None = None + mls_id: str | None = None + status: str | None = None + address: Address | None = None + + list_price: int | None = None + list_date: str | None = None + pending_date: str | None = None + last_sold_date: str | None = None + prc_sqft: int | None = None + hoa_fee: int | None = None + days_on_mls: int | None = None + description: Description | None = None + + latitude: float | None = None + longitude: float | None = None + neighborhoods: Optional[str] = None + county: Optional[str] = None + fips_code: Optional[str] = None + agents: list[Agent] = None + nearby_schools: list[str] = None + assessed_value: int | None = None + estimated_value: int | None = None diff --git a/homeharvest/homeharvest/core/scrapers/realtor/__init__.py b/homeharvest/homeharvest/core/scrapers/realtor/__init__.py new file mode 100644 index 0000000..9be8efd --- /dev/null +++ b/homeharvest/homeharvest/core/scrapers/realtor/__init__.py @@ -0,0 +1,791 @@ +""" +homeharvest.realtor.__init__ +~~~~~~~~~~~~ + +This module implements the scraper for realtor.com +""" + +from concurrent.futures import ThreadPoolExecutor, as_completed +from datetime import datetime +from requests.exceptions import HTTPError +from typing import Dict, Union, Optional + +from tenacity import retry, wait_fixed, stop_after_attempt, retry_if_exception_type + +from .. import Scraper +from ..models import Property, Address, ListingType, Description, PropertyType, Agent + + +class RealtorScraper(Scraper): + SEARCH_GQL_URL = "https://www.realtor.com/api/v1/rdc_search_srp?client_id=rdc-search-new-communities&schema=vesta" + PROPERTY_URL = "https://www.realtor.com/realestateandhomes-detail/" + PROPERTY_GQL = "https://graph.realtor.com/graphql" + ADDRESS_AUTOCOMPLETE_URL = "https://parser-external.geo.moveaws.com/suggest" + NUM_PROPERTY_WORKERS = 20 + + def __init__(self, scraper_input): + super().__init__(scraper_input) + + def handle_location(self): + params = { + "input": self.location, + "client_id": self.listing_type.value.lower().replace("_", "-"), + "limit": "1", + "area_types": "city,state,county,postal_code,address,street,neighborhood,school,school_district,university,park", + } + + response = self.session.get( + self.ADDRESS_AUTOCOMPLETE_URL, + params=params, + ) + response_json = response.json() + + result = response_json["autocomplete"] + + if not result: + return None + + return result[0] + + def handle_listing(self, listing_id: str) -> list[Property]: + query = """query Listing($listing_id: ID!) { + listing(id: $listing_id) { + source { + id + listing_id + } + address { + street_direction + street_number + street_name + street_suffix + unit + city + state_code + postal_code + location { + coordinate { + lat + lon + } + } + } + basic { + sqft + beds + baths_full + baths_half + lot_sqft + sold_price + sold_price + type + price + status + sold_date + list_date + } + details { + year_built + stories + garage + permalink + } + media { + photos { + href + } + } + } + }""" + + variables = {"listing_id": listing_id} + payload = { + "query": query, + "variables": variables, + } + + response = self.session.post(self.SEARCH_GQL_URL, json=payload) + response_json = response.json() + + property_info = response_json["data"]["listing"] + + mls = ( + property_info["source"].get("id") + if "source" in property_info and isinstance(property_info["source"], dict) + else None + ) + + able_to_get_lat_long = ( + property_info + and property_info.get("address") + and property_info["address"].get("location") + and property_info["address"]["location"].get("coordinate") + ) + list_date_str = ( + property_info["basic"]["list_date"].split("T")[0] if property_info["basic"].get("list_date") else None + ) + last_sold_date_str = ( + property_info["basic"]["sold_date"].split("T")[0] if property_info["basic"].get("sold_date") else None + ) + pending_date_str = property_info["pending_date"].split("T")[0] if property_info.get("pending_date") else None + + list_date = datetime.strptime(list_date_str, "%Y-%m-%d") if list_date_str else None + last_sold_date = datetime.strptime(last_sold_date_str, "%Y-%m-%d") if last_sold_date_str else None + pending_date = datetime.strptime(pending_date_str, "%Y-%m-%d") if pending_date_str else None + today = datetime.now() + + days_on_mls = None + status = property_info["basic"]["status"].lower() + if list_date: + if status == "sold" and last_sold_date: + days_on_mls = (last_sold_date - list_date).days + elif status in ("for_sale", "for_rent"): + days_on_mls = (today - list_date).days + if days_on_mls and days_on_mls < 0: + days_on_mls = None + + property_id = property_info["details"]["permalink"] + prop_details = self.get_prop_details(property_id) + listing = Property( + mls=mls, + mls_id=( + property_info["source"].get("listing_id") + if "source" in property_info and isinstance(property_info["source"], dict) + else None + ), + property_url=f"{self.PROPERTY_URL}{property_id}", + status=property_info["basic"]["status"].upper(), + list_price=property_info["basic"]["price"], + list_date=list_date, + prc_sqft=( + property_info["basic"].get("price") / property_info["basic"].get("sqft") + if property_info["basic"].get("price") and property_info["basic"].get("sqft") + else None + ), + last_sold_date=last_sold_date, + pending_date=pending_date, + latitude=property_info["address"]["location"]["coordinate"].get("lat") if able_to_get_lat_long else None, + longitude=property_info["address"]["location"]["coordinate"].get("lon") if able_to_get_lat_long else None, + address=self._parse_address(property_info, search_type="handle_listing"), + description=Description( + alt_photos=self.process_alt_photos(property_info.get("media", {}).get("photos", [])), + style=property_info["basic"].get("type", "").upper(), + beds=property_info["basic"].get("beds"), + baths_full=property_info["basic"].get("baths_full"), + baths_half=property_info["basic"].get("baths_half"), + sqft=property_info["basic"].get("sqft"), + lot_sqft=property_info["basic"].get("lot_sqft"), + sold_price=property_info["basic"].get("sold_price"), + year_built=property_info["details"].get("year_built"), + garage=property_info["details"].get("garage"), + stories=property_info["details"].get("stories"), + text=property_info.get("description", {}).get("text"), + ), + days_on_mls=days_on_mls, + agents=prop_details.get("agents"), + nearby_schools=prop_details.get("schools"), + assessed_value=prop_details.get("assessed_value"), + estimated_value=prop_details.get("estimated_value"), + ) + + return [listing] + + def get_latest_listing_id(self, property_id: str) -> str | None: + query = """query Property($property_id: ID!) { + property(id: $property_id) { + listings { + listing_id + primary + } + } + } + """ + + variables = {"property_id": property_id} + payload = { + "query": query, + "variables": variables, + } + + response = self.session.post(self.SEARCH_GQL_URL, json=payload) + response_json = response.json() + + property_info = response_json["data"]["property"] + if property_info["listings"] is None: + return None + + primary_listing = next( + (listing for listing in property_info["listings"] if listing["primary"]), + None, + ) + if primary_listing: + return primary_listing["listing_id"] + else: + return property_info["listings"][0]["listing_id"] + + def handle_address(self, property_id: str) -> list[Property]: + """ + Handles a specific address & returns one property + """ + query = """query Property($property_id: ID!) { + property(id: $property_id) { + property_id + details { + date_updated + garage + permalink + year_built + stories + } + address { + street_direction + street_number + street_name + street_suffix + unit + city + state_code + postal_code + location { + coordinate { + lat + lon + } + } + } + basic { + baths + beds + price + sqft + lot_sqft + type + sold_price + } + public_record { + lot_size + sqft + stories + units + year_built + } + primary_photo { + href + } + photos { + href + } + } + }""" + + variables = {"property_id": property_id} + prop_details = self.get_prop_details(property_id) + + payload = { + "query": query, + "variables": variables, + } + + response = self.session.post(self.SEARCH_GQL_URL, json=payload) + response_json = response.json() + + property_info = response_json["data"]["property"] + + return [ + Property( + mls_id=property_id, + property_url=f"{self.PROPERTY_URL}{property_info['details']['permalink']}", + address=self._parse_address(property_info, search_type="handle_address"), + description=self._parse_description(property_info), + agents=prop_details.get("agents"), + nearby_schools=prop_details.get("schools"), + assessed_value=prop_details.get("assessed_value"), + estimated_value=prop_details.get("estimated_value"), + ) + ] + + def general_search(self, variables: dict, search_type: str) -> Dict[str, Union[int, list[Property]]]: + """ + Handles a location area & returns a list of properties + """ + if search_type != "address": + return {"total": 0, "properties": []} + results_query = """{ + count + total + results { + pending_date + property_id + list_date + status + last_sold_price + last_sold_date + list_price + price_per_sqft + flags { + is_contingent + is_pending + } + description { + type + sqft + beds + baths_full + baths_half + lot_sqft + sold_price + year_built + garage + sold_price + type + name + stories + text + } + source { + id + listing_id + } + hoa { + fee + } + location { + address { + street_direction + street_number + street_name + street_suffix + unit + city + state_code + postal_code + coordinate { + lon + lat + } + } + county { + name + fips_code + } + neighborhoods { + name + } + } + tax_record { + public_record_id + } + primary_photo { + href + } + photos { + href + } + } + } + }""" + + date_param = "" + if self.listing_type == ListingType.SOLD: + if self.date_from and self.date_to: + date_param = f'sold_date: {{ min: "{self.date_from}", max: "{self.date_to}" }}' + elif self.last_x_days: + date_param = f'sold_date: {{ min: "$today-{self.last_x_days}D" }}' + else: + if self.date_from and self.date_to: + date_param = f'list_date: {{ min: "{self.date_from}", max: "{self.date_to}" }}' + elif self.last_x_days: + date_param = f'list_date: {{ min: "$today-{self.last_x_days}D" }}' + + sort_param = ( + "sort: [{ field: sold_date, direction: desc }]" + if self.listing_type == ListingType.SOLD + else "sort: [{ field: list_date, direction: desc }]" + ) + + pending_or_contingent_param = ( + "or_filters: { contingent: true, pending: true }" if self.listing_type == ListingType.PENDING else "" + ) + + listing_type = ListingType.FOR_SALE if self.listing_type == ListingType.PENDING else self.listing_type + is_foreclosure = "" + + if variables.get("foreclosure") is True: + is_foreclosure = "foreclosure: true" + elif variables.get("foreclosure") is False: + is_foreclosure = "foreclosure: false" + + if search_type == "comps": #: comps search, came from an address + query = """query Property_search( + $coordinates: [Float]! + $radius: String! + $offset: Int!, + ) { + home_search( + query: { + %s + nearby: { + coordinates: $coordinates + radius: $radius + } + status: %s + %s + %s + } + %s + limit: 200 + offset: $offset + ) %s""" % ( + is_foreclosure, + listing_type.value.lower(), + date_param, + pending_or_contingent_param, + sort_param, + results_query, + ) + elif search_type == "area": #: general search, came from a general location + query = """query Home_search( + $city: String, + $county: [String], + $state_code: String, + $postal_code: String + $offset: Int, + ) { + home_search( + query: { + %s + city: $city + county: $county + postal_code: $postal_code + state_code: $state_code + status: %s + %s + %s + } + %s + limit: 200 + offset: $offset + ) %s""" % ( + is_foreclosure, + listing_type.value.lower(), + date_param, + pending_or_contingent_param, + sort_param, + results_query, + ) + else: #: general search, came from an address + query = ( + """query Property_search( + $property_id: [ID]! + $offset: Int!, + ) { + property_search( + query: { + property_id: $property_id + } + limit: 1 + offset: $offset + ) %s""" + % results_query + ) + + payload = { + "query": query, + "variables": variables, + } + + response = self.session.post(self.SEARCH_GQL_URL, json=payload) + response_json = response.json() + search_key = "home_search" if "home_search" in query else "property_search" + + properties: list[Property] = [] + + if ( + response_json is None + or "data" not in response_json + or response_json["data"] is None + or search_key not in response_json["data"] + or response_json["data"][search_key] is None + or "results" not in response_json["data"][search_key] + ): + return {"total": 0, "properties": []} + + def process_property(result: dict) -> Property | None: + mls = result["source"].get("id") if "source" in result and isinstance(result["source"], dict) else None + + if not mls and self.mls_only: + return + + able_to_get_lat_long = ( + result + and result.get("location") + and result["location"].get("address") + and result["location"]["address"].get("coordinate") + ) + + is_pending = result["flags"].get("is_pending") or result["flags"].get("is_contingent") + + if is_pending and self.listing_type != ListingType.PENDING: + return + + property_id = result["property_id"] + prop_details = self.get_prop_details(property_id) + + realty_property = Property( + mls=mls, + mls_id=( + result["source"].get("listing_id") + if "source" in result and isinstance(result["source"], dict) + else None + ), + property_url=( + f"{self.PROPERTY_URL}{property_id}" + if self.listing_type != ListingType.FOR_RENT + else f"{self.PROPERTY_URL}M{property_id}?listing_status=rental" + ), + status="PENDING" if is_pending else result["status"].upper(), + list_price=result["list_price"], + list_date=result["list_date"].split("T")[0] if result.get("list_date") else None, + prc_sqft=result.get("price_per_sqft"), + last_sold_date=result.get("last_sold_date"), + hoa_fee=result["hoa"]["fee"] if result.get("hoa") and isinstance(result["hoa"], dict) else None, + latitude=result["location"]["address"]["coordinate"].get("lat") if able_to_get_lat_long else None, + longitude=result["location"]["address"]["coordinate"].get("lon") if able_to_get_lat_long else None, + address=self._parse_address(result, search_type="general_search"), + description=self._parse_description(result), + neighborhoods=self._parse_neighborhoods(result), + county=result["location"]["county"].get("name") if result["location"]["county"] else None, + fips_code=result["location"]["county"].get("fips_code") if result["location"]["county"] else None, + days_on_mls=self.calculate_days_on_mls(result), + agents=prop_details.get("agents"), + nearby_schools=prop_details.get("schools"), + assessed_value=prop_details.get("assessed_value"), + estimated_value=prop_details.get("estimated_value"), + ) + return realty_property + + with ThreadPoolExecutor(max_workers=self.NUM_PROPERTY_WORKERS) as executor: + futures = [ + executor.submit(process_property, result) for result in response_json["data"][search_key]["results"] + ] + + for future in as_completed(futures): + result = future.result() + if result: + properties.append(result) + + return { + "total": response_json["data"][search_key]["total"], + "properties": properties, + } + + def search(self): + location_info = self.handle_location() + if not location_info: + return [] + + location_type = location_info["area_type"] + + search_variables = { + "offset": 0, + } + + search_type = ( + "comps" + if self.radius and location_type == "address" + else "address" if location_type == "address" and not self.radius else "area" + ) + if location_type == "address": + if not self.radius: #: single address search, non comps + property_id = location_info["mpr_id"] + search_variables |= {"property_id": property_id} + + gql_results = self.general_search(search_variables, search_type=search_type) + if gql_results["total"] == 0: + listing_id = self.get_latest_listing_id(property_id) + if listing_id is None: + return self.handle_address(property_id) + else: + return self.handle_listing(listing_id) + else: + return gql_results["properties"] + + else: #: general search, comps (radius) + if not location_info.get("centroid"): + return [] + + coordinates = list(location_info["centroid"].values()) + search_variables |= { + "coordinates": coordinates, + "radius": "{}mi".format(self.radius), + } + + elif location_type == "postal_code": + search_variables |= { + "postal_code": location_info.get("postal_code"), + } + + else: #: general search, location + search_variables |= { + "city": location_info.get("city"), + "county": location_info.get("county"), + "state_code": location_info.get("state_code"), + "postal_code": location_info.get("postal_code"), + } + + if self.foreclosure: + search_variables["foreclosure"] = self.foreclosure + + result = self.general_search(search_variables, search_type=search_type) + total = result["total"] + homes = result["properties"] + + with ThreadPoolExecutor(max_workers=10) as executor: + futures = [ + executor.submit( + self.general_search, + variables=search_variables | {"offset": i}, + search_type=search_type, + ) + for i in range(200, min(total, 10000), 200) + ] + + for future in as_completed(futures): + homes.extend(future.result()["properties"]) + + return homes + + def get_prop_details(self, property_id: str) -> dict: + payload = f'{{"query":"query GetHome($property_id: ID!) {{\\n home(property_id: $property_id) {{\\n __typename\\n\\n consumerAdvertisers: consumer_advertisers {{\\n __typename\\n type\\n advertiserId: advertiser_id\\n name\\n phone\\n type\\n href\\n slogan\\n photo {{\\n __typename\\n href\\n }}\\n showRealtorLogo: show_realtor_logo\\n hours\\n }}\\n\\n\\n nearbySchools: nearby_schools(radius: 5.0, limit_per_level: 3) {{ __typename schools {{ district {{ __typename id name }} }} }} taxHistory: tax_history {{ __typename tax year assessment {{ __typename building land total }} }}estimates {{ __typename currentValues: current_values {{ __typename source {{ __typename type name }} estimate estimateHigh: estimate_high estimateLow: estimate_low date isBestHomeValue: isbest_homevalue }} }} }}\\n}}\\n","variables":{{"property_id":"{property_id}"}}}}' + response = self.session.post(self.PROPERTY_GQL, data=payload) + + def get_key(keys: list): + try: + data = response.json() + for key in keys: + data = data[key] + return data + except (KeyError, TypeError): + return {} + + ads = get_key(["data", "home", "consumerAdvertisers"]) + schools = get_key(["data", "home", "nearbySchools", "schools"]) + assessed_value = get_key(["data", "home", "taxHistory", 0, "assessment", "total"]) + estimated_value = get_key(["data", "home", "estimates", "currentValues", 0, "estimate"]) + + agents = [Agent(name=ad["name"], phone=ad["phone"]) for ad in ads] + + schools = [school["district"]["name"] for school in schools] + return { + "agents": agents if agents else None, + "schools": schools if schools else None, + "assessed_value": assessed_value if assessed_value else None, + "estimated_value": estimated_value if estimated_value else None, + } + + @staticmethod + def _parse_neighborhoods(result: dict) -> Optional[str]: + neighborhoods_list = [] + neighborhoods = result["location"].get("neighborhoods", []) + + if neighborhoods: + for neighborhood in neighborhoods: + name = neighborhood.get("name") + if name: + neighborhoods_list.append(name) + + return ", ".join(neighborhoods_list) if neighborhoods_list else None + + @staticmethod + def handle_none_safely(address_part): + if address_part is None: + return "" + + return address_part + + def _parse_address(self, result: dict, search_type): + if search_type == "general_search": + address = result["location"]["address"] + else: + address = result["address"] + + return Address( + street=" ".join( + [ + self.handle_none_safely(address.get("street_number")), + self.handle_none_safely(address.get("street_direction")), + self.handle_none_safely(address.get("street_name")), + self.handle_none_safely(address.get("street_suffix")), + ] + ).strip(), + unit=address["unit"], + city=address["city"], + state=address["state_code"], + zip=address["postal_code"], + ) + + @staticmethod + def _parse_description(result: dict) -> Description: + description_data = result.get("description", {}) + + if description_data is None or not isinstance(description_data, dict): + description_data = {} + + style = description_data.get("type", "") + if style is not None: + style = style.upper() + + primary_photo = "" + if result and "primary_photo" in result: + primary_photo_info = result["primary_photo"] + if primary_photo_info and "href" in primary_photo_info: + primary_photo_href = primary_photo_info["href"] + primary_photo = primary_photo_href.replace("s.jpg", "od-w480_h360_x2.webp?w=1080&q=75") + + return Description( + primary_photo=primary_photo, + alt_photos=RealtorScraper.process_alt_photos(result.get("photos")), + style=PropertyType(style) if style else None, + beds=description_data.get("beds"), + baths_full=description_data.get("baths_full"), + baths_half=description_data.get("baths_half"), + sqft=description_data.get("sqft"), + lot_sqft=description_data.get("lot_sqft"), + sold_price=description_data.get("sold_price"), + year_built=description_data.get("year_built"), + garage=description_data.get("garage"), + stories=description_data.get("stories"), + text=description_data.get("text"), + ) + + @staticmethod + def calculate_days_on_mls(result: dict) -> Optional[int]: + list_date_str = result.get("list_date") + list_date = datetime.strptime(list_date_str.split("T")[0], "%Y-%m-%d") if list_date_str else None + last_sold_date_str = result.get("last_sold_date") + last_sold_date = datetime.strptime(last_sold_date_str, "%Y-%m-%d") if last_sold_date_str else None + today = datetime.now() + + if list_date: + if result["status"] == "sold": + if last_sold_date: + days = (last_sold_date - list_date).days + if days >= 0: + return days + elif result["status"] in ("for_sale", "for_rent"): + days = (today - list_date).days + if days >= 0: + return days + + @staticmethod + def process_alt_photos(photos_info): + try: + alt_photos = [] + if photos_info: + for photo_info in photos_info: + href = photo_info.get("href", "") + alt_photo_href = href.replace("s.jpg", "od-w480_h360_x2.webp?w=1080&q=75") + alt_photos.append(alt_photo_href) + return alt_photos + except Exception: + pass diff --git a/homeharvest/homeharvest/exceptions.py b/homeharvest/homeharvest/exceptions.py new file mode 100644 index 0000000..c3f5111 --- /dev/null +++ b/homeharvest/homeharvest/exceptions.py @@ -0,0 +1,6 @@ +class InvalidListingType(Exception): + """Raised when a provided listing type is does not exist.""" + + +class InvalidDate(Exception): + """Raised when only one of date_from or date_to is provided or not in the correct format. ex: 2023-10-23""" diff --git a/homeharvest/homeharvest/utils.py b/homeharvest/homeharvest/utils.py new file mode 100644 index 0000000..d164b9a --- /dev/null +++ b/homeharvest/homeharvest/utils.py @@ -0,0 +1,111 @@ +import pandas as pd +from datetime import datetime +from .core.scrapers.models import Property, ListingType +from .exceptions import InvalidListingType, InvalidDate + +ordered_properties = [ + "property_url", + "mls", + "mls_id", + "status", + "text", + "style", + "street", + "unit", + "city", + "state", + "zip_code", + "beds", + "full_baths", + "half_baths", + "sqft", + "year_built", + "days_on_mls", + "list_price", + "list_date", + "sold_price", + "last_sold_date", + "assessed_value", + "estimated_value", + "lot_sqft", + "price_per_sqft", + "latitude", + "longitude", + "neighborhoods", + "county", + "fips_code", + "stories", + "hoa_fee", + "parking_garage", + "agent", + "broker", + "broker_phone", + "nearby_schools", + "primary_photo", + "alt_photos", +] + + +def process_result(result: Property) -> pd.DataFrame: + prop_data = {prop: None for prop in ordered_properties} + prop_data.update(result.__dict__) + + if "address" in prop_data: + address_data = prop_data["address"] + prop_data["street"] = address_data.street + prop_data["unit"] = address_data.unit + prop_data["city"] = address_data.city + prop_data["state"] = address_data.state + prop_data["zip_code"] = address_data.zip + + if "agents" in prop_data: + agents = prop_data["agents"] + if agents: + prop_data["agent"] = agents[0].name + if len(agents) > 1: + prop_data["broker"] = agents[1].name + prop_data["broker_phone"] = agents[1].phone + + prop_data["price_per_sqft"] = prop_data["prc_sqft"] + prop_data["nearby_schools"] = filter(None, prop_data["nearby_schools"]) if prop_data["nearby_schools"] else None + prop_data["nearby_schools"] = ", ".join(set(prop_data["nearby_schools"])) if prop_data["nearby_schools"] else None + + description = result.description + prop_data["primary_photo"] = description.primary_photo + prop_data["alt_photos"] = ", ".join(description.alt_photos) + prop_data["style"] = description.style if type(description.style) == str else description.style.value + prop_data["beds"] = description.beds + prop_data["full_baths"] = description.baths_full + prop_data["half_baths"] = description.baths_half + prop_data["sqft"] = description.sqft + prop_data["lot_sqft"] = description.lot_sqft + prop_data["sold_price"] = description.sold_price + prop_data["year_built"] = description.year_built + prop_data["parking_garage"] = description.garage + prop_data["stories"] = description.stories + prop_data["text"] = description.text + + properties_df = pd.DataFrame([prop_data]) + properties_df = properties_df.reindex(columns=ordered_properties) + + return properties_df[ordered_properties] + + +def validate_input(listing_type: str) -> None: + if listing_type.upper() not in ListingType.__members__: + raise InvalidListingType(f"Provided listing type, '{listing_type}', does not exist.") + + +def validate_dates(date_from: str | None, date_to: str | None) -> None: + if (date_from is not None and date_to is None) or (date_from is None and date_to is not None): + raise InvalidDate("Both date_from and date_to must be provided.") + + if date_from and date_to: + try: + date_from_obj = datetime.strptime(date_from, "%Y-%m-%d") + date_to_obj = datetime.strptime(date_to, "%Y-%m-%d") + + if date_to_obj < date_from_obj: + raise InvalidDate("date_to must be after date_from.") + except ValueError as e: + raise InvalidDate(f"Invalid date format or range")