aboutsummaryrefslogtreecommitdiffstats
path: root/app
diff options
context:
space:
mode:
Diffstat (limited to 'app')
-rw-r--r--app/concerns/configurable.rb26
-rw-r--r--app/controllers/api/v1/netex_imports_controller.rb26
-rw-r--r--app/controllers/import_tasks_controller.rb1
-rw-r--r--app/models/api/v1/api_key.rb22
-rw-r--r--app/models/import.rb3
-rw-r--r--app/models/netex_import.rb3
-rw-r--r--app/models/organisation.rb18
-rw-r--r--app/models/user.rb17
-rw-r--r--app/models/workbench_import.rb2
-rw-r--r--app/services/file_service.rb24
-rw-r--r--app/services/http_service.rb45
-rw-r--r--app/services/retry_service.rb54
-rw-r--r--app/services/zip_service.rb55
-rw-r--r--app/views/api/v1/netex_imports/create.json.rabl3
-rw-r--r--app/workers/workbench_import_worker.rb118
15 files changed, 382 insertions, 35 deletions
diff --git a/app/concerns/configurable.rb b/app/concerns/configurable.rb
new file mode 100644
index 000000000..c7d0f1fd9
--- /dev/null
+++ b/app/concerns/configurable.rb
@@ -0,0 +1,26 @@
+module Configurable
+
+ module ClassMethods
+ def config &blk
+ blk ? blk.(configuration) : configuration
+ end
+
+ private
+ def configuration
+ @__configuration__ ||= Rails::Application::Configuration.new
+ end
+ end
+
+ module InstanceMethods
+ private
+
+ def config
+ self.class.config
+ end
+ end
+
+ def self.included(into)
+ into.extend ClassMethods
+ into.send :include, InstanceMethods
+ end
+end
diff --git a/app/controllers/api/v1/netex_imports_controller.rb b/app/controllers/api/v1/netex_imports_controller.rb
new file mode 100644
index 000000000..d67d121c0
--- /dev/null
+++ b/app/controllers/api/v1/netex_imports_controller.rb
@@ -0,0 +1,26 @@
+module Api
+ module V1
+ class NetexImportsController < ChouetteController
+
+ def create
+ respond_to do | format |
+ format.json do
+ @import = NetexImport.create(netex_import_params)
+ unless @import.valid?
+ render json: {errors: @import.errors}, status: 406
+ end
+ end
+ end
+ end
+
+
+ private
+
+ def netex_import_params
+ params
+ .require('netex_import')
+ .permit(:file, :name, :referential_id, :workbench_id)
+ end
+ end
+ end
+end
diff --git a/app/controllers/import_tasks_controller.rb b/app/controllers/import_tasks_controller.rb
index 0e3ed6445..cb377ec5a 100644
--- a/app/controllers/import_tasks_controller.rb
+++ b/app/controllers/import_tasks_controller.rb
@@ -1,4 +1,3 @@
-# coding: utf-8
class ImportTasksController < ChouetteController
defaults :resource_class => ImportTask
diff --git a/app/models/api/v1/api_key.rb b/app/models/api/v1/api_key.rb
index 7390db232..e1a7ab5a4 100644
--- a/app/models/api/v1/api_key.rb
+++ b/app/models/api/v1/api_key.rb
@@ -3,9 +3,20 @@ module Api
class ApiKey < ::ActiveRecord::Base
before_create :generate_access_token
belongs_to :referential, :class_name => '::Referential'
+ validates_presence_of :referential
- def self.model_name
- ActiveModel::Name.new self, Api::V1, self.name.demodulize
+ class << self
+ def from(referential, name:)
+ find_or_create_by!(name: name, referential: referential)
+ end
+ def model_name
+ ActiveModel::Name.new Api::V1, self.name.demodulize
+ end
+ def referential_from_token(token)
+ array = token.split('-')
+ return nil unless array.size==2
+ ::Referential.find( array.first)
+ end
end
def eql?(other)
@@ -13,16 +24,11 @@ module Api
other.token == self.token
end
- def self.referential_from_token(token)
- array = token.split('-')
- return nil unless array.size==2
- ::Referential.find( array.first)
- end
private
def generate_access_token
begin
- self.token = "#{referential.id}-#{SecureRandom.hex}"
+ self.token = "#{referential_id}-#{SecureRandom.hex}"
end while self.class.exists?(:token => self.token)
end
end
diff --git a/app/models/import.rb b/app/models/import.rb
index d0736ab0b..c932ecdd9 100644
--- a/app/models/import.rb
+++ b/app/models/import.rb
@@ -3,10 +3,13 @@ class Import < ActiveRecord::Base
belongs_to :workbench
belongs_to :referential
+ belongs_to :parent, class_name: to_s
+
extend Enumerize
enumerize :status, in: %i(new pending successful failed running aborted canceled)
validates :file, presence: true
+ validates_presence_of :referential, :workbench
before_create do
self.token_download = SecureRandom.urlsafe_base64
diff --git a/app/models/netex_import.rb b/app/models/netex_import.rb
index de5b84537..575cef816 100644
--- a/app/models/netex_import.rb
+++ b/app/models/netex_import.rb
@@ -2,12 +2,13 @@ require 'net/http'
class NetexImport < Import
after_commit :launch_java_import
+
def launch_java_import
logger.warn "Call iev get #{Rails.configuration.iev_url}/boiv_iev/referentials/importer/new?id=#{id}"
begin
Net::HTTP.get(URI("#{Rails.configuration.iev_url}/boiv_iev/referentials/importer/new?id=#{id}"))
rescue Exception => e
- logger.error "IEV server error : e.message"
+ logger.error "IEV server error : #{e.message}"
logger.error e.backtrace.inspect
end
end
diff --git a/app/models/organisation.rb b/app/models/organisation.rb
index d0742bda6..f697122aa 100644
--- a/app/models/organisation.rb
+++ b/app/models/organisation.rb
@@ -1,4 +1,3 @@
-# -*- coding: utf-8 -*-
class Organisation < ActiveRecord::Base
include DataFormatEnumerations
@@ -26,19 +25,12 @@ class Organisation < ActiveRecord::Base
def self.portail_api_request
conf = Rails.application.config.try(:stif_portail_api)
- raise 'Rails.application.config.stif_portail_api settings is not defined' unless conf
+ raise 'Rails.application.config.stif_portail_api configuration is not defined' unless conf
- conn = Faraday.new(:url => conf[:url]) do |c|
- c.headers['Authorization'] = "Token token=\"#{conf[:key]}\""
- c.adapter Faraday.default_adapter
- end
-
- resp = conn.get '/api/v1/organizations'
- if resp.status == 200
- JSON.parse resp.body
- else
- raise "Error on api request status : #{resp.status} => #{resp.body}"
- end
+ HTTPService.get_json_resource(
+ host: conf[:url],
+ path: '/api/v1/organizations',
+ token: conf[:key])
end
def self.sync_update code, name, scope
diff --git a/app/models/user.rb b/app/models/user.rb
index c2aa14bda..37d35209a 100644
--- a/app/models/user.rb
+++ b/app/models/user.rb
@@ -41,19 +41,12 @@ class User < ActiveRecord::Base
def self.portail_api_request
conf = Rails.application.config.try(:stif_portail_api)
- raise 'Rails.application.config.stif_portail_api settings is not defined' unless conf
+ raise 'Rails.application.config.stif_portail_api configuration is not defined' unless conf
- conn = Faraday.new(:url => conf[:url]) do |c|
- c.headers['Authorization'] = %{Token token="#{conf[:key]}"}
- c.adapter Faraday.default_adapter
- end
-
- resp = conn.get '/api/v1/users'
- if resp.status == 200
- JSON.parse resp.body
- else
- raise "Error on api request status : #{resp.status} => #{resp.body}"
- end
+ HTTPService.get_json_resource(
+ host: conf[:url],
+ path: '/api/v1/users',
+ token: conf[:key])
end
def self.portail_sync
diff --git a/app/models/workbench_import.rb b/app/models/workbench_import.rb
new file mode 100644
index 000000000..9323bd4b5
--- /dev/null
+++ b/app/models/workbench_import.rb
@@ -0,0 +1,2 @@
+class WorkbenchImport < Import
+end
diff --git a/app/services/file_service.rb b/app/services/file_service.rb
new file mode 100644
index 000000000..3b3ff3561
--- /dev/null
+++ b/app/services/file_service.rb
@@ -0,0 +1,24 @@
+# TODO: Delete me after stable implementation of #1726
+# module FileService extend self
+
+# def unique_filename( path, enum_with: with_ints )
+# file_names = enum_with.map( &file_name_maker(path) )
+# file_names
+# .drop_while( &File.method(:exists?) )
+# .next
+# end
+
+# def with_ints(format='%d')
+# (0..Float::INFINITY)
+# .lazy
+# .map{ |n| format % n }
+# end
+
+
+# private
+
+# def file_name_maker path
+# ->(n){ [path, n].join('_') }
+# end
+
+# end
diff --git a/app/services/http_service.rb b/app/services/http_service.rb
new file mode 100644
index 000000000..ae7d0e413
--- /dev/null
+++ b/app/services/http_service.rb
@@ -0,0 +1,45 @@
+module HTTPService extend self
+
+ Timeout = Faraday::TimeoutError
+
+ def get_resource(host:, path:, token: nil, params: {})
+ Faraday.new(url: host) do |c|
+ c.headers['Authorization'] = "Token token=#{token.inspect}" if token
+ c.adapter Faraday.default_adapter
+
+ return c.get path, params
+ end
+ end
+
+ def get_json_resource(host:, path:, token: nil, params: {})
+ # Stupid Ruby!!! (I mean I just **need** Pattern Matching, maybe I need to write it myself :O)
+ resp = get_resource(host: host, path: path, token: token, params: params)
+ if resp.status == 200
+ return JSON.parse(resp.body)
+ else
+ raise "Error on api request status : #{resp.status} => #{resp.body}"
+ end
+ end
+
+ # host: 'http://localhost:3000',
+ # path: '/api/v1/netex_imports.json',
+ # token: '13-74009c36638f587c9eafb1ce46e95585',
+ # params: { netex_import: {referential_id: 13, workbench_id: 1}},
+ # upload: {file: [StringIO.new('howdy'), 'application/zip', 'greeting']})
+ def post_resource(host:, path:, token: nil, params: {}, upload: nil)
+ Faraday.new(url: host) do |c|
+ c.headers['Authorization'] = "Token token=#{token.inspect}" if token
+ c.request :multipart
+ c.request :url_encoded
+ c.adapter Faraday.default_adapter
+
+ if upload
+ name = upload.keys.first
+ value, mime_type, as_name = upload.values.first
+ params.update( name => Faraday::UploadIO.new(value, mime_type, as_name ) )
+ end
+
+ return c.post path, params
+ end
+ end
+end
diff --git a/app/services/retry_service.rb b/app/services/retry_service.rb
new file mode 100644
index 000000000..21b1def36
--- /dev/null
+++ b/app/services/retry_service.rb
@@ -0,0 +1,54 @@
+require 'result'
+
+class RetryService
+
+ Retry = Class.new(RuntimeError)
+
+ # @param@ delays:
+ # An array of delays that are used to retry after a sleep of the indicated
+ # value in case of failed exceutions.
+ # Once this array is exhausted the executen fails permanently
+ #
+ # @param@ rescue_from:
+ # During execution all the excpetions from this array +plus RetryService::Retry+ are rescued from and
+ # trigger just another retry after a `sleep` as indicated above.
+ #
+ # @param@ block:
+ # This optional code is excuted before each retry, it is passed the result of the failed attempt, thus
+ # an `Exception` and the number of execution already tried.
+ def initialize( delays: [], rescue_from: [], &blk )
+ @intervals = delays
+ @registered_exceptions = Array(rescue_from) << Retry
+ @failure_callback = blk
+ end
+
+ # @param@ blk:
+ # The code to be executed it will be retried goverened by the `delay` passed into the initializer
+ # as described there in case it fails with one of the predefined exceptions or `RetryService::Retry`
+ #
+ # Eventually it will return a `Result` object.
+ def execute &blk
+ result = execute_protected blk
+ return result if result.ok?
+ @intervals.each_with_index do | interval, retry_count |
+ sleep interval
+ @failure_callback.try(:call, result.value, retry_count + 1)
+ result = execute_protected blk
+ return result if result.ok?
+ end
+ result
+ end
+
+
+ private
+
+ def execute_protected blk
+ Result.ok(blk.())
+ rescue Exception => e
+ if @registered_exceptions.any?{ |re| e.is_a? re }
+ Result.error(e)
+ else
+ raise
+ end
+ end
+end
diff --git a/app/services/zip_service.rb b/app/services/zip_service.rb
new file mode 100644
index 000000000..778bfd06d
--- /dev/null
+++ b/app/services/zip_service.rb
@@ -0,0 +1,55 @@
+class ZipService
+
+ attr_reader :current_entry, :zip_data
+
+ def initialize data
+ @zip_data = data
+ @current_entry = nil
+ end
+
+ class << self
+ def convert_entries entries
+ -> output_stream do
+ entries.each do |e|
+ output_stream.put_next_entry e.name
+ output_stream.write e.get_input_stream.read
+ end
+ end
+ end
+
+ def entries input_stream
+ Enumerator.new do |enum|
+ loop{ enum << input_stream.get_next_entry }
+ end.lazy.take_while{ |e| e }
+ end
+ end
+
+ def entry_groups
+ self.class.entries(input_stream).group_by(&method(:entry_key))
+ end
+
+ def entry_group_streams
+ entry_groups.map(&method(:make_stream)).to_h
+ end
+
+ def entry_key entry
+ entry.name.split('/', -1)[-2]
+ end
+
+ def make_stream pair
+ name, entries = pair
+ [name, make_stream_from( entries )]
+ end
+
+ def make_stream_from entries
+ Zip::OutputStream.write_buffer(&self.class.convert_entries(entries))
+ end
+
+ def next_entry
+ @current_entry = input_stream.get_next_entry
+ end
+
+ def input_stream
+ @__input_stream__ ||= Zip::InputStream.open(StringIO.new(zip_data))
+ end
+end
diff --git a/app/views/api/v1/netex_imports/create.json.rabl b/app/views/api/v1/netex_imports/create.json.rabl
new file mode 100644
index 000000000..1361cdb80
--- /dev/null
+++ b/app/views/api/v1/netex_imports/create.json.rabl
@@ -0,0 +1,3 @@
+
+object @import
+attributes :id, :type
diff --git a/app/workers/workbench_import_worker.rb b/app/workers/workbench_import_worker.rb
new file mode 100644
index 000000000..57c0f5f4e
--- /dev/null
+++ b/app/workers/workbench_import_worker.rb
@@ -0,0 +1,118 @@
+class WorkbenchImportWorker
+ include Sidekiq::Worker
+ include Rails.application.routes.url_helpers
+ include Configurable
+
+ RETRY_DELAYS = [3, 5, 8]
+
+ # Workers
+ # =======
+
+ def perform(import_id)
+ @import = WorkbenchImport.find(import_id)
+ @response = nil
+ @import.update_attributes(status: 'running')
+ downloaded = download
+ zip_service = ZipService.new(downloaded)
+ upload zip_service
+ end
+
+ def download
+ logger.info "HTTP GET #{import_url}"
+ @zipfile_data = HTTPService.get_resource(
+ host: import_host,
+ path: import_path,
+ params: {token: @import.token_download}).body
+ end
+
+ def execute_post eg_name, eg_stream
+ logger.info "HTTP POST #{export_url} (for #{complete_entry_group_name(eg_name)})"
+ HTTPService.post_resource(
+ host: export_host,
+ path: export_path,
+ token: token(eg_name),
+ params: params,
+ upload: {file: [eg_stream, 'application/zip', eg_name]})
+ end
+
+ def log_failure reason, count
+ logger.warn "HTTP POST failed with #{reason}, count = #{count}, response=#{@response}"
+ end
+
+ def try_again
+ raise RetryService::Retry
+ end
+
+ def try_upload_entry_group eg_name, eg_stream
+ result = execute_post eg_name, eg_stream
+ return result if result && result.status < 400
+ @response = result.body
+ try_again
+ end
+
+ def upload zip_service
+ entry_group_streams = zip_service.entry_group_streams
+ @import.update_attributes total_steps: entry_group_streams.size
+ entry_group_streams.each_with_index(&method(:upload_entry_group))
+ rescue StopIteration
+ @import.update_attributes( current_step: entry_group_streams.size, status: 'failed' )
+ end
+
+ def upload_entry_group key_pair, element_count
+ @import.update_attributes( current_step: element_count.succ )
+ retry_service = RetryService.new(
+ delays: RETRY_DELAYS,
+ rescue_from: [HTTPService::Timeout],
+ &method(:log_failure))
+ status = retry_service.execute(&upload_entry_group_proc(key_pair))
+ raise StopIteration unless status.ok?
+ end
+
+ def upload_entry_group_proc key_pair
+ eg_name, eg_stream = key_pair
+ # This should be fn.try_upload_entry_group(eg_name, eg_stream) ;(
+ -> do
+ try_upload_entry_group(eg_name, eg_stream)
+ end
+ end
+
+
+
+ # Queries
+ # =======
+
+ def complete_entry_group_name entry_group_name
+ [@import.name, entry_group_name].join("--")
+ end
+
+ def token entry_group_name
+ Api::V1::ApiKey.from(@import.referential, name: complete_entry_group_name(entry_group_name)).token
+ end
+
+ # Constants
+ # =========
+
+ def export_host
+ Rails.application.config.rails_host
+ end
+ def export_path
+ api_v1_netex_imports_path(format: :json)
+ end
+ def export_url
+ @__export_url__ ||= File.join(export_host, export_path)
+ end
+
+ def import_host
+ Rails.application.config.rails_host
+ end
+ def import_path
+ @__import_path__ ||= download_workbench_import_path(@import.workbench, @import)
+ end
+ def import_url
+ @__import_url__ ||= File.join(import_host, import_path)
+ end
+
+ def params
+ @__params__ ||= { netex_import: { referential_id: @import.referential_id, workbench_id: @import.workbench_id } }
+ end
+end