""" Code to interact with taskwarrior
This module contains an abstract base class and two different implementations
for interacting with taskwarrior: TaskWarriorDirect and TaskWarriorShellout.
If it is determined that there is a binary 'task' on the system and that it is
of a sufficiently advanced version, then TaskWarriorShellout will be made the
default TaskWarrior class. If not, then the default TaskWarrior class will
fall back to the older TaskWarriorDirect implementation.
"""
import abc
import codecs
import copy
import errno
import json
import logging
import os
import subprocess
from distutils.version import LooseVersion
import kitchen.text.converters
import six
from six import with_metaclass
from six.moves import filter
import taskw.utils
from taskw.exceptions import TaskwarriorError
from taskw.task import Task
from taskw.taskrc import TaskRc
logger = logging.getLogger(__name__)
open = lambda fname, mode: codecs.open(fname, mode, "utf-8")
# Location of configuration file: either specified by TASKRC environment
# variable, or ~/.taskrc (default).
TASKRC = os.getenv("TASKRC", "~/.taskrc")
# Minimum version of Taskwarrior supported
SUPPORTED_VERSION = "2.5"
[docs]class TaskWarriorBase(with_metaclass(abc.ABCMeta, object)):
""" The task warrior
Really though, a python object with methods allowing you to interact
with a taskwarrior database.
"""
def __init__(
self,
config_filename=TASKRC,
config_overrides=None,
marshal=False
):
self.config_filename = config_filename
self.config = TaskWarriorBase.load_config(config_filename)
if marshal:
raise NotImplementedError(
"You must use TaskWarriorShellout to use 'marshal'"
)
if config_overrides:
raise NotImplementedError(
"You must use TaskWarriorShellout to use 'config_overrides'"
)
def _stub_task(self, description, tags=None, **kw):
""" Given a description, stub out a task dict. """
# If whitespace is not removed here, TW will do it when we pass the
# task to it.
task = {"description": description.strip()}
# Allow passing "tags" in as part of kw.
if 'tags' in kw and tags is None:
task['tags'] = tags
del(kw['tags'])
if tags is not None:
task['tags'] = tags
task.update(kw)
# Only UNIX timestamps are currently supported.
if 'due' in kw:
task['due'] = str(task['due'])
return task
def _extract_annotations_from_task(self, task):
""" Removes annotations from a task and returns a list of annotations
"""
annotations = list()
if 'annotations' in task:
existing_annotations = task.pop('annotations')
for v in existing_annotations:
if isinstance(v, dict):
annotations.append(v['description'])
else:
annotations.append(v)
for key in list(task.keys()):
if key.startswith('annotation_'):
annotations.append(task[key])
del(task[key])
return annotations
[docs] @abc.abstractmethod
def load_tasks(self, command='all'):
""" Load all tasks.
Similar to TaskWarrior, a specific command may be specified:
all - a list of all issues
pending - a list of all pending issues
completed - a list of all completed issues
By default, the 'all' command is run.
>>> w = TaskWarrior()
>>> tasks = w.load_tasks()
>>> tasks.keys()
['completed', 'pending']
>>> type(tasks['pending'])
<type 'list'>
>>> type(tasks['pending'][0])
<type 'dict'>
"""
[docs] @abc.abstractmethod
def task_add(self, description, tags=None, **kw):
""" Add a new task.
Takes any of the keywords allowed by taskwarrior like proj or prior.
"""
pass
[docs] @abc.abstractmethod
def task_done(self, **kw):
pass
[docs] @abc.abstractmethod
def task_delete(self, **kw):
pass
@abc.abstractmethod
def _load_task(self, **kw):
pass
[docs] @abc.abstractmethod
def task_update(self, task):
pass
[docs] @abc.abstractmethod
def get_task(self, **kw):
pass
[docs] def filter_by(self, func):
tasks = self.load_tasks()
filtered = filter(func, tasks)
return filtered
[docs] @classmethod
def load_config(cls, config_filename=TASKRC, overrides=None):
""" Load ~/.taskrc into a python dict
>>> config = TaskWarrior.load_config()
>>> config['data']['location']
'/home/threebean/.task'
>>> config['_forcecolor']
'yes'
"""
return TaskRc(config_filename, overrides=overrides)
[docs] @abc.abstractmethod
def task_start(self, **kw):
pass
[docs] @abc.abstractmethod
def task_stop(self, **kw):
pass
[docs]class TaskWarriorShellout(TaskWarriorBase):
""" Interacts with taskwarrior by invoking shell commands.
This is currently the supported version and should be considered stable.
See https://github.com/ralphbean/taskw/pull/15 for discussion
and https://github.com/ralphbean/taskw/issues/30 for more.
"""
DEFAULT_CONFIG_OVERRIDES = {
'json': {
'array': 'TRUE'
},
'verbose': 'nothing',
'confirmation': 'no',
'dependency': {
'confirmation': 'no',
},
}
def __init__(
self,
config_filename=TASKRC,
config_overrides=None,
marshal=False,
):
super(TaskWarriorShellout, self).__init__(config_filename)
self.config_overrides = config_overrides if config_overrides else {}
self._marshal = marshal
self.config = TaskRc(config_filename, overrides=config_overrides)
self.DEFAULT_CONFIG_OVERRIDES['verbose'] = 'new-uuid'
[docs] def get_configuration_override_args(self):
config_overrides = self.DEFAULT_CONFIG_OVERRIDES.copy()
config_overrides.update(self.config_overrides)
return taskw.utils.convert_dict_to_override_args(config_overrides)
def _execute(self, *args):
""" Execute a given taskwarrior command with arguments
Returns a 2-tuple of stdout and stderr (respectively).
"""
command = (
[
'task',
'rc:%s' % self.config_filename,
] +
self.get_configuration_override_args() +
[six.text_type(arg) for arg in args]
)
# subprocess is expecting bytestrings only, so nuke unicode if present
# and remove control characters
for i in range(len(command)):
if isinstance(command[i], six.text_type):
command[i] = (
taskw.utils.clean_ctrl_chars(command[i].encode('utf-8')))
try:
proc = subprocess.Popen(
command,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
stdout, stderr = proc.communicate()
except OSError as e:
if e.errno == errno.ENOENT:
raise OSError("Unable to find the 'task' command-line tool.")
raise
if proc.returncode != 0:
raise TaskwarriorError(command, stderr, stdout, proc.returncode)
# We should get bytes from the outside world. Turn those into unicode
# as soon as we can.
# Everything going into and coming out of taskwarrior *should* be
# utf-8, but there are weird edge cases where something totally unusual
# made it in.. so we need to be able to handle (or at least try to
# handle) whatever. Kitchen tries its best.
try:
stdout = stdout.decode(self.config.get('encoding', 'utf-8'))
except UnicodeDecodeError as e:
stdout = kitchen.text.converters.to_unicode(stdout)
try:
stderr = stderr.decode(self.config.get('encoding', 'utf-8'))
except UnicodeDecodeError as e:
stderr = kitchen.text.converters.to_unicode(stderr)
# strip any crazy terminal escape characters like bells, backspaces,
# and form feeds
for c in ('\a', '\b', '\f', ''):
stdout = stdout.replace(c, '?')
stderr = stderr.replace(c, '?')
return stdout, stderr
def _get_json(self, *args):
return json.loads(self._execute(*args)[0])
def _get_task_objects(self, *args):
json = self._get_json(*args)
if isinstance(json, dict):
return self._get_task_object(json)
value = [self._get_task_object(j) for j in json]
return value
def _get_task_object(self, obj):
if self._marshal:
return Task(obj, udas=self.config.get_udas())
return obj
def _stub_task(self, description, tags=None, **kw):
""" Given a description, stub out a task dict. """
# If whitespace is not removed here, TW will do it when we pass the
# task to it.
task = {"description": description.strip()}
# Allow passing "tags" in as part of kw.
if 'tags' in kw and tags is None:
task['tags'] = tags
del(kw['tags'])
if tags is not None:
task['tags'] = tags
task.update(kw)
if self._marshal:
return Task.from_stub(task, udas=self.config.get_udas())
return task
[docs] @classmethod
def is_supported_version(cls):
""" Returns true if minimum runtime requirements are met """
# Hack to get docs to build on https://readthedocs.org/projects/taskw/
# See https://docs.readthedocs.io/en/latest/faq.html#how-do-i-change-behavior-for-read-the-docs
on_rtd = os.environ.get('READTHEDOCS') == 'True'
if on_rtd:
return True
try:
return cls.get_version() >= LooseVersion(SUPPORTED_VERSION)
except OSError:
# OSError is raised if subprocess.Popen fails to find
# the executable.
return False
[docs] @classmethod
def get_version(cls):
try:
taskwarrior_version = subprocess.Popen(
['task', '--version'],
stdout=subprocess.PIPE
).communicate()[0]
except OSError as e:
if 'No such file or directory' in str(e):
raise OSError("Unable to find the 'task' command-line tool.")
raise
return LooseVersion(taskwarrior_version.decode())
[docs] def sync(self, init=False):
if init is True:
self._execute('sync', 'init')
else:
self._execute('sync')
[docs] def load_tasks(self, command='all'):
""" Returns a dictionary of tasks for a list of command."""
results = dict(
(db, self._get_task_objects('status:%s' % db, 'export'))
for db in Command.files(command)
)
# 'waiting' tasks are returned separately from 'pending' tasks
# Here we merge the waiting list back into the pending list.
if 'pending' in results:
results['pending'].extend(
self._get_task_objects('status:waiting', 'export'))
return results
[docs] def filter_tasks(self, filter_dict):
""" Return a filtered list of tasks from taskwarrior.
Filter dict should be a dictionary mapping filter constraints
with their values. For example, to return only pending tasks,
you could use::
{'status': 'pending'}
Or, to return tasks that have the word "Abjad" in their description
that are also pending::
{
'status': 'pending',
'description.contains': 'Abjad',
}
Filters can be quite complex, and are documented on Taskwarrior's
website.
"""
query_args = taskw.utils.encode_query(filter_dict, self.get_version())
return self._get_task_objects(
'export',
*query_args
)
[docs] def get_task(self, **kw):
task = dict()
task_id = None
task_id, task = self._load_task(**kw)
id = None
# The ID going back only makes sense if the task is pending.
if 'status' in task:
if Status.is_pending(task['status']):
id = task_id
return id, task
def _load_task(self, **kwargs):
if len(kwargs) > 1:
raise KeyError(
"Only one keyword argument may be specified"
)
search = []
for key, value in six.iteritems(kwargs):
if key not in ['id', 'uuid', 'description']:
search.append(
'%s:%s' % (
key,
value,
)
)
elif key == 'description' and '(bw)' in value:
search.append(
value[4:]
)
else:
search = [value]
task = self._get_task_objects('export', *search)
if task:
if isinstance(task, list):
# Multiple items returned from search, return just the 1st
task = task[0]
return task['id'], task
return None, dict()
[docs] def task_add(self, description, tags=None, **kw):
""" Add a new task.
Takes any of the keywords allowed by taskwarrior like proj or prior.
"""
task = self._stub_task(description, tags, **kw)
# Check if there are annotations, if so remove them from the
# task and add them after we've added the task.
annotations = self._extract_annotations_from_task(task)
# Strip out argued uuid
if 'uuid' in task:
del task['uuid']
if self._marshal:
args = taskw.utils.encode_task_experimental(task.serialized())
else:
args = taskw.utils.encode_task_experimental(task)
stdout, stderr = self._execute('add', *args)
# In 2.4 and later, you cannot specify whatever uuid you want
# when adding a task. Instead, you have to specify rc.verbose=new-uuid
# and then parse the assigned uuid out from stdout.
task['uuid'] = stdout.strip().split()[-1].strip('.')
id, added_task = self.get_task(uuid=task['uuid'])
# Check if 'uuid' is in the task we just added.
if not 'uuid' in added_task:
raise KeyError(
'Error encountered while creating task;'
'STDOUT: %s; STDERR: %s' % (
stdout,
stderr,
)
)
if annotations and 'uuid' in added_task:
for annotation in annotations:
self.task_annotate(added_task, annotation)
id, added_task = self.get_task(uuid=added_task[six.u('uuid')])
return added_task
[docs] def task_annotate(self, task, annotation):
""" Annotates a task. """
self._execute(
task['uuid'],
'annotate',
'--',
annotation
)
id, annotated_task = self.get_task(uuid=task[six.u('uuid')])
return annotated_task
[docs] def task_denotate(self, task, annotation):
""" Removes an annotation from a task. """
self._execute(
task['uuid'],
'denotate',
'--',
annotation
)
id, denotated_task = self.get_task(uuid=task[six.u('uuid')])
return denotated_task
[docs] def task_done(self, **kw):
if not kw:
raise KeyError('No key was passed.')
id, task = self.get_task(**kw)
if not Status.is_pending(task['status']):
raise ValueError("Task is not pending.")
self._execute(id, 'done')
return self.get_task(uuid=task['uuid'])[1]
[docs] def task_update(self, task):
if 'uuid' not in task:
raise KeyError('Task must have a UUID.')
# 'Legacy' causes us to handle this task as if it were an
# old-style task -- just a standard dictionary
legacy = True
if isinstance(task, Task):
# Let's pre-serialize taskw.task.Task instances
task_uuid = six.text_type(task['uuid'])
task = task.serialized_changes(keep=True)
legacy = False
else:
task_uuid = task['uuid']
id, original_task = self.get_task(uuid=task_uuid)
if 'id' in task:
del task['id']
task_to_modify = copy.deepcopy(task)
task_to_modify.pop('uuid', None)
task_to_modify.pop('id', None)
# Only handle annotation differences if this is an old-style
# task, or if the task itself says annotations have changed.
annotations_to_delete = set()
annotations_to_create = set()
if legacy or 'annotations' in task_to_modify:
# Check if there are annotations, if so, look if they are
# in the existing task, otherwise annotate the task to add them.
ttm_annotations = taskw.utils.annotation_list_to_comparison_map(
self._extract_annotations_from_task(task_to_modify)
)
original_annotations = (
taskw.utils.annotation_list_to_comparison_map(
self._extract_annotations_from_task(original_task)
)
)
new_annotations = set(ttm_annotations.keys())
existing_annotations = set(original_annotations.keys())
annotations_to_delete = existing_annotations - new_annotations
annotations_to_create = new_annotations - existing_annotations
if 'annotations' in task_to_modify:
del task_to_modify['annotations']
modification = taskw.utils.encode_task_experimental(task_to_modify)
# Only try to modify the task if there are changes to post here
# (changes *might* just be in annotations).
if modification:
self._execute(task_uuid, 'modify', *modification)
# If there are no existing annotations, add the new ones
if legacy or annotations_to_delete or annotations_to_create:
ttm_annotations.update(original_annotations)
for annotation_key in annotations_to_create:
self.task_annotate(
original_task,
ttm_annotations[annotation_key]
)
for annotation_key in annotations_to_delete:
self.task_denotate(
original_task,
ttm_annotations[annotation_key]
)
return self.get_task(uuid=task_uuid)
[docs] def task_delete(self, **kw):
""" Marks a task as deleted. """
id, task = self.get_task(**kw)
if task['status'] == Status.DELETED:
raise ValueError("Task is already deleted.")
self._execute(id, 'delete')
return self.get_task(uuid=task['uuid'])[1]
[docs] def task_start(self, **kw):
""" Marks a task as started. """
id, task = self.get_task(**kw)
self._execute(id, 'start')
return self.get_task(uuid=task['uuid'])[1]
[docs] def task_stop(self, **kw):
""" Marks a task as stopped. """
id, task = self.get_task(**kw)
self._execute(id, 'stop')
return self.get_task(uuid=task['uuid'])[1]
[docs] def task_info(self, **kw):
id, task = self.get_task(**kw)
out, err = self._execute(id, 'info')
if err:
return err
return out
[docs]class DataFile(object):
""" Encapsulates data file names. """
PENDING = 'pending'
COMPLETED = 'completed'
[docs] @classmethod
def filename(cls, name):
return "%s.data" % name
[docs]class Command(object):
""" Encapsulates available commands. """
PENDING = 'pending'
COMPLETED = 'completed'
ALL = 'all'
[docs] @classmethod
def files(cls, command):
known_commands = {
Command.PENDING: [DataFile.PENDING],
Command.COMPLETED: [DataFile.COMPLETED],
Command.ALL: [DataFile.PENDING, DataFile.COMPLETED]
}
if not command in known_commands:
raise ValueError(
"Unknown command, %s. Command must be one of %s." %
(command, known_commands.keys()))
return known_commands[command]
[docs]class Status(object):
""" Encapsulates task status values. """
PENDING = 'pending'
COMPLETED = 'completed'
DELETED = 'deleted'
WAITING = 'waiting'
[docs] @classmethod
def is_pending(cls, status):
""" Identifies if the specified status is a 'pending' state. """
return status == Status.PENDING or status == Status.WAITING
[docs] @classmethod
def to_file(cls, status):
""" Returns the file in which this task is stored. """
return {
Status.PENDING: DataFile.PENDING,
Status.WAITING: DataFile.PENDING,
Status.COMPLETED: DataFile.COMPLETED,
Status.DELETED: DataFile.COMPLETED
}[status]
[docs]class UnsupportedVersionException(Exception):
pass
# It is not really experimental anymore, but we provide this rename for
# backwards compatibility. It will eventually be removed.
TaskWarriorExperimental = TaskWarriorShellout
# Set a default based on what is available on the system.
if TaskWarriorShellout.is_supported_version():
TaskWarrior = TaskWarriorShellout
else:
raise UnsupportedVersionException(
"'taskw' requires version {} of taskwarrior or later.".format(SUPPORTED_VERSION)
)