anope- supernets anope source code & configuration |
git clone git://git.acid.vegas/anope.git |
Log | Files | Refs | Archive | README |
m_redis.cpp (13740B)
1 /* 2 * 3 * (C) 2003-2022 Anope Team 4 * Contact us at team@anope.org 5 * 6 * Please read COPYING and README for further details. 7 */ 8 9 #include "module.h" 10 #include "modules/redis.h" 11 12 using namespace Redis; 13 14 class MyRedisService; 15 16 class RedisSocket : public BinarySocket, public ConnectionSocket 17 { 18 size_t ParseReply(Reply &r, const char *buf, size_t l); 19 public: 20 MyRedisService *provider; 21 std::deque<Interface *> interfaces; 22 std::map<Anope::string, Interface *> subinterfaces; 23 24 RedisSocket(MyRedisService *pro, bool v6) : Socket(-1, v6), provider(pro) { } 25 26 ~RedisSocket(); 27 28 void OnConnect() anope_override; 29 void OnError(const Anope::string &error) anope_override; 30 31 bool Read(const char *buffer, size_t l) anope_override; 32 }; 33 34 class Transaction : public Interface 35 { 36 public: 37 std::deque<Interface *> interfaces; 38 39 Transaction(Module *creator) : Interface(creator) { } 40 41 ~Transaction() 42 { 43 for (unsigned i = 0; i < interfaces.size(); ++i) 44 { 45 Interface *inter = interfaces[i]; 46 47 if (!inter) 48 continue; 49 50 inter->OnError("Interface going away"); 51 } 52 } 53 54 void OnResult(const Reply &r) anope_override 55 { 56 /* This is a multi bulk reply of the results of the queued commands 57 * in this transaction 58 */ 59 60 Log(LOG_DEBUG_2) << "redis: transaction complete with " << r.multi_bulk.size() << " results"; 61 62 for (unsigned i = 0; i < r.multi_bulk.size(); ++i) 63 { 64 const Reply *reply = r.multi_bulk[i]; 65 66 if (interfaces.empty()) 67 break; 68 69 Interface *inter = interfaces.front(); 70 interfaces.pop_front(); 71 72 if (inter) 73 inter->OnResult(*reply); 74 } 75 } 76 }; 77 78 class MyRedisService : public Provider 79 { 80 public: 81 Anope::string host; 82 int port; 83 unsigned db; 84 85 RedisSocket *sock, *sub; 86 87 Transaction ti; 88 bool in_transaction; 89 90 MyRedisService(Module *c, const Anope::string &n, const Anope::string &h, int p, unsigned d) : Provider(c, n), host(h), port(p), db(d), sock(NULL), sub(NULL), 91 ti(c), in_transaction(false) 92 { 93 sock = new RedisSocket(this, host.find(':') != Anope::string::npos); 94 sock->Connect(host, port); 95 96 sub = new RedisSocket(this, host.find(':') != Anope::string::npos); 97 sub->Connect(host, port); 98 } 99 100 ~MyRedisService() 101 { 102 if (sock) 103 { 104 sock->flags[SF_DEAD] = true; 105 sock->provider = NULL; 106 } 107 108 if (sub) 109 { 110 sub->flags[SF_DEAD] = true; 111 sub->provider = NULL; 112 } 113 } 114 115 private: 116 inline void Pack(std::vector<char> &buffer, const char *buf, size_t sz = 0) 117 { 118 if (!sz) 119 sz = strlen(buf); 120 121 size_t old_size = buffer.size(); 122 buffer.resize(old_size + sz); 123 std::copy(buf, buf + sz, buffer.begin() + old_size); 124 } 125 126 void Send(RedisSocket *s, Interface *i, const std::vector<std::pair<const char *, size_t> > &args) 127 { 128 std::vector<char> buffer; 129 130 Pack(buffer, "*"); 131 Pack(buffer, stringify(args.size()).c_str()); 132 Pack(buffer, "\r\n"); 133 134 for (unsigned j = 0; j < args.size(); ++j) 135 { 136 const std::pair<const char *, size_t> &pair = args[j]; 137 138 Pack(buffer, "$"); 139 Pack(buffer, stringify(pair.second).c_str()); 140 Pack(buffer, "\r\n"); 141 142 Pack(buffer, pair.first, pair.second); 143 Pack(buffer, "\r\n"); 144 } 145 146 if (buffer.empty()) 147 return; 148 149 s->Write(&buffer[0], buffer.size()); 150 if (in_transaction) 151 { 152 ti.interfaces.push_back(i); 153 s->interfaces.push_back(NULL); // For the +Queued response 154 } 155 else 156 s->interfaces.push_back(i); 157 } 158 159 public: 160 bool IsSocketDead() anope_override 161 { 162 return this->sock && this->sock->flags[SF_DEAD]; 163 } 164 165 void SendCommand(RedisSocket *s, Interface *i, const std::vector<Anope::string> &cmds) 166 { 167 std::vector<std::pair<const char *, size_t> > args; 168 for (unsigned j = 0; j < cmds.size(); ++j) 169 args.push_back(std::make_pair(cmds[j].c_str(), cmds[j].length())); 170 this->Send(s, i, args); 171 } 172 173 void SendCommand(RedisSocket *s, Interface *i, const Anope::string &str) 174 { 175 std::vector<Anope::string> args; 176 spacesepstream(str).GetTokens(args); 177 this->SendCommand(s, i, args); 178 } 179 180 void Send(Interface *i, const std::vector<std::pair<const char *, size_t> > &args) 181 { 182 if (!sock) 183 { 184 sock = new RedisSocket(this, host.find(':') != Anope::string::npos); 185 sock->Connect(host, port); 186 } 187 188 this->Send(sock, i, args); 189 } 190 191 void SendCommand(Interface *i, const std::vector<Anope::string> &cmds) anope_override 192 { 193 std::vector<std::pair<const char *, size_t> > args; 194 for (unsigned j = 0; j < cmds.size(); ++j) 195 args.push_back(std::make_pair(cmds[j].c_str(), cmds[j].length())); 196 this->Send(i, args); 197 } 198 199 void SendCommand(Interface *i, const Anope::string &str) anope_override 200 { 201 std::vector<Anope::string> args; 202 spacesepstream(str).GetTokens(args); 203 this->SendCommand(i, args); 204 } 205 206 public: 207 bool BlockAndProcess() anope_override 208 { 209 if (!this->sock->ProcessWrite()) 210 this->sock->flags[SF_DEAD] = true; 211 this->sock->SetBlocking(true); 212 if (!this->sock->ProcessRead()) 213 this->sock->flags[SF_DEAD] = true; 214 this->sock->SetBlocking(false); 215 return !this->sock->interfaces.empty(); 216 } 217 218 void Subscribe(Interface *i, const Anope::string &pattern) anope_override 219 { 220 if (sub == NULL) 221 { 222 sub = new RedisSocket(this, host.find(':') != Anope::string::npos); 223 sub->Connect(host, port); 224 } 225 226 std::vector<Anope::string> args; 227 args.push_back("PSUBSCRIBE"); 228 args.push_back(pattern); 229 this->SendCommand(sub, NULL, args); 230 231 sub->subinterfaces[pattern] = i; 232 } 233 234 void Unsubscribe(const Anope::string &pattern) anope_override 235 { 236 if (sub) 237 sub->subinterfaces.erase(pattern); 238 } 239 240 void StartTransaction() anope_override 241 { 242 if (in_transaction) 243 throw CoreException(); 244 245 this->SendCommand(NULL, "MULTI"); 246 in_transaction = true; 247 } 248 249 void CommitTransaction() anope_override 250 { 251 /* The result of the transaction comes back to the reply of EXEC as a multi bulk. 252 * The reply to the individual commands that make up the transaction when executed 253 * is a simple +QUEUED 254 */ 255 in_transaction = false; 256 this->SendCommand(&this->ti, "EXEC"); 257 } 258 }; 259 260 RedisSocket::~RedisSocket() 261 { 262 if (provider) 263 { 264 if (provider->sock == this) 265 provider->sock = NULL; 266 else if (provider->sub == this) 267 provider->sub = NULL; 268 } 269 270 for (unsigned i = 0; i < interfaces.size(); ++i) 271 { 272 Interface *inter = interfaces[i]; 273 274 if (!inter) 275 continue; 276 277 inter->OnError("Interface going away"); 278 } 279 } 280 281 void RedisSocket::OnConnect() 282 { 283 Log() << "redis: Successfully connected to " << provider->name << (this == this->provider->sub ? " (sub)" : ""); 284 285 this->provider->SendCommand(NULL, "CLIENT SETNAME Anope"); 286 this->provider->SendCommand(NULL, "SELECT " + stringify(provider->db)); 287 288 if (this != this->provider->sub) 289 { 290 this->provider->SendCommand(this, NULL, "CONFIG SET notify-keyspace-events KA"); 291 } 292 } 293 294 void RedisSocket::OnError(const Anope::string &error) 295 { 296 Log() << "redis: Error on " << provider->name << (this == this->provider->sub ? " (sub)" : "") << ": " << error; 297 } 298 299 size_t RedisSocket::ParseReply(Reply &r, const char *buffer, size_t l) 300 { 301 size_t used = 0; 302 303 if (!l) 304 return used; 305 306 if (r.type == Reply::MULTI_BULK) 307 goto multi_bulk_cont; 308 309 switch (*buffer) 310 { 311 case '+': 312 { 313 Anope::string reason(buffer, 1, l - 1); 314 size_t nl = reason.find("\r\n"); 315 Log(LOG_DEBUG_2) << "redis: status ok: " << reason.substr(0, nl); 316 if (nl != Anope::string::npos) 317 { 318 r.type = Reply::OK; 319 used = 1 + nl + 2; 320 } 321 break; 322 } 323 case '-': 324 { 325 Anope::string reason(buffer, 1, l - 1); 326 size_t nl = reason.find("\r\n"); 327 Log(LOG_DEBUG) << "redis: status error: " << reason.substr(0, nl); 328 if (nl != Anope::string::npos) 329 { 330 r.type = Reply::NOT_OK; 331 used = 1 + nl + 2; 332 } 333 break; 334 } 335 case ':': 336 { 337 Anope::string ibuf(buffer, 1, l - 1); 338 size_t nl = ibuf.find("\r\n"); 339 if (nl != Anope::string::npos) 340 { 341 try 342 { 343 r.i = convertTo<int64_t>(ibuf.substr(0, nl)); 344 } 345 catch (const ConvertException &) { } 346 347 r.type = Reply::INT; 348 used = 1 + nl + 2; 349 } 350 break; 351 } 352 case '$': 353 { 354 Anope::string reply(buffer + 1, l - 1); 355 /* This assumes one bulk can always fit in our recv buffer */ 356 size_t nl = reply.find("\r\n"); 357 if (nl != Anope::string::npos) 358 { 359 int len; 360 try 361 { 362 len = convertTo<int>(reply.substr(0, nl)); 363 if (len >= 0) 364 { 365 if (1 + nl + 2 + len + 2 <= l) 366 { 367 used = 1 + nl + 2 + len + 2; 368 r.bulk = reply.substr(nl + 2, len); 369 r.type = Reply::BULK; 370 } 371 } 372 else 373 { 374 used = 1 + nl + 2 + 2; 375 r.type = Reply::BULK; 376 } 377 } 378 catch (const ConvertException &) { } 379 } 380 break; 381 } 382 multi_bulk_cont: 383 case '*': 384 { 385 if (r.type != Reply::MULTI_BULK) 386 { 387 Anope::string reply(buffer + 1, l - 1); 388 size_t nl = reply.find("\r\n"); 389 if (nl != Anope::string::npos) 390 { 391 r.type = Reply::MULTI_BULK; 392 try 393 { 394 r.multi_bulk_size = convertTo<int>(reply.substr(0, nl)); 395 } 396 catch (const ConvertException &) { } 397 398 used = 1 + nl + 2; 399 } 400 else 401 break; 402 } 403 else if (r.multi_bulk_size >= 0 && r.multi_bulk.size() == static_cast<unsigned>(r.multi_bulk_size)) 404 { 405 /* This multi bulk is already complete, so check the sub bulks */ 406 for (unsigned i = 0; i < r.multi_bulk.size(); ++i) 407 if (r.multi_bulk[i]->type == Reply::MULTI_BULK) 408 ParseReply(*r.multi_bulk[i], buffer + used, l - used); 409 break; 410 } 411 412 for (int i = r.multi_bulk.size(); i < r.multi_bulk_size; ++i) 413 { 414 Reply *reply = new Reply(); 415 size_t u = ParseReply(*reply, buffer + used, l - used); 416 if (!u) 417 { 418 Log(LOG_DEBUG) << "redis: ran out of data to parse"; 419 delete reply; 420 break; 421 } 422 r.multi_bulk.push_back(reply); 423 used += u; 424 } 425 break; 426 } 427 default: 428 Log(LOG_DEBUG) << "redis: unknown reply " << *buffer; 429 } 430 431 return used; 432 } 433 434 bool RedisSocket::Read(const char *buffer, size_t l) 435 { 436 static std::vector<char> save; 437 std::vector<char> copy; 438 439 if (!save.empty()) 440 { 441 std::copy(buffer, buffer + l, std::back_inserter(save)); 442 443 copy = save; 444 445 buffer = ©[0]; 446 l = copy.size(); 447 } 448 449 while (l) 450 { 451 static Reply r; 452 453 size_t used = this->ParseReply(r, buffer, l); 454 if (!used) 455 { 456 Log(LOG_DEBUG) << "redis: used == 0 ?"; 457 r.Clear(); 458 break; 459 } 460 else if (used > l) 461 { 462 Log(LOG_DEBUG) << "redis: used > l ?"; 463 r.Clear(); 464 break; 465 } 466 467 /* Full result is not here yet */ 468 if (r.type == Reply::MULTI_BULK && static_cast<unsigned>(r.multi_bulk_size) != r.multi_bulk.size()) 469 { 470 buffer += used; 471 l -= used; 472 break; 473 } 474 475 if (this == provider->sub) 476 { 477 if (r.multi_bulk.size() == 4) 478 { 479 /* pmessage 480 * pattern subscribed to 481 * __keyevent@0__:set 482 * key 483 */ 484 std::map<Anope::string, Interface *>::iterator it = this->subinterfaces.find(r.multi_bulk[1]->bulk); 485 if (it != this->subinterfaces.end()) 486 it->second->OnResult(r); 487 } 488 } 489 else 490 { 491 if (this->interfaces.empty()) 492 { 493 Log(LOG_DEBUG) << "redis: no interfaces?"; 494 } 495 else 496 { 497 Interface *i = this->interfaces.front(); 498 this->interfaces.pop_front(); 499 500 if (i) 501 { 502 if (r.type != Reply::NOT_OK) 503 i->OnResult(r); 504 else 505 i->OnError(r.bulk); 506 } 507 } 508 } 509 510 buffer += used; 511 l -= used; 512 513 r.Clear(); 514 } 515 516 if (l) 517 { 518 save.resize(l); 519 std::copy(buffer, buffer + l, save.begin()); 520 } 521 else 522 std::vector<char>().swap(save); 523 524 return true; 525 } 526 527 528 class ModuleRedis : public Module 529 { 530 std::map<Anope::string, MyRedisService *> services; 531 532 public: 533 ModuleRedis(const Anope::string &modname, const Anope::string &creator) : Module(modname, creator, EXTRA | VENDOR) 534 { 535 } 536 537 ~ModuleRedis() 538 { 539 for (std::map<Anope::string, MyRedisService *>::iterator it = services.begin(); it != services.end(); ++it) 540 { 541 MyRedisService *p = it->second; 542 543 delete p->sock; 544 p->sock = NULL; 545 delete p->sub; 546 p->sub = NULL; 547 548 delete p; 549 } 550 } 551 552 void OnReload(Configuration::Conf *conf) anope_override 553 { 554 Configuration::Block *block = conf->GetModule(this); 555 std::vector<Anope::string> new_services; 556 557 for (int i = 0; i < block->CountBlock("redis"); ++i) 558 { 559 Configuration::Block *redis = block->GetBlock("redis", i); 560 561 const Anope::string &n = redis->Get<const Anope::string>("name"), 562 &ip = redis->Get<const Anope::string>("ip"); 563 int port = redis->Get<int>("port"); 564 unsigned db = redis->Get<unsigned>("db"); 565 566 delete services[n]; 567 services[n] = new MyRedisService(this, n, ip, port, db); 568 new_services.push_back(n); 569 } 570 571 for (std::map<Anope::string, MyRedisService *>::iterator it = services.begin(); it != services.end();) 572 { 573 Provider *p = it->second; 574 ++it; 575 576 if (std::find(new_services.begin(), new_services.end(), p->name) == new_services.end()) 577 delete it->second; 578 } 579 } 580 581 void OnModuleUnload(User *, Module *m) anope_override 582 { 583 for (std::map<Anope::string, MyRedisService *>::iterator it = services.begin(); it != services.end(); ++it) 584 { 585 MyRedisService *p = it->second; 586 587 if (p->sock) 588 for (unsigned i = p->sock->interfaces.size(); i > 0; --i) 589 { 590 Interface *inter = p->sock->interfaces[i - 1]; 591 592 if (inter && inter->owner == m) 593 { 594 inter->OnError(m->name + " being unloaded"); 595 p->sock->interfaces.erase(p->sock->interfaces.begin() + i - 1); 596 } 597 } 598 599 if (p->sub) 600 for (unsigned i = p->sub->interfaces.size(); i > 0; --i) 601 { 602 Interface *inter = p->sub->interfaces[i - 1]; 603 604 if (inter && inter->owner == m) 605 { 606 inter->OnError(m->name + " being unloaded"); 607 p->sub->interfaces.erase(p->sub->interfaces.begin() + i - 1); 608 } 609 } 610 611 for (unsigned i = p->ti.interfaces.size(); i > 0; --i) 612 { 613 Interface *inter = p->ti.interfaces[i - 1]; 614 615 if (inter && inter->owner == m) 616 { 617 inter->OnError(m->name + " being unloaded"); 618 p->ti.interfaces.erase(p->ti.interfaces.begin() + i - 1); 619 } 620 } 621 } 622 } 623 }; 624 625 MODULE_INIT(ModuleRedis)