星期四, 十二月 14, 2006

构思:Parser 调度流程

构思:Parser 调度流程
tags:构思,简单的调度,调度,shell
PS:下面是自己的一段构思,伪代码,很伪... 呵呵 希望你能看得懂
目的是实现一个可以被监控的任务处理

程序
Worker 执行parser任务
Fairy 检测worker 是否正常运行中
数据
Lastupdate-yyyy-mm-dd.hh 任务流,即时追加中(url)

kill_worker.tmp 当前任务的id
Last_line.tmp 最后处理的行数
Logs/[job].err_lines.err 处理失败的数据

[raw_base]/[mpath].raw 数据源
[out_base]/[mpath].path 对应数据原始路径(url)
[out_base]/[mpath].rss 结果数据
Logs/[job].worker.err 错误日志
Logs/[job].worker.log 处理日志

Fairy 每10分钟启动一次
Max_pause_time=5min
If (!-f last_line.tmp) or (last_line.tmp’s change time -lt current time-$Max_pause_time) # 如果任务标记文件不存在或的最后更新时间 超过了限定的间隔
Then
cat kill_worker.tmp|sh # 假定工作进程宕机,将当前的工作进程杀掉
err_line=`cat last_line.tmp`; # 获取宕机时刻的任务
if `tail -1 logs/[jobs].err_lines.err`==$err_line # 如果同上一次宕机的任务相同
then
echo $err_line|awk ‘{sub(“.*”,$2++,$2);print}’> cat last_line.tmp; # 重置当前任务为下一个任务
echo $err_line >> logs/[jobs].err_lines.err # 记录当前出错的任务
worker [job] & # 启动下一次任务
fi

Worker 由Fairy 自动启动
$job=$1; # 获取任务
Pid=$$; # 获取当前进程的id
Echo kill $pid > worker_pid.tmp; # 设置kill操作
Start_line=`cat last_line.tmp|awk ‘{if(/$job/){print $2}else{print 1}` # 获取开始的行号
Line=1;
While read $line
If($line –gt $start_line)
Then
Continue;
Fi

Echo $job $Line > last_line.tmp; # 设置最后行号
Echo $line |php stream-parser.php ; # 开始单次任务
$Line++; # 递增
Done<$job # 设定任务流 > Logs/$job.worker.$pid.log # 记录日志
2> Logs/$job.worker.$pid.err # 记录错误日志


星期三, 十二月 13, 2006

code:管理shell中的子任务

管理shell中的子任务
tags:shell,kill,awk
---------------------------------------------------------------
PS:好久之前的代码了 不过正好用到 就找来发到这里了
---------------------------------------------------------------

http://fallseir.livejournal.com/58926.html
ps #打印运行的进程列表
kill #杀掉指定的进程
awk #操作字符数据
$$ #当前进程的PID
tmp_sh.sh # 测试shell脚本 在shell中 定时kill到超时的子进程


获取父进程的pid
$ ps lx|awk '/ps lx$/{print $4}' # 查看当前进程的父PID
ppid=`ps lx|awk '/ps lx$/{print $4}'`
$(date -u;sleep 120m;date -u;)& #创建一后台运行的任务
$ ps lx #查看当前运行的进程
USER PID %CPU %MEM VSZ RSS TTY STAT START TIME COMMAND
0 503 8500 8499 16 0 4492 1408 wait Ss pts/1 0:00 -bash
1 503 8556 8500 16 0 4492 1408 wait S pts/1 0:00 -bash
0 503 8558 8556 16 0 3904 556 - S pts/1 0:00 sleep 120
0 503 8561 8500 17 0 4420 708 - R+ pts/1 0:00 ps lx
#查看根任务的子任务列表
$ ps lx |awk 'BEGIN{ps[8500]="root";}{if($4 in ps){ps[$3]=$13;}}END{for(i in ps){print i ":" ps[i];}}'

