anope- supernets anope source code & configuration |
git clone git://git.acid.vegas/anope.git |
Log | Files | Refs | Archive | README |
db_redis.cpp (15310B)
1 /* 2 * 3 * (C) 2003-2022 Anope Team 4 * Contact us at team@anope.org 5 * 6 * Please read COPYING and README for further details. 7 */ 8 9 #include "module.h" 10 #include "modules/redis.h" 11 12 using namespace Redis; 13 14 class DatabaseRedis; 15 static DatabaseRedis *me; 16 17 class Data : public Serialize::Data 18 { 19 public: 20 std::map<Anope::string, std::stringstream *> data; 21 22 ~Data() 23 { 24 for (std::map<Anope::string, std::stringstream *>::iterator it = data.begin(), it_end = data.end(); it != it_end; ++it) 25 delete it->second; 26 } 27 28 std::iostream& operator[](const Anope::string &key) anope_override 29 { 30 std::stringstream* &stream = data[key]; 31 if (!stream) 32 stream = new std::stringstream(); 33 return *stream; 34 } 35 36 std::set<Anope::string> KeySet() const anope_override 37 { 38 std::set<Anope::string> keys; 39 for (std::map<Anope::string, std::stringstream *>::const_iterator it = this->data.begin(), it_end = this->data.end(); it != it_end; ++it) 40 keys.insert(it->first); 41 return keys; 42 } 43 44 size_t Hash() const anope_override 45 { 46 size_t hash = 0; 47 for (std::map<Anope::string, std::stringstream *>::const_iterator it = this->data.begin(), it_end = this->data.end(); it != it_end; ++it) 48 if (!it->second->str().empty()) 49 hash ^= Anope::hash_cs()(it->second->str()); 50 return hash; 51 } 52 }; 53 54 class TypeLoader : public Interface 55 { 56 Anope::string type; 57 public: 58 TypeLoader(Module *creator, const Anope::string &t) : Interface(creator), type(t) { } 59 60 void OnResult(const Reply &r) anope_override; 61 }; 62 63 class ObjectLoader : public Interface 64 { 65 Anope::string type; 66 int64_t id; 67 68 public: 69 ObjectLoader(Module *creator, const Anope::string &t, int64_t i) : Interface(creator), type(t), id(i) { } 70 71 void OnResult(const Reply &r) anope_override; 72 }; 73 74 class IDInterface : public Interface 75 { 76 Reference<Serializable> o; 77 public: 78 IDInterface(Module *creator, Serializable *obj) : Interface(creator), o(obj) { } 79 80 void OnResult(const Reply &r) anope_override; 81 }; 82 83 class Deleter : public Interface 84 { 85 Anope::string type; 86 int64_t id; 87 public: 88 Deleter(Module *creator, const Anope::string &t, int64_t i) : Interface(creator), type(t), id(i) { } 89 90 void OnResult(const Reply &r) anope_override; 91 }; 92 93 class Updater : public Interface 94 { 95 Anope::string type; 96 int64_t id; 97 public: 98 Updater(Module *creator, const Anope::string &t, int64_t i) : Interface(creator), type(t), id(i) { } 99 100 void OnResult(const Reply &r) anope_override; 101 }; 102 103 class ModifiedObject : public Interface 104 { 105 Anope::string type; 106 int64_t id; 107 public: 108 ModifiedObject(Module *creator, const Anope::string &t, int64_t i) : Interface(creator), type(t), id(i) { } 109 110 void OnResult(const Reply &r) anope_override; 111 }; 112 113 class SubscriptionListener : public Interface 114 { 115 public: 116 SubscriptionListener(Module *creator) : Interface(creator) { } 117 118 void OnResult(const Reply &r) anope_override; 119 }; 120 121 class DatabaseRedis : public Module, public Pipe 122 { 123 SubscriptionListener sl; 124 std::set<Serializable *> updated_items; 125 126 public: 127 ServiceReference<Provider> redis; 128 129 DatabaseRedis(const Anope::string &modname, const Anope::string &creator) : Module(modname, creator, DATABASE | VENDOR), sl(this) 130 { 131 me = this; 132 133 } 134 135 /* Insert or update an object */ 136 void InsertObject(Serializable *obj) 137 { 138 Serialize::Type *t = obj->GetSerializableType(); 139 140 /* If there is no id yet for this object, get one */ 141 if (!obj->id) 142 redis->SendCommand(new IDInterface(this, obj), "INCR id:" + t->GetName()); 143 else 144 { 145 Data data; 146 obj->Serialize(data); 147 148 if (obj->IsCached(data)) 149 return; 150 151 obj->UpdateCache(data); 152 153 std::vector<Anope::string> args; 154 args.push_back("HGETALL"); 155 args.push_back("hash:" + t->GetName() + ":" + stringify(obj->id)); 156 157 /* Get object attrs to clear before updating */ 158 redis->SendCommand(new Updater(this, t->GetName(), obj->id), args); 159 } 160 } 161 162 void OnNotify() anope_override 163 { 164 for (std::set<Serializable *>::iterator it = this->updated_items.begin(), it_end = this->updated_items.end(); it != it_end; ++it) 165 { 166 Serializable *s = *it; 167 168 this->InsertObject(s); 169 } 170 171 this->updated_items.clear(); 172 } 173 174 void OnReload(Configuration::Conf *conf) anope_override 175 { 176 Configuration::Block *block = conf->GetModule(this); 177 this->redis = ServiceReference<Provider>("Redis::Provider", block->Get<const Anope::string>("engine", "redis/main")); 178 } 179 180 EventReturn OnLoadDatabase() anope_override 181 { 182 if (!redis) 183 { 184 Log(this) << "Unable to load database - unable to find redis provider"; 185 return EVENT_CONTINUE; 186 } 187 188 const std::vector<Anope::string> type_order = Serialize::Type::GetTypeOrder(); 189 for (unsigned i = 0; i < type_order.size(); ++i) 190 { 191 Serialize::Type *sb = Serialize::Type::Find(type_order[i]); 192 this->OnSerializeTypeCreate(sb); 193 } 194 195 while (!redis->IsSocketDead() && redis->BlockAndProcess()); 196 197 if (redis->IsSocketDead()) 198 { 199 Log(this) << "I/O error while loading redis database - is it online?"; 200 return EVENT_CONTINUE; 201 } 202 203 redis->Subscribe(&this->sl, "__keyspace@*__:hash:*"); 204 205 return EVENT_STOP; 206 } 207 208 void OnSerializeTypeCreate(Serialize::Type *sb) anope_override 209 { 210 if (!redis) 211 return; 212 213 std::vector<Anope::string> args; 214 args.push_back("SMEMBERS"); 215 args.push_back("ids:" + sb->GetName()); 216 217 redis->SendCommand(new TypeLoader(this, sb->GetName()), args); 218 } 219 220 void OnSerializableConstruct(Serializable *obj) anope_override 221 { 222 this->updated_items.insert(obj); 223 this->Notify(); 224 } 225 226 void OnSerializableDestruct(Serializable *obj) anope_override 227 { 228 Serialize::Type *t = obj->GetSerializableType(); 229 230 if (t == NULL) 231 { 232 /* This is probably the module providing the type unloading. 233 * 234 * The types get registered after the extensible container is 235 * registered so that unserialization on module load can insert 236 * into the extensible container. So, the type destructs prior to 237 * the extensible container, which then triggers this 238 */ 239 return; 240 } 241 242 std::vector<Anope::string> args; 243 args.push_back("HGETALL"); 244 args.push_back("hash:" + t->GetName() + ":" + stringify(obj->id)); 245 246 /* Get all of the attributes for this object */ 247 redis->SendCommand(new Deleter(this, t->GetName(), obj->id), args); 248 249 this->updated_items.erase(obj); 250 t->objects.erase(obj->id); 251 this->Notify(); 252 } 253 254 void OnSerializableUpdate(Serializable *obj) anope_override 255 { 256 this->updated_items.insert(obj); 257 this->Notify(); 258 } 259 }; 260 261 void TypeLoader::OnResult(const Reply &r) 262 { 263 if (r.type != Reply::MULTI_BULK || !me->redis) 264 { 265 delete this; 266 return; 267 } 268 269 for (unsigned i = 0; i < r.multi_bulk.size(); ++i) 270 { 271 const Reply *reply = r.multi_bulk[i]; 272 273 if (reply->type != Reply::BULK) 274 continue; 275 276 int64_t id; 277 try 278 { 279 id = convertTo<int64_t>(reply->bulk); 280 } 281 catch (const ConvertException &) 282 { 283 continue; 284 } 285 286 std::vector<Anope::string> args; 287 args.push_back("HGETALL"); 288 args.push_back("hash:" + this->type + ":" + stringify(id)); 289 290 me->redis->SendCommand(new ObjectLoader(me, this->type, id), args); 291 } 292 293 delete this; 294 } 295 296 void ObjectLoader::OnResult(const Reply &r) 297 { 298 Serialize::Type *st = Serialize::Type::Find(this->type); 299 300 if (r.type != Reply::MULTI_BULK || r.multi_bulk.empty() || !me->redis || !st) 301 { 302 delete this; 303 return; 304 } 305 306 Data data; 307 308 for (unsigned i = 0; i + 1 < r.multi_bulk.size(); i += 2) 309 { 310 const Reply *key = r.multi_bulk[i], 311 *value = r.multi_bulk[i + 1]; 312 313 data[key->bulk] << value->bulk; 314 } 315 316 Serializable* &obj = st->objects[this->id]; 317 obj = st->Unserialize(obj, data); 318 if (obj) 319 { 320 obj->id = this->id; 321 obj->UpdateCache(data); 322 } 323 324 delete this; 325 } 326 327 void IDInterface::OnResult(const Reply &r) 328 { 329 if (!o || r.type != Reply::INT || !r.i) 330 { 331 delete this; 332 return; 333 } 334 335 Serializable* &obj = o->GetSerializableType()->objects[r.i]; 336 if (obj) 337 /* This shouldn't be possible */ 338 obj->id = 0; 339 340 o->id = r.i; 341 obj = o; 342 343 /* Now that we have the id, insert this object for real */ 344 anope_dynamic_static_cast<DatabaseRedis *>(this->owner)->InsertObject(o); 345 346 delete this; 347 } 348 349 void Deleter::OnResult(const Reply &r) 350 { 351 if (r.type != Reply::MULTI_BULK || !me->redis || r.multi_bulk.empty()) 352 { 353 delete this; 354 return; 355 } 356 357 /* Transaction start */ 358 me->redis->StartTransaction(); 359 360 std::vector<Anope::string> args; 361 args.push_back("DEL"); 362 args.push_back("hash:" + this->type + ":" + stringify(this->id)); 363 364 /* Delete hash object */ 365 me->redis->SendCommand(NULL, args); 366 367 args.clear(); 368 args.push_back("SREM"); 369 args.push_back("ids:" + this->type); 370 args.push_back(stringify(this->id)); 371 372 /* Delete id from ids set */ 373 me->redis->SendCommand(NULL, args); 374 375 for (unsigned i = 0; i + 1 < r.multi_bulk.size(); i += 2) 376 { 377 const Reply *key = r.multi_bulk[i], 378 *value = r.multi_bulk[i + 1]; 379 380 args.clear(); 381 args.push_back("SREM"); 382 args.push_back("value:" + this->type + ":" + key->bulk + ":" + value->bulk); 383 args.push_back(stringify(this->id)); 384 385 /* Delete value -> object id */ 386 me->redis->SendCommand(NULL, args); 387 } 388 389 /* Transaction end */ 390 me->redis->CommitTransaction(); 391 392 delete this; 393 } 394 395 void Updater::OnResult(const Reply &r) 396 { 397 Serialize::Type *st = Serialize::Type::Find(this->type); 398 399 if (!st) 400 { 401 delete this; 402 return; 403 } 404 405 Serializable *obj = st->objects[this->id]; 406 if (!obj) 407 { 408 delete this; 409 return; 410 } 411 412 Data data; 413 obj->Serialize(data); 414 415 /* Transaction start */ 416 me->redis->StartTransaction(); 417 418 for (unsigned i = 0; i + 1 < r.multi_bulk.size(); i += 2) 419 { 420 const Reply *key = r.multi_bulk[i], 421 *value = r.multi_bulk[i + 1]; 422 423 std::vector<Anope::string> args; 424 args.push_back("SREM"); 425 args.push_back("value:" + this->type + ":" + key->bulk + ":" + value->bulk); 426 args.push_back(stringify(this->id)); 427 428 /* Delete value -> object id */ 429 me->redis->SendCommand(NULL, args); 430 } 431 432 /* Add object id to id set for this type */ 433 std::vector<Anope::string> args; 434 args.push_back("SADD"); 435 args.push_back("ids:" + this->type); 436 args.push_back(stringify(obj->id)); 437 me->redis->SendCommand(NULL, args); 438 439 args.clear(); 440 args.push_back("HMSET"); 441 args.push_back("hash:" + this->type + ":" + stringify(obj->id)); 442 443 typedef std::map<Anope::string, std::stringstream *> items; 444 for (items::iterator it = data.data.begin(), it_end = data.data.end(); it != it_end; ++it) 445 { 446 const Anope::string &key = it->first; 447 std::stringstream *value = it->second; 448 449 args.push_back(key); 450 args.push_back(value->str()); 451 452 std::vector<Anope::string> args2; 453 454 args2.push_back("SADD"); 455 args2.push_back("value:" + this->type + ":" + key + ":" + value->str()); 456 args2.push_back(stringify(obj->id)); 457 458 /* Add to value -> object id set */ 459 me->redis->SendCommand(NULL, args2); 460 } 461 462 ++obj->redis_ignore; 463 464 /* Add object */ 465 me->redis->SendCommand(NULL, args); 466 467 /* Transaction end */ 468 me->redis->CommitTransaction(); 469 470 delete this; 471 } 472 473 void SubscriptionListener::OnResult(const Reply &r) 474 { 475 /* 476 * [May 15 13:59:35.645839 2013] Debug: pmessage 477 * [May 15 13:59:35.645866 2013] Debug: __keyspace@*__:anope:hash:* 478 * [May 15 13:59:35.645880 2013] Debug: __keyspace@0__:anope:hash:type:id 479 * [May 15 13:59:35.645893 2013] Debug: hset 480 */ 481 if (r.multi_bulk.size() != 4) 482 return; 483 484 size_t sz = r.multi_bulk[2]->bulk.find(':'); 485 if (sz == Anope::string::npos) 486 return; 487 488 const Anope::string &key = r.multi_bulk[2]->bulk.substr(sz + 1), 489 &op = r.multi_bulk[3]->bulk; 490 491 sz = key.rfind(':'); 492 if (sz == Anope::string::npos) 493 return; 494 495 const Anope::string &id = key.substr(sz + 1); 496 497 size_t sz2 = key.rfind(':', sz - 1); 498 if (sz2 == Anope::string::npos) 499 return; 500 const Anope::string &type = key.substr(sz2 + 1, sz - sz2 - 1); 501 502 Serialize::Type *s_type = Serialize::Type::Find(type); 503 504 if (s_type == NULL) 505 return; 506 507 uint64_t obj_id; 508 try 509 { 510 obj_id = convertTo<uint64_t>(id); 511 } 512 catch (const ConvertException &) 513 { 514 return; 515 } 516 517 if (op == "hset" || op == "hdel") 518 { 519 Serializable *s = s_type->objects[obj_id]; 520 521 if (s && s->redis_ignore) 522 { 523 --s->redis_ignore; 524 Log(LOG_DEBUG) << "redis: notify: got modify for object id " << obj_id << " of type " << type << ", but I am ignoring it"; 525 } 526 else 527 { 528 Log(LOG_DEBUG) << "redis: notify: got modify for object id " << obj_id << " of type " << type; 529 530 std::vector<Anope::string> args; 531 args.push_back("HGETALL"); 532 args.push_back("hash:" + type + ":" + id); 533 534 me->redis->SendCommand(new ModifiedObject(me, type, obj_id), args); 535 } 536 } 537 else if (op == "del") 538 { 539 Serializable* &s = s_type->objects[obj_id]; 540 if (s == NULL) 541 return; 542 543 Log(LOG_DEBUG) << "redis: notify: deleting object id " << obj_id << " of type " << type; 544 545 Data data; 546 547 s->Serialize(data); 548 549 /* Transaction start */ 550 me->redis->StartTransaction(); 551 552 typedef std::map<Anope::string, std::stringstream *> items; 553 for (items::iterator it = data.data.begin(), it_end = data.data.end(); it != it_end; ++it) 554 { 555 const Anope::string &k = it->first; 556 std::stringstream *value = it->second; 557 558 std::vector<Anope::string> args; 559 args.push_back("SREM"); 560 args.push_back("value:" + type + ":" + k + ":" + value->str()); 561 args.push_back(id); 562 563 /* Delete value -> object id */ 564 me->redis->SendCommand(NULL, args); 565 } 566 567 std::vector<Anope::string> args; 568 args.push_back("SREM"); 569 args.push_back("ids:" + type); 570 args.push_back(stringify(s->id)); 571 572 /* Delete object from id set */ 573 me->redis->SendCommand(NULL, args); 574 575 /* Transaction end */ 576 me->redis->CommitTransaction(); 577 578 delete s; 579 s = NULL; 580 } 581 } 582 583 void ModifiedObject::OnResult(const Reply &r) 584 { 585 Serialize::Type *st = Serialize::Type::Find(this->type); 586 587 if (!st) 588 { 589 delete this; 590 return; 591 } 592 593 Serializable* &obj = st->objects[this->id]; 594 595 /* Transaction start */ 596 me->redis->StartTransaction(); 597 598 /* Erase old object values */ 599 if (obj) 600 { 601 Data data; 602 603 obj->Serialize(data); 604 605 typedef std::map<Anope::string, std::stringstream *> items; 606 for (items::iterator it = data.data.begin(), it_end = data.data.end(); it != it_end; ++it) 607 { 608 const Anope::string &key = it->first; 609 std::stringstream *value = it->second; 610 611 std::vector<Anope::string> args; 612 args.push_back("SREM"); 613 args.push_back("value:" + st->GetName() + ":" + key + ":" + value->str()); 614 args.push_back(stringify(this->id)); 615 616 /* Delete value -> object id */ 617 me->redis->SendCommand(NULL, args); 618 } 619 } 620 621 Data data; 622 623 for (unsigned i = 0; i + 1 < r.multi_bulk.size(); i += 2) 624 { 625 const Reply *key = r.multi_bulk[i], 626 *value = r.multi_bulk[i + 1]; 627 628 data[key->bulk] << value->bulk; 629 } 630 631 obj = st->Unserialize(obj, data); 632 if (obj) 633 { 634 obj->id = this->id; 635 obj->UpdateCache(data); 636 637 /* Insert new object values */ 638 typedef std::map<Anope::string, std::stringstream *> items; 639 for (items::iterator it = data.data.begin(), it_end = data.data.end(); it != it_end; ++it) 640 { 641 const Anope::string &key = it->first; 642 std::stringstream *value = it->second; 643 644 std::vector<Anope::string> args; 645 args.push_back("SADD"); 646 args.push_back("value:" + st->GetName() + ":" + key + ":" + value->str()); 647 args.push_back(stringify(obj->id)); 648 649 /* Add to value -> object id set */ 650 me->redis->SendCommand(NULL, args); 651 } 652 653 std::vector<Anope::string> args; 654 args.push_back("SADD"); 655 args.push_back("ids:" + st->GetName()); 656 args.push_back(stringify(obj->id)); 657 658 /* Add to type -> id set */ 659 me->redis->SendCommand(NULL, args); 660 } 661 662 /* Transaction end */ 663 me->redis->CommitTransaction(); 664 665 delete this; 666 } 667 668 MODULE_INIT(DatabaseRedis)