1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
| #!/usr/bin/python
import re
import os
import sys
import string
class CronJob:
"""A class describing a scheduled job."""
def __init__(self, str):
"""
Generate a new object from a crontab line. We should differentiate between the following types of crontabs:
1. something = something (raise exception)
2. <schedule> <command> (classic cron shedule)
3. [!&]word(arg)[,word(arg)...] <schedule> <command> (fcron style schedule)
4. #somestuff (comment, raise exception)
5. <empty or space> (empty line, raise exception)
"""
if re.compile("^\s*$").search(str):
raise NotACronJobError("EMPTY")
elif re.compile("^\s*#").search(str):
m = re.compile("^\s*#(.*)").search(str)
raise NotACronJobError("COMMENT", m.group(1))
elif re.compile("^\s*\S+\s*=.+").search(str):
m = re.compile("^\s*(\S+?)\s*=\s*(.+)").search(str)
raise NotACronJobError("VARIABLE", m.group(1), m.group(2))
elif re.compile("^(\*|\d+)").search(str) or re.compile("^[!&]\w+").search(str):
if re.compile("^!.+?\)\s*$").search(str): raise NotACronJobError("GARBAGE", str)
self._parseLine(str)
return
else:
raise(NotACronJobError("GARBAGE", str))
def _parseLine(self, str):
if re.compile("^[!&]\w+").search(str):
self.type = "fcron"
m = re.compile("^\S+\s+(\S+)\s+(\S+)\s+(\S+)\s+(\S+)\s+(\S+)\s+(.+)").search(str)
else:
self.type = "vixie"
m = re.compile("^(\S+)\s+(\S+)\s+(\S+)\s+(\S+)\s+(\S+)\s+(.+)").search(str)
self.min = self._parseDateTime(m.group(1), "min")
self.hr = self._parseDateTime(m.group(2), "hr")
self.dom = self._parseDateTime(m.group(3), "dom")
self.mon = self._parseDateTime(m.group(4), "mon")
self.dow = self._parseDateTime(m.group(5), "dow")
self.cmd = self._parseCmd(m.group(6))
def _parseDateTime(self, dt, type):
min = range(0,59)
hr = range(0,23)
dom = range(1,31)
mon = range(1,12)
dow = range(0-7)
if dt == "*":
return None
elif re.compile("^\d+$").search(dt):
return range(int(dt),int(dt) + 1)
elif re.compile(",").search(dt):
dts = dt.split(",")
parsed = [self._parseDateTime(x, type) for x in dts]
res = []
for x in parsed:
if res == None: res = []
res = res.extend(x)
return res
elif re.compile("\/").search(dt):
m = re.compile("(.+?)/(.+)").search(dt)
r = m.group(1)
st = m.group(2)
if r == "*":
r = eval(type)
else:
(x,y) = r.split("-")
r = range(int(x),int(y))
return range(r[0], r[-1], int(st))
elif re.compile("-").search(dt):
m = re.compile("(\d+)-(\d+)").search(dt)
return range(int(m.group(1)),int(m.group(2)))
else:
raise NotACronJobError("GARBAGE", dt)
def _parseCmd(self, cmd):
if re.compile("^\s*root\s*").search(cmd):
cmd = re.compile("^\s*root\s*").sub("", cmd)
return cmd
def __str__(self):
s = "Run %s" % self.cmd
if self.mon != None:
months = ("Jan", "Feb", "Mar", "Apr", "May", "Jun", "Jul", "Aug", "Sep", "Oct", "Nov", "Dec")
s = s + " in " + ",".join([months[x] for x in self.mon])
if self.dom != None:
tmp = ",".join(["%sth" % x for x in self.dom])
tmp = tmp.replace("1th", "1st")
tmp = tmp.replace("2th", "2nd")
tmp = tmp.replace("3th", "3rd")
s = s + " on " + tmp + " day"
if self.mon == None:
s = s + " of every month"
if self.dow != None:
week = ("sunday", "monday", "tuesday", "wednesday", "thirsday", "friday", "saturday")
s = s + " on " + ",".join([week[x] for x in self.dow])
if self.hr != None:
if len(self.hr) == 1 and len(self.min) == 1:
s = s + " at %s:%s" % (string.zfill(self.hr[0],2),string.zfill(self.min[0],2))
else:
s = s + " at " + ",".join([str(x) for x in self.hr])
if self.dow == None and self.dom == None:
s = s + " every day"
else:
s = s + " at %s minutes" % ",".join([str(x) for x in self.min]) + " of every hour "
return s
class NotACronJobError(Exception):
"""An exception raised by CronJob to indicate that the line in question doesn't contain a vaild cron schedule information."""
def __str__(self):
if self.args[0] == "EMPTY":
return "Empty Line"
elif self.args[0] == "COMMENT":
return "A comment: %s" % self.args[1]
elif self.args[0] == "VARIABLE":
return "An environment variable: %s = %s" % (self.args[1], self.args[2])
elif self.args[0] == "GARBAGE":
return "Uncronish thingamabob: %s" % self.args[1]
else:
return "If you don't know how to play with me, go to the other sandbox!"
if __name__ == "__main__":
for line in sys.stdin:
try:
print CronJob(line)
except NotACronJobError, err:
print err |