Source code for taskw.utils

""" Various utilties """
from __future__ import print_function

import datetime
import re
from collections import OrderedDict
from distutils.version import LooseVersion
from operator import itemgetter

import dateutil.tz
import pytz
import six

DATE_FORMAT = '%Y%m%dT%H%M%SZ'


encode_replacements = OrderedDict([
    ('\\', '\\\\'),
    ('\"', '&dquot;'),
    ('"', '&dquot;'),
    ('[', '&open;'),
    (']', '&close;'),
    ('\n', ' '),
    ('/', '\\/'),
])

encode_replacements_experimental = OrderedDict([
    ('\"', '&dquot;'),
    ('"', '&dquot;'),
    ('[', '&open;'),
    (']', '&close;'),
])

decode_replacements = OrderedDict([
    [v, k] for k, v in encode_replacements.items()
    if k not in ('\n')  # We skip these.
])

logical_replacements = OrderedDict([
    ('?', '\\?'),
    ('+', '\\+'),
    ('(', '\\('),
    (')', '\\)'),
    ('[', '\\['),
    (']', '\\]'),
    ('{', '\\{'),
    ('}', '\\}'),
])


[docs]def encode_task_value(key, value, query=False): if value is None: value = '' elif isinstance(value, datetime.datetime): if not value.tzinfo: # Dates not having timezone information should be # assumed to be in local time value = value.replace(tzinfo=dateutil.tz.tzlocal()) # All times should be converted to UTC before serializing value = value.astimezone(pytz.utc).strftime(DATE_FORMAT) elif isinstance(value, datetime.date): value = value.strftime(DATE_FORMAT) elif isinstance(value, six.string_types): if query: # In some contexts, parentheses are interpreted for use in # logical expressions. They must *sometimes* be escaped. for left, right in six.iteritems(logical_replacements): # don't replace '?' if this is an exact match if left == '?' and '.' not in key: continue value = value.replace(left, right) else: for unsafe, safe in six.iteritems( encode_replacements_experimental ): value = value.replace(unsafe, safe) else: value = str(value) return value
[docs]def encode_query(value, version, query=True): args = [] if isinstance(value, dict): value = six.iteritems(value) for k, v in value: if isinstance(v, list): args.append( "( %s )" % (" %s " % k).join([ encode_query([item], version, query=False)[0] for item in v ]) ) else: if k.endswith(".is") and version >= LooseVersion('2.4'): args.append( '%s == "%s"' % ( k[:-3], encode_task_value(k, v, query=query) ) ) else: args.append( '%s:%s' % ( k, encode_task_value(k, v, query=query) ) ) return args
[docs]def clean_task(task): """ Clean a task by replacing any dangerous characters """ return task
[docs]def encode_task_experimental(task): """ Convert a dict-like task to its string representation Used for adding a task via `task add` """ # First, clean the task: task = task.copy() if 'tags' in task: task['tags'] = ','.join(task['tags']) for k in task: task[k] = encode_task_value(k, task[k]) # Then, format it as a string return [ "%s:\"%s\"" % (k, v) if v else "%s:" % (k, ) for k, v in sorted(task.items(), key=itemgetter(0)) ]
[docs]def encode_task(task): """ Convert a dict-like task to its string representation """ # First, clean the task: task = task.copy() if 'tags' in task: task['tags'] = ','.join(task['tags']) for k in task: for unsafe, safe in six.iteritems(encode_replacements): if isinstance(task[k], six.string_types): task[k] = task[k].replace(unsafe, safe) if isinstance(task[k], datetime.datetime): task[k] = task[k].strftime("%Y%m%dT%M%H%SZ") # Then, format it as a string return "[%s]\n" % " ".join([ "%s:\"%s\"" % (k, v) for k, v in sorted(task.items(), key=itemgetter(0)) ])
[docs]def decode_task(line): """ Parse a single record (task) from a task database file. I don't understand why they don't just use JSON or YAML. But that's okay. >>> decode_task('[description:"Make a python API for taskwarrior"]') {'description': 'Make a python API for taskwarrior'} """ task = {} for key, value in re.findall(r'(\w+):"(.*?)(?<!\\)"', line): value = value.replace('\\"', '"') # unescape quotes task[key] = value for unsafe, safe in six.iteritems(decode_replacements): task[key] = task[key].replace(unsafe, safe) if 'tags' in task: task['tags'] = task['tags'].split(',') return task
[docs]def make_annotation_comparable(annotation): """ Make an annotation comparable. Some transformations occur internally when storing a message in Taskwarrior. Let's flatten those out. """ return re.sub( r'[\W_]', '', annotation )
[docs]def get_annotation_value(annotation): """ Can either be a dictionary, or a string. """ if isinstance(annotation, dict): return annotation['description'] return annotation
[docs]def annotation_exists_in_list(authoritative, new): comparable_annotations = [] for item in authoritative: if not item: continue annotation = get_annotation_value(item) comparable_annotations.append( make_annotation_comparable(annotation) ) return make_annotation_comparable(new) in comparable_annotations
[docs]def merge_annotations(left, right): for annotation in right: if not annotation_exists_in_list(left, annotation): left.append(right) return left
[docs]def annotation_list_to_comparison_map(annotations): mapping = {} for annotation in annotations: comparable = make_annotation_comparable(annotation) mapping[comparable] = annotation return mapping
[docs]def convert_dict_to_override_args(config, prefix=''): """ Converts a dictionary of override arguments into CLI arguments. * Converts leaf nodes into dot paths of key names leading to the leaf node. * Does not include paths to leaf nodes not being non-dictionary type. See `taskw.test.test_utils.TestUtils.test_convert_dict_to_override_args` for details. """ args = [] for k, v in six.iteritems(config): if isinstance(v, dict): args.extend( convert_dict_to_override_args( v, prefix='.'.join([ prefix, k, ]) if prefix else k ) ) else: v = six.text_type(v) left = 'rc' + (('.' + prefix) if prefix else '') + '.' + k right = v if ' ' not in v else '"%s"' % v args.append('='.join([left, right])) return args
CTRLCHAR = re.compile(b"[\x00-\x08\x0e-\x1f]")
[docs]def clean_ctrl_chars(s): """ Clean string removing most (but not all) control characters """ return CTRLCHAR.sub(b'', s)