# frozen_string_literal: true require 'bulk_insert' require 'nokogiri' # use this to import CPE data into postgres database class CpeImporter XML_NAMESPACES = { 'meta' => 'http://scap.nist.gov/schema/cpe-dictionary-metadata/0.2', 'xsi' => 'http://www.w3.org/2001/XMLSchema-instance', '' => 'http://cpe.mitre.org/dictionary/2.0' }.freeze # TODO: v2.3 is available, see https://cpe.mitre.org/specification/ URL = 'https://nvd.nist.gov' \ '/feeds/xml/cpe/dictionary/official-cpe-dictionary_v2.2.xml.gz' def self.download ActiveSupport::Notifications.instrument 'downloaded.cpe_importer' do uri = URI.parse(URL) Net::HTTP.start(uri.host, uri.port, use_ssl: uri.scheme == 'https') do |http| request = Net::HTTP::Get.new uri http.request request do |response| if (response.code.to_i < 200) || (response.code.to_i > 299) raise StandardError, "Bad CPE def request: #{response.code}: #{response.body}" end read_file_chunks(response) end end end end def self.read_file_chunks(response) File.open('/data_importer/data/official-cpe-dictionary_v2.2.xml.gz', 'w') do |io| response.read_body do |chunk| io.write chunk.force_encoding('UTF-8') end end end def self.transform_node(node) Nokogiri::XML(node.outer_xml).root end def self.accept_node(node) node.name == 'cpe-item' && node.node_type == Nokogiri::XML::Reader::TYPE_ELEMENT end def self.import(bulk_count = 20000, filepath = '/data_importer/data/official-cpe-dictionary_v2.2.xml.gz') Zlib::GzipReader.open(filepath) do |file| items = [] Nokogiri::XML::Reader.from_io(file).each do |node| items << transform_node(node) if accept_node(node) if items.count == bulk_count create_cpes(items) items = [] end end create_cpes(items) if items.any? rescue Nokogiri::XML::SyntaxError => e if file.nil? == false file.rewind file_content_sample = file.read(400) handle_error("Invalid XML in this file: \"#{file_content_sample}\" - original error #{$ERROR_INFO}") end # Couldn't add more info, just re-raise the error raise e end rescue Zlib::GzipFile::Error handle_error("Unable to decompress cpe dictionary: #{$ERROR_INFO}") end def self.handle_error(error_message) raise $ERROR_INFO, error_message.to_s, $ERROR_INFO.backtrace end def self.create_cpes(items) cpes = items.map do |item| cpe_attrs_from_item(item) end Cpe.bulk_insert do |worker| cpes.each do |attrs| worker.add(attrs) end end end def self.cpe_attrs_from_item(item) cpe_attrs = {} item.search('title').each do |title| cpe_attrs[:title] = title.inner_text if title.attribute('lang').value == 'en-US' end metadata = item.at_xpath('meta:item-metadata', XML_NAMESPACES) references = item.search('reference').map { |n| { "#{n.text.gsub(' ', '_').downcase}": n.values } } cpe_attrs[:references] = references cpe_attrs[:name] = item['name'] unless item['name'].nil? cpe_attrs[:modification_date] = metadata['modification-date'] cpe_attrs[:status] = metadata['status'] cpe_attrs[:nvd_id] = metadata['nvd-id'] cpe_attrs end def self.create_cpe(item) cpe_attrs = cpe_attrs_from_item(item) cpe = Cpe.where(name: cpe_attrs[:name]).first_or_initialize return unless cpe.new_record? cpe.title = cpe_attrs[:title] cpe.metadata = cpe_attrs[:metadata] cpe.references = cpe_attrs[:references] cpe.modification_date = cpe_attrs[:modification_date] cpe.status = cpe_attrs[:status] cpe.nvd_id = cpe_attrs[:nvd_id] cpe.save end def self.download_and_import download import end end