anope- supernets anope source code & configuration |
git clone git://git.acid.vegas/anope.git |
Log | Files | Refs | Archive | README |
m_mysql.cpp (14964B)
1 /* 2 * 3 * (C) 2010-2022 Anope Team 4 * Contact us at team@anope.org 5 * 6 * Please read COPYING and README for further details. 7 */ 8 9 /* RequiredLibraries: mysqlclient */ 10 /* RequiredWindowsLibraries: libmysql */ 11 12 #include "module.h" 13 #include "modules/sql.h" 14 #define NO_CLIENT_LONG_LONG 15 #ifdef WIN32 16 # include <mysql.h> 17 #else 18 # include <mysql/mysql.h> 19 #endif 20 21 using namespace SQL; 22 23 /** Non blocking threaded MySQL API, based loosely from InspIRCd's m_mysql.cpp 24 * 25 * This module spawns a single thread that is used to execute blocking MySQL queries. 26 * When a module requests a query to be executed it is added to a list for the thread 27 * (which never stops looping and sleeping) to pick up and execute, the result of which 28 * is inserted in to another queue to be picked up by the main thread. The main thread 29 * uses Pipe to become notified through the socket engine when there are results waiting 30 * to be sent back to the modules requesting the query 31 */ 32 33 class MySQLService; 34 35 /** A query request 36 */ 37 struct QueryRequest 38 { 39 /* The connection to the database */ 40 MySQLService *service; 41 /* The interface to use once we have the result to send the data back */ 42 Interface *sqlinterface; 43 /* The actual query */ 44 Query query; 45 46 QueryRequest(MySQLService *s, Interface *i, const Query &q) : service(s), sqlinterface(i), query(q) { } 47 }; 48 49 /** A query result */ 50 struct QueryResult 51 { 52 /* The interface to send the data back on */ 53 Interface *sqlinterface; 54 /* The result */ 55 Result result; 56 57 QueryResult(Interface *i, Result &r) : sqlinterface(i), result(r) { } 58 }; 59 60 /** A MySQL result 61 */ 62 class MySQLResult : public Result 63 { 64 MYSQL_RES *res; 65 66 public: 67 MySQLResult(unsigned int i, const Query &q, const Anope::string &fq, MYSQL_RES *r) : Result(i, q, fq), res(r) 68 { 69 unsigned num_fields = res ? mysql_num_fields(res) : 0; 70 71 /* It is not thread safe to log anything here using Log(this->owner) now :( */ 72 73 if (!num_fields) 74 return; 75 76 for (MYSQL_ROW row; (row = mysql_fetch_row(res));) 77 { 78 MYSQL_FIELD *fields = mysql_fetch_fields(res); 79 80 if (fields) 81 { 82 std::map<Anope::string, Anope::string> items; 83 84 for (unsigned field_count = 0; field_count < num_fields; ++field_count) 85 { 86 Anope::string column = (fields[field_count].name ? fields[field_count].name : ""); 87 Anope::string data = (row[field_count] ? row[field_count] : ""); 88 89 items[column] = data; 90 } 91 92 this->entries.push_back(items); 93 } 94 } 95 } 96 97 MySQLResult(const Query &q, const Anope::string &fq, const Anope::string &err) : Result(0, q, fq, err), res(NULL) 98 { 99 } 100 101 ~MySQLResult() 102 { 103 if (this->res) 104 mysql_free_result(this->res); 105 } 106 }; 107 108 /** A MySQL connection, there can be multiple 109 */ 110 class MySQLService : public Provider 111 { 112 std::map<Anope::string, std::set<Anope::string> > active_schema; 113 114 Anope::string database; 115 Anope::string server; 116 Anope::string user; 117 Anope::string password; 118 int port; 119 120 MYSQL *sql; 121 122 /** Escape a query. 123 * Note the mutex must be held! 124 */ 125 Anope::string Escape(const Anope::string &query); 126 127 public: 128 /* Locked by the SQL thread when a query is pending on this database, 129 * prevents us from deleting a connection while a query is executing 130 * in the thread 131 */ 132 Mutex Lock; 133 134 MySQLService(Module *o, const Anope::string &n, const Anope::string &d, const Anope::string &s, const Anope::string &u, const Anope::string &p, int po); 135 136 ~MySQLService(); 137 138 void Run(Interface *i, const Query &query) anope_override; 139 140 Result RunQuery(const Query &query) anope_override; 141 142 std::vector<Query> CreateTable(const Anope::string &table, const Data &data) anope_override; 143 144 Query BuildInsert(const Anope::string &table, unsigned int id, Data &data) anope_override; 145 146 Query GetTables(const Anope::string &prefix) anope_override; 147 148 void Connect(); 149 150 bool CheckConnection(); 151 152 Anope::string BuildQuery(const Query &q); 153 154 Anope::string FromUnixtime(time_t); 155 }; 156 157 /** The SQL thread used to execute queries 158 */ 159 class DispatcherThread : public Thread, public Condition 160 { 161 public: 162 DispatcherThread() : Thread() { } 163 164 void Run() anope_override; 165 }; 166 167 class ModuleSQL; 168 static ModuleSQL *me; 169 class ModuleSQL : public Module, public Pipe 170 { 171 /* SQL connections */ 172 std::map<Anope::string, MySQLService *> MySQLServices; 173 public: 174 /* Pending query requests */ 175 std::deque<QueryRequest> QueryRequests; 176 /* Pending finished requests with results */ 177 std::deque<QueryResult> FinishedRequests; 178 /* The thread used to execute queries */ 179 DispatcherThread *DThread; 180 181 ModuleSQL(const Anope::string &modname, const Anope::string &creator) : Module(modname, creator, EXTRA | VENDOR) 182 { 183 me = this; 184 185 186 DThread = new DispatcherThread(); 187 DThread->Start(); 188 } 189 190 ~ModuleSQL() 191 { 192 for (std::map<Anope::string, MySQLService *>::iterator it = this->MySQLServices.begin(); it != this->MySQLServices.end(); ++it) 193 delete it->second; 194 MySQLServices.clear(); 195 196 DThread->SetExitState(); 197 DThread->Wakeup(); 198 DThread->Join(); 199 delete DThread; 200 } 201 202 void OnReload(Configuration::Conf *conf) anope_override 203 { 204 Configuration::Block *config = conf->GetModule(this); 205 206 for (std::map<Anope::string, MySQLService *>::iterator it = this->MySQLServices.begin(); it != this->MySQLServices.end();) 207 { 208 const Anope::string &cname = it->first; 209 MySQLService *s = it->second; 210 int i; 211 212 ++it; 213 214 for (i = 0; i < config->CountBlock("mysql"); ++i) 215 if (config->GetBlock("mysql", i)->Get<const Anope::string>("name", "mysql/main") == cname) 216 break; 217 218 if (i == config->CountBlock("mysql")) 219 { 220 Log(LOG_NORMAL, "mysql") << "MySQL: Removing server connection " << cname; 221 222 delete s; 223 this->MySQLServices.erase(cname); 224 } 225 } 226 227 for (int i = 0; i < config->CountBlock("mysql"); ++i) 228 { 229 Configuration::Block *block = config->GetBlock("mysql", i); 230 const Anope::string &connname = block->Get<const Anope::string>("name", "mysql/main"); 231 232 if (this->MySQLServices.find(connname) == this->MySQLServices.end()) 233 { 234 const Anope::string &database = block->Get<const Anope::string>("database", "anope"); 235 const Anope::string &server = block->Get<const Anope::string>("server", "127.0.0.1"); 236 const Anope::string &user = block->Get<const Anope::string>("username", "anope"); 237 const Anope::string &password = block->Get<const Anope::string>("password"); 238 int port = block->Get<int>("port", "3306"); 239 240 try 241 { 242 MySQLService *ss = new MySQLService(this, connname, database, server, user, password, port); 243 this->MySQLServices.insert(std::make_pair(connname, ss)); 244 245 Log(LOG_NORMAL, "mysql") << "MySQL: Successfully connected to server " << connname << " (" << server << ")"; 246 } 247 catch (const SQL::Exception &ex) 248 { 249 Log(LOG_NORMAL, "mysql") << "MySQL: " << ex.GetReason(); 250 } 251 } 252 } 253 } 254 255 void OnModuleUnload(User *, Module *m) anope_override 256 { 257 this->DThread->Lock(); 258 259 for (unsigned i = this->QueryRequests.size(); i > 0; --i) 260 { 261 QueryRequest &r = this->QueryRequests[i - 1]; 262 263 if (r.sqlinterface && r.sqlinterface->owner == m) 264 { 265 if (i == 1) 266 { 267 r.service->Lock.Lock(); 268 r.service->Lock.Unlock(); 269 } 270 271 this->QueryRequests.erase(this->QueryRequests.begin() + i - 1); 272 } 273 } 274 275 this->DThread->Unlock(); 276 277 this->OnNotify(); 278 } 279 280 void OnNotify() anope_override 281 { 282 this->DThread->Lock(); 283 std::deque<QueryResult> finishedRequests = this->FinishedRequests; 284 this->FinishedRequests.clear(); 285 this->DThread->Unlock(); 286 287 for (std::deque<QueryResult>::const_iterator it = finishedRequests.begin(), it_end = finishedRequests.end(); it != it_end; ++it) 288 { 289 const QueryResult &qr = *it; 290 291 if (!qr.sqlinterface) 292 throw SQL::Exception("NULL qr.sqlinterface in MySQLPipe::OnNotify() ?"); 293 294 if (qr.result.GetError().empty()) 295 qr.sqlinterface->OnResult(qr.result); 296 else 297 qr.sqlinterface->OnError(qr.result); 298 } 299 } 300 }; 301 302 MySQLService::MySQLService(Module *o, const Anope::string &n, const Anope::string &d, const Anope::string &s, const Anope::string &u, const Anope::string &p, int po) 303 : Provider(o, n), database(d), server(s), user(u), password(p), port(po), sql(NULL) 304 { 305 Connect(); 306 } 307 308 MySQLService::~MySQLService() 309 { 310 me->DThread->Lock(); 311 this->Lock.Lock(); 312 mysql_close(this->sql); 313 this->sql = NULL; 314 315 for (unsigned i = me->QueryRequests.size(); i > 0; --i) 316 { 317 QueryRequest &r = me->QueryRequests[i - 1]; 318 319 if (r.service == this) 320 { 321 if (r.sqlinterface) 322 r.sqlinterface->OnError(Result(0, r.query, "SQL Interface is going away")); 323 me->QueryRequests.erase(me->QueryRequests.begin() + i - 1); 324 } 325 } 326 this->Lock.Unlock(); 327 me->DThread->Unlock(); 328 } 329 330 void MySQLService::Run(Interface *i, const Query &query) 331 { 332 me->DThread->Lock(); 333 me->QueryRequests.push_back(QueryRequest(this, i, query)); 334 me->DThread->Unlock(); 335 me->DThread->Wakeup(); 336 } 337 338 Result MySQLService::RunQuery(const Query &query) 339 { 340 this->Lock.Lock(); 341 342 Anope::string real_query = this->BuildQuery(query); 343 344 if (this->CheckConnection() && !mysql_real_query(this->sql, real_query.c_str(), real_query.length())) 345 { 346 MYSQL_RES *res = mysql_store_result(this->sql); 347 unsigned int id = mysql_insert_id(this->sql); 348 349 /* because we enabled CLIENT_MULTI_RESULTS in our options 350 * a multiple statement or a procedure call can return 351 * multiple result sets. 352 * we must process them all before the next query. 353 */ 354 355 while (!mysql_next_result(this->sql)) 356 mysql_free_result(mysql_store_result(this->sql)); 357 358 this->Lock.Unlock(); 359 return MySQLResult(id, query, real_query, res); 360 } 361 else 362 { 363 Anope::string error = mysql_error(this->sql); 364 this->Lock.Unlock(); 365 return MySQLResult(query, real_query, error); 366 } 367 } 368 369 std::vector<Query> MySQLService::CreateTable(const Anope::string &table, const Data &data) 370 { 371 std::vector<Query> queries; 372 std::set<Anope::string> &known_cols = this->active_schema[table]; 373 374 if (known_cols.empty()) 375 { 376 Log(LOG_DEBUG) << "m_mysql: Fetching columns for " << table; 377 378 Result columns = this->RunQuery("SHOW COLUMNS FROM `" + table + "`"); 379 for (int i = 0; i < columns.Rows(); ++i) 380 { 381 const Anope::string &column = columns.Get(i, "Field"); 382 383 Log(LOG_DEBUG) << "m_mysql: Column #" << i << " for " << table << ": " << column; 384 known_cols.insert(column); 385 } 386 } 387 388 if (known_cols.empty()) 389 { 390 Anope::string query_text = "CREATE TABLE `" + table + "` (`id` int(10) unsigned NOT NULL AUTO_INCREMENT," 391 " `timestamp` timestamp NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP"; 392 for (Data::Map::const_iterator it = data.data.begin(), it_end = data.data.end(); it != it_end; ++it) 393 { 394 known_cols.insert(it->first); 395 396 query_text += ", `" + it->first + "` "; 397 if (data.GetType(it->first) == Serialize::Data::DT_INT) 398 query_text += "int(11)"; 399 else 400 query_text += "text"; 401 } 402 query_text += ", PRIMARY KEY (`id`), KEY `timestamp_idx` (`timestamp`))"; 403 queries.push_back(query_text); 404 } 405 else 406 for (Data::Map::const_iterator it = data.data.begin(), it_end = data.data.end(); it != it_end; ++it) 407 { 408 if (known_cols.count(it->first) > 0) 409 continue; 410 411 known_cols.insert(it->first); 412 413 Anope::string query_text = "ALTER TABLE `" + table + "` ADD `" + it->first + "` "; 414 if (data.GetType(it->first) == Serialize::Data::DT_INT) 415 query_text += "int(11)"; 416 else 417 query_text += "text"; 418 419 queries.push_back(query_text); 420 } 421 422 return queries; 423 } 424 425 Query MySQLService::BuildInsert(const Anope::string &table, unsigned int id, Data &data) 426 { 427 /* Empty columns not present in the data set */ 428 const std::set<Anope::string> &known_cols = this->active_schema[table]; 429 for (std::set<Anope::string>::iterator it = known_cols.begin(), it_end = known_cols.end(); it != it_end; ++it) 430 if (*it != "id" && *it != "timestamp" && data.data.count(*it) == 0) 431 data[*it] << ""; 432 433 Anope::string query_text = "INSERT INTO `" + table + "` (`id`"; 434 for (Data::Map::const_iterator it = data.data.begin(), it_end = data.data.end(); it != it_end; ++it) 435 query_text += ",`" + it->first + "`"; 436 query_text += ") VALUES (" + stringify(id); 437 for (Data::Map::const_iterator it = data.data.begin(), it_end = data.data.end(); it != it_end; ++it) 438 query_text += ",@" + it->first + "@"; 439 query_text += ") ON DUPLICATE KEY UPDATE "; 440 for (Data::Map::const_iterator it = data.data.begin(), it_end = data.data.end(); it != it_end; ++it) 441 query_text += "`" + it->first + "`=VALUES(`" + it->first + "`),"; 442 query_text.erase(query_text.end() - 1); 443 444 Query query(query_text); 445 for (Data::Map::const_iterator it = data.data.begin(), it_end = data.data.end(); it != it_end; ++it) 446 { 447 Anope::string buf; 448 *it->second >> buf; 449 450 bool escape = true; 451 if (buf.empty()) 452 { 453 buf = "NULL"; 454 escape = false; 455 } 456 457 query.SetValue(it->first, buf, escape); 458 } 459 460 return query; 461 } 462 463 Query MySQLService::GetTables(const Anope::string &prefix) 464 { 465 return Query("SHOW TABLES LIKE '" + prefix + "%';"); 466 } 467 468 void MySQLService::Connect() 469 { 470 this->sql = mysql_init(this->sql); 471 472 const unsigned int timeout = 1; 473 mysql_options(this->sql, MYSQL_OPT_CONNECT_TIMEOUT, reinterpret_cast<const char *>(&timeout)); 474 475 bool connect = mysql_real_connect(this->sql, this->server.c_str(), this->user.c_str(), this->password.c_str(), this->database.c_str(), this->port, NULL, CLIENT_MULTI_RESULTS); 476 477 if (!connect) 478 throw SQL::Exception("Unable to connect to MySQL service " + this->name + ": " + mysql_error(this->sql)); 479 480 Log(LOG_DEBUG) << "Successfully connected to MySQL service " << this->name << " at " << this->server << ":" << this->port; 481 } 482 483 484 bool MySQLService::CheckConnection() 485 { 486 if (!this->sql || mysql_ping(this->sql)) 487 { 488 try 489 { 490 this->Connect(); 491 } 492 catch (const SQL::Exception &) 493 { 494 return false; 495 } 496 } 497 498 return true; 499 } 500 501 Anope::string MySQLService::Escape(const Anope::string &query) 502 { 503 std::vector<char> buffer(query.length() * 2 + 1); 504 mysql_real_escape_string(this->sql, &buffer[0], query.c_str(), query.length()); 505 return &buffer[0]; 506 } 507 508 Anope::string MySQLService::BuildQuery(const Query &q) 509 { 510 Anope::string real_query = q.query; 511 512 for (std::map<Anope::string, QueryData>::const_iterator it = q.parameters.begin(), it_end = q.parameters.end(); it != it_end; ++it) 513 real_query = real_query.replace_all_cs("@" + it->first + "@", (it->second.escape ? ("'" + this->Escape(it->second.data) + "'") : it->second.data)); 514 515 return real_query; 516 } 517 518 Anope::string MySQLService::FromUnixtime(time_t t) 519 { 520 return "FROM_UNIXTIME(" + stringify(t) + ")"; 521 } 522 523 void DispatcherThread::Run() 524 { 525 this->Lock(); 526 527 while (!this->GetExitState()) 528 { 529 if (!me->QueryRequests.empty()) 530 { 531 QueryRequest &r = me->QueryRequests.front(); 532 this->Unlock(); 533 534 Result sresult = r.service->RunQuery(r.query); 535 536 this->Lock(); 537 if (!me->QueryRequests.empty() && me->QueryRequests.front().query == r.query) 538 { 539 if (r.sqlinterface) 540 me->FinishedRequests.push_back(QueryResult(r.sqlinterface, sresult)); 541 me->QueryRequests.pop_front(); 542 } 543 } 544 else 545 { 546 if (!me->FinishedRequests.empty()) 547 me->Notify(); 548 this->Wait(); 549 } 550 } 551 552 this->Unlock(); 553 } 554 555 MODULE_INIT(ModuleSQL)