Module DINO2.tools.export.csv

Functions for simple csv exports

Expand source code
# -*- coding: utf-8 -*-
"""Functions for simple csv exports"""

from __future__ import annotations

from collections import defaultdict
from csv import writer
from datetime import date, datetime, timedelta
import itertools
import os
from sqlalchemy import tuple_
from sqlalchemy.orm import joinedload, load_only, Query, Load
from sqlalchemy.orm.session import Session
from typing import Optional, Set, Dict, Tuple, List

from ...model import Base, Version, calendar, fares, location, operational, network, schedule


def stops(session: Session, fname: str, version_id: Optional[int] = None) -> None:
    """Export csv file of all stops, one row per stop position"""
    rows = [
        (
            "version", "stopid", "stopname", "placename", "stopshortname",
            "stopifopt", "areaid", "areaname", "areaifopt",
            "posid", "posname", "posifopt", "X", "Y", "farezones"
        )]
    q = session.query(location.Stop).options(joinedload('points').joinedload('area'))
    if version_id is not None:
        q = q.filter_by(version_id=version_id)
    for stop in q.all():
        for point in stop.points:
            area_id, area_name, area_ifopt = (point.area.id, point.area.name, point.area.ifopt) if point.area is not None else (None, None, None)
            rows.append(
                (
                    stop.version_id, stop.id, stop.name, stop.place, stop.abbr,
                    stop.ifopt, area_id, area_name, area_ifopt,
                    point.id, point.name, point.ifopt, point.pos_x, point.pos_y,
                    ",".join(str(fzid) for fzid in stop.fare_zone_ids)
                ))
    with open(fname, 'w', encoding='utf-8') as f:
        writer(f, delimiter=";", lineterminator='\n').writerows(rows)


def courses(session: Session, path: str, line_ids: Optional[Set[int]] = None, export_full_name: bool = False, version_id: Optional[int] = None) -> None:
    """Export csv files of all course stops per course"""
    q = session.query(network.Course).options(joinedload('stops').joinedload('stop'), joinedload('stops').joinedload('stop_point'))
    if version_id is not None:
        q = q.filter_by(version_id=version_id)
    if line_ids:
        q = q.filter(network.Course.line.in_(line_ids))
    for course in q.all():
        _from_stop, _to_stop = course.stops[0], course.stops[-1]

        def try_name(cs: network.CourseStop) -> str:
            name = cs.stop.name_noloc
            if name:
                return name.replace("/", "").replace(".", "")
            name = cs.stop.name
            try:
                return name.split(" ")[1].replace("/", "").replace(".", "")
            except IndexError:
                return name.replace("/", "").replace(".", "")
        _from, _to = try_name(_from_stop), try_name(_to_stop)
        fname = os.path.join(path, f"dino_{course.name}_{course.id}_{_from}_{_to}.csv")
        os.makedirs(os.path.dirname(fname), exist_ok=True)
        with open(fname, 'w', encoding='utf-8') as f:
            writer(f, delimiter=";", lineterminator='\n').writerows([
                (
                    cs.consec_stop_nr,
                    (cs.stop_point.ifopt if (cs.stop_point is not None and cs.stop_point.ifopt) else cs.stop.ifopt),
                    (cs.stop.name if (export_full_name or not cs.stop.name_noloc) else cs.stop.name_noloc)
                ) for cs in course.stops])


def trips(session: Session, fname: str, date_: date, line_ids: Optional[Set[int]] = None, version_id: Optional[int] = None) -> None:
    """Export csv file of trips for a given date"""
    rows = [("lineid", "linesymbol", "from", "to", "startt", "endt", "tripid", "wkt")]
    q = schedule.Trip.query_for_date(session, date_)
    if version_id is not None:
        q = q.filter_by(version_id=version_id)
    if line_ids:
        q = q.filter(schedule.Trip.line.in_(line_ids))
    for trip in q.all():
        rows.append(
            (
                trip.line, trip.course.name, trip.dep_stop.name, trip.arr_stop.name,
                int(trip.departure_time.total_seconds()),
                int(trip.departure_time.total_seconds()+trip.duration.total_seconds()),
                trip.id, trip.wkt()
            ))
    with open(fname, 'w', encoding='utf-8') as f:
        writer(f, delimiter=";", lineterminator='\n').writerows(rows)


