data_importer/lib/importers/cpe_importer.rb

133 lines
3.9 KiB
Ruby

# frozen_string_literal: true
require 'bulk_insert'
require 'nokogiri'
# use this to import CPE data into postgres database
class CpeImporter
XML_NAMESPACES = {
'meta' => 'http://scap.nist.gov/schema/cpe-dictionary-metadata/0.2',
'xsi' => 'http://www.w3.org/2001/XMLSchema-instance',
'' => 'http://cpe.mitre.org/dictionary/2.0'
}.freeze
# TODO: v2.3 is available, see https://cpe.mitre.org/specification/
URL = 'https://nvd.nist.gov' \
'/feeds/xml/cpe/dictionary/official-cpe-dictionary_v2.2.xml.gz'
def self.download
ActiveSupport::Notifications.instrument 'downloaded.cpe_importer' do
uri = URI.parse(URL)
Net::HTTP.start(uri.host, uri.port,
use_ssl: uri.scheme == 'https') do |http|
request = Net::HTTP::Get.new uri
http.request request do |response|
if (response.code.to_i < 200) || (response.code.to_i > 299)
raise StandardError, "Bad CPE def request: #{response.code}: #{response.body}"
end
read_file_chunks(response)
end
end
end
end
def self.read_file_chunks(response)
File.open('/data_importer/data/official-cpe-dictionary_v2.2.xml.gz', 'w') do |io|
response.read_body do |chunk|
io.write chunk.force_encoding('UTF-8')
end
end
end
def self.transform_node(node)
Nokogiri::XML(node.outer_xml).root
end
def self.accept_node(node)
node.name == 'cpe-item' && node.node_type == Nokogiri::XML::Reader::TYPE_ELEMENT
end
def self.import(bulk_count = 20_000, filepath = '/data_importer/data/official-cpe-dictionary_v2.2.xml.gz')
puts 'Now importing Cpes.'
Zlib::GzipReader.open(filepath) do |file|
items = []
Nokogiri::XML::Reader.from_io(file).each do |node|
items << transform_node(node) if accept_node(node)
if items.count == bulk_count
create_cpes(items)
items = []
end
end
create_cpes(items) if items.any?
rescue Nokogiri::XML::SyntaxError => e
if file.nil? == false
file.rewind
file_content_sample = file.read(400)
handle_error("Invalid XML in this file: \"#{file_content_sample}\" - original error #{$ERROR_INFO}")
end
# Couldn't add more info, just re-raise the error
raise e
end
rescue Zlib::GzipFile::Error
handle_error("Unable to decompress cpe dictionary: #{$ERROR_INFO}")
end
def self.handle_error(error_message)
raise $ERROR_INFO,
error_message.to_s,
$ERROR_INFO.backtrace
end
def self.create_cpes(items)
cpes = items.map do |item|
cpe_attrs_from_item(item)
end
Cpe.upsert_all(cpes)
#Cpe.bulk_insert do |worker|
# cpes.each do |attrs|
# worker.add(attrs)
# end
#end
end
def self.cpe_attrs_from_item(item)
cpe_attrs = {}
item.search('title').each do |title|
cpe_attrs[:title] = title.inner_text if title.attribute('lang').value == 'en-US'
end
metadata = item.at_xpath('meta:item-metadata', XML_NAMESPACES)
references = item.search('reference').map { |n| { "#{n.text.gsub(' ', '_').downcase}": n.values } }
cpe_attrs[:references] = references
cpe_attrs[:name] = item['name'] unless item['name'].nil?
cpe_attrs[:modification_date] = metadata['modification-date']
cpe_attrs[:status] = metadata['status']
cpe_attrs[:nvd_id] = metadata['nvd-id']
cpe_attrs
end
def self.create_cpe(item)
cpe_attrs = cpe_attrs_from_item(item)
cpe = Cpe.where(name: cpe_attrs[:name]).first_or_initialize
return unless cpe.new_record?
cpe.title = cpe_attrs[:title]
cpe.metadata = cpe_attrs[:metadata]
cpe.references = cpe_attrs[:references]
cpe.modification_date = cpe_attrs[:modification_date]
cpe.status = cpe_attrs[:status]
cpe.nvd_id = cpe_attrs[:nvd_id]
cpe.save
end
def self.download_and_import
download
import
end
end