anope

- supernets anope source code & configuration
git clone git://git.acid.vegas/anope.git
Log | Files | Refs | Archive | README

db_redis.cpp (15310B)

      1 /*
      2  *
      3  * (C) 2003-2022 Anope Team
      4  * Contact us at team@anope.org
      5  *
      6  * Please read COPYING and README for further details.
      7  */
      8 
      9 #include "module.h"
     10 #include "modules/redis.h"
     11 
     12 using namespace Redis;
     13 
     14 class DatabaseRedis;
     15 static DatabaseRedis *me;
     16 
     17 class Data : public Serialize::Data
     18 {
     19  public:
     20 	std::map<Anope::string, std::stringstream *> data;
     21 
     22 	~Data()
     23 	{
     24 		for (std::map<Anope::string, std::stringstream *>::iterator it = data.begin(), it_end = data.end(); it != it_end; ++it)
     25 			delete it->second;
     26 	}
     27 
     28 	std::iostream& operator[](const Anope::string &key) anope_override
     29 	{
     30 		std::stringstream* &stream = data[key];
     31 		if (!stream)
     32 			stream = new std::stringstream();
     33 		return *stream;
     34 	}
     35 
     36 	std::set<Anope::string> KeySet() const anope_override
     37 	{
     38 		std::set<Anope::string> keys;
     39 		for (std::map<Anope::string, std::stringstream *>::const_iterator it = this->data.begin(), it_end = this->data.end(); it != it_end; ++it)
     40 			keys.insert(it->first);
     41 		return keys;
     42 	}
     43 
     44 	size_t Hash() const anope_override
     45 	{
     46 		size_t hash = 0;
     47 		for (std::map<Anope::string, std::stringstream *>::const_iterator it = this->data.begin(), it_end = this->data.end(); it != it_end; ++it)
     48 			if (!it->second->str().empty())
     49 				hash ^= Anope::hash_cs()(it->second->str());
     50 		return hash;
     51 	}
     52 };
     53 
     54 class TypeLoader : public Interface
     55 {
     56 	Anope::string type;
     57  public:
     58 	TypeLoader(Module *creator, const Anope::string &t) : Interface(creator), type(t) { }
     59 
     60 	void OnResult(const Reply &r) anope_override;
     61 };
     62 
     63 class ObjectLoader : public Interface
     64 {
     65 	Anope::string type;
     66 	int64_t id;
     67 
     68  public:
     69 	ObjectLoader(Module *creator, const Anope::string &t, int64_t i) : Interface(creator), type(t), id(i) { }
     70 
     71 	void OnResult(const Reply &r) anope_override;
     72 };
     73 
     74 class IDInterface : public Interface
     75 {
     76 	Reference<Serializable> o;
     77  public:
     78 	IDInterface(Module *creator, Serializable *obj) : Interface(creator), o(obj) { }
     79 
     80 	void OnResult(const Reply &r) anope_override;
     81 };
     82 
     83 class Deleter : public Interface
     84 {
     85 	Anope::string type;
     86 	int64_t id;
     87  public:
     88 	Deleter(Module *creator, const Anope::string &t, int64_t i) : Interface(creator), type(t), id(i) { }
     89 
     90 	void OnResult(const Reply &r) anope_override;
     91 };
     92 
     93 class Updater : public Interface
     94 {
     95 	Anope::string type;
     96 	int64_t id;
     97  public:
     98 	Updater(Module *creator, const Anope::string &t, int64_t i) : Interface(creator), type(t), id(i) { }
     99 
    100 	void OnResult(const Reply &r) anope_override;
    101 };
    102 
    103 class ModifiedObject : public Interface
    104 {
    105 	Anope::string type;
    106 	int64_t id;
    107  public:
    108 	ModifiedObject(Module *creator, const Anope::string &t, int64_t i) : Interface(creator), type(t), id(i) { }
    109 
    110 	void OnResult(const Reply &r) anope_override;
    111 };
    112 
    113 class SubscriptionListener : public Interface
    114 {
    115  public:
    116 	SubscriptionListener(Module *creator) : Interface(creator) { }
    117 
    118 	void OnResult(const Reply &r) anope_override;
    119 };
    120 
    121 class DatabaseRedis : public Module, public Pipe
    122 {
    123 	SubscriptionListener sl;
    124 	std::set<Serializable *> updated_items;
    125 
    126  public:
    127 	ServiceReference<Provider> redis;
    128 
    129 	DatabaseRedis(const Anope::string &modname, const Anope::string &creator) : Module(modname, creator, DATABASE | VENDOR), sl(this)
    130 	{
    131 		me = this;
    132 
    133 	}
    134 
    135 	/* Insert or update an object */
    136 	void InsertObject(Serializable *obj)
    137 	{
    138 		Serialize::Type *t = obj->GetSerializableType();
    139 
    140 		/* If there is no id yet for this object, get one */
    141 		if (!obj->id)
    142 			redis->SendCommand(new IDInterface(this, obj), "INCR id:" + t->GetName());
    143 		else
    144 		{
    145 			Data data;
    146 			obj->Serialize(data);
    147 
    148 			if (obj->IsCached(data))
    149 				return;
    150 
    151 			obj->UpdateCache(data);
    152 
    153 			std::vector<Anope::string> args;
    154 			args.push_back("HGETALL");
    155 			args.push_back("hash:" + t->GetName() + ":" + stringify(obj->id));
    156 
    157 			/* Get object attrs to clear before updating */
    158 			redis->SendCommand(new Updater(this, t->GetName(), obj->id), args);
    159 		}
    160 	}
    161 
    162 	void OnNotify() anope_override
    163 	{
    164 		for (std::set<Serializable *>::iterator it = this->updated_items.begin(), it_end = this->updated_items.end(); it != it_end; ++it)
    165 		{
    166 			Serializable *s = *it;
    167 
    168 			this->InsertObject(s);
    169 		}
    170 
    171 		this->updated_items.clear();
    172 	}
    173 
    174 	void OnReload(Configuration::Conf *conf) anope_override
    175 	{
    176 		Configuration::Block *block = conf->GetModule(this);
    177 		this->redis = ServiceReference<Provider>("Redis::Provider", block->Get<const Anope::string>("engine", "redis/main"));
    178 	}
    179 
    180 	EventReturn OnLoadDatabase() anope_override
    181 	{
    182 		if (!redis)
    183 		{
    184 			Log(this) << "Unable to load database - unable to find redis provider";
    185 			return EVENT_CONTINUE;
    186 		}
    187 
    188 		const std::vector<Anope::string> type_order = Serialize::Type::GetTypeOrder();
    189 		for (unsigned i = 0; i < type_order.size(); ++i)
    190 		{
    191 			Serialize::Type *sb = Serialize::Type::Find(type_order[i]);
    192 			this->OnSerializeTypeCreate(sb);
    193 		}
    194 
    195 		while (!redis->IsSocketDead() && redis->BlockAndProcess());
    196 
    197 		if (redis->IsSocketDead())
    198 		{
    199 			Log(this) << "I/O error while loading redis database - is it online?";
    200 			return EVENT_CONTINUE;
    201 		}
    202 
    203 		redis->Subscribe(&this->sl, "__keyspace@*__:hash:*");
    204 
    205 		return EVENT_STOP;
    206 	}
    207 
    208 	void OnSerializeTypeCreate(Serialize::Type *sb) anope_override
    209 	{
    210 		if (!redis)
    211 			return;
    212 
    213 		std::vector<Anope::string> args;
    214 		args.push_back("SMEMBERS");
    215 		args.push_back("ids:" + sb->GetName());
    216 
    217 		redis->SendCommand(new TypeLoader(this, sb->GetName()), args);
    218 	}
    219 
    220 	void OnSerializableConstruct(Serializable *obj) anope_override
    221 	{
    222 		this->updated_items.insert(obj);
    223 		this->Notify();
    224 	}
    225 
    226 	void OnSerializableDestruct(Serializable *obj) anope_override
    227 	{
    228 		Serialize::Type *t = obj->GetSerializableType();
    229 
    230 		if (t == NULL)
    231 		{
    232 			/* This is probably the module providing the type unloading.
    233 			 *
    234 			 * The types get registered after the extensible container is
    235 			 * registered so that unserialization on module load can insert
    236 			 * into the extensible container. So, the type destructs prior to
    237 			 * the extensible container, which then triggers this
    238 			 */
    239 			return;
    240 		}
    241 
    242 		std::vector<Anope::string> args;
    243 		args.push_back("HGETALL");
    244 		args.push_back("hash:" + t->GetName() + ":" + stringify(obj->id));
    245 
    246 		/* Get all of the attributes for this object */
    247 		redis->SendCommand(new Deleter(this, t->GetName(), obj->id), args);
    248 
    249 		this->updated_items.erase(obj);
    250 		t->objects.erase(obj->id);
    251 		this->Notify();
    252 	}
    253 
    254 	void OnSerializableUpdate(Serializable *obj) anope_override
    255 	{
    256 		this->updated_items.insert(obj);
    257 		this->Notify();
    258 	}
    259 };
    260 
    261 void TypeLoader::OnResult(const Reply &r)
    262 {
    263 	if (r.type != Reply::MULTI_BULK || !me->redis)
    264 	{
    265 		delete this;
    266 		return;
    267 	}
    268 
    269 	for (unsigned i = 0; i < r.multi_bulk.size(); ++i)
    270 	{
    271 		const Reply *reply = r.multi_bulk[i];
    272 
    273 		if (reply->type != Reply::BULK)
    274 			continue;
    275 
    276 		int64_t id;
    277 		try
    278 		{
    279 			id = convertTo<int64_t>(reply->bulk);
    280 		}
    281 		catch (const ConvertException &)
    282 		{
    283 			continue;
    284 		}
    285 
    286 		std::vector<Anope::string> args;
    287 		args.push_back("HGETALL");
    288 		args.push_back("hash:" + this->type + ":" + stringify(id));
    289 
    290 		me->redis->SendCommand(new ObjectLoader(me, this->type, id), args);
    291 	}
    292 
    293 	delete this;
    294 }
    295 
    296 void ObjectLoader::OnResult(const Reply &r)
    297 {
    298 	Serialize::Type *st = Serialize::Type::Find(this->type);
    299 
    300 	if (r.type != Reply::MULTI_BULK || r.multi_bulk.empty() || !me->redis || !st)
    301 	{
    302 		delete this;
    303 		return;
    304 	}
    305 
    306 	Data data;
    307 
    308 	for (unsigned i = 0; i + 1 < r.multi_bulk.size(); i += 2)
    309 	{
    310 		const Reply *key = r.multi_bulk[i],
    311 			*value = r.multi_bulk[i + 1];
    312 
    313 		data[key->bulk] << value->bulk;
    314 	}
    315 
    316 	Serializable* &obj = st->objects[this->id];
    317 	obj = st->Unserialize(obj, data);
    318 	if (obj)
    319 	{
    320 		obj->id = this->id;
    321 		obj->UpdateCache(data);
    322 	}
    323 
    324 	delete this;
    325 }
    326 
    327 void IDInterface::OnResult(const Reply &r)
    328 {
    329 	if (!o || r.type != Reply::INT || !r.i)
    330 	{
    331 		delete this;
    332 		return;
    333 	}
    334 
    335 	Serializable* &obj = o->GetSerializableType()->objects[r.i];
    336 	if (obj)
    337 		/* This shouldn't be possible */
    338 		obj->id = 0;
    339 
    340 	o->id = r.i;
    341 	obj = o;
    342 
    343 	/* Now that we have the id, insert this object for real */
    344 	anope_dynamic_static_cast<DatabaseRedis *>(this->owner)->InsertObject(o);
    345 
    346 	delete this;
    347 }
    348 
    349 void Deleter::OnResult(const Reply &r)
    350 {
    351 	if (r.type != Reply::MULTI_BULK || !me->redis || r.multi_bulk.empty())
    352 	{
    353 		delete this;
    354 		return;
    355 	}
    356 
    357 	/* Transaction start */
    358 	me->redis->StartTransaction();
    359 
    360 	std::vector<Anope::string> args;
    361 	args.push_back("DEL");
    362 	args.push_back("hash:" + this->type + ":" + stringify(this->id));
    363 
    364 	/* Delete hash object */
    365 	me->redis->SendCommand(NULL, args);
    366 
    367 	args.clear();
    368 	args.push_back("SREM");
    369 	args.push_back("ids:" + this->type);
    370 	args.push_back(stringify(this->id));
    371 
    372 	/* Delete id from ids set */
    373 	me->redis->SendCommand(NULL, args);
    374 
    375 	for (unsigned i = 0; i + 1 < r.multi_bulk.size(); i += 2)
    376 	{
    377 		const Reply *key = r.multi_bulk[i],
    378 			*value = r.multi_bulk[i + 1];
    379 
    380 		args.clear();
    381 		args.push_back("SREM");
    382 		args.push_back("value:" + this->type + ":" + key->bulk + ":" + value->bulk);
    383 		args.push_back(stringify(this->id));
    384 
    385 		/* Delete value -> object id */
    386 		me->redis->SendCommand(NULL, args);
    387 	}
    388 
    389 	/* Transaction end */
    390 	me->redis->CommitTransaction();
    391 
    392 	delete this;
    393 }
    394 
    395 void Updater::OnResult(const Reply &r)
    396 {
    397 	Serialize::Type *st = Serialize::Type::Find(this->type);
    398 
    399 	if (!st)
    400 	{
    401 		delete this;
    402 		return;
    403 	}
    404 
    405 	Serializable *obj = st->objects[this->id];
    406 	if (!obj)
    407 	{
    408 		delete this;
    409 		return;
    410 	}
    411 
    412 	Data data;
    413 	obj->Serialize(data);
    414 
    415 	/* Transaction start */
    416 	me->redis->StartTransaction();
    417 
    418 	for (unsigned i = 0; i + 1 < r.multi_bulk.size(); i += 2)
    419 	{
    420 		const Reply *key = r.multi_bulk[i],
    421 			*value = r.multi_bulk[i + 1];
    422 
    423 		std::vector<Anope::string> args;
    424 		args.push_back("SREM");
    425 		args.push_back("value:" + this->type + ":" + key->bulk + ":" + value->bulk);
    426 		args.push_back(stringify(this->id));
    427 
    428 		/* Delete value -> object id */
    429 		me->redis->SendCommand(NULL, args);
    430 	}
    431 
    432 	/* Add object id to id set for this type */
    433 	std::vector<Anope::string> args;
    434 	args.push_back("SADD");
    435 	args.push_back("ids:" + this->type);
    436 	args.push_back(stringify(obj->id));
    437 	me->redis->SendCommand(NULL, args);
    438 
    439 	args.clear();
    440 	args.push_back("HMSET");
    441 	args.push_back("hash:" + this->type + ":" + stringify(obj->id));
    442 
    443 	typedef std::map<Anope::string, std::stringstream *> items;
    444 	for (items::iterator it = data.data.begin(), it_end = data.data.end(); it != it_end; ++it)
    445 	{
    446 		const Anope::string &key = it->first;
    447 		std::stringstream *value = it->second;
    448 
    449 		args.push_back(key);
    450 		args.push_back(value->str());
    451 
    452 		std::vector<Anope::string> args2;
    453 
    454 		args2.push_back("SADD");
    455 		args2.push_back("value:" + this->type + ":" + key + ":" + value->str());
    456 		args2.push_back(stringify(obj->id));
    457 
    458 		/* Add to value -> object id set */
    459 		me->redis->SendCommand(NULL, args2);
    460 	}
    461 
    462 	++obj->redis_ignore;
    463 
    464 	/* Add object */
    465 	me->redis->SendCommand(NULL, args);
    466 
    467 	/* Transaction end */
    468 	me->redis->CommitTransaction();
    469 
    470 	delete this;
    471 }
    472 
    473 void SubscriptionListener::OnResult(const Reply &r)
    474 {
    475 	/*
    476 	 * [May 15 13:59:35.645839 2013] Debug: pmessage
    477 	 * [May 15 13:59:35.645866 2013] Debug: __keyspace@*__:anope:hash:*
    478 	 * [May 15 13:59:35.645880 2013] Debug: __keyspace@0__:anope:hash:type:id
    479 	 * [May 15 13:59:35.645893 2013] Debug: hset
    480 	 */
    481 	if (r.multi_bulk.size() != 4)
    482 		return;
    483 
    484 	size_t sz = r.multi_bulk[2]->bulk.find(':');
    485 	if (sz == Anope::string::npos)
    486 		return;
    487 
    488 	const Anope::string &key = r.multi_bulk[2]->bulk.substr(sz + 1),
    489 				&op = r.multi_bulk[3]->bulk;
    490 
    491 	sz = key.rfind(':');
    492 	if (sz == Anope::string::npos)
    493 		return;
    494 
    495 	const Anope::string &id = key.substr(sz + 1);
    496 
    497 	size_t sz2 = key.rfind(':', sz - 1);
    498 	if (sz2 == Anope::string::npos)
    499 		return;
    500 	const Anope::string &type = key.substr(sz2 + 1, sz - sz2 - 1);
    501 
    502 	Serialize::Type *s_type = Serialize::Type::Find(type);
    503 
    504 	if (s_type == NULL)
    505 		return;
    506 
    507 	uint64_t obj_id;
    508 	try
    509 	{
    510 		obj_id = convertTo<uint64_t>(id);
    511 	}
    512 	catch (const ConvertException &)
    513 	{
    514 		return;
    515 	}
    516 
    517 	if (op == "hset" || op == "hdel")
    518 	{
    519 		Serializable *s = s_type->objects[obj_id];
    520 
    521 		if (s && s->redis_ignore)
    522 		{
    523 			--s->redis_ignore;
    524 			Log(LOG_DEBUG) << "redis: notify: got modify for object id " << obj_id << " of type " << type << ", but I am ignoring it";
    525 		}
    526 		else
    527 		{
    528 			Log(LOG_DEBUG) << "redis: notify: got modify for object id " << obj_id << " of type " << type;
    529 
    530 			std::vector<Anope::string> args;
    531 			args.push_back("HGETALL");
    532 			args.push_back("hash:" + type + ":" + id);
    533 
    534 			me->redis->SendCommand(new ModifiedObject(me, type, obj_id), args);
    535 		}
    536 	}
    537 	else if (op == "del")
    538 	{
    539 		Serializable* &s = s_type->objects[obj_id];
    540 		if (s == NULL)
    541 			return;
    542 
    543 		Log(LOG_DEBUG) << "redis: notify: deleting object id " << obj_id << " of type " << type;
    544 
    545 		Data data;
    546 
    547 		s->Serialize(data);
    548 
    549 		/* Transaction start */
    550 		me->redis->StartTransaction();
    551 
    552 		typedef std::map<Anope::string, std::stringstream *> items;
    553 		for (items::iterator it = data.data.begin(), it_end = data.data.end(); it != it_end; ++it)
    554 		{
    555 			const Anope::string &k = it->first;
    556 			std::stringstream *value = it->second;
    557 
    558 			std::vector<Anope::string> args;
    559 			args.push_back("SREM");
    560 			args.push_back("value:" + type + ":" + k + ":" + value->str());
    561 			args.push_back(id);
    562 
    563 			/* Delete value -> object id */
    564 			me->redis->SendCommand(NULL, args);
    565 		}
    566 
    567 		std::vector<Anope::string> args;
    568 		args.push_back("SREM");
    569 		args.push_back("ids:" + type);
    570 		args.push_back(stringify(s->id));
    571 
    572 		/* Delete object from id set */
    573 		me->redis->SendCommand(NULL, args);
    574 
    575 		/* Transaction end */
    576 		me->redis->CommitTransaction();
    577 
    578 		delete s;
    579 		s = NULL;
    580 	}
    581 }
    582 
    583 void ModifiedObject::OnResult(const Reply &r)
    584 {
    585 	Serialize::Type *st = Serialize::Type::Find(this->type);
    586 
    587 	if (!st)
    588 	{
    589 		delete this;
    590 		return;
    591 	}
    592 
    593 	Serializable* &obj = st->objects[this->id];
    594 
    595 	/* Transaction start */
    596 	me->redis->StartTransaction();
    597 
    598 	/* Erase old object values */
    599 	if (obj)
    600 	{
    601 		Data data;
    602 
    603 		obj->Serialize(data);
    604 
    605 		typedef std::map<Anope::string, std::stringstream *> items;
    606 		for (items::iterator it = data.data.begin(), it_end = data.data.end(); it != it_end; ++it)
    607 		{
    608 			const Anope::string &key = it->first;
    609 			std::stringstream *value = it->second;
    610 
    611 			std::vector<Anope::string> args;
    612 			args.push_back("SREM");
    613 			args.push_back("value:" + st->GetName() + ":" + key + ":" + value->str());
    614 			args.push_back(stringify(this->id));
    615 
    616 			/* Delete value -> object id */
    617 			me->redis->SendCommand(NULL, args);
    618 		}
    619 	}
    620 
    621 	Data data;
    622 
    623 	for (unsigned i = 0; i + 1 < r.multi_bulk.size(); i += 2)
    624 	{
    625 		const Reply *key = r.multi_bulk[i],
    626 			*value = r.multi_bulk[i + 1];
    627 
    628 		data[key->bulk] << value->bulk;
    629 	}
    630 
    631 	obj = st->Unserialize(obj, data);
    632 	if (obj)
    633 	{
    634 		obj->id = this->id;
    635 		obj->UpdateCache(data);
    636 
    637 		/* Insert new object values */
    638 		typedef std::map<Anope::string, std::stringstream *> items;
    639 		for (items::iterator it = data.data.begin(), it_end = data.data.end(); it != it_end; ++it)
    640 		{
    641 			const Anope::string &key = it->first;
    642 			std::stringstream *value = it->second;
    643 
    644 			std::vector<Anope::string> args;
    645 			args.push_back("SADD");
    646 			args.push_back("value:" + st->GetName() + ":" + key + ":" + value->str());
    647 			args.push_back(stringify(obj->id));
    648 
    649 			/* Add to value -> object id set */
    650 			me->redis->SendCommand(NULL, args);
    651 		}
    652 
    653 		std::vector<Anope::string> args;
    654 		args.push_back("SADD");
    655 		args.push_back("ids:" + st->GetName());
    656 		args.push_back(stringify(obj->id));
    657 
    658 		/* Add to type -> id set */
    659 		me->redis->SendCommand(NULL, args);
    660 	}
    661 
    662 	/* Transaction end */
    663 	me->redis->CommitTransaction();
    664 
    665 	delete this;
    666 }
    667 
    668 MODULE_INIT(DatabaseRedis)