def line_stats(session: Session, fname: str, version_id: Optional[int] = None) -> None:
    """Line statistics with total year-kilometers per version->line->course"""
    lq = session.query(schedule.Trip, network.Course.length) \
        .order_by(schedule.Trip.version_id.asc(), schedule.Trip.line.asc(), schedule.Trip.line_dir.asc(), schedule.Trip.course_id.asc()) \
        .options(
            joinedload('course', innerjoin=True),
            joinedload('day_attribute', innerjoin=True),
            joinedload('restriction'),
            *(load_only(_) for _ in ('line', 'course_id', 'line_dir', 'day_attribute_id', 'restriction_id'))
        ).join(network.Course)
    if version_id is not None:
        lq = lq.filter_by(version_id=version_id)
    total = 0
    rows = [("version", "line", "course", "km")]
    for versionid, version_grouper in itertools.groupby(lq.all(), lambda t_l: t_l[0].version_id):
        version_total = 0
        for lineid, line_grouper in itertools.groupby(version_grouper, lambda t_l: t_l[0].line):
            line_total = 0
            for courseid, course_grouper in itertools.groupby(line_grouper, lambda t_l: t_l[0].course_id):
                course_total = 0
                for trip, length in course_grouper:
                    days = len(trip.dates)
                    km = days*(length/1000)
                    course_total += km
                rows.append((versionid, lineid, courseid, f"{course_total:.2f}"))
                line_total += course_total
            rows.append((versionid, lineid, "total", f"{line_total:.2f}"))
            version_total += line_total
        rows.append((versionid, "total", "", f"{version_total:.2f}"))
        total += version_total
    rows.append(("total", "", "", f"{total:.2f}"))
    with open(fname, 'w', encoding='utf-8') as f:
        writer(f, delimiter=";", lineterminator='\n').writerows(rows)


def departure_stats(tripquery: Query, fname: str) -> None:
    """Departures from->to, between, and from stops."""
    tripquery = tripquery.options(
        load_only('version_id'), load_only('line'), load_only('id'), load_only('course_id'), load_only('line_dir'),
        joinedload('course', innerjoin=True).load_only('version_id').load_only('line').load_only('id').load_only('line_dir')
        .joinedload('stops', innerjoin=True).load_only('version_id').load_only('line').load_only('course_id').load_only('line_dir').load_only('consec_stop_nr')
        .joinedload('stop', innerjoin=True).load_only('version_id').load_only('ifopt').load_only('name'))

    pairs: Dict[Tuple[location.Stop, location.Stop], int] = defaultdict(int)
    for trip in tripquery.all():
        for _stopi, cstop in enumerate(trip.course.stops):
            if cstop.consec_stop_nr != len(trip.course.stops):
                pairs[(cstop.stop, trip.course.stops[_stopi+1].stop)] += 1

    # todo: ggf. version "ignorieren", stops version-uebergreifend zuordnen?
    rows = [("type", "version", "from/A", "l-ifopt", "to/B", "r-ifopt", "count")]
    for (sf, st), dc in sorted(pairs.items(), key=lambda i: -i[1]):
        rows.append(("from-to", sf.version_id, sf.name, sf.ifopt, st.name, st.ifopt, dc))

    pairsbidi = {}
    for (sf, st), dc in pairs.items():
        if (sf, st) not in pairsbidi and (st, sf) not in pairsbidi:
            pairsbidi[(sf, st)] = dc
            if (st, sf) in pairs and (sf, st) != (st, sf):
                pairsbidi[(sf, st)] += pairs[(st, sf)]

    for (sf, st), dc in sorted(pairsbidi.items(), key=lambda i: -i[1]):
        rows.append(("between", sf.version_id, sf.name, sf.ifopt, st.name, st.ifopt, dc))

    stopdeps = {stop: sum(dc for ((sf, st), dc) in pairs.items() if stop == sf) for (stop, _) in pairs}
    for stop, dc in sorted(stopdeps.items(), key=lambda i: -i[1]):
        rows.append(("from", sf.version_id, stop.name, stop.ifopt, "", "", dc))

    with open(fname, 'w', encoding='utf-8') as f:
        writer(f, delimiter=";", lineterminator='\n').writerows(rows)


