Files
mux/Server/Source/ServiceSpace/RelationService/MessageHandler.cpp
2013-02-11 19:31:50 -06:00

323 lines
9.2 KiB
C++

#define ACE_BUILD_SVC_DLL
#include "MessageHandler.h"
#include "TeamManager.h"
#include "ChatGroupManager.h"
#include "MessageTool.h"
#include "FriendManager.h"
#include "UnionManager.h"
#include "SocialRelationManager.h"
#include "../ServiceSpaceLib/ObjectMessageBlock.h"
#include "../ServiceSpaceLib/TcpService.h"
#include "../ServiceSpaceLib/GetOpts.h"
#include "../ServiceSpaceLib/StreamBuffer.h"
#include "../ServiceSpaceLib/utilib/split.h"
__SERVICE_SPACE_BEGIN_NS__
const short MESSAGE_TYPE = 0;
RelationServiceMessageHandler::lock_type RelationServiceMessageHandler::gate_svr_lock;
RelationServiceMessageHandler::gate_map RelationServiceMessageHandler::gates;
RelationServiceProtocol RelationServiceMessageHandler::protocol;
tcp_session RelationServiceMessageHandler::get_session(unsigned short gate_id)
{
ACE_TRACE(ACE_TEXT("RelationServiceMessageHandler::get_session"));
gate_map::iterator it = gates.find(gate_id) ;
if (it != gates.end())
{
return it->second ;
}
return tcp_session() ;
}
unsigned RelationServiceMessageHandler::get_gate_id(tcp_session & session)
{
for(gate_map::iterator it = gates.begin(); it != gates.end(); ++it)
{
if (it->second == session)
return it->first;
}
return (unsigned)-1;
}
int RelationServiceMessageHandler::init(int argc, ACE_TCHAR * argv[])
{
SS_DEBUG("\n\n\n=======RelationServiceMessageHandler * init()\n\n\n");
GetOpts<ACE_TCHAR> opts(argc, argv);
int err = 0;
const ACE_TCHAR * options = ACE_TEXT(":h:p:z:");
ACE_Get_Opt get_opt(argc, (ACE_TCHAR **)opts.argv, options);
int c;
std::string db_str ;
while((c = get_opt()) != EOF)
{
switch(c)
{
case 'h':
{
unsigned long addr = ::inet_addr(get_opt.optarg);
listen_addr_.set_address((const char *)&addr, sizeof(addr), 0);
}
break;
case 'p':
listen_addr_.set_port_number(ACE_OS::atoi(get_opt.optarg));
break;
case 'z': //数据库ip地址
{
db_str = get_opt.optarg ;
break ;
}
}
}
err = SocialRelationManager::instance()->init(db_str);
if (err != 0)
return -1;
err = base::init(argc, argv);
return err;
}
int RelationServiceMessageHandler::open(void *args /*= 0*/)
{
int err = base::open(args);
if (-1 == err)
return -1;
/*this->register_handler(G_R_CreateChatGroup, &RelationServiceMessageHandler::handle_create_chat_group);
this->register_handler(G_R_DestroyChatGroup, &RelationServiceMessageHandler::handle_destroy_chat_group);
this->register_handler(G_R_AddChatGroupMember, &RelationServiceMessageHandler::handle_add_chat_group_member);
this->register_handler(G_R_RemoveChatGroupMember, &RelationServiceMessageHandler::handle_remove_chat_group_member);
this->register_handler(G_R_ChatGroupSpeek, &RelationServiceMessageHandler::handle_chat_group_speek);*/
#ifdef WIN32
TcpService * tcp = ACE_Dynamic_Service<TcpService>::instance("listener");
#else
const ACE_Service_Type * svc_rec;
if (ACE_Service_Repository::instance()->find("listener", &svc_rec) == -1)
return -1;
const ACE_Service_Type_Impl * type = svc_rec->type();
if (type == 0)
return -1;
ACE_Service_Object * obj = ACE_static_cast (ACE_Service_Object *, type->object());
TcpService * tcp = (TcpService *)obj;
#endif
if (tcp)
{
tcp->create_tcp_acceptor(listen_addr_, 0);
SS_INFO("\n\n\n----------------RelationService create_tcp_acceptor----------------\n\n\n");
}
else
err = -1;
return err;
}
int RelationServiceMessageHandler::handle_gate_session_closed(tcp_session & session)
{
gate_map::iterator it = gates.begin();
for (; it != gates.end(); ++it)
{
if (it->second == session)
{
SS_DEBUG(ACE_TEXT("gate[%s:%d] unregister id: %d\n"), session->remote_address().get_host_addr(),
session->remote_address().get_port_number(), it->first);
gates.erase(it);
break;
}
}
return 0;
}
int RelationServiceMessageHandler::get_message_id(const char * message)
{
short len;
short id;
char type;
parse_message_header(message, len, type, id);
return (int)id;
}
int RelationServiceMessageHandler::handle_task_message(ACE_Message_Block * message)
{
if (!message)
return -1;
utilib::pool_guard<lock_type> guard(gate_svr_lock);
ObjectMessageBlock<TcpService::Tcp_Notify> * notif_message = (ObjectMessageBlock<TcpService::Tcp_Notify> *)message;
TcpService::Tcp_Notify & notify = notif_message->obj();
int err = -1;
if (notify.flag == TcpService::Tcp_Notify::TCP_RECEIVED)
{
ACE_Message_Block * real_message = notify.message;
err = StreamBuffer(real_message, 0, 2, false);
if (err == 0)
{
handle_message(notify.session, real_message->rd_ptr(), real_message->length());
while (real_message->cont())
{
handle_message(notify.session, real_message->cont()->rd_ptr(), real_message->cont()->length());
real_message = real_message->cont();
}
}
else if (err == 1)
{
while (real_message->cont())
{
handle_message(notify.session, real_message->rd_ptr(), real_message->length());
ACE_Message_Block * tmp = real_message;
real_message = real_message->cont();
if (!real_message->cont())
{
tmp->cont(0);
}
}
if (notify.message == real_message)
notify.message = 0;
notify.session->post_asynch_read(real_message);
}
else if (err == -1)
{
SS_ERROR(ACE_TEXT("[ERROR]error message from [%s:%d]\n"), notify.session->remote_address().get_host_addr(),
notify.session->remote_address().get_port_number());
}
if (err != 1)
{
notify.session->post_asynch_read();
}
}
else if (notify.flag == TcpService::Tcp_Notify::TCP_SESSION_BUILT && check_session(notify.session))
{
SS_DEBUG(ACE_TEXT("gate session[%s:%d] built\n"), notify.session->remote_address().get_host_addr(),
notify.session->remote_address().get_port_number());
}
else if (notify.flag == TcpService::Tcp_Notify::TCP_SESSION_CLOSED && check_session(notify.session))
{
SS_DEBUG(ACE_TEXT("gate session[%s:%d] closed\n"), notify.session->remote_address().get_host_addr(),
notify.session->remote_address().get_port_number());
for(gate_map::iterator it = gates.begin(); it != gates.end(); ++it)
{
if (it->second == notify.session)
{
gates.erase(it);
break;
}
}
}
message->release();
return err;
}
int RelationServiceMessageHandler::handle_message(tcp_session & session, char * message, unsigned len)
{
if (get_message_id(message) == RelationServiceNS::ReportGateSvrIdRequest::wCmd)
return handle_report_gate_id(session, message, len);
else
return SocialRelationManager::instance()->HandleMessage(session, message, len);
}
ACE_Message_Block * RelationServiceMessageHandler::build_result_message(void * data, int msgId)
{
ACE_Message_Block * message = new MessageBlock();
message->wr_ptr(5);
size_t size = protocol.EnCode(msgId, data, message->wr_ptr(), message->space());
if (-1 == size)
{
SS_ERROR(ACE_TEXT("protocol encode error, message id: %d\n"), msgId);
message->release();
return 0;
}
message->wr_ptr(size);
build_message_header(message->rd_ptr(), short(size + 5), MESSAGE_TYPE, msgId);
return message;
}
int RelationServiceMessageHandler::send_result_message(ss::tcp_session &session, void *data, int msgId)
{
if (!check_session(session))
return 0;
ACE_Message_Block* pBlock = build_result_message(data, msgId);
if (!pBlock)
return -1;
if (pBlock->length()<5)
{
SS_ERROR(ACE_TEXT("RelationServiceMessageHandler::send_result_message | sent message length is less 5\n")) ;
pBlock->release();
return -1;
}
SS_DEBUG(ACE_TEXT("RelationServiceMessageHandler::send_result_message | sent message id is %d\n"), msgId);
session->post_asynch_write(pBlock);
return 0;
}
int RelationServiceMessageHandler::handle_report_gate_id(tcp_session & session, const char * message, unsigned len)
{
if (!check_session(session) || !message)
return -1;
message += 5;
RelationServiceNS::ReportGateSvrIdRequest req;
if (-1 == protocol.DeCode(RelationServiceNS::ReportGateSvrIdRequest::wCmd, &req, sizeof(req), const_cast<char *>(message), len - 5))
{
SS_ERROR(ACE_TEXT("protocol decode error\n"));
return -1;
}
bool close_session = false;
RelationServiceNS::ReportGateSvrIdResult result;
result.requestSeq = req.requestSeq;
std::map<unsigned, tcp_session>::iterator it = gates.find(req.gateId);
if (it != gates.end())
{
SS_ERROR(ACE_TEXT("dirty gate id:%d from [%s:%d]\n"), req.gateId, session->remote_address().get_host_addr(),
session->remote_address().get_port_number());
result.returnCode = RelationServiceNS::R_G_RESULT_GATE_ID_DIRTY;
close_session = true;
}
else
{
gates[req.gateId] = session;
result.returnCode = RelationServiceNS::R_G_RESULT_SUCCEED;
SS_DEBUG(ACE_TEXT("gate[%s:%d] register id: %d\n"), session->remote_address().get_host_addr(),
session->remote_address().get_port_number(), req.gateId);
}
send_result_message(session, &result, RelationServiceNS::ReportGateSvrIdResult::wCmd);
if (close_session)
session->peer().close();
else
{
RelationServiceNS::QueryExpNeededWhenLvlUpRequest req;
send_result_message(session, &req, RelationServiceNS::QueryExpNeededWhenLvlUpRequest::wCmd);
}
return 0;
}
extern "C" ACE_Svc_Export RelationServiceMessageHandler * CreateMessageHandler()
{
return new RelationServiceMessageHandler();
}
ACE_SVC_FACTORY_DECLARE(RelationServiceMessageHandler);
ACE_SVC_FACTORY_DEFINE(RelationServiceMessageHandler);
__SERVICE_SPACE_END_NS__