未加星标

并发多子JOB执行分发数据

字体大小 | |
[数据库(综合) 所属分类 数据库(综合) | 发布者 店小二04 | 时间 2016 | 作者 红领巾 ] 0人收藏点击收藏

ORACLE的JOB调用存储过程的逻辑方式的优势是:快。但是单一的JOB执行某一项任务可能还是不能满足性能需求。比如数据处理,可能在10几分钟需要处理的数据量是百万级。下面介绍如何并发多子JOB执行分发数据。

PROCEDURE job_0125(a_job_counts IN INTEGER, a_counts_per_job IN INTEGER, a_timeout IN INTEGER) IS
e_business EXCEPTION;
v_job_name job_error_log.job_name%TYPE := 'job_0125';
v_sid NUMBER;
v_serial# NUMBER;
v_sqlcode job_error_log.error_no%TYPE;
v_sqlerrm job_error_log.error_message%TYPE;
v_error_comment job_error_log.error_comment%TYPE;
CURSOR v_job_running_cur IS
SELECT t.sid, t.serial#, t.job_id, t.create_time
FROM pscp_job_running_log t
WHERE t.job_status = '0'
AND t.job_name = 'job_0125'
AND t.create_time >= SYSDATE - 1;
v_job_running_record v_job_running_cur%ROWTYPE;
v_active_job_counts INTEGER := 0;
v_block_job_counts INTEGER := 0;
v_job_counts INTEGER;
v_group_id VARCHAR2(100);
v_group3_id VARCHAR2(50);
v_today DATE;
v_job_id NUMBER;
BEGIN
BEGIN
--获取当前的SID和SERIAL#
SELECT sid, serial#
INTO v_sid, v_serial#
FROM v$session
WHERE sid = (SELECT MAX(sid) FROM v$session WHERE audsid = userenv('SESSIONID'));
EXCEPTION
WHEN NO_DATA_FOUND THEN
v_sqlcode := '-0000';
v_sqlerrm := 'JOB启动异常:无法得到' || v_job_name || '的SID与SERIAL#';
v_error_comment := v_job_name || '启动异常';
RAISE e_business;
END;
-- 检查JOB输入参数
IF a_job_counts < 1 OR a_job_counts > 20 THEN
v_sqlcode := '-0000';
v_sqlerrm := 'JOB启动异常:JOB创建数量必须在1~20之间(包含1与20)';
v_error_comment := v_job_name || '启动异常';
RAISE e_business;
END IF;
IF a_counts_per_job < 100 OR a_counts_per_job > 30000 THEN
v_sqlcode := '-0000';
v_sqlerrm := 'JOB启动异常:每个JOB处理数据的数量必须在100~10000之间(包含100与10000)';
v_error_comment := v_job_name || '启动异常';
RAISE e_business;
END IF;
IF a_timeout < 40 OR a_timeout > 480 THEN
v_sqlcode := '-0000';
v_sqlerrm := 'JOB启动异常:JOB运行超时时间必须在60~480分钟之间(包含60与480)';
v_error_comment := v_job_name || '启动异常';
RAISE e_business;
END IF;
-- 分析当前JOB的运行情况
OPEN v_job_running_cur;
LOOP
FETCH v_job_running_cur
INTO v_job_running_record;
EXIT WHEN v_job_running_cur%NOTFOUND;
IF (SYSDATE - v_job_running_record.create_time) >=
a_timeout / (24 * 60) THEN
v_block_job_counts := v_block_job_counts + 1;
INSERT INTO job_error_log
(error_no,
error_message,
job_name,
job_user,
job_date,
error_comment,
sid,
serial#)
VALUES
('-0000',
'JOB运行异常:' || v_job_name || '创建的子JOB(' ||
v_job_running_record.job_id || ')运行时间超时',
v_job_name,
USER,
SYSDATE,
v_job_name || '创建的子JOB运行异常',
v_job_running_record.SID,
v_job_running_record.serial#);
ELSE
v_active_job_counts := v_active_job_counts + 1;
END IF;
END LOOP;
CLOSE v_job_running_cur;
-- 立即写入JOB异常
IF v_block_job_counts > 0 THEN
COMMIT;
END IF;
-- 计算创建JOB的数量
v_job_counts := a_job_counts - (v_block_job_counts + v_active_job_counts);
-- 判断是否需要启动新的子JOB
IF v_job_counts > 0 THEN
FOR v_index IN 1 .. v_job_counts LOOP
-- 为JOB分配GROUP_ID,并更新处理状态为已分配
SELECT pscpdata.seq_group3_id.NEXTVAL INTO v_group3_id FROM DUAL;
v_group_id := '2011' || TO_CHAR(TRUNC(SYSDATE), 'YYYYMMDD') || v_group3_id;
v_today := SYSDATE;
UPDATE pscp_sms_wait_send t
SET t.disposal_status = '1', t.group_id = v_group_id
WHERE (t.disposal_status = '0' or t.disposal_status is null)
AND arrange_date <= SYSDATE + 1 / 24 / 60
AND arrange_date >= trunc(SYSDATE - 2)
AND ROWNUM <= a_counts_per_job;
IF SQL%FOUND THEN
INSERT INTO pscp_job_running_log(group_id, username, create_time, job_status, job_name)
VALUES(v_group_id, USER, v_today, '0', 'job_0125');
DBMS_JOB.submit(job => v_job_id, what => 'PSCPCDE.pscp_sms_wait_to_sent_muti(' || v_group_id || ');', next_date => SYSDATE, INTERVAL => NULL);
COMMIT;
ELSE
ROLLBACK;
RETURN;
END IF;
END LOOP;
END IF;
EXCEPTION
WHEN OTHERS THEN
ROLLBACK;
IF v_sqlcode IS NULL THEN
v_sqlcode := SQLCODE;
v_sqlerrm := SUBSTR(SQLERRM, 1, 200);
END IF;
INSERT INTO job_error_log
(error_no,
error_message,
job_name,
job_user,
job_date,
error_comment,
sid,
serial#)
VALUES
(v_sqlcode,
v_sqlerrm,
v_job_name,
USER,
SYSDATE,
v_error_comment,
v_sid,
v_serial#);
COMMIT;
END job_0125;
END pkg_pscp_job_scheduler;
CREATE OR REPLACE PROCEDURE pscp_sms_wait_to_sent_muti(a_group_id IN VARCHAR2) IS
counter NUMBER := 0; --统计数,当移动的数据达到10000条时,提交一次事务
sent_message_id integer; --发送表的message_id
v_today DATE;
v_sqlcode job_error_log.error_no%TYPE;
v_sqlerrm job_error_log.error_message%TYPE;
CURSOR v_job_info_cur IS
SELECT /*+rule*/
s.SID, s.serial#, s.logon_time, dj.job
FROM v$session s, dba_jobs_running djr, dba_jobs dj
WHERE djr.job = dj.job
AND s.SID = djr.SID
AND dj.what = 'PSCPCDE.PKG_PMCP_TO_PSCP_CONTROL.P_DATA_MANIPULATION(' ||
a_group_id || ');';
v_job_info_record v_job_info_cur%ROWTYPE;
BEGIN
-- 修改JOB状态为运行中
v_today := SYSDATE;
UPDATE pscp_job_running_log t
SET t.sid = v_job_info_record.sid,
t.serial# = v_job_info_record.serial#,
t.logon_time = v_job_info_record.logon_time,
t.job_id = v_job_info_record.job,
t.start_time = v_today,
t.end_time = v_today
WHERE t.group_id = a_group_id;
COMMIT;
--查询本次的待发数据--
FOR cur IN (SELECT new_message_id, sms_type FROM pscp_sms_wait_send t WHERE t.group_id = a_group_id AND t.disposal_status = '1' AND arrange_date <= SYSDATE + 1 / 24 / 60 AND arrange_date >= trunc(SYSDATE - 2)) LOOP
BEGIN
select SEQ_MESSAGE_SENT_ID.NEXTVAL into sent_message_id from dual;
INSERT INTO pscp_sms_sent
(sent_message_id,
template_id,
version_id,
vsender_id,
send_user,
request_id,
mobile_number,
sms_content,
sms_priority,
arrive_date,
arrange_date,
expire_date,
mosequence_id,
src_id,
sender_series_id,
gateway_channel,
localgateway_date,
native_series_id,
template_name,
sms_task_id,
sms_task_num,
sms_benefit_serie,
confirm_template_id,
business_type_id,
party_no,
client_no,
group_task_no,
sms_operation_status,
url,
sms_type,
site_no,
linkid,
receive_msg_id,
TOTALNUMBER,
forbid_start_date,
forbid_stop_date,
stat_date)
(SELECT sent_message_id, template_id, version_id, vsender_id, send_user, request_id, mobile_number, sms_content, sms_priority, trunc(SYSDATE), arrange_date, expire_date, mosequence_id, src_id, sender_series_id, gateway_channel, localgateway_date, native_series_id, template_name, sms_task_id, sms_task_num, sms_benefit_serie, confirm_template_id, business_type_id, party_no, client_no, group_task_no, sms_operation_status, url, sms_type, site_no, linkid, receive_msg_id, TOTALNUMBER, forbid_start_date, forbid_stop_date, arrive_date AS stat_date
FROM pscp_sms_wait_send
WHERE new_message_id = cur.new_message_id);
DELETE FROM pscp_sms_wait_send
WHERE new_message_id = cur.new_message_id;
if cur.sms_type = 2 then
update pscp_tpl_mms_sent wt
set wt.mms_sent_id = sent_message_id
where mms_sent_id = cur.new_message_id;
end if;
--备份出现异常,给源表一个标识,继续下一条得处理
EXCEPTION
WHEN OTHERS THEN
BEGIN
v_sqlcode := SQLCODE;
v_sqlerrm := SUBSTR(SQLERRM, 1, 200);
--插入违例表
INSERT INTO pscp_violate_history t(t.violate_id, t.template_name, t.request_id, t.violate_desp, t.sms_content, t.send_user, t.sms_benefit_serie, t.record_date, t.mobile_number, t.arrive_date, t.expire_date, t.sender_series_id, t.result_type, t.sms_send_template, t.tpl_version_id, t.sms_type, t.vsender_id)SELECT seq_violate_id.NEXTVAL, TEMPLATE_NAME, request_id, v_sqlerrm, sms_content, send_user, SMS_BENEFIT_SERIE, SYSDATE, mobile_number, arrange_date, expire_date, sender_series_id, 3, TEMPLATE_ID, VERSION_ID, sms_type, VSENDER_ID FROM pscp_sms_wait_send WHERE new_message_id = cur.new_message_id;
DELETE FROM pscp_sms_wait_send pss
WHERE pss.new_message_id = cur.new_message_id;
EXCEPTION
WHEN OTHERS THEN
v_sqlerrm := SUBSTR(SQLERRM, 1, 200);DELETE FROM pscp_sms_wait_send pss WHERE pss.new_message_id = cur.new_message_id;INSERT INTO pscp_db_log("prc_name", "log_desc", "log_time")VALUES ('pscp_sms_wait_to_sent', v_sqlerrm, SYSDATE);
END;
END;
---1000条提交一次
counter := counter + 1;
if mod(counter, 10000) = 0 then
commit;
end if;
END LOOP;
-- 更新JOB状态为处理完成
UPDATE pscp_job_running_log t
SET t.end_time = SYSDATE, t.job_status = '1'
WHERE t.group_id = a_group_id;
COMMIT;
EXCEPTION
WHEN OTHERS THEN
v_sqlerrm := SUBSTR(SQLERRM, 1, 200);
INSERT INTO pscp_db_log("prc_name", "log_desc", "log_time")
VALUES
('pscp_sms_wait_to_sent', v_sqlerrm, SYSDATE);
COMMIT;
RAISE;
END pscp_sms_wait_to_sent_muti;

本文数据库(综合)相关术语:系统安全软件

主题: SQL数据RIATIUBSUDUCUUASAGE
分页:12
转载请注明
本文标题:并发多子JOB执行分发数据
本站链接:http://www.codesec.net/view/480638.html
分享请点击:


1.凡CodeSecTeam转载的文章,均出自其它媒体或其他官网介绍,目的在于传递更多的信息,并不代表本站赞同其观点和其真实性负责;
2.转载的文章仅代表原创作者观点,与本站无关。其原创性以及文中陈述文字和内容未经本站证实,本站对该文以及其中全部或者部分内容、文字的真实性、完整性、及时性,不作出任何保证或承若;
3.如本站转载稿涉及版权等问题,请作者及时联系本站,我们会及时处理。
登录后可拥有收藏文章、关注作者等权限...
技术大类 技术大类 | 数据库(综合) | 评论(0) | 阅读(58)