diff options
| -rw-r--r-- | app/workers/workbench_import_worker.rb | 93 | ||||
| -rw-r--r-- | app/workers/workbench_import_worker/object_state_updater.rb | 36 | ||||
| -rw-r--r-- | lib/stif/my_workbench_scopes.rb | 3 | ||||
| -rw-r--r-- | spec/requests/api/v1/netex_import_spec.rb | 47 | ||||
| -rw-r--r-- | spec/workers/workbench_import/workbench_import_with_corrupt_zip_spec.rb | 47 | ||||
| -rw-r--r-- | spec/workers/workbench_import/workbench_import_worker_spec.rb | 169 | ||||
| -rw-r--r-- | spec/workers/workbench_import_worker_spec.rb | 102 | 
7 files changed, 214 insertions, 283 deletions
| diff --git a/app/workers/workbench_import_worker.rb b/app/workers/workbench_import_worker.rb index de51efded..6a3eda2f9 100644 --- a/app/workers/workbench_import_worker.rb +++ b/app/workers/workbench_import_worker.rb @@ -3,29 +3,25 @@ class WorkbenchImportWorker    include Rails.application.routes.url_helpers    include Configurable +  include ObjectStateUpdater + +  attr_reader :entries, :workbench_import +    # Workers    # =======    def perform(import_id) -    @workbench_import = WorkbenchImport.find(import_id) -    @response         = nil -    @workbench_import.update(status: 'running', started_at: Time.now) -    downloaded  = download -    zip_service = ZipService.new(downloaded) +    @entries = 0 +    @workbench_import ||= WorkbenchImport.find(import_id) + +    workbench_import.update(status: 'running', started_at: Time.now) +    zip_service = ZipService.new(downloaded, allowed_lines)      upload zip_service -    @workbench_import.update(ended_at: Time.now) +    workbench_import.update(ended_at: Time.now)    rescue Zip::Error      handle_corrupt_zip_file    end -  def download -    logger.info  "HTTP GET #{import_url}" -    HTTPService.get_resource( -      host: import_host, -      path: import_path, -      params: {token: @workbench_import.token_download}).body -  end -    def execute_post eg_name, eg_file      logger.info  "HTTP POST #{export_url} (for #{complete_entry_group_name(eg_name)})"      HTTPService.post_resource( @@ -35,48 +31,44 @@ class WorkbenchImportWorker    end    def handle_corrupt_zip_file -    @workbench_import.messages.create(criticity: :error, message_key: 'corrupt_zip_file', message_attributes: {source_filename: @workbench_import.file.file.file}) +    workbench_import.messages.create(criticity: :error, message_key: 'corrupt_zip_file', message_attributes: {source_filename: workbench_import.file.file.file})    end    def upload zip_service      entry_group_streams = zip_service.subdirs -    @workbench_import.update total_steps: entry_group_streams.size      entry_group_streams.each_with_index(&method(:upload_entry_group)) +    workbench_import.update total_steps: @entries    rescue Exception => e      logger.error e.message -    @workbench_import.update( current_step: entry_group_streams.size, status: 'failed' ) +    workbench_import.update( current_step: @entries, status: 'failed' )      raise    end -  def update_object_state entry, count -    @workbench_import.update( current_step: count ) -    unless entry.spurious.empty? -      @workbench_import.messages.create( -        criticity: :warning, -        message_key: 'inconsistent_zip_file', -        message_attributes: { -          'source_filename' => @workbench_import.file.file.file, -          'spurious_dirs'   => entry.spurious.join(', ') -        })  -    end -  end    def upload_entry_group entry, element_count      update_object_state entry, element_count.succ +    return unless entry.ok?      # status = retry_service.execute(&upload_entry_group_proc(entry)) -    eg_name = entry.name -    eg_stream = entry.stream +    upload_entry_group_stream entry.name, entry.stream +  end +  def upload_entry_group_stream eg_name, eg_stream      FileUtils.mkdir_p(Rails.root.join('tmp', 'imports')) -    eg_file = File.new(Rails.root.join('tmp', 'imports', "WorkbenchImport_#{eg_name}_#{$$}.zip"), 'wb').tap do |file| +    File.new(Rails.root.join('tmp', 'imports', "WorkbenchImport_#{eg_name}_#{$$}.zip"), 'wb').tap do |file|        eg_stream.rewind        file.write eg_stream.read +      file.close      end -    eg_file.close -    eg_file = File.new(Rails.root.join('tmp', 'imports', "WorkbenchImport_#{eg_name}_#{$$}.zip")) + +    upload_entry_group_tmpfile eg_name, File.new(Rails.root.join('tmp', 'imports', "WorkbenchImport_#{eg_name}_#{$$}.zip")) +  end +     +  def upload_entry_group_tmpfile eg_name, eg_file      result = execute_post eg_name, eg_file      if result && result.status < 400 +      @entries += 1 +      workbench_import.update( current_step: @entries )        result      else        raise StopIteration, result.body @@ -91,7 +83,7 @@ class WorkbenchImportWorker    # =======    def complete_entry_group_name entry_group_name -    [@workbench_import.name, entry_group_name].join("--") +    [workbench_import.name, entry_group_name].join("--")    end    # Constants @@ -111,7 +103,7 @@ class WorkbenchImportWorker      Rails.application.config.rails_host    end    def import_path -    @__import_path__ ||= download_workbench_import_path(@workbench_import.workbench, @workbench_import) +    @__import_path__ ||= download_workbench_import_path(workbench_import.workbench, workbench_import)    end    def import_url      @__import_url__ ||= File.join(import_host, import_path) @@ -119,10 +111,29 @@ class WorkbenchImportWorker    def params file, name      { netex_import: -        { parent_id: @workbench_import.id, -          parent_type: @workbench_import.class.name, -          workbench_id: @workbench_import.workbench_id, -          name: name, -          file: HTTPService.upload(file, 'application/zip', "#{name}.zip") } } +      { parent_id: workbench_import.id, +        parent_type: workbench_import.class.name, +        workbench_id: workbench_import.workbench_id, +        name: name, +        file: HTTPService.upload(file, 'application/zip', "#{name}.zip") } } +  end + +  # Lazy Values +  # =========== + +  def allowed_lines +    @__allowed_lines__ ||= workbench_import.workbench.organisation.lines_set    end +  def downloaded +    @__downloaded__ ||= download_response.body +  end +  def download_response +    @__download_response__ ||= HTTPService.get_resource( +      host: import_host, +      path: import_path, +      params: {token: workbench_import.token_download}).tap do +        logger.info  "HTTP GET #{import_url}" +      end +  end +  end diff --git a/app/workers/workbench_import_worker/object_state_updater.rb b/app/workers/workbench_import_worker/object_state_updater.rb new file mode 100644 index 000000000..e9cc081b7 --- /dev/null +++ b/app/workers/workbench_import_worker/object_state_updater.rb @@ -0,0 +1,36 @@ + +class WorkbenchImportWorker +  module ObjectStateUpdater + +    def update_object_state entry, count +      workbench_import.update( total_steps: count ) +      update_spurious entry +      update_foreign_lines entry +    end + + +    private + +    def update_foreign_lines entry +      return if entry.foreign_lines.empty? +      workbench_import.messages.create( +        criticity: :warning, +        message_key: 'foreign_lines_in_referential', +        message_attributes: { +          'source_filename' => workbench_import.file.file.file, +          'foreign_lines'   => entry.foreign_lines.join(', ') +        })  +    end + +    def update_spurious entry +      return if entry.spurious.empty? +      workbench_import.messages.create( +        criticity: :warning, +        message_key: 'inconsistent_zip_file', +        message_attributes: { +          'source_filename' => workbench_import.file.file.file, +          'spurious_dirs'   => entry.spurious.join(', ') +        })  +    end +  end +end diff --git a/lib/stif/my_workbench_scopes.rb b/lib/stif/my_workbench_scopes.rb index 89c4e659c..04bc93089 100644 --- a/lib/stif/my_workbench_scopes.rb +++ b/lib/stif/my_workbench_scopes.rb @@ -2,12 +2,13 @@ module Stif    class MyWorkbenchScopes      attr_accessor :workbench +          def initialize(workbench)        @workbench = workbench      end      def line_scope(initial_scope) -      ids = self.parse_functional_scope +      ids = parse_functional_scope        ids ? initial_scope.where(objectid: ids) : initial_scope      end diff --git a/spec/requests/api/v1/netex_import_spec.rb b/spec/requests/api/v1/netex_import_spec.rb index a90e51e5b..8597c1d32 100644 --- a/spec/requests/api/v1/netex_import_spec.rb +++ b/spec/requests/api/v1/netex_import_spec.rb @@ -32,18 +32,18 @@ RSpec.describe "NetexImport", type: :request do        let( :authorization ){ authorization_token_header( get_api_key.token ) }        #TODO Check why referential_id is nil        it 'succeeds' do -        skip "Problem with referential_id" do -          create(:line, objectid: 'STIF:CODIFLIGNE:Line:C00108', line_referential: workbench.line_referential) -          create(:line, objectid: 'STIF:CODIFLIGNE:Line:C00109', line_referential: workbench.line_referential) - -          post_request.(netex_import: legal_attributes) -          expect( response ).to be_success -          expect( json_response_body ).to eq( -            'id'             => NetexImport.last.id, -            'referential_id' => Referential.last.id, -            'workbench_id'   => workbench.id -          ) -        end +        # skip "Problem with referential_id" do +        create(:line, objectid: 'STIF:CODIFLIGNE:Line:C00108', line_referential: workbench.line_referential) +        create(:line, objectid: 'STIF:CODIFLIGNE:Line:C00109', line_referential: workbench.line_referential) + +        post_request.(netex_import: legal_attributes) +        expect( response ).to be_success +        expect( json_response_body ).to eq( +          'id'             => NetexImport.last.id, +          'referential_id' => Referential.last.id, +          'workbench_id'   => workbench.id +        ) +        # end        end @@ -54,24 +54,21 @@ RSpec.describe "NetexImport", type: :request do          expect{ post_request.(netex_import: legal_attributes) }.to change{NetexImport.count}.by(1)        end -      #TODO Check why Referential count does not change -      it 'creates a correct Referential' do -        skip "Referential count does not change" do -          create(:line, objectid: 'STIF:CODIFLIGNE:Line:C00108', line_referential: workbench.line_referential) -          create(:line, objectid: 'STIF:CODIFLIGNE:Line:C00109', line_referential: workbench.line_referential) - -          legal_attributes # force object creation for correct to change behavior -          expect{post_request.(netex_import: legal_attributes)}.to change{Referential.count}.by(1) -          Referential.last.tap do | ref | -            expect( ref.workbench_id ).to eq(workbench.id) -            expect( ref.organisation_id ).to eq(workbench.organisation_id) -          end +      it 'creates a correct Referential', pending: 'see #5073' do +        create(:line, objectid: 'STIF:CODIFLIGNE:Line:C00108', line_referential: workbench.line_referential) +        create(:line, objectid: 'STIF:CODIFLIGNE:Line:C00109', line_referential: workbench.line_referential) + +        legal_attributes # force object creation for correct to change behavior +        expect{post_request.(netex_import: legal_attributes)}.to change{Referential.count}.by(1) +        Referential.last.tap do | ref | +          expect( ref.workbench_id ).to eq(workbench.id) +          expect( ref.organisation_id ).to eq(workbench.organisation_id)          end        end      end -    context 'with incorrect credentials and correct request', pending: "see #4311" do +    context 'with incorrect credentials and correct request', pending: "see #4311 & #5072" do        let( :authorization ){ authorization_token_header( "#{referential.id}-incorrect_token") }        it 'does not create any DB object and does not succeed' do diff --git a/spec/workers/workbench_import/workbench_import_with_corrupt_zip_spec.rb b/spec/workers/workbench_import/workbench_import_with_corrupt_zip_spec.rb deleted file mode 100644 index 47626f5a1..000000000 --- a/spec/workers/workbench_import/workbench_import_with_corrupt_zip_spec.rb +++ /dev/null @@ -1,47 +0,0 @@ -RSpec.describe WorkbenchImportWorker do - -  shared_examples_for 'corrupt zipfile data' do -    subject { described_class.new } -    let( :workbench_import ){ create :workbench_import, status: :pending } - -    before do -      # Let us make sure that the name Enterprise will never be forgotten by history, -      # ahem, I meant, that nothing is uploaded, by forbidding any message to be sent -      # to HTTPService -      expect_it.to receive(:download).and_return(downloaded) -    end - -    it 'does not upload' do -      stub_const 'HTTPService', double('HTTPService') -      subject.perform(workbench_import.id) -    end - -    it 'does create a message' do -      expect{ subject.perform(workbench_import.id) }.to change{ workbench_import.messages.count }.by(1) - -      message = workbench_import.messages.last -      expect( message.criticity ).to eq('error') -      expect( message.message_key ).to eq('corrupt_zip_file') -      expect( message.message_attributes ).to eq( 'source_filename' => workbench_import.file.file.file ) -    end - -    it 'does not change current step' do -      expect{ subject.perform(workbench_import.id) }.not_to change{ workbench_import.current_step } -    end - -    it "sets the workbench_import.status to failed" do -      subject.perform(workbench_import.id) -      expect( workbench_import.reload.status ).to eq('failed') -    end -  end - -  context 'empty zip file' do  -    let( :downloaded ){ '' } -    it_should_behave_like 'corrupt zipfile data' -  end - -  context 'corrupt data' do  -    let( :downloaded ){ very_random } -    it_should_behave_like 'corrupt zipfile data' -  end -end diff --git a/spec/workers/workbench_import/workbench_import_worker_spec.rb b/spec/workers/workbench_import/workbench_import_worker_spec.rb deleted file mode 100644 index 47ca2b4ff..000000000 --- a/spec/workers/workbench_import/workbench_import_worker_spec.rb +++ /dev/null @@ -1,169 +0,0 @@ -RSpec.describe WorkbenchImportWorker, type: [:worker, :request] do - -  let( :worker ) { described_class.new } -  let( :import ){ build_stubbed :import, token_download: download_token, file: zip_file } - -  let( :workbench ){ import.workbench } -  let( :referential ){ import.referential } -  let( :api_key ){ build_stubbed :api_key, referential: referential, token: "#{referential.id}-#{random_hex}" } - -  # http://www.example.com/workbenches/:workbench_id/imports/:id/download -  let( :host ){ Rails.configuration.rails_host } -  let( :path ){ download_workbench_import_path(workbench, import) } - -  let( :downloaded_zip ){ double("downloaded zip") } -  let( :download_zip_response ){ OpenStruct.new( body: downloaded_zip ) } -  let( :download_token ){ random_string } - -  let( :upload_path ) { api_v1_netex_imports_path(format: :json) } - -  let( :spurious ){ [[], [], []] } -  let( :subdirs ) do -    entry_count.times.map do |i| -      ZipService::Subdir.new( -        "subdir #{i}", -        double("subdir #{i}", rewind: 0, read: ''), -        spurious[i] -      ) -    end -  end - -  let( :zip_service ){ double("zip service") } -  let( :zip_file ){ open_fixture('multiple_references_import.zip') } - -  let( :post_response_ok ){ double(status: 201, body: "{}") } - -  before do -    Timecop.freeze(Time.now) - -    # Silence Logger -    allow_any_instance_of(Logger).to receive(:info) -    allow_any_instance_of(Logger).to receive(:warn) - -    # That should be `build_stubbed's` job, no? -    allow(Import).to receive(:find).with(import.id).and_return(import) - -    allow(Api::V1::ApiKey).to receive(:from).and_return(api_key) -    allow(ZipService).to receive(:new).with(downloaded_zip).and_return zip_service -    expect(zip_service).to receive(:subdirs).and_return(subdirs) -    expect( import ).to receive(:update).with( -      status: 'running', -      started_at: Time.now -    ) -  end - -  after do -    Timecop.return -  end - - -  context 'multireferential zipfile, no errors' do -    let( :entry_count ){ 2 } - -    it 'downloads a zip file, cuts it, and uploads all pieces' do - -      expect(HTTPService).to receive(:get_resource) -        .with(host: host, path: path, params: {token: download_token}) -        .and_return( download_zip_response ) - -      subdirs.each do |subdir| -        mock_post subdir, post_response_ok -      end - -      expect( import ).to receive(:update).with(total_steps: 2) -      expect( import ).to receive(:update).with(current_step: 1) -      expect( import ).to receive(:update).with(current_step: 2) -      expect( import ).to receive(:update).with(ended_at: Time.now) - -      worker.perform import.id - -    end -  end - -  context 'multireferential zipfile with error' do -    let( :entry_count ){ 3 } -    let( :post_response_failure ){ double(status: 406, body: {error: 'What was you thinking'}) } - -    it 'downloads a zip file, cuts it, and uploads some pieces' do -      expect(HTTPService).to receive(:get_resource) -        .with(host: host, path: path, params: {token: download_token}) -        .and_return( download_zip_response ) - -      # First subdir succeeds -      subdirs[0..0].each do |subdir| -        mock_post subdir, post_response_ok -      end - -      # Second subdir fails (M I S E R A B L Y) -      subdirs[1..1].each do |subdir| -        mock_post subdir, post_response_failure -      end - -      expect( import ).to receive(:update).with(total_steps: 3) -      expect( import ).to receive(:update).with(current_step: 1) -      expect( import ).to receive(:update).with(current_step: 2) -      expect( import ).to receive(:update).with(current_step: 3, status: 'failed') - -      expect { worker.perform import.id }.to raise_error(StopIteration) -    end -  end - -  context 'multireferential zipfile with spurious directories' do  -    let( :entry_count ){ 2 } -    let( :spurious1 ){ [random_string] } -    let( :spurious2 ){ [random_string, random_string] } -    let( :spurious ){ [spurious1, spurious2] } -    let( :messages ){ double('messages') } -    let( :message_attributes ){{criticity: :warning, message_key: 'inconsistent_zip_file'}} -    let( :message1_attributes ){ message_attributes.merge(message_attributes: {'source_filename' => import.file.file.file, 'spurious_dirs' => spurious1.join(', ')}) } -    let( :message2_attributes ){ message_attributes.merge(message_attributes: {'source_filename' => import.file.file.file, 'spurious_dirs' => spurious2.join(', ')}) } - -    before do -      allow(import).to receive(:messages).and_return(messages) -    end - -    it 'downloads a zip file, cuts it, and uploads all pieces and adds messages' do - -      expect(HTTPService).to receive(:get_resource) -        .with(host: host, path: path, params: {token: download_token}) -        .and_return( download_zip_response ) - -      subdirs.each do |subdir| -        mock_post subdir, post_response_ok -      end - -      expect( import ).to receive(:update).with(total_steps: 2) -      expect( import ).to receive(:update).with(current_step: 1) -      expect( messages ).to receive(:create).with(message1_attributes) -      expect( import ).to receive(:update).with(current_step: 2) -      expect( messages ).to receive(:create).with(message2_attributes) -      expect( import ).to receive(:update).with(ended_at: Time.now) - -      worker.perform import.id - -    end -     -  end - -  def mock_post subdir, response -    allow(HTTPService).to receive(:upload) -    expect( HTTPService ).to receive(:post_resource) -      .with( -        host: host, -        path: upload_path, -        params: { -          netex_import: { -            parent_id: import.id, -            parent_type: import.class.name, -            workbench_id: workbench.id, -            name: subdir.name, -            file: HTTPService.upload( -              subdir.stream, -              'application/zip', -              "#{subdir.name}.zip" -            ) -          } -        } -      ).and_return(response) -  end -end diff --git a/spec/workers/workbench_import_worker_spec.rb b/spec/workers/workbench_import_worker_spec.rb new file mode 100644 index 000000000..310693e1e --- /dev/null +++ b/spec/workers/workbench_import_worker_spec.rb @@ -0,0 +1,102 @@ +RSpec.describe WorkbenchImportWorker, type: [:worker, :request, :zip] do + +  def self.expect_upload_with *entry_names, &blk +    let(:expected_upload_names){ Set.new(entry_names.flatten) } + +    it "uploads the following entries: #{entry_names.flatten.inspect}" do +      allow( HTTPService ).to receive(:post_resource) +        .with(host: host, path: upload_path, params: anything) { |params| +        name =  params[:params][:netex_import][:name] +        raise RuntimeError, "unexpected upload of entry #{name}" unless expected_upload_names.delete?(name) +        OpenStruct.new(status: 201) +      } +      instance_eval(&blk) +      expect( expected_upload_names ).to be_empty, "the following expected uploads were not executed: #{expected_upload_names.to_a.inspect}" +    end +  end + +  let( :lines ){ %w{*:C00109 *:C00108}.to_json } +  let!( :organisation ){ workbench.organisation.update sso_attributes: {'functional_scope' => lines}} + +  let( :worker ) { described_class.new } +  let( :workbench_import ){ create :workbench_import, token_download: download_token } + +  let( :workbench ){ workbench_import.workbench } + +  # http://www.example.com/workbenches/:workbench_id/imports/:id/download +  let( :host ){ Rails.configuration.rails_host } +  let( :path ){ download_workbench_import_path(workbench, workbench_import) } +  let( :upload_path ){ api_v1_netex_imports_path(format: :json) } + +  let( :downloaded_zip_archive ){ make_zip_from_tree zip_data_dir } +  let( :downloaded_zip_data ){ downloaded_zip_archive.data } +  let( :download_token ){ random_string } + +  before do +    stub_request(:get, "#{ host }#{ path }?token=#{ workbench_import.token_download }").  +      to_return(body: downloaded_zip_data, status: :success) +  end + +  context 'correct workbench_import' do +    let( :zip_data_dir ){ fixtures_path 'two_referentials_ok' } + +    expect_upload_with %w{ OFFRE_TRANSDEV_20170301122517 OFFRE_TRANSDEV_20170301122519 } do +      expect{ worker.perform( workbench_import.id ) }.not_to change{ workbench_import.messages.count } +      expect( workbench_import.reload.attributes.values_at(*%w{current_step total_steps}) ) +        .to eq([2, 2]) +      expect( workbench_import.reload.status ).to eq('running') +    end + +  end + +  context 'correct but spurious directories' do +    let( :zip_data_dir ){ fixtures_path 'extra_file_nok' } + +    expect_upload_with [] do +      expect{ worker.perform( workbench_import.id ) }.to change{ workbench_import.messages.count }.by(1) +      expect( workbench_import.reload.attributes.values_at(*%w{current_step total_steps}) ) +        .to eq([0, 0]) +      expect( workbench_import.messages.last.message_key ).to eq('inconsistent_zip_file') +      expect( workbench_import.reload.status ).to eq('running') +    end +  end + +  context 'foreign lines' do  +    let( :zip_data_dir ){ fixtures_path 'some_foreign_mixed' } + +    expect_upload_with %w{ OFFRE_TRANSDEV_20170301122517 OFFRE_TRANSDEV_20170301122519 } do +      expect{ worker.perform( workbench_import.id ) }.to change{ workbench_import.messages.count }.by(1) +      expect( workbench_import.reload.attributes.values_at(*%w{current_step total_steps}) ) +        .to eq([2, 2]) +      expect( workbench_import.messages.last.message_key ).to eq('foreign_lines_in_referential') +      expect( workbench_import.reload.status ).to eq('running') +    end +     +  end + +  context 'foreign and spurious' do +    let( :zip_data_dir ){ fixtures_path 'foreign_and_spurious' } + +    expect_upload_with %w{ OFFRE_TRANSDEV_20170301122517 OFFRE_TRANSDEV_20170301122519 } do +      expect{ worker.perform( workbench_import.id ) }.to change{ workbench_import.messages.count }.by(2) +      expect( workbench_import.reload.attributes.values_at(*%w{current_step total_steps}) ) +        .to eq([2, 2]) +      expect( workbench_import.messages.last(2).map(&:message_key).sort ) +        .to eq(%w{foreign_lines_in_referential inconsistent_zip_file}) +      expect( workbench_import.reload.status ).to eq('running') +    end +  end + +  context 'corrupt zip file' do  +    let( :downloaded_zip_archive ){ OpenStruct.new(data: '') } + +    it 'will not upload anything' do +      expect(HTTPService).not_to receive(:post_resource) +      expect{ worker.perform( workbench_import.id ) }.to change{ workbench_import.messages.count }.by(1) +      expect( workbench_import.messages.last.message_key ).to eq('corrupt_zip_file') +      expect( workbench_import.reload.status ).to eq('failed') +    end +     +  end + +end | 