def departures(stop: location.Stop, day: date, fname: str, days: int = 1) -> None:
    """Departures from a given stop for a given date (range)"""
    servinglines = set((c.line, c.id) for c in stop.courses)
    deps: List[Tuple[datetime, schedule.TripStop]] = []
    for currdate in (day + timedelta(days=n) for n in range(days)):
        tripquery = schedule.Trip.query_for_date(stop._session, currdate) \
            .filter(
                (schedule.Trip.version_id == stop.version_id)
                & tuple_(schedule.Trip.line, schedule.Trip.course_id).in_(servinglines)
            ).options(
                load_only('version_id'), load_only('line'), load_only('id'), load_only('course_id'), load_only('line_dir'), load_only('timing_group'), load_only('departure_time'), load_only('arr_stop_id'),
                joinedload('course', innerjoin=True).load_only('version_id').load_only('line').load_only('id').load_only('line_dir').load_only('name'),
                joinedload('arr_stop').load_only('name')
            )
        trips = [(t, t.trip_stops(simple=True)) for t in tripquery.all()]
        # todo: eventuell mal das trip_stops sich sparen koennen.
        for trip, tripstops in trips:
            deps.extend(
                (datetime.combine(currdate, datetime.min.time()) + ts.dep_time, ts)
                for n, ts
                in filter(
                    lambda n_ts: n_ts[1].stop == stop and n_ts[0] != len(tripstops),
                    enumerate(tripstops, start=1)
                ))
    rows = [("date", "time", "plat", "linenum", "direction")]
    rows.extend(
        (
            deptime.strftime("%Y-%m-%d"), deptime.strftime("%H:%M:%S"),
            ts.stop_point.name if ts.stop_point is not None else "",
            ts.trip.course.name, ts.trip.arr_stop.name
        ) for deptime, ts in sorted(deps, key=lambda d: d[0]))
    with open(fname, 'w', encoding='utf-8') as f:
        writer(f, delimiter=";", lineterminator='\n').writerows(rows)

Functions

def stops(session: Session, fname: str, version_id: Optional[int] = None) ‑> NoneType

Export csv file of all stops, one row per stop position

Expand source code
def stops(session: Session, fname: str, version_id: Optional[int] = None) -> None:
    """Export csv file of all stops, one row per stop position"""
    rows = [
        (
            "version", "stopid", "stopname", "placename", "stopshortname",
            "stopifopt", "areaid", "areaname", "areaifopt",
            "posid", "posname", "posifopt", "X", "Y", "farezones"
        )]
    q = session.query(location.Stop).options(joinedload('points').joinedload('area'))
    if version_id is not None:
        q = q.filter_by(version_id=version_id)
    for stop in q.all():
        for point in stop.points:
            area_id, area_name, area_ifopt = (point.area.id, point.area.name, point.area.ifopt) if point.area is not None else (None, None, None)
            rows.append(
                (
                    stop.version_id, stop.id, stop.name, stop.place, stop.abbr,
                    stop.ifopt, area_id, area_name, area_ifopt,
                    point.id, point.name, point.ifopt, point.pos_x, point.pos_y,
                    ",".join(str(fzid) for fzid in stop.fare_zone_ids)
                ))
    with open(fname, 'w', encoding='utf-8') as f:
        writer(f, delimiter=";", lineterminator='\n').writerows(rows)
def courses(session: Session, path: str, line_ids: Optional[Set[int]] = None, export_full_name: bool = False, version_id: Optional[int] = None) ‑> NoneType

Export csv files of all course stops per course

Expand source code
def courses(session: Session, path: str, line_ids: Optional[Set[int]] = None, export_full_name: bool = False, version_id: Optional[int] = None) -> None:
    """Export csv files of all course stops per course"""
    q = session.query(network.Course).options(joinedload('stops').joinedload('stop'), joinedload('stops').joinedload('stop_point'))
    if version_id is not None:
        q = q.filter_by(version_id=version_id)
    if line_ids:
        q = q.filter(network.Course.line.in_(line_ids))
    for course in q.all():
        _from_stop, _to_stop = course.stops[0], course.stops[-1]

        def try_name(cs: network.CourseStop) -> str:
            name = cs.stop.name_noloc
            if name:
                return name.replace("/", "").replace(".", "")
            name = cs.stop.name
            try:
                return name.split(" ")[1].replace("/", "").replace(".", "")
            except IndexError:
                return name.replace("/", "").replace(".", "")
        _from, _to = try_name(_from_stop), try_name(_to_stop)
        fname = os.path.join(path, f"dino_{course.name}_{course.id}_{_from}_{_to}.csv")
        os.makedirs(os.path.dirname(fname), exist_ok=True)
        with open(fname, 'w', encoding='utf-8') as f:
            writer(f, delimiter=";", lineterminator='\n').writerows([
                (
                    cs.consec_stop_nr,
                    (cs.stop_point.ifopt if (cs.stop_point is not None and cs.stop_point.ifopt) else cs.stop.ifopt),
                    (cs.stop.name if (export_full_name or not cs.stop.name_noloc) else cs.stop.name_noloc)
                ) for cs in course.stops])
