Bug#781168: Workarounds for Google being evil with .ics feeds
On Wed, Mar 25, 2015 at 04:24:40PM +0100, Enrico Zini wrote:
> I am now working on a smart diff between ical files that should be able
> to tell when two .ics files mangled that way are still actually the
> same. I'll try to keep you posted.
Done! I'm attaching the script that I'm using at the moment.
Enrico
--
GPG key: 4096R/E7AD5568 2009-05-08 Enrico Zini <enrico@enricozini.org>
#!/usr/bin/python3
#
# Copyright © 2015 Enrico Zini <enrico@enricozini.org>
# This work is free. You can redistribute it and/or modify it under the
# terms of the Do What The Fuck You Want To Public License, Version 2,
# as published by Sam Hocevar. See http://www.wtfpl.net/ for more details.
#
# Work around Google being evil in ical feeds.
#
# This is the list of what I have observed Gmail doing to an ical feed to make
# it hard to sync with its contents efficiently:
#
# - HTTP Date header is always now
# - If-Modified-Since is not supported
# - DTSTAMP of each element is always now
# - VTIMEZONE entries appear in random order
# - ORGANIZER CN entries randomly change between full name and plus.google.com
# user ID
# - ATTENDEE entries randomly change between having a CN or not having it
# - TRIGGER entries change spontaneously
# - CREATED entries change spontaneously
import requests
import tempfile
import os
import re
import argparse
import time
class atomic_writer(object):
"""
Atomically write to a file
"""
def __init__(self, fname, mode, osmode=0o644, sync=True, **kw):
self.fname = fname
self.osmode = osmode
self.sync = sync
dirname = os.path.dirname(self.fname)
self.fd, self.abspath = tempfile.mkstemp(dir=dirname, text="b" not in mode)
self.outfd = open(self.fd, mode, closefd=True, **kw)
def __enter__(self):
return self.outfd
def __exit__(self, exc_type, exc_val, exc_tb):
if exc_type is None:
self.outfd.flush()
if self.sync: os.fdatasync(self.fd)
os.fchmod(self.fd, self.osmode)
os.rename(self.abspath, self.fname)
else:
os.unlink(self.abspath)
self.outfd.close()
return False
class Field:
def __init__(self, content):
# Field name and value
self.name, self.value = content.split(":", 1)
def __str__(self):
"""
Reserialize the field.
"""
return "{}:{}\n".format(self.name, self.value)
def parse_ical(fd):
"""
Parse an ical feed in a sequence of Field elements.
"""
lines = []
for line in fd:
if line[0] != " ":
# Field start
if lines:
yield Field("".join(lines))
lines = [line]
else:
# Continuation line
if not lines:
raise RuntimeError("feed starts with a continuation line")
lines.append(line[1:])
if lines:
yield Field("".join(lines))
def drop_vtimezones(feed):
"""
Skip VTIMEZONE fields
"""
in_vtimezone = False
for field in feed:
if field.value == "VTIMEZONE":
if field.name == "BEGIN":
in_vtimezone = True
continue
elif field.name == "END":
in_vtimezone = False
continue
if in_vtimezone: continue
yield field
re_nondet = re.compile(r"^(?:DTSTAMP|ORGANIZER|ATTENDEE|TRIGGER|CREATED)")
def remove_nondeterminism(content):
"""
Return the string content without all the DTSTAMP lines
"""
res = []
for field in drop_vtimezones(parse_ical(content.splitlines())):
if re_nondet.match(field.name): continue
res.append(str(field))
return "".join(res)
def download(url, target):
"""
Download a new version of an ical feed, without touching the existing file
if it has not changed.
"""
res = requests.get(url)
if os.path.exists(target):
with open(target, "rt") as fd:
old_content = fd.read()
if remove_nondeterminism(res.text) == remove_nondeterminism(old_content):
# Update not needed
return False
#else: # Uncomment for debugging
# with open("old", "wt") as out:
# out.write(remove_nondeterminism(old_content))
# with open("new", "wt") as out:
# out.write(remove_nondeterminism(res.text))
with atomic_writer(target, "wt") as fd:
# Update needed
fd.write(res.text)
return True
if __name__ == "__main__":
parser = argparse.ArgumentParser(
description="Download a gmail ics feed, leaving the destination untouched if it has not changed")
parser.add_argument('url', help="url to download")
parser.add_argument('dest', help="destination file name")
parser.add_argument('--log', action="store", help="log actual updates to this file")
args = parser.parse_args()
if download(args.url, args.dest):
if args.log:
with open(args.log, "at") as fd:
print("{}: updated {}".format(time.strftime("%Y-%m-%d %H:%M:%S %Z"), args.dest), file=fd)
Reply to: