anope

- supernets anope source code & configuration
git clone git://git.acid.vegas/anope.git
Log | Files | Refs | Archive | README

m_redis.cpp (13740B)

      1 /*
      2  *
      3  * (C) 2003-2022 Anope Team
      4  * Contact us at team@anope.org
      5  *
      6  * Please read COPYING and README for further details.
      7  */
      8 
      9 #include "module.h"
     10 #include "modules/redis.h"
     11 
     12 using namespace Redis;
     13 
     14 class MyRedisService;
     15 
     16 class RedisSocket : public BinarySocket, public ConnectionSocket
     17 {
     18 	size_t ParseReply(Reply &r, const char *buf, size_t l);
     19  public:
     20 	MyRedisService *provider;
     21 	std::deque<Interface *> interfaces;
     22 	std::map<Anope::string, Interface *> subinterfaces;
     23 
     24 	RedisSocket(MyRedisService *pro, bool v6) : Socket(-1, v6), provider(pro) { }
     25 
     26 	~RedisSocket();
     27 
     28 	void OnConnect() anope_override;
     29 	void OnError(const Anope::string &error) anope_override;
     30 
     31 	bool Read(const char *buffer, size_t l) anope_override;
     32 };
     33 
     34 class Transaction : public Interface
     35 {
     36  public:
     37 	std::deque<Interface *> interfaces;
     38 
     39 	Transaction(Module *creator) : Interface(creator) { }
     40 
     41 	~Transaction()
     42 	{
     43 		for (unsigned i = 0; i < interfaces.size(); ++i)
     44 		{
     45 			Interface *inter = interfaces[i];
     46 
     47 			if (!inter)
     48 				continue;
     49 
     50 			inter->OnError("Interface going away");
     51 		}
     52 	}
     53 
     54 	void OnResult(const Reply &r) anope_override
     55 	{
     56 		/* This is a multi bulk reply of the results of the queued commands
     57 		 * in this transaction
     58 		 */
     59 
     60 		Log(LOG_DEBUG_2) << "redis: transaction complete with " << r.multi_bulk.size() << " results";
     61 
     62 		for (unsigned i = 0; i < r.multi_bulk.size(); ++i)
     63 		{
     64 			const Reply *reply = r.multi_bulk[i];
     65 
     66 			if (interfaces.empty())
     67 				break;
     68 
     69 			Interface *inter = interfaces.front();
     70 			interfaces.pop_front();
     71 
     72 			if (inter)
     73 				inter->OnResult(*reply);
     74 		}
     75 	}
     76 };
     77 
     78 class MyRedisService : public Provider
     79 {
     80  public:
     81 	Anope::string host;
     82 	int port;
     83 	unsigned db;
     84 
     85 	RedisSocket *sock, *sub;
     86 
     87 	Transaction ti;
     88 	bool in_transaction;
     89 
     90 	MyRedisService(Module *c, const Anope::string &n, const Anope::string &h, int p, unsigned d) : Provider(c, n), host(h), port(p), db(d), sock(NULL), sub(NULL),
     91 		ti(c), in_transaction(false)
     92 	{
     93 		sock = new RedisSocket(this, host.find(':') != Anope::string::npos);
     94 		sock->Connect(host, port);
     95 
     96 		sub = new RedisSocket(this, host.find(':') != Anope::string::npos);
     97 		sub->Connect(host, port);
     98 	}
     99 
    100 	~MyRedisService()
    101 	{
    102 		if (sock)
    103 		{
    104 			sock->flags[SF_DEAD] = true;
    105 			sock->provider = NULL;
    106 		}
    107 
    108 		if (sub)
    109 		{
    110 			sub->flags[SF_DEAD] = true;
    111 			sub->provider = NULL;
    112 		}
    113 	}
    114 
    115  private:
    116 	inline void Pack(std::vector<char> &buffer, const char *buf, size_t sz = 0)
    117 	{
    118 		if (!sz)
    119 			sz = strlen(buf);
    120 
    121 		size_t old_size = buffer.size();
    122 		buffer.resize(old_size + sz);
    123 		std::copy(buf, buf + sz, buffer.begin() + old_size);
    124 	}
    125 
    126 	void Send(RedisSocket *s, Interface *i, const std::vector<std::pair<const char *, size_t> > &args)
    127 	{
    128 		std::vector<char> buffer;
    129 
    130 		Pack(buffer, "*");
    131 		Pack(buffer, stringify(args.size()).c_str());
    132 		Pack(buffer, "\r\n");
    133 
    134 		for (unsigned j = 0; j < args.size(); ++j)
    135 		{
    136 			const std::pair<const char *, size_t> &pair = args[j];
    137 
    138 			Pack(buffer, "$");
    139 			Pack(buffer, stringify(pair.second).c_str());
    140 			Pack(buffer, "\r\n");
    141 
    142 			Pack(buffer, pair.first, pair.second);
    143 			Pack(buffer, "\r\n");
    144 		}
    145 
    146 		if (buffer.empty())
    147 			return;
    148 
    149 		s->Write(&buffer[0], buffer.size());
    150 		if (in_transaction)
    151 		{
    152 			ti.interfaces.push_back(i);
    153 			s->interfaces.push_back(NULL); // For the +Queued response
    154 		}
    155 		else
    156 			s->interfaces.push_back(i);
    157 	}
    158 
    159  public:
    160 	bool IsSocketDead() anope_override
    161 	{
    162 		return this->sock && this->sock->flags[SF_DEAD];
    163 	}
    164 
    165 	void SendCommand(RedisSocket *s, Interface *i, const std::vector<Anope::string> &cmds)
    166 	{
    167 		std::vector<std::pair<const char *, size_t> > args;
    168 		for (unsigned j = 0; j < cmds.size(); ++j)
    169 			args.push_back(std::make_pair(cmds[j].c_str(), cmds[j].length()));
    170 		this->Send(s, i, args);
    171 	}
    172 
    173 	void SendCommand(RedisSocket *s, Interface *i, const Anope::string &str)
    174 	{
    175 		std::vector<Anope::string> args;
    176 		spacesepstream(str).GetTokens(args);
    177 		this->SendCommand(s, i, args);
    178 	}
    179 
    180 	void Send(Interface *i, const std::vector<std::pair<const char *, size_t> > &args)
    181 	{
    182 		if (!sock)
    183 		{
    184 			sock = new RedisSocket(this, host.find(':') != Anope::string::npos);
    185 			sock->Connect(host, port);
    186 		}
    187 
    188 		this->Send(sock, i, args);
    189 	}
    190 
    191 	void SendCommand(Interface *i, const std::vector<Anope::string> &cmds) anope_override
    192 	{
    193 		std::vector<std::pair<const char *, size_t> > args;
    194 		for (unsigned j = 0; j < cmds.size(); ++j)
    195 			args.push_back(std::make_pair(cmds[j].c_str(), cmds[j].length()));
    196 		this->Send(i, args);
    197 	}
    198 
    199 	void SendCommand(Interface *i, const Anope::string &str) anope_override
    200 	{
    201 		std::vector<Anope::string> args;
    202 		spacesepstream(str).GetTokens(args);
    203 		this->SendCommand(i, args);
    204 	}
    205 
    206  public:
    207 	bool BlockAndProcess() anope_override
    208 	{
    209 		if (!this->sock->ProcessWrite())
    210 			this->sock->flags[SF_DEAD] = true;
    211 		this->sock->SetBlocking(true);
    212 		if (!this->sock->ProcessRead())
    213 			this->sock->flags[SF_DEAD] = true;
    214 		this->sock->SetBlocking(false);
    215 		return !this->sock->interfaces.empty();
    216 	}
    217 
    218 	void Subscribe(Interface *i, const Anope::string &pattern) anope_override
    219 	{
    220 		if (sub == NULL)
    221 		{
    222 			sub = new RedisSocket(this, host.find(':') != Anope::string::npos);
    223 			sub->Connect(host, port);
    224 		}
    225 
    226 		std::vector<Anope::string> args;
    227 		args.push_back("PSUBSCRIBE");
    228 		args.push_back(pattern);
    229 		this->SendCommand(sub, NULL, args);
    230 
    231 		sub->subinterfaces[pattern] = i;
    232 	}
    233 
    234 	void Unsubscribe(const Anope::string &pattern) anope_override
    235 	{
    236 		if (sub)
    237 			sub->subinterfaces.erase(pattern);
    238 	}
    239 
    240 	void StartTransaction() anope_override
    241 	{
    242 		if (in_transaction)
    243 			throw CoreException();
    244 
    245 		this->SendCommand(NULL, "MULTI");
    246 		in_transaction = true;
    247 	}
    248 
    249 	void CommitTransaction() anope_override
    250 	{
    251 		/* The result of the transaction comes back to the reply of EXEC as a multi bulk.
    252 		 * The reply to the individual commands that make up the transaction when executed
    253 		 * is a simple +QUEUED
    254 		 */
    255 		in_transaction = false;
    256 		this->SendCommand(&this->ti, "EXEC");
    257 	}
    258 };
    259 
    260 RedisSocket::~RedisSocket()
    261 {
    262 	if (provider)
    263 	{
    264 		if (provider->sock == this)
    265 			provider->sock = NULL;
    266 		else if (provider->sub == this)
    267 			provider->sub = NULL;
    268 	}
    269 
    270 	for (unsigned i = 0; i < interfaces.size(); ++i)
    271 	{
    272 		Interface *inter = interfaces[i];
    273 
    274 		if (!inter)
    275 			continue;
    276 
    277 		inter->OnError("Interface going away");
    278 	}
    279 }
    280 
    281 void RedisSocket::OnConnect()
    282 {
    283 	Log() << "redis: Successfully connected to " << provider->name << (this == this->provider->sub ? " (sub)" : "");
    284 
    285 	this->provider->SendCommand(NULL, "CLIENT SETNAME Anope");
    286 	this->provider->SendCommand(NULL, "SELECT " + stringify(provider->db));
    287 
    288 	if (this != this->provider->sub)
    289 	{
    290 		this->provider->SendCommand(this, NULL, "CONFIG SET notify-keyspace-events KA");
    291 	}
    292 }
    293 
    294 void RedisSocket::OnError(const Anope::string &error)
    295 {
    296 	Log() << "redis: Error on " << provider->name << (this == this->provider->sub ? " (sub)" : "") << ": " << error;
    297 }
    298 
    299 size_t RedisSocket::ParseReply(Reply &r, const char *buffer, size_t l)
    300 {
    301 	size_t used = 0;
    302 
    303 	if (!l)
    304 		return used;
    305 
    306 	if (r.type == Reply::MULTI_BULK)
    307 		goto multi_bulk_cont;
    308 
    309 	switch (*buffer)
    310 	{
    311 		case '+':
    312 		{
    313 			Anope::string reason(buffer, 1, l - 1);
    314 			size_t nl = reason.find("\r\n");
    315 			Log(LOG_DEBUG_2) << "redis: status ok: " << reason.substr(0, nl);
    316 			if (nl != Anope::string::npos)
    317 			{
    318 				r.type = Reply::OK;
    319 				used = 1 + nl + 2;
    320 			}
    321 			break;
    322 		}
    323 		case '-':
    324 		{
    325 			Anope::string reason(buffer, 1, l - 1);
    326 			size_t nl = reason.find("\r\n");
    327 			Log(LOG_DEBUG) << "redis: status error: " << reason.substr(0, nl);
    328 			if (nl != Anope::string::npos)
    329 			{
    330 				r.type = Reply::NOT_OK;
    331 				used = 1 + nl + 2;
    332 			}
    333 			break;
    334 		}
    335 		case ':':
    336 		{
    337 			Anope::string ibuf(buffer, 1, l - 1);
    338 			size_t nl = ibuf.find("\r\n");
    339 			if (nl != Anope::string::npos)
    340 			{
    341 				try
    342 				{
    343 					r.i = convertTo<int64_t>(ibuf.substr(0, nl));
    344 				}
    345 				catch (const ConvertException &) { }
    346 
    347 				r.type = Reply::INT;
    348 				used = 1 + nl + 2;
    349 			}
    350 			break;
    351 		}
    352 		case '$':
    353 		{
    354 			Anope::string reply(buffer + 1, l - 1);
    355 			/* This assumes one bulk can always fit in our recv buffer */
    356 			size_t nl = reply.find("\r\n");
    357 			if (nl != Anope::string::npos)
    358 			{
    359 				int len;
    360 				try
    361 				{
    362 					len = convertTo<int>(reply.substr(0, nl));
    363 					if (len >= 0)
    364 					{
    365 						if (1 + nl + 2 + len + 2 <= l)
    366 						{
    367 							used = 1 + nl + 2 + len + 2;
    368 							r.bulk = reply.substr(nl + 2, len);
    369 							r.type = Reply::BULK;
    370 						}
    371 					}
    372 					else
    373 					{
    374 						used = 1 + nl + 2 + 2;
    375 						r.type = Reply::BULK;
    376 					}
    377 				}
    378 				catch (const ConvertException &) { }
    379 			}
    380 			break;
    381 		}
    382 		multi_bulk_cont:
    383 		case '*':
    384 		{
    385 			if (r.type != Reply::MULTI_BULK)
    386 			{
    387 				Anope::string reply(buffer + 1, l - 1);
    388 				size_t nl = reply.find("\r\n");
    389 				if (nl != Anope::string::npos)
    390 				{
    391 					r.type = Reply::MULTI_BULK;
    392 					try
    393 					{
    394 						r.multi_bulk_size = convertTo<int>(reply.substr(0, nl));
    395 					}
    396 					catch (const ConvertException &) { }
    397 
    398 					used = 1 + nl + 2;
    399 				}
    400 				else
    401 					break;
    402 			}
    403 			else if (r.multi_bulk_size >= 0 && r.multi_bulk.size() == static_cast<unsigned>(r.multi_bulk_size))
    404 			{
    405 				/* This multi bulk is already complete, so check the sub bulks */
    406 				for (unsigned i = 0; i < r.multi_bulk.size(); ++i)
    407 					if (r.multi_bulk[i]->type == Reply::MULTI_BULK)
    408 						ParseReply(*r.multi_bulk[i], buffer + used, l - used);
    409 				break;
    410 			}
    411 
    412 			for (int i = r.multi_bulk.size(); i < r.multi_bulk_size; ++i)
    413 			{
    414 				Reply *reply = new Reply();
    415 				size_t u = ParseReply(*reply, buffer + used, l - used);
    416 				if (!u)
    417 				{
    418 					Log(LOG_DEBUG) << "redis: ran out of data to parse";
    419 					delete reply;
    420 					break;
    421 				}
    422 				r.multi_bulk.push_back(reply);
    423 				used += u;
    424 			}
    425 			break;
    426 		}
    427 		default:
    428 			Log(LOG_DEBUG) << "redis: unknown reply " << *buffer;
    429 	}
    430 
    431 	return used;
    432 }
    433 
    434 bool RedisSocket::Read(const char *buffer, size_t l)
    435 {
    436 	static std::vector<char> save;
    437 	std::vector<char> copy;
    438 
    439 	if (!save.empty())
    440 	{
    441 		std::copy(buffer, buffer + l, std::back_inserter(save));
    442 
    443 		copy = save;
    444 
    445 		buffer = &copy[0];
    446 		l = copy.size();
    447 	}
    448 
    449 	while (l)
    450 	{
    451 		static Reply r;
    452 
    453 		size_t used = this->ParseReply(r, buffer, l);
    454 		if (!used)
    455 		{
    456 			Log(LOG_DEBUG) << "redis: used == 0 ?";
    457 			r.Clear();
    458 			break;
    459 		}
    460 		else if (used > l)
    461 		{
    462 			Log(LOG_DEBUG) << "redis: used > l ?";
    463 			r.Clear();
    464 			break;
    465 		}
    466 
    467 		/* Full result is not here yet */
    468 		if (r.type == Reply::MULTI_BULK && static_cast<unsigned>(r.multi_bulk_size) != r.multi_bulk.size())
    469 		{
    470 			buffer += used;
    471 			l -= used;
    472 			break;
    473 		}
    474 
    475 		if (this == provider->sub)
    476 		{
    477 			if (r.multi_bulk.size() == 4)
    478 			{
    479 				/* pmessage
    480 				 * pattern subscribed to
    481 				 * __keyevent@0__:set
    482 				 * key
    483 				 */
    484 				std::map<Anope::string, Interface *>::iterator it = this->subinterfaces.find(r.multi_bulk[1]->bulk);
    485 				if (it != this->subinterfaces.end())
    486 					it->second->OnResult(r);
    487 			}
    488 		}
    489 		else
    490 		{
    491 			if (this->interfaces.empty())
    492 			{
    493 				Log(LOG_DEBUG) << "redis: no interfaces?";
    494 			}
    495 			else
    496 			{
    497 				Interface *i = this->interfaces.front();
    498 				this->interfaces.pop_front();
    499 
    500 				if (i)
    501 				{
    502 					if (r.type != Reply::NOT_OK)
    503 						i->OnResult(r);
    504 					else
    505 						i->OnError(r.bulk);
    506 				}
    507 			}
    508 		}
    509 
    510 		buffer += used;
    511 		l -= used;
    512 
    513 		r.Clear();
    514 	}
    515 
    516 	if (l)
    517 	{
    518 		save.resize(l);
    519 		std::copy(buffer, buffer + l, save.begin());
    520 	}
    521 	else
    522 		std::vector<char>().swap(save);
    523 
    524 	return true;
    525 }
    526 
    527 
    528 class ModuleRedis : public Module
    529 {
    530 	std::map<Anope::string, MyRedisService *> services;
    531 
    532  public:
    533 	ModuleRedis(const Anope::string &modname, const Anope::string &creator) : Module(modname, creator, EXTRA | VENDOR)
    534 	{
    535 	}
    536 
    537 	~ModuleRedis()
    538 	{
    539 		for (std::map<Anope::string, MyRedisService *>::iterator it = services.begin(); it != services.end(); ++it)
    540 		{
    541 			MyRedisService *p = it->second;
    542 
    543 			delete p->sock;
    544 			p->sock = NULL;
    545 			delete p->sub;
    546 			p->sub = NULL;
    547 
    548 			delete p;
    549 		}
    550 	}
    551 
    552 	void OnReload(Configuration::Conf *conf) anope_override
    553 	{
    554 		Configuration::Block *block = conf->GetModule(this);
    555 		std::vector<Anope::string> new_services;
    556 
    557 		for (int i = 0; i < block->CountBlock("redis"); ++i)
    558 		{
    559 			Configuration::Block *redis = block->GetBlock("redis", i);
    560 
    561 			const Anope::string &n = redis->Get<const Anope::string>("name"),
    562 						&ip = redis->Get<const Anope::string>("ip");
    563 			int port = redis->Get<int>("port");
    564 			unsigned db = redis->Get<unsigned>("db");
    565 
    566 			delete services[n];
    567 			services[n] = new MyRedisService(this, n, ip, port, db);
    568 			new_services.push_back(n);
    569 		}
    570 
    571 		for (std::map<Anope::string, MyRedisService *>::iterator it = services.begin(); it != services.end();)
    572 		{
    573 			Provider *p = it->second;
    574 			++it;
    575 
    576 			if (std::find(new_services.begin(), new_services.end(), p->name) == new_services.end())
    577 				delete it->second;
    578 		}
    579 	}
    580 
    581 	void OnModuleUnload(User *, Module *m) anope_override
    582 	{
    583 		for (std::map<Anope::string, MyRedisService *>::iterator it = services.begin(); it != services.end(); ++it)
    584 		{
    585 			MyRedisService *p = it->second;
    586 
    587 			if (p->sock)
    588 				for (unsigned i = p->sock->interfaces.size(); i > 0; --i)
    589 				{
    590 					Interface *inter = p->sock->interfaces[i - 1];
    591 
    592 					if (inter && inter->owner == m)
    593 					{
    594 						inter->OnError(m->name + " being unloaded");
    595 						p->sock->interfaces.erase(p->sock->interfaces.begin() + i - 1);
    596 					}
    597 				}
    598 
    599 			if (p->sub)
    600 				for (unsigned i = p->sub->interfaces.size(); i > 0; --i)
    601 				{
    602 					Interface *inter = p->sub->interfaces[i - 1];
    603 
    604 					if (inter && inter->owner == m)
    605 					{
    606 						inter->OnError(m->name + " being unloaded");
    607 						p->sub->interfaces.erase(p->sub->interfaces.begin() + i - 1);
    608 					}
    609 				}
    610 
    611 			for (unsigned i = p->ti.interfaces.size(); i > 0; --i)
    612 			{
    613 				Interface *inter = p->ti.interfaces[i - 1];
    614 
    615 				if (inter && inter->owner == m)
    616 				{
    617 					inter->OnError(m->name + " being unloaded");
    618 					p->ti.interfaces.erase(p->ti.interfaces.begin() + i - 1);
    619 				}
    620 			}
    621 		}
    622 	}
    623 };
    624 
    625 MODULE_INIT(ModuleRedis)