时间:2022-04-27 10:28:09 | 栏目:Mysql | 点击:次
使用mysql_udf与curl库完成http_post通信模块(mysql_udf,multi_curl,http,post)
这个模块其目前主要用于xoyo江湖的sns与kingsoft_xoyo自主研发的TCSQL数据库做数据同步,当有feed插入sns数据库,使用触 发器调用该模块,向tcsql数据库发送同步数据。也可以使用该模块与其它使用socket接口的数据库或程序做转发与同步。
http_post模块主要使用mysql_udf接口,与curl库两部分技术。
mysql_udf是mysql为c语言提供的一个接口,通过这个接口,用户可以自定义mysql的函数,通过调用这些mysql函数,调用相应的c语言 模块来执行特定功能,实现mysql数据与外部应用的交互。curl库是一个比较常用的应用层网络协议库,主要用到的是其中的curl_multi异步通 信api,用来进行网络传输。
首先参考mysql官方提供的udf_example.c文件,建立3个主要的接口函数,分别是初始化函数,执行函数与析构函数。
在mysql_udf接口中,主函数体中是不允许使用new或malloc动态分配内存,所以如果需要申请内存空间,必须用xxxx_init()函数申 请并将申请的地址赋给initid->ptr指针,然后在主函数体中使用,并在xxxx_deinit析构函数体中释放。另外对于 mysql_udf接口的调用好像当并发量超过一定程度,如果是使用动态分配内存,会出现double free的错误,为了避免这个错误,所以在我的程序里使用静态空间与动态申请空间相结合的方式,这样如果数据较小,并发量较大,不会出现double free错误。对于静态申请空间,最大约在160000~170000byte左右,我这里使用的160000,当mysql传送的数据大于这个数的时 候,才动态申请内存。初始化函数体如下:
if(args->arg_count == 2 && args->args[1]!=NULL)
{
int flexibleLength = strlen(args->args[1]);
if(flexibleLength > 160000)
{
int allocLength = 200 + flexibleLength;
if (!(initid->ptr=(char*) malloc(allocLength) ) )
{
strcpy(message,"Couldn't allocate memory in http_post_init");
return 1;
}
return 0;
}
else
{
initid->ptr=NULL;
}
}
return 0;
}
主函数如下:
char sendArray[160000] = "\0";//can not move this into the if
if(initid->ptr == NULL)
{
//char sendArray[160000] = "\0";//error
sendBuffer=sendArray;
}
else
{
sendBuffer = initid->ptr;
TRY_TIMES=100;
}
strcpy(sendBuffer,args->args[1]);
curl = curl_easy_init();
multi_handle = curl_multi_init();
if(curl && multi_handle)
{
/* what URL that receives this POST */
curl_easy_setopt(curl, CURLOPT_URL,args->args[0]);
curl_easy_setopt(curl, CURLOPT_HTTPPOST, 1);
curl_easy_setopt(curl,CURLOPT_POSTFIELDS,sendBuffer);
curl_multi_add_handle(multi_handle, curl);
while(CURLM_CALL_MULTI_PERFORM == curl_multi_perform(multi_handle,\ &still_running));
while(still_running && times< TRY_TIMES)
{
int rc; //select() return code
int maxfd;
fd_set fdread;
fd_set fdwrite;
fd_set fdexcep;
FD_ZERO(&fdread);
FD_ZERO(&fdwrite);
FD_ZERO(&fdexcep); //get file descriptors from the transfers
curl_multi_fdset(multi_handle, &fdread, &fdwrite, &fdexcep,\ &maxfd);
rc = select(maxfd+1, &fdread, &fdwrite, &fdexcep, &timeout);
switch(rc)
{
case -1://select error
break;
case 0:
default: // timeout
while(CURLM_CALL_MULTI_PERFORM !== curl_multi_perform(multi_handle, &still_running));
break;
}
times++;
}//end while
curl_multi_remove_handle(multi_handle,curl);
curl_multi_cleanup(multi_handle);//always cleanup
curl_easy_cleanup(curl);
if(times>=TRY_TIMES)
{
return 1;
}
return 0;
}//end if
return 1;
}
对于easy发送完数据后,会阻塞等待服务器的response,如果没 有返回,就会一直阻塞,当然可以设置一个timeout,但如果这个时间设小了,easy发送大数据的时候就会中断,设太大了影响时间效率,另外当接收端 不发送response的时候,easy库即使发送完了数据,也会阻塞等待,有些时候对于发送端来讲不需要等待接收端的respons,当发送完毕就可以 结束了,这个时候easy就不适用。所以最后选择multi库。
如程序所示,首先得初始化,并设置easy句柄为post模式,指定需要post的数据,如下:
curl = curl_easy_init();
multi_handle = curl_multi_init();
curl_easy_setopt(curl, CURLOPT_URL,args->args[0]);
curl_easy_setopt(curl, CURLOPT_HTTPPOST, 1);
curl_easy_setopt(curl,CURLOPT_POSTFIELDS,sendBuffer);
由于要使用multi模式,必须也要初始化一个easy模式,并将这个easy模式的句柄放入所谓的multi函数执行栈:
curl_multi_add_handle(multi_handle, curl);
使用curl_multi_perform(multi_handle, &still_running),来进行异步传输,但如果该函数返回的不是CURLM_CALL_MULTI_PERFORM,则需要重新执行。直到循环while(CURLM_CALL_MULTI_PERFORM == curl_multi_perform(multi_handle, &still_running));结束。此时如果刚才函数体中的still_running被置为1,表明连接建立,正在发送数据。需要配合select机制来进行数据发送。
函数 curl_multi_fdset(multi_handle, &fdread, &fdwrite, &fdexcep, &maxfd);会将最大的描述符写入maxfd,
然后用select进行等待:rc = select(maxfd+1, &fdread, &fdwrite, &fdexcep, &timeout);
最后如果select返回值不为-1(error)0(timeout)时候再次进行异步传输,即执行curl_multi_perform函数,直到
still_running为0,程序结束退出。
这里设置了一个最大执行次数的限制,如果服务器出现了问题,不能发送response,则still_running不会变为0,程序会死循环,
所以,设置一个最大循环次数TRY_TIMES,防止这种情况发生。但是这个次数设小了,数据可能没有发送完,就退出了,如设置太大了,程序发送完了,服务器没有response就会多执行多余循环。所以这个TRY_TIMES需要根据数据的大小和网络状况来设置,比正常
传输数据的次数略长。这里我小数据的时候循环设次数25,大数据循环设为100.
最后是析构函数体:
编译执行过程如下:
将程序保存为http_post.c编译如下(请根据机器上的mysql路径进行调整):
在网络助手中可以看到如下结果: