Line data Source code
1 : #include <stdexcept> 2 : #include <iostream> 3 : 4 : #include "felixbus/felixbus.hpp" 5 : #include "felixbus/felixbus.h" 6 : 7 : #include "felix/felix_fid.h" 8 : 9 : using namespace felixbus; 10 : 11 30 : void FelixBus::publish(uint64_t fid, const std::string& filename, FelixBusInfo& info, std::error_code* ecptr) { 12 30 : if (ecptr) { 13 8 : ecptr->clear(); 14 : } 15 : 16 30 : struct felix_bus_info bus_info; 17 30 : bus_info.ip = info.ip.c_str(); 18 30 : bus_info.port = info.port; 19 30 : bus_info.unbuffered = info.unbuffered; 20 30 : bus_info.pubsub = info.pubsub; 21 30 : bus_info.netio_pages = info.netio_pages; 22 30 : bus_info.netio_pagesize = info.netio_pagesize; 23 : 24 30 : felix_bus_set_verbose(verbose); 25 30 : felix_bus_set_cleanup(cleanup); 26 : 27 30 : felix_bus bus = nullptr; 28 90 : char* bus_path = felix_bus_path(bus_path_prefix.c_str(), groupname.c_str(), get_vid(fid), get_did(fid), get_cid(fid), filename.c_str()); 29 30 : if (bus_path) { 30 30 : try { 31 60 : bus = bus_by_name.at(bus_path); 32 12 : } catch(const std::out_of_range&) { 33 12 : bus = felix_bus_open(bus_path); 34 24 : bus_by_name[bus_path] = bus; 35 12 : } 36 : } 37 : 38 30 : if (bus) { 39 28 : int rc = felix_bus_write(bus, fid, &bus_info); 40 28 : if (rc == 0) { 41 28 : free(bus_path); 42 28 : return; 43 : } 44 : } 45 : 46 2 : free(bus_path); 47 2 : auto error_code = std::make_error_code(std::errc::no_such_file_or_directory); 48 2 : if (ecptr) { 49 2 : *ecptr = error_code; 50 2 : return; 51 : } 52 0 : throw std::system_error(error_code); 53 : } 54 : 55 575 : void FelixBus::publish_close() { 56 587 : for (const auto & [name, bus] : bus_by_name) { 57 12 : if (verbose) 58 2 : printf("Closing %s\n", name.c_str()); 59 12 : felix_bus_close(bus); 60 12 : felix_bus_release(bus); 61 : } 62 575 : bus_by_name.clear(); 63 575 : } 64 : 65 48719 : FelixBusInfo FelixBus::get_info(uint64_t fid, std::error_code* ecptr) { 66 : 67 48719 : if (verbose) 68 4219 : printf("Getting info for 0x%lx\n", fid); 69 : 70 : // uint8_t vid = (fid >> 60) & 0xF; 71 48719 : uint8_t did = (fid >> 52) & 0xFF; 72 48719 : uint32_t cid = (fid >> 36) & 0xFFFF; 73 48719 : std::filesystem::path path = bus_path_prefix; 74 48719 : path /= groupname; 75 97438 : path /= int_to_hex(did, 1); 76 97438 : path /= int_to_hex(cid, 1); 77 : 78 48721 : std::string fid_hex = "0x" + int_to_hex(fid); 79 48719 : FelixBusInfo info; 80 48719 : felix_bus_set_verbose(verbose); 81 : 82 48719 : if (verbose) 83 4219 : printf("Looking for %s\n", path.c_str()); 84 : 85 48719 : if (std::filesystem::exists(path)) { 86 48699 : try{ 87 260939 : for(std::filesystem::directory_entry json_file: std::filesystem::directory_iterator(path)) { 88 114830 : std::error_code ec; 89 114830 : json_file.refresh(ec); 90 114830 : if(ec){ 91 0 : std::cout << "ERROR: " << ec << std::endl; 92 : } 93 114830 : if (verbose) 94 4261 : std::cout << "Looking for " << fid_hex << " in " << json_file.path() << std::endl; 95 114830 : simdjson::dom::element fid_data; 96 114830 : simdjson::dom::element data; 97 : 98 114830 : bool stale = felix_bus_stale(json_file.path().c_str()); 99 114830 : if (verbose) 100 4261 : std::cout << "Stale file " << json_file.path() << " (ignored)" << std::endl; 101 114830 : if (stale) { 102 4 : continue; 103 : } 104 : 105 114826 : simdjson::dom::document_stream entries; 106 229652 : auto error = parser.load_many(json_file.path()).get(entries); 107 114826 : if (error) { 108 0 : if (verbose) 109 0 : std::cerr << "ERROR: " << error << std::endl; 110 0 : continue; 111 : } 112 : // we need the last entry in the file for this fid, 113 : // so we need to look through all of them... 114 114826 : bool found = false; 115 21003088 : for (auto entry : entries) { 116 : //std::cout << "E " << entry << std::endl; 117 : 118 20888262 : auto no_data = entry.get(data); 119 20888262 : if (no_data) { 120 0 : if (verbose) 121 0 : std::cout << no_data << std::endl; 122 20888262 : continue; 123 : } 124 : 125 20888262 : uint64_t entry_fid; 126 20888262 : auto no_fid = data[BUS_FID].get(entry_fid); 127 0 : if (no_fid) { 128 0 : if (verbose) 129 0 : std::cout << no_fid << std::endl; 130 0 : continue; 131 : } 132 : 133 20888262 : if (entry_fid != fid) { 134 20839413 : continue; 135 : } 136 : 137 48849 : if (verbose) 138 4369 : std::cout << fid_hex << ": " << data << std::endl; 139 : 140 : // found one but it may not be the last one 141 48849 : found = true; 142 48849 : info.fid = data[BUS_FID]; 143 48849 : info.hfid = data[BUS_HFID]; 144 48849 : info.ip = data[BUS_IP]; 145 48849 : info.port = data[BUS_PORT]; 146 48849 : info.unbuffered = data[BUS_UNBUFFERED]; 147 48849 : info.pubsub = data[BUS_PUBSUB]; 148 48849 : info.netio_pages = data[BUS_NETIO_PAGES]; 149 48849 : info.netio_pagesize = data[BUS_NETIO_PAGESIZE]; 150 : 151 48849 : info.host = data[BUS_HOST]; 152 48849 : uint64_t pid = 0; 153 48849 : auto no_pid = data[BUS_PID].get(pid); 154 0 : if (!no_pid) { 155 48849 : info.pid = (pid_t)pid; 156 : } 157 48849 : info.user = data[BUS_USER]; 158 : } 159 : 160 114826 : if (found) { 161 48687 : if (ecptr) { 162 48681 : ecptr->clear(); 163 : } 164 48687 : return info; 165 : } 166 : // std::cout << entries.truncated_bytes() << " bytes "<< std::endl; 167 327054 : } 168 0 : } catch(std::bad_alloc& e) { 169 0 : auto error_code = std::make_error_code(std::errc::permission_denied); 170 0 : if (ecptr) { 171 0 : *ecptr = error_code; 172 0 : return info; 173 : } 174 0 : throw; 175 0 : } catch (std::exception& e){ 176 0 : auto error_code = std::make_error_code(std::errc::no_message_available); 177 0 : if (ecptr) { 178 0 : *ecptr = error_code; 179 0 : return info; 180 : } 181 0 : throw; 182 0 : } 183 : } 184 : // std::cout << "FID not found " << fid_hex << std::endl; 185 32 : auto error_code = std::make_error_code(std::errc::no_such_file_or_directory); 186 32 : if (ecptr) { 187 30 : *ecptr = error_code; 188 30 : return info; 189 : } 190 2 : throw std::system_error(error_code); 191 97438 : }