diff options
Diffstat (limited to 'app')
| -rw-r--r-- | app/controllers/api/v1/netex_imports_controller.rb | 45 | ||||
| -rw-r--r-- | app/controllers/imports_controller.rb | 11 | ||||
| -rw-r--r-- | app/models/concerns/error_format.rb | 29 | ||||
| -rw-r--r-- | app/models/import.rb | 2 | ||||
| -rw-r--r-- | app/models/netex_import.rb | 16 | ||||
| -rw-r--r-- | app/models/workbench_import.rb | 5 | ||||
| -rw-r--r-- | app/services/http_service.rb | 15 | ||||
| -rw-r--r-- | app/services/retry_service.rb | 54 | ||||
| -rw-r--r-- | app/services/zip_service.rb | 79 | ||||
| -rw-r--r-- | app/views/imports/_form.html.slim | 10 | ||||
| -rw-r--r-- | app/workers/workbench_import_worker.rb | 86 |
11 files changed, 181 insertions, 171 deletions
diff --git a/app/controllers/api/v1/netex_imports_controller.rb b/app/controllers/api/v1/netex_imports_controller.rb index 8f7c8e67e..cb863b9fc 100644 --- a/app/controllers/api/v1/netex_imports_controller.rb +++ b/app/controllers/api/v1/netex_imports_controller.rb @@ -3,6 +3,8 @@ module Api class NetexImportsController < ChouetteController include ControlFlow + skip_before_action :authenticate + def create respond_to do | format | format.json(&method(:create_models)) @@ -26,8 +28,20 @@ module Api end def create_netex_import - @netex_import = NetexImport.new(netex_import_params.merge(referential_id: @new_referential.id, creator: 'Webservice')) + attributes = netex_import_params + if @new_referential.persisted? + attributes = attributes.merge referential_id: @new_referential.id, creator: "Webservice" + else + attributes = attributes.merge status: "failed" + end + + @netex_import = NetexImport.new attributes @netex_import.save! + + unless @netex_import.referential + Rails.logger.info "Can't create referential for import #{@netex_import.id}: #{@new_referential.inspect} #{@new_referential.metadatas.inspect} #{@new_referential.errors.full_messages}" + @netex_import.messages.create criticity: :error, message_key: "cant_create_referential" + end rescue ActiveRecord::RecordInvalid render json: {errors: @netex_import.errors}, status: 406 finish_action! @@ -38,17 +52,34 @@ module Api Referential.new( name: netex_import_params['name'], organisation_id: @workbench.organisation_id, - workbench_id: @workbench.id) - @new_referential.save! - rescue ActiveRecord::RecordInvalid - render json: {errors: @new_referential.errors}, status: 406 - finish_action! + workbench_id: @workbench.id, + metadatas: [metadata] + ) + @new_referential.save + end + + def metadata + metadata = ReferentialMetadata.new + + if netex_import_params['file'] + netex_file = STIF::NetexFile.new(netex_import_params['file'].to_io) + frame = netex_file.frames.first + + if frame + metadata.periodes = frame.periods + + line_objectids = frame.line_refs.map { |ref| "STIF:CODIFLIGNE:Line:#{ref}" } + metadata.line_ids = @workbench.lines.where(objectid: line_objectids).pluck(:id) + end + end + + metadata end def netex_import_params params .require('netex_import') - .permit(:file, :name, :workbench_id) + .permit(:file, :name, :workbench_id, :parent_id, :parent_type) end end end diff --git a/app/controllers/imports_controller.rb b/app/controllers/imports_controller.rb index 979c9bfcf..de1975890 100644 --- a/app/controllers/imports_controller.rb +++ b/app/controllers/imports_controller.rb @@ -34,10 +34,6 @@ class ImportsController < BreadcrumbController end end - def create - create! { workbench_import_path(parent, resource) } - end - def download if params[:token] == resource.token_download send_file resource.file.path @@ -60,13 +56,16 @@ class ImportsController < BreadcrumbController private def build_resource - # Manage only NetexImports for the moment - @import ||= NetexImport.new(*resource_params) do |import| + @import ||= WorkbenchImport.new(*resource_params) do |import| import.workbench = parent import.creator = current_user.name end end + def collection + @imports ||= WorkbenchImport.all + end + def import_params params.require(:import).permit( :name, diff --git a/app/models/concerns/error_format.rb b/app/models/concerns/error_format.rb new file mode 100644 index 000000000..158edb6e4 --- /dev/null +++ b/app/models/concerns/error_format.rb @@ -0,0 +1,29 @@ +# TODO: This module should be moved out of concerns to somewhere that makes +# more sense. + +module ErrorFormat extend self + + def details error_object + error_object.errors.messages.inject({}) do |hash, error| + hash.merge(partial(:detail, error_object, error).call) + end + end + + private + + def detail error_object, error + { + error.first => { + error: error.last.first, + value: error_object[error.first] + } + } + end + + def partial name, *partial_args + -> *lazy_args do + send(name, *(partial_args + lazy_args)) + end + end + +end diff --git a/app/models/import.rb b/app/models/import.rb index b34ca2b48..94cb025d6 100644 --- a/app/models/import.rb +++ b/app/models/import.rb @@ -5,6 +5,8 @@ class Import < ActiveRecord::Base belongs_to :parent, polymorphic: true + has_many :messages, class_name: "ImportMessage" + extend Enumerize enumerize :status, in: %i(new pending successful failed running aborted canceled) diff --git a/app/models/netex_import.rb b/app/models/netex_import.rb index 575cef816..1b3eaff18 100644 --- a/app/models/netex_import.rb +++ b/app/models/netex_import.rb @@ -1,15 +1,19 @@ require 'net/http' class NetexImport < Import - after_commit :launch_java_import + after_commit :launch_java_import, on: :create + validates_presence_of :parent def launch_java_import logger.warn "Call iev get #{Rails.configuration.iev_url}/boiv_iev/referentials/importer/new?id=#{id}" - begin - Net::HTTP.get(URI("#{Rails.configuration.iev_url}/boiv_iev/referentials/importer/new?id=#{id}")) - rescue Exception => e - logger.error "IEV server error : #{e.message}" - logger.error e.backtrace.inspect + + Thread.new do + begin + Net::HTTP.get(URI("#{Rails.configuration.iev_url}/boiv_iev/referentials/importer/new?id=#{id}")) + rescue Exception => e + logger.error "IEV server error : #{e.message}" + logger.error e.backtrace.inspect + end end end end diff --git a/app/models/workbench_import.rb b/app/models/workbench_import.rb index 9323bd4b5..27f53a44f 100644 --- a/app/models/workbench_import.rb +++ b/app/models/workbench_import.rb @@ -1,2 +1,7 @@ class WorkbenchImport < Import + after_commit :launch_worker, :on => :create + + def launch_worker + WorkbenchImportWorker.perform_async(id) + end end diff --git a/app/services/http_service.rb b/app/services/http_service.rb index ae7d0e413..d3999f293 100644 --- a/app/services/http_service.rb +++ b/app/services/http_service.rb @@ -20,26 +20,23 @@ module HTTPService extend self raise "Error on api request status : #{resp.status} => #{resp.body}" end end - # host: 'http://localhost:3000', # path: '/api/v1/netex_imports.json', # token: '13-74009c36638f587c9eafb1ce46e95585', # params: { netex_import: {referential_id: 13, workbench_id: 1}}, # upload: {file: [StringIO.new('howdy'), 'application/zip', 'greeting']}) def post_resource(host:, path:, token: nil, params: {}, upload: nil) - Faraday.new(url: host) do |c| + result = Faraday.new(url: host) do |c| c.headers['Authorization'] = "Token token=#{token.inspect}" if token c.request :multipart c.request :url_encoded c.adapter Faraday.default_adapter - - if upload - name = upload.keys.first - value, mime_type, as_name = upload.values.first - params.update( name => Faraday::UploadIO.new(value, mime_type, as_name ) ) - end - return c.post path, params end end + + # Expose this in order to make the service replaceable + def upload(*triple) + Faraday::UploadIO.new(*triple) + end end diff --git a/app/services/retry_service.rb b/app/services/retry_service.rb deleted file mode 100644 index 21b1def36..000000000 --- a/app/services/retry_service.rb +++ /dev/null @@ -1,54 +0,0 @@ -require 'result' - -class RetryService - - Retry = Class.new(RuntimeError) - - # @param@ delays: - # An array of delays that are used to retry after a sleep of the indicated - # value in case of failed exceutions. - # Once this array is exhausted the executen fails permanently - # - # @param@ rescue_from: - # During execution all the excpetions from this array +plus RetryService::Retry+ are rescued from and - # trigger just another retry after a `sleep` as indicated above. - # - # @param@ block: - # This optional code is excuted before each retry, it is passed the result of the failed attempt, thus - # an `Exception` and the number of execution already tried. - def initialize( delays: [], rescue_from: [], &blk ) - @intervals = delays - @registered_exceptions = Array(rescue_from) << Retry - @failure_callback = blk - end - - # @param@ blk: - # The code to be executed it will be retried goverened by the `delay` passed into the initializer - # as described there in case it fails with one of the predefined exceptions or `RetryService::Retry` - # - # Eventually it will return a `Result` object. - def execute &blk - result = execute_protected blk - return result if result.ok? - @intervals.each_with_index do | interval, retry_count | - sleep interval - @failure_callback.try(:call, result.value, retry_count + 1) - result = execute_protected blk - return result if result.ok? - end - result - end - - - private - - def execute_protected blk - Result.ok(blk.()) - rescue Exception => e - if @registered_exceptions.any?{ |re| e.is_a? re } - Result.error(e) - else - raise - end - end -end diff --git a/app/services/zip_service.rb b/app/services/zip_service.rb index 778bfd06d..cab301b01 100644 --- a/app/services/zip_service.rb +++ b/app/services/zip_service.rb @@ -1,55 +1,68 @@ class ZipService + # TODO: Remove me before merge https://github.com/rubyzip/rubyzip - attr_reader :current_entry, :zip_data + class Subdir < Struct.new(:name, :stream) + end + + attr_reader :current_key, :current_output, :yielder def initialize data - @zip_data = data - @current_entry = nil - end - - class << self - def convert_entries entries - -> output_stream do - entries.each do |e| - output_stream.put_next_entry e.name - output_stream.write e.get_input_stream.read - end - end - end + @zip_data = StringIO.new(data) + @current_key = nil + @current_output = nil + end - def entries input_stream - Enumerator.new do |enum| - loop{ enum << input_stream.get_next_entry } - end.lazy.take_while{ |e| e } + def subdirs + Enumerator.new do |yielder| + @yielder = yielder + Zip::File.open_buffer(@zip_data, &(method :_subdirs)) end end - def entry_groups - self.class.entries(input_stream).group_by(&method(:entry_key)) + def _subdirs zip_file + zip_file.each do | entry | + add_entry entry + end + finish_current_output end - def entry_group_streams - entry_groups.map(&method(:make_stream)).to_h + def add_entry entry + key = entry_key entry + unless key == current_key + finish_current_output + open_new_output key + end + add_to_current_output entry end - def entry_key entry - entry.name.split('/', -1)[-2] + def add_to_current_output entry + current_output.put_next_entry entry.name + write_to_current_output entry.get_input_stream end - def make_stream pair - name, entries = pair - [name, make_stream_from( entries )] + def write_to_current_output input_stream + # the condition below is true for directory entries + return if Zip::NullInputStream == input_stream + current_output.write input_stream.read end - def make_stream_from entries - Zip::OutputStream.write_buffer(&self.class.convert_entries(entries)) + def finish_current_output + if current_output + @yielder << Subdir.new( + current_key, + # Second part of the solution, yield the closed stream + current_output.close_buffer) + end end - def next_entry - @current_entry = input_stream.get_next_entry + def open_new_output entry_key + @current_key = entry_key + # First piece of the solution, use internal way to create a Zip::OutputStream + @current_output = Zip::OutputStream.new(StringIO.new(''), true, nil) end - def input_stream - @__input_stream__ ||= Zip::InputStream.open(StringIO.new(zip_data)) + def entry_key entry + # last dir name File.dirname.split("/").last + entry.name.split('/', -1)[-2] end end diff --git a/app/views/imports/_form.html.slim b/app/views/imports/_form.html.slim index 3ec22415f..0fbf578be 100644 --- a/app/views/imports/_form.html.slim +++ b/app/views/imports/_form.html.slim @@ -3,7 +3,7 @@ .row .col-lg-12 = form.input :name - + .row .col-lg-12 .form-group @@ -11,12 +11,4 @@ .col-sm-8.col-xs-7 = form.input_field :file, label: false, class: 'form-control' - .separator - - .row - .col-lg-12 - = form.association :referential, collection: workbench.referentials, input_html: { 'data-select2ed': 'true', 'data-select2ed-placeholder': t('imports.filters.referential') }, label: t('activerecord.attributes.import.references_type'), label_method: :name, wrapper_html: { class: 'select2ed'} - = form.input :type, as: :hidden - - = form.button :submit, t('actions.submit'), class: 'btn btn-default formSubmitr', form: 'wb_import_form' diff --git a/app/workers/workbench_import_worker.rb b/app/workers/workbench_import_worker.rb index 7f77b46dc..6818bb84b 100644 --- a/app/workers/workbench_import_worker.rb +++ b/app/workers/workbench_import_worker.rb @@ -3,15 +3,13 @@ class WorkbenchImportWorker include Rails.application.routes.url_helpers include Configurable - RETRY_DELAYS = [3, 5, 8] - # Workers # ======= def perform(import_id) @workbench_import = WorkbenchImport.find(import_id) @response = nil - @workbench_import.update_attributes(status: 'running') + @workbench_import.update(status: 'running') downloaded = download zip_service = ZipService.new(downloaded) upload zip_service @@ -25,59 +23,47 @@ class WorkbenchImportWorker params: {token: @workbench_import.token_download}).body end - def execute_post eg_name, eg_stream + def execute_post eg_name, eg_file logger.info "HTTP POST #{export_url} (for #{complete_entry_group_name(eg_name)})" HTTPService.post_resource( host: export_host, path: export_path, - token: token(eg_name), - params: params, - upload: {file: [eg_stream, 'application/zip', eg_name]}) - end - - def log_failure reason, count - logger.warn "HTTP POST failed with #{reason}, count = #{count}, response=#{@response}" - end - - def try_again - raise RetryService::Retry - end - - def try_upload_entry_group eg_name, eg_stream - result = execute_post eg_name, eg_stream - return result if result && result.status < 400 - @response = result.body - try_again + params: params(eg_file, eg_name)) end def upload zip_service - entry_group_streams = zip_service.entry_group_streams - @workbench_import.update_attributes total_steps: entry_group_streams.size + entry_group_streams = zip_service.subdirs + @workbench_import.update total_steps: entry_group_streams.size entry_group_streams.each_with_index(&method(:upload_entry_group)) - rescue StopIteration - @workbench_import.update_attributes( current_step: entry_group_streams.size, status: 'failed' ) + rescue Exception => e + logger.error e.message + @workbench_import.update( current_step: entry_group_streams.size, status: 'failed' ) + raise end def upload_entry_group entry_pair, element_count - @workbench_import.update_attributes( current_step: element_count.succ ) - retry_service = RetryService.new( - delays: RETRY_DELAYS, - rescue_from: [HTTPService::Timeout], - &method(:log_failure)) - status = retry_service.execute(&upload_entry_group_proc(entry_pair)) - raise StopIteration unless status.ok? - end - - def upload_entry_group_proc entry_pair - eg_name, eg_stream = entry_pair - # This should be fn.try_upload_entry_group(eg_name, eg_stream) ;( - -> do - try_upload_entry_group(eg_name, eg_stream) + @workbench_import.update( current_step: element_count.succ ) + # status = retry_service.execute(&upload_entry_group_proc(entry_pair)) + eg_name = entry_pair.name + eg_stream = entry_pair.stream + eg_file = File.new(Rails.root.join('tmp', "WorkbenchImport_#{eg_name}_#{$$}.zip"), 'wb').tap do |file| + eg_stream.rewind + file.write eg_stream.read end + eg_file.close + eg_file = File.new(Rails.root.join('tmp', "WorkbenchImport_#{eg_name}_#{$$}.zip")) + result = execute_post eg_name, eg_file + if result && result.status < 400 + result + else + raise StopIteration, result.body + end + ensure + eg_file.close rescue nil + eg_file.unlink rescue nil end - # Queries # ======= @@ -85,10 +71,6 @@ class WorkbenchImportWorker [@workbench_import.name, entry_group_name].join("--") end - def token entry_group_name - Api::V1::ApiKey.from(@workbench_import.referential, name: complete_entry_group_name(entry_group_name)).token - end - # Constants # ========= @@ -112,7 +94,17 @@ class WorkbenchImportWorker @__import_url__ ||= File.join(import_host, import_path) end - def params - @__params__ ||= { netex_import: { referential_id: @workbench_import.referential_id, workbench_id: @workbench_import.workbench_id } } + def params file, name + if dest = ENV["DEBUG_TEMPFILE"] + require 'pry' + binding.pry + %x{unzip -oqq #{file.path} -d #{dest}} + end + { netex_import: + { parent_id: @workbench_import.id, + parent_type: @workbench_import.class.name, + workbench_id: @workbench_import.workbench_id, + name: name, + file: HTTPService.upload(file, 'application/zip', "#{name}.zip") } } end end |