def trips(session: Session, fname: str, date_: date, line_ids: Optional[Set[int]] = None, version_id: Optional[int] = None) ‑> NoneType

Export csv file of trips for a given date

Expand source code
def trips(session: Session, fname: str, date_: date, line_ids: Optional[Set[int]] = None, version_id: Optional[int] = None) -> None:
    """Export csv file of trips for a given date"""
    rows = [("lineid", "linesymbol", "from", "to", "startt", "endt", "tripid", "wkt")]
    q = schedule.Trip.query_for_date(session, date_)
    if version_id is not None:
        q = q.filter_by(version_id=version_id)
    if line_ids:
        q = q.filter(schedule.Trip.line.in_(line_ids))
    for trip in q.all():
        rows.append(
            (
                trip.line, trip.course.name, trip.dep_stop.name, trip.arr_stop.name,
                int(trip.departure_time.total_seconds()),
                int(trip.departure_time.total_seconds()+trip.duration.total_seconds()),
                trip.id, trip.wkt()
            ))
    with open(fname, 'w', encoding='utf-8') as f:
        writer(f, delimiter=";", lineterminator='\n').writerows(rows)
def line_stats(session: Session, fname: str, version_id: Optional[int] = None) ‑> NoneType

Line statistics with total year-kilometers per version->line->course

Expand source code
def line_stats(session: Session, fname: str, version_id: Optional[int] = None) -> None:
    """Line statistics with total year-kilometers per version->line->course"""
    lq = session.query(schedule.Trip, network.Course.length) \
        .order_by(schedule.Trip.version_id.asc(), schedule.Trip.line.asc(), schedule.Trip.line_dir.asc(), schedule.Trip.course_id.asc()) \
        .options(
            joinedload('course', innerjoin=True),
            joinedload('day_attribute', innerjoin=True),
            joinedload('restriction'),
            *(load_only(_) for _ in ('line', 'course_id', 'line_dir', 'day_attribute_id', 'restriction_id'))
        ).join(network.Course)
    if version_id is not None:
        lq = lq.filter_by(version_id=version_id)
    total = 0
    rows = [("version", "line", "course", "km")]
    for versionid, version_grouper in itertools.groupby(lq.all(), lambda t_l: t_l[0].version_id):
        version_total = 0
        for lineid, line_grouper in itertools.groupby(version_grouper, lambda t_l: t_l[0].line):
            line_total = 0
            for courseid, course_grouper in itertools.groupby(line_grouper, lambda t_l: t_l[0].course_id):
                course_total = 0
                for trip, length in course_grouper:
                    days = len(trip.dates)
                    km = days*(length/1000)
                    course_total += km
                rows.append((versionid, lineid, courseid, f"{course_total:.2f}"))
                line_total += course_total
            rows.append((versionid, lineid, "total", f"{line_total:.2f}"))
            version_total += line_total
        rows.append((versionid, "total", "", f"{version_total:.2f}"))
        total += version_total
    rows.append(("total", "", "", f"{total:.2f}"))
    with open(fname, 'w', encoding='utf-8') as f:
        writer(f, delimiter=";", lineterminator='\n').writerows(rows)
def departure_stats(tripquery: Query, fname: str) ‑> NoneType

Departures from->to, between, and from stops.

