diff options
Diffstat (limited to 'app')
| -rw-r--r-- | app/concerns/configurable.rb | 26 | ||||
| -rw-r--r-- | app/controllers/api/v1/netex_imports_controller.rb | 26 | ||||
| -rw-r--r-- | app/controllers/import_tasks_controller.rb | 1 | ||||
| -rw-r--r-- | app/models/api/v1/api_key.rb | 22 | ||||
| -rw-r--r-- | app/models/import.rb | 3 | ||||
| -rw-r--r-- | app/models/netex_import.rb | 3 | ||||
| -rw-r--r-- | app/models/organisation.rb | 18 | ||||
| -rw-r--r-- | app/models/user.rb | 17 | ||||
| -rw-r--r-- | app/models/workbench_import.rb | 2 | ||||
| -rw-r--r-- | app/services/file_service.rb | 24 | ||||
| -rw-r--r-- | app/services/http_service.rb | 45 | ||||
| -rw-r--r-- | app/services/retry_service.rb | 54 | ||||
| -rw-r--r-- | app/services/zip_service.rb | 55 | ||||
| -rw-r--r-- | app/views/api/v1/netex_imports/create.json.rabl | 3 | ||||
| -rw-r--r-- | app/workers/workbench_import_worker.rb | 118 | 
15 files changed, 382 insertions, 35 deletions
| diff --git a/app/concerns/configurable.rb b/app/concerns/configurable.rb new file mode 100644 index 000000000..c7d0f1fd9 --- /dev/null +++ b/app/concerns/configurable.rb @@ -0,0 +1,26 @@ +module Configurable +   +  module ClassMethods +    def config &blk +      blk ? blk.(configuration) : configuration +    end + +    private +    def configuration +      @__configuration__ ||= Rails::Application::Configuration.new +    end +  end + +  module InstanceMethods +    private + +    def config +      self.class.config +    end +  end + +  def self.included(into) +    into.extend ClassMethods +    into.send :include, InstanceMethods +  end +end diff --git a/app/controllers/api/v1/netex_imports_controller.rb b/app/controllers/api/v1/netex_imports_controller.rb new file mode 100644 index 000000000..d67d121c0 --- /dev/null +++ b/app/controllers/api/v1/netex_imports_controller.rb @@ -0,0 +1,26 @@ +module Api +  module V1 +    class NetexImportsController < ChouetteController + +      def create +        respond_to do | format | +          format.json do  +            @import = NetexImport.create(netex_import_params) +            unless @import.valid? +              render json: {errors: @import.errors}, status: 406 +            end +          end +        end +      end + + +      private + +      def netex_import_params +        params +          .require('netex_import') +          .permit(:file, :name, :referential_id, :workbench_id) +      end +    end +  end +end diff --git a/app/controllers/import_tasks_controller.rb b/app/controllers/import_tasks_controller.rb index 0e3ed6445..cb377ec5a 100644 --- a/app/controllers/import_tasks_controller.rb +++ b/app/controllers/import_tasks_controller.rb @@ -1,4 +1,3 @@ -# coding: utf-8  class ImportTasksController < ChouetteController    defaults :resource_class => ImportTask diff --git a/app/models/api/v1/api_key.rb b/app/models/api/v1/api_key.rb index 7390db232..e1a7ab5a4 100644 --- a/app/models/api/v1/api_key.rb +++ b/app/models/api/v1/api_key.rb @@ -3,9 +3,20 @@ module Api      class ApiKey < ::ActiveRecord::Base        before_create :generate_access_token        belongs_to :referential, :class_name => '::Referential' +      validates_presence_of :referential -      def self.model_name -        ActiveModel::Name.new self, Api::V1, self.name.demodulize +      class << self +        def from(referential, name:) +          find_or_create_by!(name: name, referential: referential) +        end +        def model_name +          ActiveModel::Name.new  Api::V1, self.name.demodulize +        end +        def referential_from_token(token) +          array = token.split('-') +          return nil unless array.size==2 +          ::Referential.find( array.first) +        end        end        def eql?(other) @@ -13,16 +24,11 @@ module Api          other.token == self.token        end -      def self.referential_from_token(token) -        array = token.split('-') -        return nil unless array.size==2 -        ::Referential.find( array.first) -      end      private        def generate_access_token          begin -          self.token = "#{referential.id}-#{SecureRandom.hex}" +          self.token = "#{referential_id}-#{SecureRandom.hex}"          end while self.class.exists?(:token => self.token)        end      end diff --git a/app/models/import.rb b/app/models/import.rb index d0736ab0b..c932ecdd9 100644 --- a/app/models/import.rb +++ b/app/models/import.rb @@ -3,10 +3,13 @@ class Import < ActiveRecord::Base    belongs_to :workbench    belongs_to :referential +  belongs_to :parent, class_name: to_s +    extend Enumerize    enumerize :status, in: %i(new pending successful failed running aborted canceled)    validates :file, presence: true +  validates_presence_of :referential, :workbench    before_create do      self.token_download = SecureRandom.urlsafe_base64 diff --git a/app/models/netex_import.rb b/app/models/netex_import.rb index de5b84537..575cef816 100644 --- a/app/models/netex_import.rb +++ b/app/models/netex_import.rb @@ -2,12 +2,13 @@ require 'net/http'  class NetexImport < Import    after_commit :launch_java_import +    def launch_java_import      logger.warn  "Call iev get #{Rails.configuration.iev_url}/boiv_iev/referentials/importer/new?id=#{id}"      begin        Net::HTTP.get(URI("#{Rails.configuration.iev_url}/boiv_iev/referentials/importer/new?id=#{id}"))      rescue Exception => e -      logger.error "IEV server error : e.message" +      logger.error "IEV server error : #{e.message}"        logger.error e.backtrace.inspect      end    end diff --git a/app/models/organisation.rb b/app/models/organisation.rb index d0742bda6..f697122aa 100644 --- a/app/models/organisation.rb +++ b/app/models/organisation.rb @@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*-  class Organisation < ActiveRecord::Base    include DataFormatEnumerations @@ -26,19 +25,12 @@ class Organisation < ActiveRecord::Base    def self.portail_api_request      conf = Rails.application.config.try(:stif_portail_api) -    raise 'Rails.application.config.stif_portail_api settings is not defined' unless conf +    raise 'Rails.application.config.stif_portail_api configuration is not defined' unless conf -    conn = Faraday.new(:url => conf[:url]) do |c| -      c.headers['Authorization'] = "Token token=\"#{conf[:key]}\"" -      c.adapter  Faraday.default_adapter -    end - -    resp = conn.get '/api/v1/organizations' -    if resp.status == 200 -      JSON.parse resp.body -    else -      raise "Error on api request status : #{resp.status} => #{resp.body}" -    end +    HTTPService.get_json_resource( +      host: conf[:url], +      path: '/api/v1/organizations', +      token: conf[:key])    end    def self.sync_update code, name, scope diff --git a/app/models/user.rb b/app/models/user.rb index c2aa14bda..37d35209a 100644 --- a/app/models/user.rb +++ b/app/models/user.rb @@ -41,19 +41,12 @@ class User < ActiveRecord::Base    def self.portail_api_request      conf = Rails.application.config.try(:stif_portail_api) -    raise 'Rails.application.config.stif_portail_api settings is not defined' unless conf +    raise 'Rails.application.config.stif_portail_api configuration is not defined' unless conf -    conn = Faraday.new(:url => conf[:url]) do |c| -      c.headers['Authorization'] = %{Token token="#{conf[:key]}"} -      c.adapter  Faraday.default_adapter -    end - -    resp = conn.get '/api/v1/users' -    if resp.status == 200 -      JSON.parse resp.body -    else -      raise "Error on api request status : #{resp.status} => #{resp.body}" -    end +    HTTPService.get_json_resource( +      host: conf[:url], +      path: '/api/v1/users', +      token: conf[:key])    end    def self.portail_sync diff --git a/app/models/workbench_import.rb b/app/models/workbench_import.rb new file mode 100644 index 000000000..9323bd4b5 --- /dev/null +++ b/app/models/workbench_import.rb @@ -0,0 +1,2 @@ +class WorkbenchImport < Import +end diff --git a/app/services/file_service.rb b/app/services/file_service.rb new file mode 100644 index 000000000..3b3ff3561 --- /dev/null +++ b/app/services/file_service.rb @@ -0,0 +1,24 @@ +# TODO: Delete me after stable implementation of #1726 +# module FileService extend self + +#   def unique_filename( path, enum_with: with_ints ) +#     file_names = enum_with.map( &file_name_maker(path) ) +#     file_names +#       .drop_while( &File.method(:exists?) ) +#       .next +#   end + +#   def with_ints(format='%d') +#     (0..Float::INFINITY) +#       .lazy +#       .map{ |n| format % n } +#   end +   + +#   private + +#   def file_name_maker path +#     ->(n){ [path, n].join('_') } +#   end + +# end diff --git a/app/services/http_service.rb b/app/services/http_service.rb new file mode 100644 index 000000000..ae7d0e413 --- /dev/null +++ b/app/services/http_service.rb @@ -0,0 +1,45 @@ +module HTTPService extend self + +  Timeout = Faraday::TimeoutError + +  def get_resource(host:, path:, token: nil, params: {}) +    Faraday.new(url: host) do |c| +      c.headers['Authorization'] = "Token token=#{token.inspect}" if token +      c.adapter Faraday.default_adapter + +      return c.get path, params +    end +  end + +  def get_json_resource(host:, path:, token: nil, params: {}) +    # Stupid Ruby!!! (I mean I just **need** Pattern Matching, maybe I need to write it myself :O) +    resp = get_resource(host: host, path: path, token: token, params: params)  +    if resp.status == 200 +      return JSON.parse(resp.body) +    else +      raise "Error on api request status : #{resp.status} => #{resp.body}" +    end +  end + +  # host: 'http://localhost:3000', +  # path: '/api/v1/netex_imports.json', +  # token: '13-74009c36638f587c9eafb1ce46e95585', +  # params: { netex_import: {referential_id: 13, workbench_id: 1}}, +  # upload: {file: [StringIO.new('howdy'), 'application/zip', 'greeting']}) +  def post_resource(host:, path:, token: nil, params: {}, upload: nil) +    Faraday.new(url: host) do |c| +      c.headers['Authorization'] = "Token token=#{token.inspect}" if token +      c.request :multipart +      c.request :url_encoded +      c.adapter Faraday.default_adapter + +      if upload +        name = upload.keys.first +        value, mime_type, as_name = upload.values.first +        params.update( name => Faraday::UploadIO.new(value, mime_type, as_name ) ) +      end + +      return c.post path, params +    end +  end +end diff --git a/app/services/retry_service.rb b/app/services/retry_service.rb new file mode 100644 index 000000000..21b1def36 --- /dev/null +++ b/app/services/retry_service.rb @@ -0,0 +1,54 @@ +require 'result' + +class RetryService + +  Retry = Class.new(RuntimeError) + +  # @param@ delays: +  # An array of delays that are used to retry after a sleep of the indicated +  # value in case of failed exceutions. +  # Once this array is exhausted the executen fails permanently +  # +  # @param@ rescue_from: +  # During execution all the excpetions from this array +plus RetryService::Retry+ are rescued from and +  # trigger just another retry after a `sleep` as indicated above. +  # +  # @param@ block: +  # This optional code is excuted before each retry, it is passed the result of the failed attempt, thus +  # an `Exception` and the number of execution already tried. +  def initialize( delays: [], rescue_from: [], &blk ) +    @intervals             = delays +    @registered_exceptions = Array(rescue_from) << Retry +    @failure_callback      = blk +  end + +  # @param@ blk: +  # The code to be executed it will be retried goverened by the `delay` passed into the initializer +  # as described there in case it fails with one of the predefined exceptions or `RetryService::Retry` +  # +  # Eventually it will return a `Result` object. +  def execute &blk +    result = execute_protected blk +    return result if result.ok? +    @intervals.each_with_index do | interval, retry_count | +      sleep interval +      @failure_callback.try(:call, result.value, retry_count + 1) +      result = execute_protected blk +      return result if result.ok? +    end +    result +  end + + +  private + +  def execute_protected blk +    Result.ok(blk.()) +  rescue Exception => e +    if @registered_exceptions.any?{ |re| e.is_a? re } +      Result.error(e) +    else +      raise +    end +  end +end diff --git a/app/services/zip_service.rb b/app/services/zip_service.rb new file mode 100644 index 000000000..778bfd06d --- /dev/null +++ b/app/services/zip_service.rb @@ -0,0 +1,55 @@ +class ZipService + +  attr_reader :current_entry, :zip_data + +  def initialize data +    @zip_data = data +    @current_entry = nil +  end + +  class << self +    def convert_entries entries +      -> output_stream do +        entries.each do |e| +          output_stream.put_next_entry e.name +          output_stream.write e.get_input_stream.read +        end +      end +    end + +    def entries input_stream +      Enumerator.new do |enum| +        loop{ enum << input_stream.get_next_entry } +      end.lazy.take_while{ |e| e } +    end +  end + +  def entry_groups +    self.class.entries(input_stream).group_by(&method(:entry_key)) +  end + +  def entry_group_streams +    entry_groups.map(&method(:make_stream)).to_h +  end + +  def entry_key entry +    entry.name.split('/', -1)[-2] +  end + +  def make_stream pair +    name, entries = pair  +    [name,  make_stream_from( entries )] +  end + +  def make_stream_from entries +    Zip::OutputStream.write_buffer(&self.class.convert_entries(entries)) +  end + +  def next_entry +    @current_entry = input_stream.get_next_entry +  end + +  def input_stream +    @__input_stream__ ||= Zip::InputStream.open(StringIO.new(zip_data)) +  end +end diff --git a/app/views/api/v1/netex_imports/create.json.rabl b/app/views/api/v1/netex_imports/create.json.rabl new file mode 100644 index 000000000..1361cdb80 --- /dev/null +++ b/app/views/api/v1/netex_imports/create.json.rabl @@ -0,0 +1,3 @@ + +object @import +attributes :id, :type diff --git a/app/workers/workbench_import_worker.rb b/app/workers/workbench_import_worker.rb new file mode 100644 index 000000000..57c0f5f4e --- /dev/null +++ b/app/workers/workbench_import_worker.rb @@ -0,0 +1,118 @@ +class WorkbenchImportWorker +  include Sidekiq::Worker +  include Rails.application.routes.url_helpers +  include Configurable + +  RETRY_DELAYS = [3, 5, 8] + +  # Workers +  # ======= + +  def perform(import_id) +    @import     = WorkbenchImport.find(import_id) +    @response   = nil +    @import.update_attributes(status: 'running') +    downloaded  = download +    zip_service = ZipService.new(downloaded) +    upload zip_service +  end + +  def download +    logger.info  "HTTP GET #{import_url}" +    @zipfile_data = HTTPService.get_resource( +      host: import_host, +      path: import_path, +      params: {token: @import.token_download}).body +  end + +  def execute_post eg_name, eg_stream +    logger.info  "HTTP POST #{export_url} (for #{complete_entry_group_name(eg_name)})" +    HTTPService.post_resource( +      host: export_host, +      path: export_path, +      token: token(eg_name), +      params: params, +      upload: {file: [eg_stream, 'application/zip', eg_name]}) +  end + +  def log_failure reason, count +    logger.warn "HTTP POST failed with #{reason}, count = #{count}, response=#{@response}" +  end + +  def try_again +    raise RetryService::Retry +  end + +  def try_upload_entry_group eg_name, eg_stream +    result = execute_post eg_name, eg_stream +    return result if result && result.status < 400 +    @response = result.body +    try_again +  end + +  def upload zip_service +    entry_group_streams = zip_service.entry_group_streams +    @import.update_attributes total_steps: entry_group_streams.size +    entry_group_streams.each_with_index(&method(:upload_entry_group)) +  rescue StopIteration +    @import.update_attributes( current_step: entry_group_streams.size, status: 'failed' ) +  end + +  def upload_entry_group key_pair, element_count +    @import.update_attributes( current_step: element_count.succ ) +    retry_service = RetryService.new( +      delays: RETRY_DELAYS, +      rescue_from: [HTTPService::Timeout], +      &method(:log_failure))  +    status = retry_service.execute(&upload_entry_group_proc(key_pair)) +    raise StopIteration unless status.ok? +  end + +  def upload_entry_group_proc key_pair +    eg_name, eg_stream = key_pair +    # This should be fn.try_upload_entry_group(eg_name, eg_stream) ;( +    -> do +      try_upload_entry_group(eg_name, eg_stream) +    end +  end + + + +  # Queries +  # ======= + +  def complete_entry_group_name entry_group_name +    [@import.name, entry_group_name].join("--") +  end + +  def token entry_group_name +    Api::V1::ApiKey.from(@import.referential, name: complete_entry_group_name(entry_group_name)).token +  end + +  # Constants +  # ========= + +  def export_host +    Rails.application.config.rails_host +  end +  def export_path +    api_v1_netex_imports_path(format: :json) +  end +  def export_url +    @__export_url__ ||= File.join(export_host, export_path) +  end + +  def import_host +    Rails.application.config.rails_host +  end +  def import_path +    @__import_path__ ||= download_workbench_import_path(@import.workbench, @import) +  end +  def import_url +    @__import_url__ ||= File.join(import_host, import_path) +  end + +  def params +    @__params__ ||= { netex_import: { referential_id: @import.referential_id, workbench_id: @import.workbench_id } } +  end +end | 
