Line data Source code
1 : #include <stdexcept> 2 : #include <iostream> 3 : 4 : #include "felixbus/felixbus.hpp" 5 : #include "felixbus/felixbus.h" 6 : 7 : #include "felix/felix_fid.h" 8 : 9 : using namespace felixbus; 10 : 11 : #define STREAM_MASK 0xFF 12 : 13 101 : void FelixBus::publish(uint64_t fid, const std::string& filename, FelixBusInfo& info, std::error_code* ecptr) { 14 101 : if (ecptr) { 15 4 : ecptr->clear(); 16 : } 17 : 18 101 : if (info.stream) { 19 40 : fid &= ~STREAM_MASK; 20 : } 21 : 22 101 : struct felix_bus_info bus_info; 23 101 : bus_info.ip = info.ip.c_str(); 24 101 : bus_info.port = info.port; 25 101 : bus_info.unbuffered = info.unbuffered; 26 101 : bus_info.pubsub = info.pubsub; 27 101 : bus_info.raw_tcp = info.raw_tcp; 28 101 : bus_info.stream = info.stream; 29 101 : bus_info.netio_pages = info.netio_pages; 30 101 : bus_info.netio_pagesize = info.netio_pagesize; 31 : 32 101 : felix_bus_set_verbose(verbose); 33 101 : felix_bus_set_cleanup(cleanup); 34 : 35 101 : felix_bus bus = nullptr; 36 303 : char* bus_path = felix_bus_path(bus_path_prefix.c_str(), groupname.c_str(), get_vid(fid), get_did(fid), get_cid(fid), filename.c_str()); 37 101 : if (bus_path) { 38 101 : try { 39 202 : bus = bus_by_name.at(bus_path); 40 42 : } catch(const std::out_of_range&) { 41 42 : bus = felix_bus_open(bus_path); 42 84 : bus_by_name[bus_path] = bus; 43 42 : } 44 : } 45 : 46 101 : if (bus) { 47 100 : int rc = felix_bus_write(bus, fid, &bus_info); 48 100 : if (rc == 0) { 49 100 : free(bus_path); 50 100 : return; 51 : } 52 : } 53 : 54 1 : free(bus_path); 55 1 : auto error_code = std::make_error_code(std::errc::no_such_file_or_directory); 56 1 : if (ecptr) { 57 1 : *ecptr = error_code; 58 1 : return; 59 : } 60 0 : throw std::system_error(error_code); 61 : } 62 : 63 357 : void FelixBus::publish_close() { 64 399 : for (const auto & [name, bus] : bus_by_name) { 65 42 : if (verbose) 66 1 : printf("Closing %s\n", name.c_str()); 67 42 : felix_bus_close(bus); 68 42 : felix_bus_release(bus); 69 : } 70 357 : bus_by_name.clear(); 71 357 : } 72 : 73 26124 : FelixBusInfo FelixBus::get_info(uint64_t fid, std::error_code* ecptr) { 74 : 75 26124 : if (verbose) 76 3959 : printf("Getting info for 0x%lx\n", fid); 77 : 78 26124 : bool looking_for_stream = false; 79 26124 : if ((fid & STREAM_MASK) > 0) { 80 22 : fid &= ~STREAM_MASK; 81 22 : looking_for_stream = true; 82 : } 83 : 84 26124 : if (verbose) 85 3959 : printf("Actually Getting info for 0x%lx\n", fid); 86 : 87 : // uint8_t vid = (fid >> 60) & 0xF; 88 26124 : uint8_t did = (fid >> 52) & 0xFF; 89 26124 : uint32_t cid = (fid >> 36) & 0xFFFF; 90 26124 : std::filesystem::path path = bus_path_prefix; 91 26124 : path /= groupname; 92 52248 : path /= int_to_hex(did, 1); 93 52248 : path /= int_to_hex(cid, 1); 94 : 95 26125 : std::string fid_hex = "0x" + int_to_hex(fid); 96 26124 : FelixBusInfo info; 97 26124 : felix_bus_set_verbose(verbose); 98 : 99 26124 : if (verbose) 100 3959 : printf("Looking for %s\n", path.c_str()); 101 : 102 26124 : if (std::filesystem::exists(path)) { 103 26115 : try{ 104 136855 : for(std::filesystem::directory_entry json_file: std::filesystem::directory_iterator(path)) { 105 58499 : std::error_code ec; 106 58499 : json_file.refresh(ec); 107 58499 : if(ec){ 108 0 : std::cout << "ERROR: " << ec << std::endl; 109 : } 110 58499 : if (verbose) 111 3984 : std::cout << "Looking for " << fid_hex << " in " << json_file.path() << std::endl; 112 58499 : simdjson::dom::element fid_data; 113 58499 : simdjson::dom::element data; 114 : 115 58499 : bool stale = felix_bus_stale(json_file.path().c_str()); 116 58499 : if (verbose) 117 3985 : std::cout << "Stale file " << json_file.path() << (ignore_stale ? " (ignored)" : " (NOT ignored)") << std::endl; 118 58499 : if (ignore_stale && stale) { 119 2 : continue; 120 : } 121 : 122 58497 : simdjson::dom::document_stream entries; 123 116994 : auto error = parser.load_many(json_file.path()).get(entries); 124 58497 : if (error) { 125 0 : if (verbose) 126 0 : std::cerr << "ERROR: " << error << std::endl; 127 0 : continue; 128 : } 129 : // we need the last entry in the file for this fid, 130 : // so we need to look through all of them... 131 58497 : bool found = false; 132 10516741 : for (auto entry : entries) { 133 : //std::cout << "E " << entry << std::endl; 134 : 135 10458244 : auto no_data = entry.get(data); 136 10458244 : if (no_data) { 137 0 : if (verbose) 138 0 : std::cout << no_data << std::endl; 139 10458244 : continue; 140 : } 141 : 142 10458244 : uint64_t entry_fid; 143 10458244 : auto no_fid = data[BUS_FID].get(entry_fid); 144 0 : if (no_fid) { 145 0 : if (verbose) 146 0 : std::cout << no_fid << std::endl; 147 0 : continue; 148 : } 149 : 150 10458244 : if (entry_fid != fid) { 151 10432039 : continue; 152 : } 153 : 154 26205 : if (verbose) 155 4037 : std::cout << fid_hex << ": " << data << std::endl; 156 : 157 : // found one but it may not be the last one 158 26205 : found = true; 159 26205 : info.fid = data[BUS_FID]; 160 26205 : info.hfid = data[BUS_HFID]; 161 26205 : info.ip = data[BUS_IP]; 162 26205 : info.port = data[BUS_PORT]; 163 26205 : info.unbuffered = data[BUS_UNBUFFERED]; 164 26205 : info.pubsub = data[BUS_PUBSUB]; 165 26205 : info.netio_pages = data[BUS_NETIO_PAGES]; 166 26205 : info.netio_pagesize = data[BUS_NETIO_PAGESIZE]; 167 : 168 : // compatibility for field(s) added in 4.3.x 169 26205 : bool raw_tcp = false; 170 26205 : auto no_raw_tcp = data[BUS_RAW_TCP].get(raw_tcp); 171 26205 : info.raw_tcp = no_raw_tcp ? false : raw_tcp; 172 : 173 : // compatibility for field(s) added in 4.2.9 174 26205 : bool stream = false; 175 26205 : auto no_stream = data[BUS_STREAM].get(stream); 176 26205 : info.stream = no_stream ? false : stream; 177 : 178 : // special fields 179 26205 : info.host = data[BUS_HOST]; 180 26205 : uint64_t pid = 0; 181 26205 : auto no_pid = data[BUS_PID].get(pid); 182 0 : if (!no_pid) { 183 26205 : info.pid = (pid_t)pid; 184 : } 185 26205 : info.user = data[BUS_USER]; 186 : 187 : // Check if we are looking for a stream while the one found is a plain FID 188 26205 : if (looking_for_stream && !info.stream) { 189 : found = false; 190 : } 191 : } 192 : 193 58497 : if (found) { 194 26104 : if (ecptr) { 195 26103 : ecptr->clear(); 196 : } 197 26104 : return info; 198 : } 199 : // std::cout << entries.truncated_bytes() << " bytes "<< std::endl; 200 169226 : } 201 0 : } catch(std::bad_alloc& e) { 202 0 : auto error_code = std::make_error_code(std::errc::permission_denied); 203 0 : if (ecptr) { 204 0 : *ecptr = error_code; 205 0 : return info; 206 : } 207 0 : throw; 208 0 : } catch (std::exception& e){ 209 0 : auto error_code = std::make_error_code(std::errc::no_message_available); 210 0 : if (ecptr) { 211 0 : *ecptr = error_code; 212 0 : return info; 213 : } 214 0 : throw; 215 0 : } 216 : } 217 : // std::cout << "FID not found " << fid_hex << std::endl; 218 20 : auto error_code = std::make_error_code(std::errc::no_such_file_or_directory); 219 20 : if (ecptr) { 220 19 : *ecptr = error_code; 221 19 : return info; 222 : } 223 1 : throw std::system_error(error_code); 224 52248 : }