Expand source code
def departure_stats(tripquery: Query, fname: str) -> None:
    """Departures from->to, between, and from stops."""
    tripquery = tripquery.options(
        load_only('version_id'), load_only('line'), load_only('id'), load_only('course_id'), load_only('line_dir'),
        joinedload('course', innerjoin=True).load_only('version_id').load_only('line').load_only('id').load_only('line_dir')
        .joinedload('stops', innerjoin=True).load_only('version_id').load_only('line').load_only('course_id').load_only('line_dir').load_only('consec_stop_nr')
        .joinedload('stop', innerjoin=True).load_only('version_id').load_only('ifopt').load_only('name'))

    pairs: Dict[Tuple[location.Stop, location.Stop], int] = defaultdict(int)
    for trip in tripquery.all():
        for _stopi, cstop in enumerate(trip.course.stops):
            if cstop.consec_stop_nr != len(trip.course.stops):
                pairs[(cstop.stop, trip.course.stops[_stopi+1].stop)] += 1

    # todo: ggf. version "ignorieren", stops version-uebergreifend zuordnen?
    rows = [("type", "version", "from/A", "l-ifopt", "to/B", "r-ifopt", "count")]
    for (sf, st), dc in sorted(pairs.items(), key=lambda i: -i[1]):
        rows.append(("from-to", sf.version_id, sf.name, sf.ifopt, st.name, st.ifopt, dc))

    pairsbidi = {}
    for (sf, st), dc in pairs.items():
        if (sf, st) not in pairsbidi and (st, sf) not in pairsbidi:
            pairsbidi[(sf, st)] = dc
            if (st, sf) in pairs and (sf, st) != (st, sf):
                pairsbidi[(sf, st)] += pairs[(st, sf)]

    for (sf, st), dc in sorted(pairsbidi.items(), key=lambda i: -i[1]):
        rows.append(("between", sf.version_id, sf.name, sf.ifopt, st.name, st.ifopt, dc))

    stopdeps = {stop: sum(dc for ((sf, st), dc) in pairs.items() if stop == sf) for (stop, _) in pairs}
    for stop, dc in sorted(stopdeps.items(), key=lambda i: -i[1]):
        rows.append(("from", sf.version_id, stop.name, stop.ifopt, "", "", dc))

    with open(fname, 'w', encoding='utf-8') as f:
        writer(f, delimiter=";", lineterminator='\n').writerows(rows)
def departures(stop: location.Stop, day: date, fname: str, days: int = 1) ‑> NoneType

Departures from a given stop for a given date (range)

Expand source code
def departures(stop: location.Stop, day: date, fname: str, days: int = 1) -> None:
    """Departures from a given stop for a given date (range)"""
    servinglines = set((c.line, c.id) for c in stop.courses)
    deps: List[Tuple[datetime, schedule.TripStop]] = []
    for currdate in (day + timedelta(days=n) for n in range(days)):
        tripquery = schedule.Trip.query_for_date(stop._session, currdate) \
            .filter(
                (schedule.Trip.version_id == stop.version_id)
                & tuple_(schedule.Trip.line, schedule.Trip.course_id).in_(servinglines)
            ).options(
                load_only('version_id'), load_only('line'), load_only('id'), load_only('course_id'), load_only('line_dir'), load_only('timing_group'), load_only('departure_time'), load_only('arr_stop_id'),
                joinedload('course', innerjoin=True).load_only('version_id').load_only('line').load_only('id').load_only('line_dir').load_only('name'),
                joinedload('arr_stop').load_only('name')
            )
        trips = [(t, t.trip_stops(simple=True)) for t in tripquery.all()]
        # todo: eventuell mal das trip_stops sich sparen koennen.
        for trip, tripstops in trips:
            deps.extend(
                (datetime.combine(currdate, datetime.min.time()) + ts.dep_time, ts)
                for n, ts
                in filter(
                    lambda n_ts: n_ts[1].stop == stop and n_ts[0] != len(tripstops),
                    enumerate(tripstops, start=1)
                ))
    rows = [("date", "time", "plat", "linenum", "direction")]
    rows.extend(
        (
            deptime.strftime("%Y-%m-%d"), deptime.strftime("%H:%M:%S"),
            ts.stop_point.name if ts.stop_point is not None else "",
            ts.trip.course.name, ts.trip.arr_stop.name
        ) for deptime, ts in sorted(deps, key=lambda d: d[0]))
    with open(fname, 'w', encoding='utf-8') as f:
        writer(f, delimiter=";", lineterminator='\n').writerows(rows)