Claas' Blog

Download Lap Times from MyRcm

CLAAS VONDERSCHEN
-

Many venues hosting events for remote controlled cars in my area publish the results on myrcm.ch. Unfortunately the website does not offer a convenient way to export the lap times. Here is a quick python script, no dependencies required, to download an event’s lap times to one csv file per run:

#!/usr/bin/python

import argparse
from urllib.parse import urlparse, urlencode, ParseResult
from urllib import request
import re
import os
import pathlib as p
import json
from typing import Optional, Self
from collections.abc import Callable
from datetime import timedelta
import xml.etree.ElementTree as ET

URL = ParseResult

REGEX = r"reportKey=(\d+)', '([\w\d\s:-]+)'"

class ReportLocation:
  def __init__(self, location: URL, name: str):
    self.location = location
    self.name = name

  def __str__(self):
    return str({"location": location, "name": name})

  def _eq__(self, other):
    if not isinstance(other, ReportLocation):
      return False
    return self.location == other.location

  def __hash__(self):
    return hash(self.location.query)

class Lap:
  def __init__(self, number: int, driver: str, duration: timedelta):
    self.number = number
    self._driver = driver
    self._duration = duration

  @property
  def driver(self):
    return re.sub(",", "", self._driver)

  @property
  def lap_time(self) -> int:
    return round(self._duration / timedelta(milliseconds=1))

class Report:
  def __init__(self, name: str, laps: [Lap]):
    self.name = name
    self.laps = laps

  def filter_laps_by_driver(self, drivers: [str]):
    drivers = list(map(lambda it: it.lower(), drivers))
    self.laps = list(filter(lambda lap: lap.driver.lower() in drivers, self.laps))

  def has_no_laps(self):
    return len(self.laps) == 0

  @staticmethod
  def from_json(name: str, json: str) -> Optional[Self]:
    pass

def download_start_html(args) -> str:
  with request.urlopen(args.report_location) as response:
    if response.status < 200 or response.status > 299:
      raise Error(f"failed to fetch {args.report_location} with status {response.status}")
    return response.read().decode("utf-8")

def find_reports(args, html: str) -> [ReportLocation]:
  matches = set(re.findall(REGEX, html))
  base_url = urlparse(args.report_location)
  return {ReportLocation(base_url._replace(query=f"reportKey={query}"), name) for (query, name) in matches}

def download_report(report_location: ReportLocation) -> str:
  data = urlencode({"cType": "json", "ajax": True}) \
    .encode("ascii")
  req = request.Request(report_location.location.geturl(), data)
  with request.urlopen(req) as response:
    if response.status < 200 or response.status > 299:
      raise Error(f"failed to fetch {args.report_location} with status {response.status}")
    return report_location.name, json.loads(response.read().decode("utf-8"))["DATA"][0]

def parse_lap_time(time: str) -> timedelta:
  match = re.match(r"((\d+):)?(\d+).(\d+)", time)
  if match is None:
    raise Error(f"failed to parse time {time}")
  if match.group(3) is None or match.group(4) is None:
    raise Error(f"missing seconds or milliseconds im time {time}")
  return timedelta(
    minutes = int(match.group(2)) if match.group(2) is not None else 0,
    seconds = int(match.group(3)),
    milliseconds = int(match.group(4)))

def parse_report_response(name: str, body: str) -> Optional[Report]:
  table_str = re.search(r"<h4 id=\"title\">Rundenzeiten</h4>(<table[\w\W]*?</table>)", body)
  if table_str is None:
    return None
  table_str = table_str.group(1)
  table_str = re.sub(r"width=\".*?\"", "", table_str)
  table_str = re.sub(r"class=\".*?\"", "", table_str)
  root = ET.fromstring(table_str) 
  drivers = root.findall("./thead/tr/th")
  drivers = map(lambda it: it.text, drivers)
  drivers = filter(lambda it: it is not None and it != "", drivers)
  drivers = {i: driver for i, driver in enumerate(drivers)}
  laps = []
  for i, row in enumerate(root.findall("./tr")):
    for j, cell in enumerate(row.findall("./td")):
      if j == 0:
        continue
      if cell.find("./b") is not None:
        cell = cell.find("./b")
      if cell.text is None or cell.text == "":
        continue
      laps.append(Lap(i+1, drivers[j], parse_lap_time(cell.text)))
  return Report(name, laps)
  

def filter_report(args, report: [Report]) -> Optional[Report]:
  if args.driver is None:
    return True
  report.filter_laps_by_driver(args.driver)
  if report.has_no_laps():
    return False
  return True
  

def filename(args, i: int, report: Report) -> str:
  if args.count_filenames:
    return f"run_{i + args.offset}.csv" 
  else:
    return re.sub(r"[^\w-]", "_", report.name) + ".csv"

def write_report(args, report: Report, filename: str):
  cwd = p.Path(os.getcwd())
  out_path = cwd / filename
  if cwd not in out_path.parents:
    return
  if out_path.exists() and not args.force:
    return
  with open(out_path, "w") as file:
    file.write("lap_num,driver,lap_time ms\n")
    for lap in report.laps:
      file.write(f"{lap.number},{lap.driver},{lap.lap_time}\n")
     

def main(args):
  start_html = download_start_html(args)
  report_locations = find_reports(args, start_html)
  report_xmls = map(download_report, report_locations)
  reports = map(lambda it: parse_report_response(it[0], it[1]), report_xmls)
  reports = filter(lambda it: it is not None, reports)
  reports = filter(lambda report: filter_report(args, report), reports)
  for i, report in enumerate(reports):
    write_report(args, report, filename(args, i, report))

if __name__ == "__main__":
  parser = argparse.ArgumentParser(prog="download_myrcm", description="download lap times from myrcm.ch")
  parser.add_argument("report_location", help="url to myrcm report")
  parser.add_argument("-c", "--count_filenames", 
                      help="use run_%%i.csv as filenames", 
                      action="store_true")
  parser.add_argument("-o", "--offset", type=int,
                      help="add offset to i, if count_filenames is active", default=0)
  parser.add_argument("-d", "--driver", type=str, nargs="*", help="retain specified drivers. lowercase exactly matched")
  parser.add_argument("-f", "--force", action="store_true", help="overwrite existing run files", default=False)
  args = parser.parse_args()
  main(args)

Depending on your local venue, you might need to apply some modifications to the regex patterns.

TAGS: