select ? ?/*+parallel(t,25)+*/?
成都創(chuàng)新互聯(lián)專注為客戶提供全方位的互聯(lián)網(wǎng)綜合服務(wù),包含不限于網(wǎng)站制作、成都做網(wǎng)站、啟東網(wǎng)絡(luò)推廣、重慶小程序開發(fā)、啟東網(wǎng)絡(luò)營銷、啟東企業(yè)策劃、啟東品牌公關(guān)、搜索引擎seo、人物專訪、企業(yè)宣傳片、企業(yè)代運(yùn)營等,從售前售中售后,我們都將竭誠為您服務(wù),您的肯定,是我們最大的嘉獎;成都創(chuàng)新互聯(lián)為所有大學(xué)生創(chuàng)業(yè)者提供啟東建站搭建服務(wù),24小時服務(wù)熱線:18980820575,官方網(wǎng)址:www.cdcxhl.com
一、Parallel
1. 用途
強(qiáng)行啟用并行度來執(zhí)行當(dāng)前SQL。這個在Oracle 9i之后的版本可以使用,之前的版本現(xiàn)在沒有環(huán)境進(jìn)行測試。也就是說,加上這個說明,可以強(qiáng)行啟用Oracle的多線程處理功能。舉例的話,就像電腦裝了多核的CPU,但大多情況下都不會完全多核同時啟用(2核以上的比較明顯),使用parallel說明,就會多核同時工作,來提高效率。
但本身啟動這個功能,也是要消耗資源與性能的。所有,一般都會在返回記錄數(shù)大于100萬時使用,效果也會比較明顯。
2. 語法
/*+parallel(table_short_name,cash_number)*/
這個可以加到insert、delete、update、select的后面來使用(和rule的用法差不多,有機(jī)會再分享rule的用法)
開啟parallel功能的語句是:
alter session enable parallel dml;
這個語句是DML語句哦,如果在程序中用,用execute的方法打開。
3. 實(shí)例說明
用ERP中的transaction來說明下吧。這個table記錄了所有的transaction,而且每天數(shù)據(jù)量也算相對比較大的(根據(jù)企業(yè)自身業(yè)務(wù)量而定)。假設(shè)我們現(xiàn)在要查看對比去年一年當(dāng)中每月的進(jìn)、銷情況,所以,一般都會寫成:
select to_char(transaction_date,'yyyymm') txn_month,
sum(
decode(
sign(transaction_quantity),1,transaction_quantity,0
)
) in_qty,
sum(
decode(
sign(transaction_quantity),-1,transaction_quantity,0
)
) out_qty
from mtl_material_transactions mmt
where transaction_date = add_months(
to_date(?
to_char(sysdate,'yyyy')||'0101','yyyymmdd'),
-12)
and transaction_date = add_months(
to_date(
to_char(sysdate,'yyyy')||'1231','yyyymmdd'),
-12)
group by to_char(transaction_date,'yyyymm')?
這個SQL執(zhí)行起來,如果transaction_date上面有加index的話,效率還算過的去;但如果沒有加index的話,估計就會半個小時內(nèi)都執(zhí)行不出來。這是就可以在select 后面加上parallel說明。例如:
select /*+parallel(mmt,10)*/
to_char(transaction_date,'yyyymm') txn_month,
...
這樣的話,會大大提高執(zhí)行效率。如果要將檢索出來的結(jié)果insert到另一個表tmp_count_tab的話,也可以寫成:
insert /*+parallel(t,10)*/
into tmp_count_tab
(
txn_month,
in_qty,
out_qty
)
select /*+parallel(mmt,10)*/
to_char(transaction_date,'yyyymm') txn_month,
...
插入的機(jī)制和檢索機(jī)制差不多,所以,在insert后面加parallel也會加速的。關(guān)于insert機(jī)制,這里暫不說了。
Parallel后面的數(shù)字,越大,執(zhí)行效率越高。不過,貌似跟server的配置還有oracle的配置有關(guān),增大到一定值,效果就不明顯了。所以,一般用8,10,12,16的比較常見。我試過用30,發(fā)現(xiàn)和16的效果一樣。不過,數(shù)值越大,占用的資源也會相對增大的。如果是在一些package、function or procedure中寫的話,還是不要寫那么大,免得占用太多資源被DBA開K。
4. Parallel也可以用于多表
多表的話,就是在第一后面,加入其他的就可以了。具體寫法如下:
/*+parallel(t,10) (b,10)*/
5. 小結(jié)
關(guān)于執(zhí)行效率,建議還是多按照index的方法來提高效果。Oracle有自帶的explan road的方法,在執(zhí)行之前,先看下執(zhí)行計劃路線,對寫好的SQL tuned之后再執(zhí)行。實(shí)在沒辦法了,再用parallel方法。Parallel比較邪惡,對開發(fā)者而言,不是好東西,會養(yǎng)成不好習(xí)慣,導(dǎo)致很多bad SQL不會暴漏,SQL Tuning的能力得不到提升。我有見過某些人create table后,從不create index或primary key,認(rèn)為寫SQL時加parallel就可以了。
Oracle JOB實(shí)現(xiàn)多線程插入
Sql代碼
--經(jīng)測試,大數(shù)據(jù)量的插入,多線程在普通磁盤執(zhí)行效率反而更慢,不如單insert語句,而在磁盤陣列硬件環(huán)境下執(zhí)行效率有很大的提升。
--創(chuàng)建表,模擬多線程插入(TT3-TT4)
DROP TABLE TT3;
DROP TABLE TT4;
CREATE TABLE TT4 AS SELECT * FROM DBA_OBJECTS WHERE 1=0;
CREATE TABLE TT3 AS SELECT * FROM DBA_OBJECTS;
--數(shù)據(jù)分批插入?yún)?shù)表
DROP TABLE JOB_PARMS;
CREATE TABLE JOB_PARMS
(
JOB NUMBER PRIMARY KEY,
LO_RID INT,
HI_RID INT
);
--創(chuàng)建插入的存儲過程
CREATE OR REPLACE PROCEDURE PROC_TEST(P_JOB IN NUMBER) IS
L_REC JOB_PARMS%ROWTYPE;
BEGIN
SELECT * INTO L_REC
FROM JOB_PARMS
WHERE JOB = P_JOB;
INSERT INTO TT4
SELECT A.OWNER,
A.OBJECT_NAME,
A.SUBOBJECT_NAME,
A.OBJECT_ID,
A.DATA_OBJECT_ID,
A.OBJECT_TYPE,
A.CREATED,
A.LAST_DDL_TIME,
A.TIMESTAMP,
A.STATUS,
A.TEMPORARY,
A.GENERATED,
A.SECONDARY
FROM (SELECT ROWNUM RN, TT3.* FROM TT3 WHERE ROWNUM = L_REC.HI_RID) A
WHERE A.RN = L_REC.LO_RID;
DELETE FROM JOB_PARMS WHERE JOB = P_JOB;
COMMIT;
END;
/
---DIY 并行調(diào)度程序塊
DECLARE
L_JOB NUMBER;
C_INDEX NUMBER;--插入的數(shù)量總數(shù)
S_INDEX INT:=0;--插入的開始index
E_INDEX INT:=0;--插入的結(jié)束index
CQ_INDEX INT:=20;--循環(huán)的次數(shù)
NUM_INCREASE INT:=0;--增量累加
V_I INT:=0;--計數(shù)器
BEGIN
SELECT COUNT(*) INTO C_INDEX FROM TT3;
NUM_INCREASE:= CEIL(C_INDEX/CQ_INDEX);
WHILE CQ_INDEX V_I
LOOP
V_I:=V_I+1;
S_INDEX:=1+NUM_INCREASE*(V_I-1);
IF(V_I = 20) THEN--當(dāng)?shù)扔谘h(huán)次數(shù)則修改結(jié)束的index
E_INDEX:= C_INDEX;
ELSE
E_INDEX:=NUM_INCREASE*V_I;
END IF;
DBMS_JOB.SUBMIT( L_JOB, 'PROC_TEST(JOB);');
INSERT INTO JOB_PARMS(JOB, LO_RID, HI_RID)
VALUES ( L_JOB, S_INDEX, E_INDEX );
END LOOP;
END;
/
問題如下:
ORACLE的多線程體現(xiàn)在DML上 在操作時, 如果見到/* +*/ (平時寫備注、評論塊的/**/符號中有加號, 那么則表明了使用Oracle Hint. ?/*+ parallel(表名,并發(fā)數(shù))*/ (有時候?qū)懽鰽ppend ?parallel,或者有時候直接寫Append) ?.
從開發(fā)的角度看:
ORACLE多線程可以提高某些語句查詢的速度(不是一定的,取決于你的核,和服務(wù)器, 我原本有一些材料可以圖示進(jìn)程數(shù)和速度的關(guān)系,可惜一時找不到, 如果需要可以再聯(lián)系)。具體使用時, 做幾個測試 看看速率提高多少。。
從數(shù)據(jù)庫整體來看:
多線程并不是優(yōu)化了你的查詢速率, 而是使用了更多數(shù)據(jù)庫的資源(其他用戶或者進(jìn)程的資源)換來你的語句速率的提高。 聯(lián)系一下你的DBA, 因?yàn)楹苡锌赡苣阌昧硕噙M(jìn)程后,從DBA的EM上會發(fā)現(xiàn)你資源在某時間段內(nèi)用的很高,甚至?xí)o出警告。
void* OracleProcess(GPS_DATA GpsRec) // 數(shù)據(jù)庫數(shù)據(jù)處理
{
interval = 0;
struct HashItem* pHash;
pHash = inithashtable(MAX_REC2);
char sql[384] = {0};
char temp[256] = {0};
char tName[10] = {0}; // 表名字
int i,k;
int j = TotalRec RATE;
double distance;
for(i=0; i j; i++)
{
sprintf(temp,"%s%f%f%f%d",gps_last[i].tid,gps_last[i].lon,gps_last[i].lat,gps_last[i].speed,gps_last[i].udate);
InsertHash(temp, pHash, MAX_REC2); // 插入最后GPS信息到hash
memset(temp,0x00,256);
}
for(i = 0; i TotalRec; i++)
{
for(k=0; kj; k++) // 查詢車機(jī)是否在冊
if(strcmp(GpsRec[i].tid,tid[k]) == 0)
break;
if(k j)
{
if(GpsRec[i].udate != 0.00)
{
distance = InfoUpdate(GpsRec,i); // 最新GPS數(shù)據(jù)更新
sprintf(temp,"%s%f%f%f%d",GpsRec[i].tid,GpsRec[i].lon,GpsRec[i].lat,GpsRec[i].speed,GpsRec[i].udate);
if(GetHashTablePos(temp, pHash, MAX_REC2) == -1) // 查找hash是否存在
{
if (distance 0.0001)
{
sprintf(tName,"GPS_%d_Y",tf[k]);
InsertHash(temp, pHash, MAX_REC2); // 插入
sprintf(sql,"insert into %s (id,tm_id,lon,lat, speed, utc_time, udate,mileage,DIRECTION,DISTANCE) values (seq_gps.nextVal,'%s','%f','%f','%f','%d','%d','%f','%d','%f','%d')",
tName,GpsRec[i].tid,GpsRec[i].lon,GpsRec[i].lat,GpsRec[i].speed,GpsRec[i].utime,GpsRec[i].udate,GpsRec[i].mileage,GpsRec[i].dir,distance,interval);
printf("%s\n",sql);
oci_excu(oracle_env,(text *)sql,0); // 插入數(shù)據(jù)
memset(tName,0x00,10);
}
}
memset(sql,0x00,384);
memset(temp,0x00,256);
}
}
}
memset(GpsRec,0x00,sizeof(GpsRec));
free(pHash);
pthread_exit(NULL);
}
void TcpProcess(int tfd) // 處理TCP連接上的事務(wù)
{
struct timeval ntime;
int index = 0,times,ret;
int rlen = 0,rflag = 0;
char recvbuf[513] = {0};
bzero(recvbuf,513);
while(1)
{
ret = rlen = read(tfd,recvbuf,512);
if(rlen = 0)
break;
if((rlen%32) == 0) // 32長度為標(biāo)準(zhǔn)TCP信息
{
times = 0;
ret = 5;
while(ret--)
{
if(tflag[tfd] == tfd) // 已經(jīng)存在的socket
{
LOVENIX *info = (LOVENIX *)malloc(sizeof(LOVENIX));
memset(info,0x00,sizeof(LOVENIX));
if(recvbuf[times] == 0x58 || recvbuf[times] == 0x59)
ProtocolAnalysisLovenixTcp(recvbuf[times],info);
else if(recvbuf[times] == 0x24)
ProtocolAnalysisLovenixUdp(recvbuf[times],info);
sprintf(info-tid,"%s",seq[tfd]); // 合成車輛ID
DataProcess(info); // 處理GPS數(shù)據(jù)
free(info);
gettimeofday(ntime, NULL);
cntime[tfd] = ntime.tv_sec; // 更新時間
times += 32;
}
}
}
else if(rlen 32)
{
if(!rflag)
{
if((index = RegLovenix(tfd,recvbuf)) -1)
{
sprintf(seq[tfd],"%s",tid[index]); // 將對應(yīng)的socket設(shè)備ID保存
gettimeofday(ntime, NULL);
sfd[tfd] = tfd;
cntime[tfd] = ntime.tv_sec;
tflag[tfd] = tfd;
rflag = 1;
}
}
}
if(rlen 512); // 已經(jīng)讀完
break;
memset(recvbuf,0x00,rlen);
}
}
void *TcpServer(void *arg)
{
int port = (unsigned int) arg;
int efd,i;
struct timeval ntime;
int listener, nfds, n, listen_opt = 1, lisnum;
struct sockaddr_in my_addr, their_addr;
socklen_t len = sizeof(their_addr);
lisnum = MAXLISTEN;
for(i=0; iMAX_REC; i++)
{
sfd[i] = 0;
tflag[i] = 0;
}
if ((listener = socket(PF_INET, SOCK_STREAM, 0)) == -1) // 開啟 socket 監(jiān)聽
{
lprintf(lfd, FATAL, "TCP Socket error!\n");
exit(1);
}
else
lprintf(lfd, INFO, "TCP socket creat susscess!\n");
setsockopt(listener, SOL_SOCKET, SO_REUSEADDR, (void *) listen_opt,(int) sizeof(listen_opt)); // 設(shè)置端口多重邦定
setnonblocking(listener);
bzero(my_addr, sizeof(my_addr));
my_addr.sin_family = PF_INET;
my_addr.sin_port = htons(port);
my_addr.sin_addr.s_addr = INADDR_ANY;
if (bind(listener, (struct sockaddr *) my_addr, sizeof(struct sockaddr)) == -1)
{
lprintf(lfd, FATAL, "TCP bind error!\n");
exit(1);
}
else
lprintf(lfd, INFO, "TCP bind susscess!\n");
if (listen(listener, lisnum) == -1)
{
lprintf(lfd, FATAL, "TCP listen error!\n");
exit(1);
}
else
lprintf(lfd, INFO, "TCP listen susscess!\n");
kdpfd = epoll_create(MAXEPOLLSIZE); // 創(chuàng)建 epoll句柄,把監(jiān)聽socket加入到epoll集合里
ev.events = EPOLLIN | EPOLLET; // 注冊epoll 事件
ev.data.fd = listener;
if (epoll_ctl(kdpfd, EPOLL_CTL_ADD, listener, ev) 0)
lprintf(lfd, FATAL, "EPOLL_CTL_ADD error!\n");
while (1)
{
sem_wait(sem_tcp); // 等待 sem_TCP
sem_wait(sem_tp); // 將tp值減一
nfds = epoll_wait(kdpfd, events, MAXEPOLLSIZE, 1); // 等待有事件發(fā)生
if (nfds == -1)
lprintf(lfd, FATAL,"EPOLL_WAIT error!\n");
for (n = 0; n nfds; ++n) // 處理epoll所有事件
{
if (events[n].data.fd == listener) // 如果是連接事件
{
if ((efd = accept(listener, (struct sockaddr *) their_addr,len)) 0)
{
lprintf(lfd, FATAL, "accept error!\n");
continue;
}
else
lprintf(lfd, INFO, "Client from :%s\tSocket ID:%d\n", inet_ntoa(their_addr.sin_addr) ,efd);
setnonblocking(efd); // 設(shè)置新連接為非阻塞模式
ev.events = EPOLLIN | EPOLLET; // 注冊新連接
ev.data.fd = efd;
if (epoll_ctl(kdpfd, EPOLL_CTL_ADD, efd, ev) 0) // 將新連接加入EPOLL的監(jiān)聽隊列
lprintf(lfd, FATAL, "EPOLL_CTL_ADD error!\n");
else
{
gettimeofday(ntime, NULL);
cntime[efd] = ntime.tv_sec;
sfd[efd] = efd;
}
}
else if (events[n].events EPOLLIN)
tpool_add_work(pool, TcpProcess, (void*)events[n].data.fd); // 讀取分析TCP信息
else
{
close(events[n].data.fd);
epoll_ctl(kdpfd, EPOLL_CTL_DEL, events[n].data.fd, ev);
}
}
sem_post(sem_cm);
sem_post(sem_udp);
}
close(listener);
}
int DataProcess(LOVENIX *info) // 處理GPS數(shù)據(jù)
{
if(sflag == 0 (CacheRec != TotalRec)) // 緩存1可用且沒有滿
{
gps_cache[CacheRec].lat = info-lat;
gps_cache[CacheRec].mileage = info-mileage;
gps_cache[CacheRec].lon = info-lon;
gps_cache[CacheRec].speed = atod(info-speed, strlen(info-speed))*0.514444444*3.6;
gps_cache[CacheRec].udate = atoi(info-udate);
gps_cache[CacheRec].utime = atoi(info-utime);
gps_cache[CacheRec].dir = atoi(info-dir);
sprintf(gps_cache[CacheRec].tid ,"%s",info-tid);
CacheRec++;
// printf("CacheRec %d\tTotalRec %d \t sflag:%d\n",CacheRec,TotalRec,sflag);
if(CacheRec == TotalRec)
{
sflag = 1;
pthread_attr_init(attr); // 初始化屬性值,均設(shè)為默認(rèn)值
pthread_attr_setscope(attr, PTHREAD_SCOPE_SYSTEM);
pthread_attr_setdetachstate(attr, PTHREAD_CREATE_DETACHED); // 設(shè)置線程為分離屬性
if (pthread_create(thread, attr,(void*) OracleProcess,(void*)gps_cache)) // 創(chuàng)建數(shù)據(jù)處理線程
lprintf(lfd, FATAL, "oracle pthread_creat error!\n");
CacheRec = 0;
}
}
else if(sflag == 1 (Cache1Rec != TotalRec)) // 緩存2可用且沒有滿
{
gps_cache1[Cache1Rec].mileage = info-mileage;
gps_cache1[Cache1Rec].lat = info-lat;
gps_cache1[Cache1Rec].lon = info-lon;
gps_cache1[Cache1Rec].speed = atod(info-speed, strlen(info-speed))*0.514444444*3.6;
gps_cache1[Cache1Rec].udate = atoi(info-udate);
gps_cache1[Cache1Rec].utime = atoi(info-utime);
gps_cache1[Cache1Rec].dir = atoi(info-dir);
sprintf(gps_cache1[Cache1Rec].tid ,"%s",info-tid);
Cache1Rec++;
if(Cache1Rec == TotalRec)
{
sflag = 0;
pthread_attr_init(attr); // 初始化屬性值,均設(shè)為默認(rèn)值
pthread_attr_setscope(attr, PTHREAD_SCOPE_SYSTEM);
pthread_attr_setdetachstate(attr, PTHREAD_CREATE_DETACHED); // 設(shè)置線程為分離屬性
if (pthread_create(thread, attr,(void*) OracleProcess,(void*)gps_cache1)) // 創(chuàng)建數(shù)據(jù)處理線程
lprintf(lfd, FATAL, "oracle pthread_creat error!\n");
Cache1Rec = 0;
}
}
else
{
lprintf(lfd, FATAL, "No cache to use!\n");
return (0);
}
return (1);
}
windows里所有oracle事務(wù)統(tǒng)一由一個oracle.exe進(jìn)程管理,pmon、smon等表現(xiàn)為oracle.exe內(nèi)的線程。
linux系統(tǒng)里pmon、smon都是獨(dú)立的進(jìn)程。
線程是進(jìn)程的組成部分,進(jìn)程中的資源由多個線程共享。你可以把它們想象成:
進(jìn)程是老大,手上有的是鈔票、棍棒;
線程是他的小弟們,身無分文;
老大提供棍棒給小弟們出去辦事,辦事需要錢的時候由老大分配,小弟們搶來的鈔票歸老大統(tǒng)一支配。