星期四, 十二月 14, 2006

构思:Parser 调度流程

构思:Parser 调度流程
tags:构思,简单的调度,调度,shell
PS:下面是自己的一段构思,伪代码,很伪... 呵呵 希望你能看得懂
目的是实现一个可以被监控的任务处理

程序
Worker 执行parser任务
Fairy 检测worker 是否正常运行中
数据
Lastupdate-yyyy-mm-dd.hh 任务流,即时追加中(url)

kill_worker.tmp 当前任务的id
Last_line.tmp 最后处理的行数
Logs/[job].err_lines.err 处理失败的数据

[raw_base]/[mpath].raw 数据源
[out_base]/[mpath].path 对应数据原始路径(url)
[out_base]/[mpath].rss 结果数据
Logs/[job].worker.err 错误日志
Logs/[job].worker.log 处理日志

Fairy 每10分钟启动一次
Max_pause_time=5min
If (!-f last_line.tmp) or (last_line.tmp’s change time -lt current time-$Max_pause_time) # 如果任务标记文件不存在或的最后更新时间 超过了限定的间隔
Then
cat kill_worker.tmp|sh # 假定工作进程宕机,将当前的工作进程杀掉
err_line=`cat last_line.tmp`; # 获取宕机时刻的任务
if `tail -1 logs/[jobs].err_lines.err`==$err_line # 如果同上一次宕机的任务相同
then
echo $err_line|awk ‘{sub(“.*”,$2++,$2);print}’> cat last_line.tmp; # 重置当前任务为下一个任务
echo $err_line >> logs/[jobs].err_lines.err # 记录当前出错的任务
worker [job] & # 启动下一次任务
fi

Worker 由Fairy 自动启动
$job=$1; # 获取任务
Pid=$$; # 获取当前进程的id
Echo kill $pid > worker_pid.tmp; # 设置kill操作
Start_line=`cat last_line.tmp|awk ‘{if(/$job/){print $2}else{print 1}` # 获取开始的行号
Line=1;
While read $line
If($line –gt $start_line)
Then
Continue;
Fi

Echo $job $Line > last_line.tmp; # 设置最后行号
Echo $line |php stream-parser.php ; # 开始单次任务
$Line++; # 递增
Done<$job # 设定任务流 > Logs/$job.worker.$pid.log # 记录日志
2> Logs/$job.worker.$pid.err # 记录错误日志


没有评论: