aboutsummaryrefslogtreecommitdiffstats
path: root/app
diff options
context:
space:
mode:
authorAlban Peignier2017-08-28 00:27:29 +0200
committerAlban Peignier2017-08-28 00:27:29 +0200
commitd01ec772b1e65e5d00f56bcd288bf710d1b55712 (patch)
tree5a251e3c08a4050fba629003bfbc33a6d62b919c /app
parentb240f38d661e6e11428e3f09258ba29bcbf228bb (diff)
parentd79f84398849e9c32fdf41582d299dd914fb8452 (diff)
downloadchouette-core-d01ec772b1e65e5d00f56bcd288bf710d1b55712.tar.bz2
Merge branch '4273_http_service_broken'
Diffstat (limited to 'app')
-rw-r--r--app/controllers/api/v1/netex_imports_controller.rb45
-rw-r--r--app/controllers/imports_controller.rb11
-rw-r--r--app/models/concerns/error_format.rb29
-rw-r--r--app/models/import.rb2
-rw-r--r--app/models/netex_import.rb16
-rw-r--r--app/models/workbench_import.rb5
-rw-r--r--app/services/http_service.rb15
-rw-r--r--app/services/retry_service.rb54
-rw-r--r--app/services/zip_service.rb79
-rw-r--r--app/views/imports/_form.html.slim10
-rw-r--r--app/workers/workbench_import_worker.rb86
11 files changed, 181 insertions, 171 deletions
diff --git a/app/controllers/api/v1/netex_imports_controller.rb b/app/controllers/api/v1/netex_imports_controller.rb
index 8f7c8e67e..cb863b9fc 100644
--- a/app/controllers/api/v1/netex_imports_controller.rb
+++ b/app/controllers/api/v1/netex_imports_controller.rb
@@ -3,6 +3,8 @@ module Api
class NetexImportsController < ChouetteController
include ControlFlow
+ skip_before_action :authenticate
+
def create
respond_to do | format |
format.json(&method(:create_models))
@@ -26,8 +28,20 @@ module Api
end
def create_netex_import
- @netex_import = NetexImport.new(netex_import_params.merge(referential_id: @new_referential.id, creator: 'Webservice'))
+ attributes = netex_import_params
+ if @new_referential.persisted?
+ attributes = attributes.merge referential_id: @new_referential.id, creator: "Webservice"
+ else
+ attributes = attributes.merge status: "failed"
+ end
+
+ @netex_import = NetexImport.new attributes
@netex_import.save!
+
+ unless @netex_import.referential
+ Rails.logger.info "Can't create referential for import #{@netex_import.id}: #{@new_referential.inspect} #{@new_referential.metadatas.inspect} #{@new_referential.errors.full_messages}"
+ @netex_import.messages.create criticity: :error, message_key: "cant_create_referential"
+ end
rescue ActiveRecord::RecordInvalid
render json: {errors: @netex_import.errors}, status: 406
finish_action!
@@ -38,17 +52,34 @@ module Api
Referential.new(
name: netex_import_params['name'],
organisation_id: @workbench.organisation_id,
- workbench_id: @workbench.id)
- @new_referential.save!
- rescue ActiveRecord::RecordInvalid
- render json: {errors: @new_referential.errors}, status: 406
- finish_action!
+ workbench_id: @workbench.id,
+ metadatas: [metadata]
+ )
+ @new_referential.save
+ end
+
+ def metadata
+ metadata = ReferentialMetadata.new
+
+ if netex_import_params['file']
+ netex_file = STIF::NetexFile.new(netex_import_params['file'].to_io)
+ frame = netex_file.frames.first
+
+ if frame
+ metadata.periodes = frame.periods
+
+ line_objectids = frame.line_refs.map { |ref| "STIF:CODIFLIGNE:Line:#{ref}" }
+ metadata.line_ids = @workbench.lines.where(objectid: line_objectids).pluck(:id)
+ end
+ end
+
+ metadata
end
def netex_import_params
params
.require('netex_import')
- .permit(:file, :name, :workbench_id)
+ .permit(:file, :name, :workbench_id, :parent_id, :parent_type)
end
end
end
diff --git a/app/controllers/imports_controller.rb b/app/controllers/imports_controller.rb
index 979c9bfcf..de1975890 100644
--- a/app/controllers/imports_controller.rb
+++ b/app/controllers/imports_controller.rb
@@ -34,10 +34,6 @@ class ImportsController < BreadcrumbController
end
end
- def create
- create! { workbench_import_path(parent, resource) }
- end
-
def download
if params[:token] == resource.token_download
send_file resource.file.path
@@ -60,13 +56,16 @@ class ImportsController < BreadcrumbController
private
def build_resource
- # Manage only NetexImports for the moment
- @import ||= NetexImport.new(*resource_params) do |import|
+ @import ||= WorkbenchImport.new(*resource_params) do |import|
import.workbench = parent
import.creator = current_user.name
end
end
+ def collection
+ @imports ||= WorkbenchImport.all
+ end
+
def import_params
params.require(:import).permit(
:name,
diff --git a/app/models/concerns/error_format.rb b/app/models/concerns/error_format.rb
new file mode 100644
index 000000000..158edb6e4
--- /dev/null
+++ b/app/models/concerns/error_format.rb
@@ -0,0 +1,29 @@
+# TODO: This module should be moved out of concerns to somewhere that makes
+# more sense.
+
+module ErrorFormat extend self
+
+ def details error_object
+ error_object.errors.messages.inject({}) do |hash, error|
+ hash.merge(partial(:detail, error_object, error).call)
+ end
+ end
+
+ private
+
+ def detail error_object, error
+ {
+ error.first => {
+ error: error.last.first,
+ value: error_object[error.first]
+ }
+ }
+ end
+
+ def partial name, *partial_args
+ -> *lazy_args do
+ send(name, *(partial_args + lazy_args))
+ end
+ end
+
+end
diff --git a/app/models/import.rb b/app/models/import.rb
index b34ca2b48..94cb025d6 100644
--- a/app/models/import.rb
+++ b/app/models/import.rb
@@ -5,6 +5,8 @@ class Import < ActiveRecord::Base
belongs_to :parent, polymorphic: true
+ has_many :messages, class_name: "ImportMessage"
+
extend Enumerize
enumerize :status, in: %i(new pending successful failed running aborted canceled)
diff --git a/app/models/netex_import.rb b/app/models/netex_import.rb
index 575cef816..1b3eaff18 100644
--- a/app/models/netex_import.rb
+++ b/app/models/netex_import.rb
@@ -1,15 +1,19 @@
require 'net/http'
class NetexImport < Import
- after_commit :launch_java_import
+ after_commit :launch_java_import, on: :create
+ validates_presence_of :parent
def launch_java_import
logger.warn "Call iev get #{Rails.configuration.iev_url}/boiv_iev/referentials/importer/new?id=#{id}"
- begin
- Net::HTTP.get(URI("#{Rails.configuration.iev_url}/boiv_iev/referentials/importer/new?id=#{id}"))
- rescue Exception => e
- logger.error "IEV server error : #{e.message}"
- logger.error e.backtrace.inspect
+
+ Thread.new do
+ begin
+ Net::HTTP.get(URI("#{Rails.configuration.iev_url}/boiv_iev/referentials/importer/new?id=#{id}"))
+ rescue Exception => e
+ logger.error "IEV server error : #{e.message}"
+ logger.error e.backtrace.inspect
+ end
end
end
end
diff --git a/app/models/workbench_import.rb b/app/models/workbench_import.rb
index 9323bd4b5..27f53a44f 100644
--- a/app/models/workbench_import.rb
+++ b/app/models/workbench_import.rb
@@ -1,2 +1,7 @@
class WorkbenchImport < Import
+ after_commit :launch_worker, :on => :create
+
+ def launch_worker
+ WorkbenchImportWorker.perform_async(id)
+ end
end
diff --git a/app/services/http_service.rb b/app/services/http_service.rb
index ae7d0e413..d3999f293 100644
--- a/app/services/http_service.rb
+++ b/app/services/http_service.rb
@@ -20,26 +20,23 @@ module HTTPService extend self
raise "Error on api request status : #{resp.status} => #{resp.body}"
end
end
-
# host: 'http://localhost:3000',
# path: '/api/v1/netex_imports.json',
# token: '13-74009c36638f587c9eafb1ce46e95585',
# params: { netex_import: {referential_id: 13, workbench_id: 1}},
# upload: {file: [StringIO.new('howdy'), 'application/zip', 'greeting']})
def post_resource(host:, path:, token: nil, params: {}, upload: nil)
- Faraday.new(url: host) do |c|
+ result = Faraday.new(url: host) do |c|
c.headers['Authorization'] = "Token token=#{token.inspect}" if token
c.request :multipart
c.request :url_encoded
c.adapter Faraday.default_adapter
-
- if upload
- name = upload.keys.first
- value, mime_type, as_name = upload.values.first
- params.update( name => Faraday::UploadIO.new(value, mime_type, as_name ) )
- end
-
return c.post path, params
end
end
+
+ # Expose this in order to make the service replaceable
+ def upload(*triple)
+ Faraday::UploadIO.new(*triple)
+ end
end
diff --git a/app/services/retry_service.rb b/app/services/retry_service.rb
deleted file mode 100644
index 21b1def36..000000000
--- a/app/services/retry_service.rb
+++ /dev/null
@@ -1,54 +0,0 @@
-require 'result'
-
-class RetryService
-
- Retry = Class.new(RuntimeError)
-
- # @param@ delays:
- # An array of delays that are used to retry after a sleep of the indicated
- # value in case of failed exceutions.
- # Once this array is exhausted the executen fails permanently
- #
- # @param@ rescue_from:
- # During execution all the excpetions from this array +plus RetryService::Retry+ are rescued from and
- # trigger just another retry after a `sleep` as indicated above.
- #
- # @param@ block:
- # This optional code is excuted before each retry, it is passed the result of the failed attempt, thus
- # an `Exception` and the number of execution already tried.
- def initialize( delays: [], rescue_from: [], &blk )
- @intervals = delays
- @registered_exceptions = Array(rescue_from) << Retry
- @failure_callback = blk
- end
-
- # @param@ blk:
- # The code to be executed it will be retried goverened by the `delay` passed into the initializer
- # as described there in case it fails with one of the predefined exceptions or `RetryService::Retry`
- #
- # Eventually it will return a `Result` object.
- def execute &blk
- result = execute_protected blk
- return result if result.ok?
- @intervals.each_with_index do | interval, retry_count |
- sleep interval
- @failure_callback.try(:call, result.value, retry_count + 1)
- result = execute_protected blk
- return result if result.ok?
- end
- result
- end
-
-
- private
-
- def execute_protected blk
- Result.ok(blk.())
- rescue Exception => e
- if @registered_exceptions.any?{ |re| e.is_a? re }
- Result.error(e)
- else
- raise
- end
- end
-end
diff --git a/app/services/zip_service.rb b/app/services/zip_service.rb
index 778bfd06d..cab301b01 100644
--- a/app/services/zip_service.rb
+++ b/app/services/zip_service.rb
@@ -1,55 +1,68 @@
class ZipService
+ # TODO: Remove me before merge https://github.com/rubyzip/rubyzip
- attr_reader :current_entry, :zip_data
+ class Subdir < Struct.new(:name, :stream)
+ end
+
+ attr_reader :current_key, :current_output, :yielder
def initialize data
- @zip_data = data
- @current_entry = nil
- end
-
- class << self
- def convert_entries entries
- -> output_stream do
- entries.each do |e|
- output_stream.put_next_entry e.name
- output_stream.write e.get_input_stream.read
- end
- end
- end
+ @zip_data = StringIO.new(data)
+ @current_key = nil
+ @current_output = nil
+ end
- def entries input_stream
- Enumerator.new do |enum|
- loop{ enum << input_stream.get_next_entry }
- end.lazy.take_while{ |e| e }
+ def subdirs
+ Enumerator.new do |yielder|
+ @yielder = yielder
+ Zip::File.open_buffer(@zip_data, &(method :_subdirs))
end
end
- def entry_groups
- self.class.entries(input_stream).group_by(&method(:entry_key))
+ def _subdirs zip_file
+ zip_file.each do | entry |
+ add_entry entry
+ end
+ finish_current_output
end
- def entry_group_streams
- entry_groups.map(&method(:make_stream)).to_h
+ def add_entry entry
+ key = entry_key entry
+ unless key == current_key
+ finish_current_output
+ open_new_output key
+ end
+ add_to_current_output entry
end
- def entry_key entry
- entry.name.split('/', -1)[-2]
+ def add_to_current_output entry
+ current_output.put_next_entry entry.name
+ write_to_current_output entry.get_input_stream
end
- def make_stream pair
- name, entries = pair
- [name, make_stream_from( entries )]
+ def write_to_current_output input_stream
+ # the condition below is true for directory entries
+ return if Zip::NullInputStream == input_stream
+ current_output.write input_stream.read
end
- def make_stream_from entries
- Zip::OutputStream.write_buffer(&self.class.convert_entries(entries))
+ def finish_current_output
+ if current_output
+ @yielder << Subdir.new(
+ current_key,
+ # Second part of the solution, yield the closed stream
+ current_output.close_buffer)
+ end
end
- def next_entry
- @current_entry = input_stream.get_next_entry
+ def open_new_output entry_key
+ @current_key = entry_key
+ # First piece of the solution, use internal way to create a Zip::OutputStream
+ @current_output = Zip::OutputStream.new(StringIO.new(''), true, nil)
end
- def input_stream
- @__input_stream__ ||= Zip::InputStream.open(StringIO.new(zip_data))
+ def entry_key entry
+ # last dir name File.dirname.split("/").last
+ entry.name.split('/', -1)[-2]
end
end
diff --git a/app/views/imports/_form.html.slim b/app/views/imports/_form.html.slim
index 3ec22415f..0fbf578be 100644
--- a/app/views/imports/_form.html.slim
+++ b/app/views/imports/_form.html.slim
@@ -3,7 +3,7 @@
.row
.col-lg-12
= form.input :name
-
+
.row
.col-lg-12
.form-group
@@ -11,12 +11,4 @@
.col-sm-8.col-xs-7
= form.input_field :file, label: false, class: 'form-control'
- .separator
-
- .row
- .col-lg-12
- = form.association :referential, collection: workbench.referentials, input_html: { 'data-select2ed': 'true', 'data-select2ed-placeholder': t('imports.filters.referential') }, label: t('activerecord.attributes.import.references_type'), label_method: :name, wrapper_html: { class: 'select2ed'}
- = form.input :type, as: :hidden
-
-
= form.button :submit, t('actions.submit'), class: 'btn btn-default formSubmitr', form: 'wb_import_form'
diff --git a/app/workers/workbench_import_worker.rb b/app/workers/workbench_import_worker.rb
index 7f77b46dc..6818bb84b 100644
--- a/app/workers/workbench_import_worker.rb
+++ b/app/workers/workbench_import_worker.rb
@@ -3,15 +3,13 @@ class WorkbenchImportWorker
include Rails.application.routes.url_helpers
include Configurable
- RETRY_DELAYS = [3, 5, 8]
-
# Workers
# =======
def perform(import_id)
@workbench_import = WorkbenchImport.find(import_id)
@response = nil
- @workbench_import.update_attributes(status: 'running')
+ @workbench_import.update(status: 'running')
downloaded = download
zip_service = ZipService.new(downloaded)
upload zip_service
@@ -25,59 +23,47 @@ class WorkbenchImportWorker
params: {token: @workbench_import.token_download}).body
end
- def execute_post eg_name, eg_stream
+ def execute_post eg_name, eg_file
logger.info "HTTP POST #{export_url} (for #{complete_entry_group_name(eg_name)})"
HTTPService.post_resource(
host: export_host,
path: export_path,
- token: token(eg_name),
- params: params,
- upload: {file: [eg_stream, 'application/zip', eg_name]})
- end
-
- def log_failure reason, count
- logger.warn "HTTP POST failed with #{reason}, count = #{count}, response=#{@response}"
- end
-
- def try_again
- raise RetryService::Retry
- end
-
- def try_upload_entry_group eg_name, eg_stream
- result = execute_post eg_name, eg_stream
- return result if result && result.status < 400
- @response = result.body
- try_again
+ params: params(eg_file, eg_name))
end
def upload zip_service
- entry_group_streams = zip_service.entry_group_streams
- @workbench_import.update_attributes total_steps: entry_group_streams.size
+ entry_group_streams = zip_service.subdirs
+ @workbench_import.update total_steps: entry_group_streams.size
entry_group_streams.each_with_index(&method(:upload_entry_group))
- rescue StopIteration
- @workbench_import.update_attributes( current_step: entry_group_streams.size, status: 'failed' )
+ rescue Exception => e
+ logger.error e.message
+ @workbench_import.update( current_step: entry_group_streams.size, status: 'failed' )
+ raise
end
def upload_entry_group entry_pair, element_count
- @workbench_import.update_attributes( current_step: element_count.succ )
- retry_service = RetryService.new(
- delays: RETRY_DELAYS,
- rescue_from: [HTTPService::Timeout],
- &method(:log_failure))
- status = retry_service.execute(&upload_entry_group_proc(entry_pair))
- raise StopIteration unless status.ok?
- end
-
- def upload_entry_group_proc entry_pair
- eg_name, eg_stream = entry_pair
- # This should be fn.try_upload_entry_group(eg_name, eg_stream) ;(
- -> do
- try_upload_entry_group(eg_name, eg_stream)
+ @workbench_import.update( current_step: element_count.succ )
+ # status = retry_service.execute(&upload_entry_group_proc(entry_pair))
+ eg_name = entry_pair.name
+ eg_stream = entry_pair.stream
+ eg_file = File.new(Rails.root.join('tmp', "WorkbenchImport_#{eg_name}_#{$$}.zip"), 'wb').tap do |file|
+ eg_stream.rewind
+ file.write eg_stream.read
end
+ eg_file.close
+ eg_file = File.new(Rails.root.join('tmp', "WorkbenchImport_#{eg_name}_#{$$}.zip"))
+ result = execute_post eg_name, eg_file
+ if result && result.status < 400
+ result
+ else
+ raise StopIteration, result.body
+ end
+ ensure
+ eg_file.close rescue nil
+ eg_file.unlink rescue nil
end
-
# Queries
# =======
@@ -85,10 +71,6 @@ class WorkbenchImportWorker
[@workbench_import.name, entry_group_name].join("--")
end
- def token entry_group_name
- Api::V1::ApiKey.from(@workbench_import.referential, name: complete_entry_group_name(entry_group_name)).token
- end
-
# Constants
# =========
@@ -112,7 +94,17 @@ class WorkbenchImportWorker
@__import_url__ ||= File.join(import_host, import_path)
end
- def params
- @__params__ ||= { netex_import: { referential_id: @workbench_import.referential_id, workbench_id: @workbench_import.workbench_id } }
+ def params file, name
+ if dest = ENV["DEBUG_TEMPFILE"]
+ require 'pry'
+ binding.pry
+ %x{unzip -oqq #{file.path} -d #{dest}}
+ end
+ { netex_import:
+ { parent_id: @workbench_import.id,
+ parent_type: @workbench_import.class.name,
+ workbench_id: @workbench_import.workbench_id,
+ name: name,
+ file: HTTPService.upload(file, 'application/zip', "#{name}.zip") } }
end
end