tmp_sh.sh # 测试shell脚本 在shell中 定时kill到超时的子进程
-----------------------------------------------------------------------------------------------
# fallseir.lee (fallseir@gmail.com ,http://feed.feedsky.com/fallseir)
# -20060620 17:08
# 获取当前的进程PID
ppid=$$;
##
# 将满足条件的子进程kill掉
##
function kill_subs(){
# 打印所有进程并输出满足条件的进程ID
ps lx|awk '
BEGIN{
ps['$ppid']="root";#设置父进程ID
}
##
# 判断当前进程是否为kill的进程
# pid 进程ID
# w 进程启动的shell命令名称
# t 进程执行的时间 hh:mm
##
function is_kill(pid,w,t){
split(t,arr_t); # 分离hh:mm时间的时间格式以便用于计算
# 测试规则 将所有存在的wget进程标记为kill进程
if(w=="wget" && (arr_t[0]>0 || arr_t[1]>=0)){
return 1;
}else{
return -1;
}
}
{
# 设定使用的参数
ppid=$4; # 当前行的父进程ID
pid=$3; # 当前行的进程ID
w=$13; # 当前行的shell命令
t=$12; # 当前行的shell命令执行时间

if(ppid in ps){ # 如果父进程ID在ps列队里,只有子进程别操作
ps[pid]=t; # 将当前进程id添加到ps列队
if(is_kill(pid,w,t)>0){ # 如果当前进程满足kill条件
ks[pid]=w " " t; # 加入到ks列队
}
}
}
END{
# 输出kill名令,kill掉ks列队里的PID
for(i in ks){
print "kill " i;
}
}'
}
##
# 判断是否有worker在工作
##
function is_working(){
# 打印当前进程列队并查看是否有当前进程的子进程中的worker数量
ps lx|awk '
BEGIN{
ps['$ppid']="root";
}
function is_work(w){
# 假设 worker 的 shel l命令为 wget 和 awk 命令
if(w == "wget" || w=="awk"){
return 1;
}else{
return -1;
}
}
{
ppid=$4;
pid=$3;
w=$13;
t=$12;
if(ppid in ps){
ps[pid]=t;
if(is_work(w)>0){ # 将 worker 放入 ws 列队
ws[pid]=w " " t;
}
}
}
END{
c=0;
for(i in ws){ #计算worker数量
c++;
}
print c; #打印worker数量
}'
}

## --- work ---
# -- 创建 worker 的工位
mkfifo $$.test # 以当前进程ID构建管道
exec 3<>$$.test # 将管道已读写方式连接到3号流
rm $$.test # 删除构建的管道,当需要的流建立后 之前建立的管道已无用
# 建立5个工作者的位置
for((i=0;i<5;i++))
do
echo i>&3;
done
# 在后台进行100个数量的wget任务
(for((i=0;i<=100;i++))
do
read t; # 获取一个工作的位置
(r=$(wget "http://feed.feedsky.com/fallseir" --timeout=3000 --spider 2>&1 |cat);
# 判断wget的执行结果是否成功
r=$(echo $r|awk 'BEGIN {r=-1;} {if(/200/){r=1;}} END{print r;}') ;
if( [ "$r" == "1" ] )
then
echo "# $t $i";
else
echo "- $t $i";
fi;
# 任务完成后归还工作位置
echo $t >&3;)& # 任务在后台进行
# 设定 for 范围中默认从3号流读取数据
done <&3)&
while [ "1"="1" ] # 建立 killer 监听进程
do
kills=`kill_subs`;
echo $kills; #输出要被kill的进程
$kills # 执行kill 操作 #kill_subs|sh;
if([ "`is_working`" -gt 0 ]) # 如果还有worker在工作 sleep 5 秒钟
then
#echo `is_working`
echo "killer sleep 5 s";
sleep 5s;
else # 如果没有 worker 在工作 退出killer 监听
#is_working;
echo "killer end `date -u`";
break;
fi
done

echo "wait sh end `date -u`";
wait; # 等待所有自进程结束
exec 3<&- # 关闭 3 号流
echo "end `date -u`";




--
[:p] --飞扬.轻狂 [fallseir.lee]
http://fallseir.livejournal.com
http://feed.feedsky.com/fallseir